DAG: exchange-trade-summary_v002

schedule: @hourly


exchange-trade-summary_v002

Toggle wrap
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
from airflow import DAG

from template.dag_template import build_dag
from template.utils import hours_ago
from airflow.models import Variable
from datetime import datetime

# <cluster name>_<version id>
dag_id = 'exchange-trade-summary_v002'
SSPUI_JOB_EXECUTION_MODE = Variable.get("sspui_job_execution_mode")
AWS_REGION=Variable.get("aws_region")

# SSPUI_JOB_EXECUTION_MODE=0, db full load
# SSPUI_JOB_EXECUTION_MODE=1, load data for airflow execution time and execution time + 1 (default behavior)
# SSPUI_JOB_EXECUTION_MODE=2, load data for airflow execution time (backfill behavior)

ExportVioohHourLevelReportToDbV2_script_args = f"\"$ENV#0.0.2#db-import-viooh-hourly-ui-report-v2#{SSPUI_JOB_EXECUTION_MODE}#$EXECUTION_DATETIME#$ARTIFACT_BUCKET#scripts/exchange-trade-summary/sspui_reporting#deal#{AWS_REGION}\""
ExportVioohHourlyCreativeLevelReportToDb_script_args = f"\"$ENV#0.0.2#db-import-viooh-hourly-creative-report#{SSPUI_JOB_EXECUTION_MODE}#$EXECUTION_DATETIME#$ARTIFACT_BUCKET#scripts/exchange-trade-summary/sspui_reporting#creative#{AWS_REGION}\""

