1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174 | from airflow import DAG
from template.dag_template import build_dag
from airflow.models import Variable
from template.utils import hours_ago
# <cluster name>_<version id>
dag_id = 'exchange-trade-summary-backfill_all_v002'
SSPUI_JOB_EXECUTION_MODE = Variable.get("sspui_job_execution_mode")
AWS_REGION=Variable.get("aws_region")
ExportVioohFrameLevelReportToDbV2_backfill_script_args = f"\"$ENV#0.0.2#db-import-viooh-hourly-ui-report-v2#backfill-frame#$EXECUTION_DATETIME#$ARTIFACT_BUCKET#scripts/exchange-trade-summary/sspui_reporting#backfill-frame#{AWS_REGION}\""
ExportVioohCreativeLevelReportToDbV2_backfill_script_args = f"\"$ENV#0.0.2#db-import-viooh-hourly-creative-report#backfill-creative#$EXECUTION_DATETIME#$ARTIFACT_BUCKET#scripts/exchange-trade-summary/sspui_reporting#backfill-creative#{AWS_REGION}\""
emr_steps = """[
{
"step-name": "SupplyDenormalizationJobBackfill",
"config-json": [
{"spark.driver.memory":"20g"},
{"spark.driver.cores":"5"},
{"spark.serializer":"org.apache.spark.serializer.KryoSerializer"},
{"spark.task.maxFailures":"20"},
{"spark.yarn.maxAppAttempts":"10"},
{"spark.stage.maxConsecutiveAttempts":"20"}
],
"main-class": "com.viooh.smex.supply.denormalization.SupplyDataDenormalizationBackfill",
"group-id":"com/viooh/smex",
"artifact": "supply-denormalization",
"jars": "/usr/lib/spark/connector/lib/spark-avro.jar"
},
{
"step-name": "DemandDenormalizationJobBackfill",
"config-json": [
{"spark.driver.memory":"20g"},
{"spark.driver.cores":"5"},
{"spark.serializer":"org.apache.spark.serializer.KryoSerializer"},
{"spark.task.maxFailures":"20"},
{"spark.yarn.maxAppAttempts":"10"},
{"spark.stage.maxConsecutiveAttempts":"20"}
],
"main-class": "com.viooh.smex.demand.denormalization.DemandDataDenormalizationBackfill",
"group-id":"com/viooh/smex",
"artifact": "demand-denormalization",
"jars": "/usr/lib/spark/connector/lib/spark-avro.jar"
},
{
"step-name": "DealsyncDenormalizationJobBackfill",
"config-json": [
{"spark.driver.memory":"20g"},
{"spark.driver.cores":"5"},
{"spark.serializer":"org.apache.spark.serializer.KryoSerializer"},
{"spark.task.maxFailures":"20"},
{"spark.yarn.maxAppAttempts":"10"},
{"spark.stage.maxConsecutiveAttempts":"20"}
],
"main-class": "com.viooh.smex.dealsync.denormalization.DealSyncDenormalizationBackfill",
"group-id":"com/viooh/smex",
"artifact": "dealsync-denormalization"
},
{
"step-name": "ModerationDenormalizationJobBackfill",
"config-json": [
{"spark.driver.memory":"20g"},
{"spark.driver.cores":"5"},
{"spark.serializer":"org.apache.spark.serializer.KryoSerializer"},
{"spark.task.maxFailures":"20"},
{"spark.yarn.maxAppAttempts":"10"},
{"spark.stage.maxConsecutiveAttempts":"20"}
],
"main-class": "com.viooh.smex.moderation.denormalization.ModerationDenormalizationBackfill",
"group-id":"com/viooh/smex",
"artifact": "moderation-denormalization",
"jars": "/usr/lib/spark/connector/lib/spark-avro.jar"
},
{
"step-name": "DealDenormalizationJobBackfill",
"config-json": [
{"spark.driver.memory":"20g"},
{"spark.driver.cores":"5"},
{"spark.serializer":"org.apache.spark.serializer.KryoSerializer"},
{"spark.task.maxFailures":"20"},
{"spark.yarn.maxAppAttempts":"10"},
{"spark.stage.maxConsecutiveAttempts":"20"}
],
"main-class": "com.viooh.smex.deal.denormalization.DealDenormalizationBackfill",
"group-id":"com/viooh/smex",
"artifact": "deal-denormalization",
"jars": "/usr/lib/spark/connector/lib/spark-avro.jar"
},
{
"step-name": "ExchangeTradeSummaryJobBackfill",
"config-json": [
{"spark.driver.memory":"16g"},
{"spark.serializer":"org.apache.spark.serializer.KryoSerializer"},
{"spark.task.maxFailures":"20"},
{"spark.yarn.maxAppAttempts":"10"},
{"spark.stage.maxConsecutiveAttempts":"20"}
],
"main-class": "com.viooh.smex.trade.summary.ExchangeTradeSummaryBackfill",
"group-id":"com/viooh/smex",
"artifact": "exchange-trade-summary"
},
{
"step-name": "ExportVioohHourLevelReportToDbV2Backfill",
"jar-location":"s3://elasticmapreduce/libs/script-runner/script-runner.jar",
"script-file":"scripts/exchange-trade-summary/sspui_reporting/viooh_sspui_hourly_aggr_report_v2_installer.sh",
"script-args": """ + ExportVioohFrameLevelReportToDbV2_backfill_script_args + """
},
{
"step-name": "ExportVioohHourLevelReportToDbV2Backfill",
"jar-location":"s3://elasticmapreduce/libs/script-runner/script-runner.jar",
"script-file":"scripts/exchange-trade-summary/sspui_reporting/viooh_sspui_hourly_aggr_report_v2_installer.sh",
"script-args": """ + ExportVioohCreativeLevelReportToDbV2_backfill_script_args + """
}
]"""
# cluster level parameters (optional)
cluster_args = {
"master-instance-types": "m5.2xlarge,m5.4xlarge",
"core-instance-types": "m5.2xlarge,m5.4xlarge",
"task-instance-types": "m5.2xlarge,m5.4xlarge",
"core-instance-capacity": 20,
"task-instance-capacity": 0,
"ebs-volume-size": "200",
"emr-version": "emr-6.14.0",
"cluster-configurations": [
{
'Classification': 'yarn-site',
'Properties': {
'yarn.resourcemanager.am.max-attempts': '2'
}
},
{
'Classification': 'hadoop-env',
'Configurations': [
{
'Classification': 'export',
'Configurations': [],
'Properties': {
'JAVA_HOME': '/usr/lib/jvm/java-11-amazon-corretto.x86_64'
}
}
],
'Properties': {}
},
{
'Classification': 'spark-env',
'Configurations': [
{
'Classification': 'export',
'Configurations': [],
'Properties': {
'JAVA_HOME': '/usr/lib/jvm/java-11-amazon-corretto.x86_64'
}
}
],
'Properties': {}
}
]
}
# dag parameter
dag_args = {
'owner': 'data.engineers@viooh.com',
'start_date': hours_ago(1)
}
dag = DAG(
dag_id,
schedule_interval=None, # cron expression
default_args=dag_args)
build_dag(emr_steps=emr_steps, dag=dag, cluster_args=cluster_args,catchup=True)
|