The scheduler does not appear to be running. Last heartbeat was received .
The DAGs list may not update, and new tasks will not be scheduled.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 | from airflow import DAG
from template.dag_template import build_dag
from template.utils import hours_ago
from airflow.models import Variable
from datetime import datetime
# <cluster name>_<version id>
dag_id = 'exchange-trade-summary_v002'
SSPUI_JOB_EXECUTION_MODE = Variable.get("sspui_job_execution_mode")
AWS_REGION=Variable.get("aws_region")
# SSPUI_JOB_EXECUTION_MODE=0, db full load
# SSPUI_JOB_EXECUTION_MODE=1, load data for airflow execution time and execution time + 1 (default behavior)
# SSPUI_JOB_EXECUTION_MODE=2, load data for airflow execution time (backfill behavior)
ExportVioohHourLevelReportToDbV2_script_args = f"\"$ENV#0.0.2#db-import-viooh-hourly-ui-report-v2#{SSPUI_JOB_EXECUTION_MODE}#$EXECUTION_DATETIME#$ARTIFACT_BUCKET#scripts/exchange-trade-summary/sspui_reporting#deal#{AWS_REGION}\""
ExportVioohHourlyCreativeLevelReportToDb_script_args = f"\"$ENV#0.0.2#db-import-viooh-hourly-creative-report#{SSPUI_JOB_EXECUTION_MODE}#$EXECUTION_DATETIME#$ARTIFACT_BUCKET#scripts/exchange-trade-summary/sspui_reporting#creative#{AWS_REGION}\""
emr_steps = """[
{
"step-name": "SupplyDenormalizationJob",
"config-json": [
{"spark.driver.memory":"6g"}
],
"main-class": "com.viooh.smex.supply.denormalization.SupplyDataDenormalization",
"artifact": "supply-denormalization"
},
{
"step-name": "DemandDenormalizationJob",
"config-json": [
{"spark.driver.memory":"6g"}
],
"main-class": "com.viooh.smex.demand.denormalization.DemandDataDenormalization",
"artifact": "demand-denormalization"
},
{
"step-name": "DealSyncDenormalizationJob",
"config-json": [
{"spark.driver.memory":"6g"}
],
"main-class": "com.viooh.smex.dealsync.denormalization.DealSyncDenormalization",
"artifact": "dealsync-denormalization"
},
{
"step-name": "ModerationDenormalizationJob",
"config-json": [
{"spark.driver.memory":"6g"}
],
"main-class": "com.viooh.smex.moderation.denormalization.ModerationDenormalization",
"artifact": "moderation-denormalization",
"jars": "/usr/lib/spark/external/lib/spark-avro.jar"
},
{
"step-name": "DealDenormalizationJob",
"config-json": [
{"spark.driver.memory":"6g"}
],
"main-class": "com.viooh.smex.deal.denormalization.DealDenormalization",
"artifact": "deal-denormalization",
"jars": "/usr/lib/spark/external/lib/spark-avro.jar"
},
{
"step-name": "ExchangeTradeSummaryJob",
"config-json": [
{"spark.driver.memory":"6g"},
{"spark.serializer":"org.apache.spark.serializer.KryoSerializer"},
{"spark.task.maxFailures":"20"},
{"spark.yarn.maxAppAttempts":"10"},
{"spark.stage.maxConsecutiveAttempts":"20"},
{"spark.app.config.key":"exchange-trade-summary"}
],
"main-class": "com.viooh.smex.trade.summary.ExchangeTradeSummary",
"artifact": "exchange-trade-summary"
},
{
"step-name": "CopySmartExchangeMediaOwnerReport",
"jar-location":"s3://elasticmapreduce/libs/script-runner/script-runner.jar",
"script-file":"scripts/exchange-trade-summary/1.3.0/job.sh",
"script-args": "$ENV#$NEXT_EXECUTION_DATE#$NEXT_EXECUTION_HOUR#mediaowner-trade-report#trade-summary-reports#v1#csv"
},
{
"step-name": "CopyExchangeLiteMediaOwnerReport",
"jar-location":"s3://elasticmapreduce/libs/script-runner/script-runner.jar",
"script-file":"scripts/exchange-trade-summary/1.3.0/job.sh",
"script-args": "$ENV#$EXECUTION_DATE#$EXECUTION_HOUR#exchange-lite-mediaowner-trade-report#trade-summary-reports#v1#csv"
},
{
"step-name": "CopySmartExchangeFrameLevelMediaOwnerReport",
"jar-location":"s3://elasticmapreduce/libs/script-runner/script-runner.jar",
"script-file":"scripts/exchange-trade-summary/1.3.0/job.sh",
"script-args": "$ENV#$NEXT_EXECUTION_DATE#$NEXT_EXECUTION_HOUR#frame-level-mediaowner-trade-report#frame-level-media-owner-reports#v1#csv"
},
{
"step-name": "CopyExchangeLiteFrameLevelMediaOwnerReport",
"jar-location":"s3://elasticmapreduce/libs/script-runner/script-runner.jar",
"script-file":"scripts/exchange-trade-summary/1.3.0/job.sh",
"script-args": "$ENV#$EXECUTION_DATE#$EXECUTION_HOUR#frame-level-exchange-lite-mediaowner-trade-report#frame-level-media-owner-reports#v1#csv"
},
{
"step-name": "CopySmartExchangeMediaOwnerReportV2",
"jar-location":"s3://elasticmapreduce/libs/script-runner/script-runner.jar",
"script-file":"scripts/exchange-trade-summary/1.3.0/job.sh",
"script-args": "$ENV#$NEXT_EXECUTION_DATE#$NEXT_EXECUTION_HOUR#mediaowner-trade-report#trade-summary-reports#v2#json"
},
{
"step-name": "CopyExchangeLiteMediaOwnerReportV2",
"jar-location":"s3://elasticmapreduce/libs/script-runner/script-runner.jar",
"script-file":"scripts/exchange-trade-summary/1.3.0/job.sh",
"script-args": "$ENV#$EXECUTION_DATE#$EXECUTION_HOUR#exchange-lite-mediaowner-trade-report#trade-summary-reports#v2#json"
},
{
"step-name": "ExportVioohHourLevelReportToDbV2",
"jar-location":"s3://elasticmapreduce/libs/script-runner/script-runner.jar",
"script-file":"scripts/exchange-trade-summary/sspui_reporting/viooh_sspui_hourly_aggr_report_v2_installer.sh",
"script-args": """ + ExportVioohHourLevelReportToDbV2_script_args + """
},
{
"step-name": "ExportVioohHourlyCreativeLevelReportToDb",
"jar-location":"s3://elasticmapreduce/libs/script-runner/script-runner.jar",
"script-file":"scripts/exchange-trade-summary/sspui_reporting/viooh_sspui_hourly_aggr_report_v2_installer.sh",
"script-args": """ + ExportVioohHourlyCreativeLevelReportToDb_script_args + """
}
]"""
# cluster level parameters (optional)
cluster_args = {
"master-instance-types": "m5.xlarge,m5.2xlarge,m5.4xlarge",
"core-instance-types": "m5.xlarge,m5.2xlarge,m5.4xlarge",
"task-instance-types": "m5.xlarge,m5.2xlarge,m5.4xlarge",
"core-instance-capacity": 5,
"task-instance-capacity": 0,
"emr-version": "emr-5.35.0",
"cluster-configurations": [
{
'Classification': 'yarn-site',
'Properties': {
'yarn.resourcemanager.am.max-attempts': '2'
}
}
]
}
# dag parameter
dag_args = {
'owner': 'data.engineers@viooh.com',
'start_date':hours_ago(1),
'depends_on_past': True,
'wait_for_downstream': True
}
dag = DAG(
dag_id,
schedule_interval='@hourly', # cron expression
default_args=dag_args)
build_dag(emr_steps=emr_steps, dag=dag, cluster_args=cluster_args,catchup=True)
|