emr_steps = """[
      {
        "step-name": "SupplyDenormalizationJob",
        "config-json": [
          {"spark.driver.memory":"6g"}
        ],
        "main-class": "com.viooh.smex.supply.denormalization.SupplyDataDenormalization",
        "artifact": "supply-denormalization"
      },
      {
        "step-name": "DemandDenormalizationJob",
        "config-json": [
          {"spark.driver.memory":"6g"}
        ],
        "main-class": "com.viooh.smex.demand.denormalization.DemandDataDenormalization",
        "artifact": "demand-denormalization"
      },
      {
        "step-name": "DealSyncDenormalizationJob",
        "config-json": [
          {"spark.driver.memory":"6g"}
        ],
        "main-class": "com.viooh.smex.dealsync.denormalization.DealSyncDenormalization",
        "artifact": "dealsync-denormalization"
      },
      {
        "step-name": "ModerationDenormalizationJob",
        "config-json": [
          {"spark.driver.memory":"6g"}
        ],
        "main-class": "com.viooh.smex.moderation.denormalization.ModerationDenormalization",
        "artifact": "moderation-denormalization",
        "jars": "/usr/lib/spark/external/lib/spark-avro.jar"
      },
      {
        "step-name": "DealDenormalizationJob",
        "config-json": [
          {"spark.driver.memory":"6g"}
        ],
        "main-class": "com.viooh.smex.deal.denormalization.DealDenormalization",
        "artifact": "deal-denormalization",
        "jars": "/usr/lib/spark/external/lib/spark-avro.jar"
      },
      {
        "step-name": "ExchangeTradeSummaryJob",
        "config-json": [
          {"spark.driver.memory":"6g"},
          {"spark.serializer":"org.apache.spark.serializer.KryoSerializer"},
          {"spark.task.maxFailures":"20"},
          {"spark.yarn.maxAppAttempts":"10"},
          {"spark.stage.maxConsecutiveAttempts":"20"},
          {"spark.app.config.key":"exchange-trade-summary"}
        ],
        "main-class": "com.viooh.smex.trade.summary.ExchangeTradeSummary",
        "artifact": "exchange-trade-summary"
      },
      {
        "step-name": "CopySmartExchangeMediaOwnerReport",
        "jar-location":"s3://elasticmapreduce/libs/script-runner/script-runner.jar",
        "script-file":"scripts/exchange-trade-summary/1.3.0/job.sh",
        "script-args": "$ENV#$NEXT_EXECUTION_DATE#$NEXT_EXECUTION_HOUR#mediaowner-trade-report#trade-summary-reports#v1#csv"
      },
      {
        "step-name": "CopyExchangeLiteMediaOwnerReport",
        "jar-location":"s3://elasticmapreduce/libs/script-runner/script-runner.jar",
        "script-file":"scripts/exchange-trade-summary/1.3.0/job.sh",
        "script-args": "$ENV#$EXECUTION_DATE#$EXECUTION_HOUR#exchange-lite-mediaowner-trade-report#trade-summary-reports#v1#csv"
      },
      {
        "step-name": "CopySmartExchangeFrameLevelMediaOwnerReport",
        "jar-location":"s3://elasticmapreduce/libs/script-runner/script-runner.jar",
        "script-file":"scripts/exchange-trade-summary/1.3.0/job.sh",
        "script-args": "$ENV#$NEXT_EXECUTION_DATE#$NEXT_EXECUTION_HOUR#frame-level-mediaowner-trade-report#frame-level-media-owner-reports#v1#csv"
      },
      {
        "step-name": "CopyExchangeLiteFrameLevelMediaOwnerReport",
        "jar-location":"s3://elasticmapreduce/libs/script-runner/script-runner.jar",
        "script-file":"scripts/exchange-trade-summary/1.3.0/job.sh",
        "script-args": "$ENV#$EXECUTION_DATE#$EXECUTION_HOUR#frame-level-exchange-lite-mediaowner-trade-report#frame-level-media-owner-reports#v1#csv"
      },
            {
        "step-name": "CopySmartExchangeMediaOwnerReportV2",
        "jar-location":"s3://elasticmapreduce/libs/script-runner/script-runner.jar",
        "script-file":"scripts/exchange-trade-summary/1.3.0/job.sh",
        "script-args": "$ENV#$NEXT_EXECUTION_DATE#$NEXT_EXECUTION_HOUR#mediaowner-trade-report#trade-summary-reports#v2#json"
      },
      {
        "step-name": "CopyExchangeLiteMediaOwnerReportV2",
        "jar-location":"s3://elasticmapreduce/libs/script-runner/script-runner.jar",
        "script-file":"scripts/exchange-trade-summary/1.3.0/job.sh",
        "script-args": "$ENV#$EXECUTION_DATE#$EXECUTION_HOUR#exchange-lite-mediaowner-trade-report#trade-summary-reports#v2#json"
      },
      {
        "step-name": "ExportVioohHourLevelReportToDbV2",
        "jar-location":"s3://elasticmapreduce/libs/script-runner/script-runner.jar",
        "script-file":"scripts/exchange-trade-summary/sspui_reporting/viooh_sspui_hourly_aggr_report_v2_installer.sh",
        "script-args": """ + ExportVioohHourLevelReportToDbV2_script_args + """
      },
      {
        "step-name": "ExportVioohHourlyCreativeLevelReportToDb",
        "jar-location":"s3://elasticmapreduce/libs/script-runner/script-runner.jar",
        "script-file":"scripts/exchange-trade-summary/sspui_reporting/viooh_sspui_hourly_aggr_report_v2_installer.sh",
        "script-args": """ + ExportVioohHourlyCreativeLevelReportToDb_script_args + """
      }
]"""

# cluster level parameters (optional)
cluster_args = {
    "master-instance-types": "m5.xlarge,m5.2xlarge,m5.4xlarge",
    "core-instance-types": "m5.xlarge,m5.2xlarge,m5.4xlarge",
    "task-instance-types": "m5.xlarge,m5.2xlarge,m5.4xlarge",
    "core-instance-capacity": 5,
    "task-instance-capacity": 0,
    "emr-version": "emr-5.35.0",
    "cluster-configurations": [
        {
            'Classification': 'yarn-site',
            'Properties': {
                'yarn.resourcemanager.am.max-attempts': '2'
            }
        }
    ]
}

# dag parameter
dag_args = {
    'owner': 'data.engineers@viooh.com',
    'start_date':hours_ago(1),
    'depends_on_past': True,
    'wait_for_downstream': True
}

dag = DAG(
    dag_id,
    schedule_interval='@hourly',  # cron expression
    default_args=dag_args)

build_dag(emr_steps=emr_steps, dag=dag, cluster_args=cluster_args,catchup=True)