DAG: exchange-trade-summary-backfill_all_v002

schedule: None


exchange-trade-summary-backfill_all_v002

Toggle wrap
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
from airflow import DAG

from template.dag_template import build_dag
from airflow.models import Variable
from template.utils import hours_ago

# <cluster name>_<version id>
dag_id = 'exchange-trade-summary-backfill_all_v002'
SSPUI_JOB_EXECUTION_MODE = Variable.get("sspui_job_execution_mode")
AWS_REGION=Variable.get("aws_region")

ExportVioohFrameLevelReportToDbV2_backfill_script_args = f"\"$ENV#0.0.2#db-import-viooh-hourly-ui-report-v2#backfill-frame#$EXECUTION_DATETIME#$ARTIFACT_BUCKET#scripts/exchange-trade-summary/sspui_reporting#backfill-frame#{AWS_REGION}\""
ExportVioohCreativeLevelReportToDbV2_backfill_script_args = f"\"$ENV#0.0.2#db-import-viooh-hourly-creative-report#backfill-creative#$EXECUTION_DATETIME#$ARTIFACT_BUCKET#scripts/exchange-trade-summary/sspui_reporting#backfill-creative#{AWS_REGION}\""


emr_steps = """[
    {
        "step-name": "SupplyDenormalizationJobBackfill",
            "config-json": [
                {"spark.driver.memory":"20g"},
                {"spark.driver.cores":"5"},
                {"spark.serializer":"org.apache.spark.serializer.KryoSerializer"},
                {"spark.task.maxFailures":"20"},
                {"spark.yarn.maxAppAttempts":"10"},
                {"spark.stage.maxConsecutiveAttempts":"20"}
    ],
        "main-class": "com.viooh.smex.supply.denormalization.SupplyDataDenormalizationBackfill",
        "group-id":"com/viooh/smex",
        "artifact": "supply-denormalization",
        "jars": "/usr/lib/spark/connector/lib/spark-avro.jar"
    },
    {
        "step-name": "DemandDenormalizationJobBackfill",
            "config-json": [
                {"spark.driver.memory":"20g"},
                {"spark.driver.cores":"5"},
                {"spark.serializer":"org.apache.spark.serializer.KryoSerializer"},
                {"spark.task.maxFailures":"20"},
                {"spark.yarn.maxAppAttempts":"10"},
                {"spark.stage.maxConsecutiveAttempts":"20"}
    ],
        "main-class": "com.viooh.smex.demand.denormalization.DemandDataDenormalizationBackfill",
        "group-id":"com/viooh/smex",
        "artifact": "demand-denormalization",
        "jars": "/usr/lib/spark/connector/lib/spark-avro.jar"
    },
     {
        "step-name": "DealsyncDenormalizationJobBackfill",
            "config-json": [
                {"spark.driver.memory":"20g"},
                {"spark.driver.cores":"5"},
                {"spark.serializer":"org.apache.spark.serializer.KryoSerializer"},
                {"spark.task.maxFailures":"20"},
                {"spark.yarn.maxAppAttempts":"10"},
                {"spark.stage.maxConsecutiveAttempts":"20"}
    ],
        "main-class": "com.viooh.smex.dealsync.denormalization.DealSyncDenormalizationBackfill",
        "group-id":"com/viooh/smex",
        "artifact": "dealsync-denormalization"
    },
    {
        "step-name": "ModerationDenormalizationJobBackfill",
            "config-json": [
                {"spark.driver.memory":"20g"},
                {"spark.driver.cores":"5"},
                {"spark.serializer":"org.apache.spark.serializer.KryoSerializer"},
                {"spark.task.maxFailures":"20"},
                {"spark.yarn.maxAppAttempts":"10"},
                {"spark.stage.maxConsecutiveAttempts":"20"}
    ],
        "main-class": "com.viooh.smex.moderation.denormalization.ModerationDenormalizationBackfill",
        "group-id":"com/viooh/smex",
        "artifact": "moderation-denormalization",
        "jars": "/usr/lib/spark/connector/lib/spark-avro.jar"
    },
    {
        "step-name": "DealDenormalizationJobBackfill",
            "config-json": [
                {"spark.driver.memory":"20g"},
                {"spark.driver.cores":"5"},
                {"spark.serializer":"org.apache.spark.serializer.KryoSerializer"},
                {"spark.task.maxFailures":"20"},
                {"spark.yarn.maxAppAttempts":"10"},
                {"spark.stage.maxConsecutiveAttempts":"20"}
    ],
        "main-class": "com.viooh.smex.deal.denormalization.DealDenormalizationBackfill",
        "group-id":"com/viooh/smex",
        "artifact": "deal-denormalization",
        "jars": "/usr/lib/spark/connector/lib/spark-avro.jar"
    },
    {
        "step-name": "ExchangeTradeSummaryJobBackfill",
            "config-json": [
                 {"spark.driver.memory":"16g"},
                 {"spark.serializer":"org.apache.spark.serializer.KryoSerializer"},
                 {"spark.task.maxFailures":"20"},
                 {"spark.yarn.maxAppAttempts":"10"},
                 {"spark.stage.maxConsecutiveAttempts":"20"}
             ],
        "main-class": "com.viooh.smex.trade.summary.ExchangeTradeSummaryBackfill",
        "group-id":"com/viooh/smex",
        "artifact": "exchange-trade-summary"
    },
    {
        "step-name": "ExportVioohHourLevelReportToDbV2Backfill",
        "jar-location":"s3://elasticmapreduce/libs/script-runner/script-runner.jar",
        "script-file":"scripts/exchange-trade-summary/sspui_reporting/viooh_sspui_hourly_aggr_report_v2_installer.sh",
        "script-args": """ + ExportVioohFrameLevelReportToDbV2_backfill_script_args + """
    },
    {
        "step-name": "ExportVioohHourLevelReportToDbV2Backfill",
        "jar-location":"s3://elasticmapreduce/libs/script-runner/script-runner.jar",
        "script-file":"scripts/exchange-trade-summary/sspui_reporting/viooh_sspui_hourly_aggr_report_v2_installer.sh",
        "script-args": """ + ExportVioohCreativeLevelReportToDbV2_backfill_script_args + """
    }
]"""

