1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 | import boto3
from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
import logging
# Function to terminate the EMR clusters
def terminate_emr_clusters():
# Initialize Boto3 client for EMR
aws_region = Variable.get("aws_region")
emr_client = boto3.client('emr', region_name=aws_region)
logging.info("Retrieving EMR clusters...")
# List clusters matching the specified criteria
all_clusters_response = emr_client.list_clusters(ClusterStates=['WAITING'])
# Extract cluster IDs for clusters with names ending with '-viooh-Custom-EMR-cluster'
cluster_ids = [cluster['Id'] for cluster in all_clusters_response['Clusters'] if cluster['Name'].endswith('-viooh-Custom-EMR-cluster')]
if not cluster_ids:
logging.info("No clusters found for Custom EMR.")
return
logging.info(f"Found {len(cluster_ids)} clusters to terminate.")
# Terminate each UAT cluster
for cluster_id in cluster_ids:
logging.info(f"Terminating cluster with ID: {cluster_id}")
emr_client.terminate_job_flows(JobFlowIds=[cluster_id])
logging.info(f"Cluster with ID {cluster_id} terminated successfully at {datetime.utcnow()}.")
default_args = {
'owner': 'data.engineers@viooh.com',
'start_date': datetime(2024, 5, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'Terminate_Custom_EMR_Clusters',
default_args=default_args,
description='Terminate Custom EMR Clusters DAG',
schedule_interval='30 18 * * *' # Every midnight at 12:00 am IST
)
# Task to terminate the EMR clusters
terminate_emr_clusters_task = PythonOperator(
task_id='terminate_emr_clusters_task',
python_callable=terminate_emr_clusters,
dag=dag
)
|