1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157 | from airflow import DAG
from template.dag_template import build_dag
from template.utils import hours_ago
from airflow.models import Variable
from datetime import datetime
# <cluster name>_<version id>
dag_id = 'exchange-trade-summary_v002'
SSPUI_JOB_EXECUTION_MODE = Variable.get("sspui_job_execution_mode")
AWS_REGION=Variable.get("aws_region")
# SSPUI_JOB_EXECUTION_MODE=0, db full load
# SSPUI_JOB_EXECUTION_MODE=1, load data for airflow execution time and execution time + 1 (default behavior)
# SSPUI_JOB_EXECUTION_MODE=2, load data for airflow execution time (backfill behavior)
ExportVioohHourLevelReportToDbV2_script_args = f"\"$ENV#0.0.2#db-import-viooh-hourly-ui-report-v2#{SSPUI_JOB_EXECUTION_MODE}#$EXECUTION_DATETIME#$ARTIFACT_BUCKET#scripts/exchange-trade-summary/sspui_reporting#deal#{AWS_REGION}\""
ExportVioohHourlyCreativeLevelReportToDb_script_args = f"\"$ENV#0.0.2#db-import-viooh-hourly-creative-report#{SSPUI_JOB_EXECUTION_MODE}#$EXECUTION_DATETIME#$ARTIFACT_BUCKET#scripts/exchange-trade-summary/sspui_reporting#creative#{AWS_REGION}\""
emr_steps = """[
{
"step-name": "SupplyDenormalizationJob",
"config-json": [
{"spark.driver.memory":"6g"}
],
"main-class": "com.viooh.smex.supply.denormalization.SupplyDataDenormalization",
"artifact": "supply-denormalization"
},
{
"step-name": "DemandDenormalizationJob",
"config-json": [
{"spark.driver.memory":"6g"}
],
"main-class": "com.viooh.smex.demand.denormalization.DemandDataDenormalization",
"artifact": "demand-denormalization"
},
{
"step-name": "DealSyncDenormalizationJob",
"config-json": [
{"spark.driver.memory":"6g"}
],
"main-class": "com.viooh.smex.dealsync.denormalization.DealSyncDenormalization",
"artifact": "dealsync-denormalization"
},
{
"step-name": "ModerationDenormalizationJob",
"config-json": [
{"spark.driver.memory":"6g"}
],
"main-class": "com.viooh.smex.moderation.denormalization.ModerationDenormalization",
"artifact": "moderation-denormalization",
"jars": "/usr/lib/spark/external/lib/spark-avro.jar"
},
{
"step-name": "DealDenormalizationJob",
"config-json": [
{"spark.driver.memory":"6g"}
],
"main-class": "com.viooh.smex.deal.denormalization.DealDenormalization",
"artifact": "deal-denormalization",
"jars": "/usr/lib/spark/external/lib/spark-avro.jar"
},
{
"step-name": "ExchangeTradeSummaryJob",
"config-json": [
{"spark.driver.memory":"6g"},
{"spark.serializer":"org.apache.spark.serializer.KryoSerializer"},
{"spark.task.maxFailures":"20"},
{"spark.yarn.maxAppAttempts":"10"},
{"spark.stage.maxConsecutiveAttempts":"20"},
{"spark.app.config.key":"exchange-trade-summary"}
],
"main-class": "com.viooh.smex.trade.summary.ExchangeTradeSummary",
"artifact": "exchange-trade-summary"
},
{
"step-name": "CopySmartExchangeMediaOwnerReport",
"jar-location":"s3://elasticmapreduce/libs/script-runner/script-runner.jar",
"script-file":"scripts/exchange-trade-summary/1.3.0/job.sh",
"script-args": "$ENV#$NEXT_EXECUTION_DATE#$NEXT_EXECUTION_HOUR#mediaowner-trade-report#trade-summary-reports#v1#csv"
},
{
"step-name": "CopyExchangeLiteMediaOwnerReport",
"jar-location":"s3://elasticmapreduce/libs/script-runner/script-runner.jar",
"script-file":"scripts/exchange-trade-summary/1.3.0/job.sh",
"script-args": "$ENV#$EXECUTION_DATE#$EXECUTION_HOUR#exchange-lite-mediaowner-trade-report#trade-summary-reports#v1#csv"
},
{
"step-name": "CopySmartExchangeFrameLevelMediaOwnerReport",
"jar-location":"s3://elasticmapreduce/libs/script-runner/script-runner.jar",
"script-file":"scripts/exchange-trade-summary/1.3.0/job.sh",
"script-args": "$ENV#$NEXT_EXECUTION_DATE#$NEXT_EXECUTION_HOUR#frame-level-mediaowner-trade-report#frame-level-media-owner-reports#v1#csv"
},
{
"step-name": "CopyExchangeLiteFrameLevelMediaOwnerReport",
"jar-location":"s3://elasticmapreduce/libs/script-runner/script-runner.jar",
"script-file":"scripts/exchange-trade-summary/1.3.0/job.sh",
"script-args": "$ENV#$EXECUTION_DATE#$EXECUTION_HOUR#frame-level-exchange-lite-mediaowner-trade-report#frame-level-media-owner-reports#v1#csv"
},
{
"step-name": "CopySmartExchangeMediaOwnerReportV2",
"jar-location":"s3://elasticmapreduce/libs/script-runner/script-runner.jar",
"script-file":"scripts/exchange-trade-summary/1.3.0/job.sh",
"script-args": "$ENV#$NEXT_EXECUTION_DATE#$NEXT_EXECUTION_HOUR#mediaowner-trade-report#trade-summary-reports#v2#json"
},
{
"step-name": "CopyExchangeLiteMediaOwnerReportV2",
"jar-location":"s3://elasticmapreduce/libs/script-runner/script-runner.jar",
"script-file":"scripts/exchange-trade-summary/1.3.0/job.sh",
"script-args": "$ENV#$EXECUTION_DATE#$EXECUTION_HOUR#exchange-lite-mediaowner-trade-report#trade-summary-reports#v2#json"
},
{
"step-name": "ExportVioohHourLevelReportToDbV2",
"jar-location":"s3://elasticmapreduce/libs/script-runner/script-runner.jar",
"script-file":"scripts/exchange-trade-summary/sspui_reporting/viooh_sspui_hourly_aggr_report_v2_installer.sh",
"script-args": """ + ExportVioohHourLevelReportToDbV2_script_args + """
},
{
"step-name": "ExportVioohHourlyCreativeLevelReportToDb",
"jar-location":"s3://elasticmapreduce/libs/script-runner/script-runner.jar",
"script-file":"scripts/exchange-trade-summary/sspui_reporting/viooh_sspui_hourly_aggr_report_v2_installer.sh",
"script-args": """ + ExportVioohHourlyCreativeLevelReportToDb_script_args + """
}
]"""
# cluster level parameters (optional)
cluster_args = {
"master-instance-types": "m5.xlarge,m5.2xlarge,m5.4xlarge",
"core-instance-types": "m5.xlarge,m5.2xlarge,m5.4xlarge",
"task-instance-types": "m5.xlarge,m5.2xlarge,m5.4xlarge",
"core-instance-capacity": 5,
"task-instance-capacity": 0,
"emr-version": "emr-5.35.0",
"cluster-configurations": [
{
'Classification': 'yarn-site',
'Properties': {
'yarn.resourcemanager.am.max-attempts': '2'
}
}
]
}
# dag parameter
dag_args = {
'owner': 'data.engineers@viooh.com',
'start_date':hours_ago(1),
'depends_on_past': True,
'wait_for_downstream': True
}
dag = DAG(
dag_id,
schedule_interval='@hourly', # cron expression
default_args=dag_args)
build_dag(emr_steps=emr_steps, dag=dag, cluster_args=cluster_args,catchup=True)
|