DAG: Terminate_Custom_EMR_Clusters

schedule: 30 18 * * *


Terminate_Custom_EMR_Clusters

Toggle wrap
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import boto3
from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
import logging


# Function to terminate the EMR clusters
def terminate_emr_clusters():
    # Initialize Boto3 client for EMR
    aws_region = Variable.get("aws_region")
    emr_client = boto3.client('emr', region_name=aws_region)

    logging.info("Retrieving EMR clusters...")

    # List clusters matching the specified criteria
    all_clusters_response = emr_client.list_clusters(ClusterStates=['WAITING'])

    # Extract cluster IDs for clusters with names ending with '-viooh-Custom-EMR-cluster'
    cluster_ids = [cluster['Id'] for cluster in all_clusters_response['Clusters'] if cluster['Name'].endswith('-viooh-Custom-EMR-cluster')]

    if not cluster_ids:
        logging.info("No clusters found for Custom EMR.")
        return

    logging.info(f"Found {len(cluster_ids)} clusters to terminate.")


    # Terminate each UAT cluster
    for cluster_id in cluster_ids:
        logging.info(f"Terminating cluster with ID: {cluster_id}")
        emr_client.terminate_job_flows(JobFlowIds=[cluster_id])
        logging.info(f"Cluster with ID {cluster_id} terminated successfully at {datetime.utcnow()}.")

default_args = {
    'owner': 'data.engineers@viooh.com',
    'start_date': datetime(2024, 5, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'Terminate_Custom_EMR_Clusters',
    default_args=default_args,
    description='Terminate Custom EMR Clusters DAG',
    schedule_interval='30 18 * * *'  # Every midnight at 12:00 am IST
)

# Task to terminate the EMR clusters
terminate_emr_clusters_task = PythonOperator(
    task_id='terminate_emr_clusters_task',
    python_callable=terminate_emr_clusters,
    dag=dag
)