# cluster level parameters (optional)
cluster_args = {
    "master-instance-types": "m5.2xlarge,m5.4xlarge",
    "core-instance-types": "m5.2xlarge,m5.4xlarge",
    "task-instance-types": "m5.2xlarge,m5.4xlarge",
    "core-instance-capacity": 20,
    "task-instance-capacity": 0,
    "ebs-volume-size": "200",
    "emr-version": "emr-6.14.0",
    "cluster-configurations": [
        {
            'Classification': 'yarn-site',
            'Properties': {
                'yarn.resourcemanager.am.max-attempts': '2'
            }
        },
        {
            'Classification': 'hadoop-env',
            'Configurations': [
                {
                    'Classification': 'export',
                    'Configurations': [],
                    'Properties': {
                        'JAVA_HOME': '/usr/lib/jvm/java-11-amazon-corretto.x86_64'
                    }
                }
            ],
            'Properties': {}
        },
        {
            'Classification': 'spark-env',
            'Configurations': [
                {
                    'Classification': 'export',
                    'Configurations': [],
                    'Properties': {
                        'JAVA_HOME': '/usr/lib/jvm/java-11-amazon-corretto.x86_64'
                    }
                }
            ],
            'Properties': {}
        }
    ]
}

# dag parameter
dag_args = {
    'owner': 'data.engineers@viooh.com',
    'start_date': hours_ago(1)
}

dag = DAG(
    dag_id,
    schedule_interval=None,  # cron expression
    default_args=dag_args)

build_dag(emr_steps=emr_steps, dag=dag, cluster_args=cluster_args,catchup=True)