from airflow import DAG
from airflow.utils.dates import days_ago
from template.dag_template import build_dag
dag_id = 'exchange-dsp-reporting_v002'
emr_steps = """[
{
"step-name": "DSPExchangeTradeJob",
"main-class": "com.viooh.smex.dsp.DSPExchangeTrade",
"group-id":"com/viooh/smex",
"artifact": "dsp-exchange-trade"
},
{
"step-name": "CopyDSPReports",
"jar-location":"s3://elasticmapreduce/libs/script-runner/script-runner.jar",
"script-file":"scripts/exchange-dsp-reporting/1.0.0/job.sh",
"script-args": "--environment $ENV --reportdate $EXECUTION_DATE --region cn --skipFailure"
}
]"""
# cluster level parameters (optional)
cluster_args = {
"master-instance-types": "m5.xlarge,m5.2xlarge,m5.4xlarge",
"core-instance-types": "m5.xlarge,m5.2xlarge,m5.4xlarge",
"task-instance-types": "m5.xlarge,m5.2xlarge,m5.4xlarge",
"core-instance-capacity": 5,
"task-instance-capacity": 0,
"emr-version": "emr-6.14.0",
"cluster-configurations": [
{
'Classification': 'hadoop-env',
'Configurations': [
{
'Classification': 'export',
'Configurations': [],
'Properties': {
'JAVA_HOME': '/usr/lib/jvm/java-11-amazon-corretto.x86_64'
}
}
],
'Properties': {}
},
{
'Classification': 'spark-env',
'Configurations': [
{
'Classification': 'export',
'Configurations': [],
'Properties': {
'JAVA_HOME': '/usr/lib/jvm/java-11-amazon-corretto.x86_64'
}
}
],
'Properties': {}
}
]
}
# dag parameter
dag_args = {
'owner': 'data.engineers@viooh.com',
'start_date': days_ago(1)
}
dag = DAG(
dag_id,
schedule_interval='0 4 * * *', # cron expression
default_args=dag_args)
build_dag(emr_steps=emr_steps, dag=dag, cluster_args=cluster_args)