DAG: primary-audience-global-ind_v003

schedule: None


primary-audience-global-ind_v003

Toggle wrap
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
from airflow import DAG
from template.dag_template import build_dag
from airflow.utils.dates import days_ago
from template.utils import set_config_variable_in_emr_steps
from airflow.models import Variable
from datetime import datetime
import logging

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(filename)s:%(lineno)d [%(levelname)s]  %(message)s",
    handlers=[
        logging.StreamHandler()
    ])

LOG = logging.getLogger()

# DAG(Job) specific parameters
dag_id = 'primary-audience-global-ind_v003'
ENVIRONMENT = "uat"

config = Variable.get("primary_audience_global_ind_conf", deserialize_json=True)
MEDIA_OWNER = config["MEDIA_OWNER"]
COUNTRY = config["COUNTRY"]
AUD_VERSION = config["AUD_VERSION"]
BUCKET_NAME = 's3a://viooh-datashare-jcd-' + COUNTRY.lower()
BUCKET_OVERRIDE = config["BUCKET_OVERRIDE"]
COMPUTATION_DATE = config["COMPUTATION_DATE"]
config['ENVIRONMENT'] = ENVIRONMENT


# primary-audience-global job checks
if len(BUCKET_OVERRIDE) > 0:
    BUCKET_NAME = BUCKET_OVERRIDE
else:
    LOG.info("Bucket not overrided using - " + BUCKET_NAME)
config['BUCKET_NAME'] = BUCKET_NAME

if COMPUTATION_DATE == "00000000":
    COMPUTE_DATE = datetime.now().strftime("%Y%m%d%H%M%S")
    LOG.info("Computation date equal "+ COMPUTE_DATE)
config["COMPUTATION_DATE"] = COMPUTE_DATE

if len(AUD_VERSION) == 0:
    AUD_VERSION = datetime.now().strftime("%y%j")
    LOG.info("Audience version equal " + AUD_VERSION)
else:
    LOG.info("User provided version " + AUD_VERSION)
config["AUD_VERSION"] = AUD_VERSION
    
# Cluster configurations
CLUSTER_NAME = 'primary-audience-global-' + MEDIA_OWNER + '-' + AUD_VERSION
EMR_VERSION = "emr-6.0.0"
NUM_CORE_NODES = 1
NUM_TASK_NODES = 0

step_integrity = """{
        "step-name": "PrimaryAudienceGlobal-integrity",
        "py-script": "jobs/integrity_job.py",
        "config-json": [
          {"spark.app.audience.country":"$COUNTRY"},
          {"spark.app.audience.media.owner":"$MEDIA_OWNER"},
          {"spark.app.audience.version":"$AUD_VERSION"},
          {"spark.app.audience.bucket.name":"$BUCKET_NAME"},
          {"spark.app.audience.digital.file":"$DIGITAL_FILE_TO_PROCESS"},
          {"spark.app.audience.static.file":"$STATIC_FILE_TO_PROCESS"},
          {"spark.app.audience.demographic.file":"$DEMOGRAPHIC_FILE_TO_PROCESS"},
          {"spark.app.audience.input.bucket":"s3://global-cn-audiences-input"},
          {"spark.app.audience.output.bucket":"s3://global-cn-audiences-output"},
          {"spark.app.audience.folder":"$FOLDER"},
          {"spark.app.audience.computation.date":"$COMPUTATION_DATE"},
          {"spark.executor.memoryOverhead":"2500"}
        ],
        "artifact": "AudiencesPipeline"
    }"""

step_etl = """{
        "step-name": "PrimaryAudienceGlobal-load",
        "py-script": "jobs/etl_job.py",
        "config-file": "configs/primary-audience-global/1.0.0/job.json",
        "config-json": [
          {"spark.app.audience.country":"$COUNTRY"},
          {"spark.app.audience.media.owner":"$MEDIA_OWNER"},
          {"spark.app.audience.version":"$AUD_VERSION"},
          {"spark.app.audience.digital.file":"$DIGITAL_FILE_TO_PROCESS"},
          {"spark.app.audience.static.file":"$STATIC_FILE_TO_PROCESS"},
          {"spark.app.audience.folder":"$FOLDER"},
          {"spark.app.audience.computation.date":"$COMPUTATION_DATE"},
          {"spark.executor.memoryOverhead":"2500"}
        ],
        "artifact": "AudiencesPipeline"
      }"""

step_save_to_db = """{
        "step-name": "SaveToDataBase",
        "py-script": "jobs/save_to_sql.py",
        "config-file": "configs/primary-audience-global-masterdb-$ENVIRONMENT/1.0.0/job.json",
        "config-json": [
          {"spark.app.audience.country":"$COUNTRY"},
          {"spark.app.audience.media.owner":"$MEDIA_OWNER"},
          {"spark.app.audience.version":"$AUD_VERSION"},
          {"spark.app.audience.output.bucket":"s3://global-cn-audiences-output"},
          {"spark.app.audience.computation.date":"$COMPUTATION_DATE"},
          {"spark.app.audience.database":"$AUD_DATABASE"},
          {"spark.executor.memoryOverhead":"2500"}
        ],
        "artifact": "AudiencesPipeline",
        "packages": "mysql:mysql-connector-java:5.1.46"
      }"""

INTEGRITY_JOB_FLAG = True if config["INTEGRITY_JOB_FLAG"] else False
ETL_JOB_FLAG = True if config["ETL_JOB_FLAG"] else False
SAVE_TO_DB_FLAG = True if config["SAVE_TO_DB_FLAG"] else False
global_job_steps = "[" + step_integrity + "," + step_etl + "," + step_save_to_db + "]"

if INTEGRITY_JOB_FLAG == True and ETL_JOB_FLAG == False and SAVE_TO_DB_FLAG == False:
    global_job_steps = "[" + step_integrity + "]"
elif INTEGRITY_JOB_FLAG == False and ETL_JOB_FLAG == True and SAVE_TO_DB_FLAG == False:
    global_job_steps = "[" + step_etl + "]"
elif INTEGRITY_JOB_FLAG == False and ETL_JOB_FLAG == False and SAVE_TO_DB_FLAG == True:
    global_job_steps = "[" + step_save_to_db + "]"
elif INTEGRITY_JOB_FLAG == True and ETL_JOB_FLAG == True and SAVE_TO_DB_FLAG == False:
    global_job_steps = "[" + step_integrity + "," + step_etl + "]"
elif INTEGRITY_JOB_FLAG == False and ETL_JOB_FLAG == True and SAVE_TO_DB_FLAG == True:
    global_job_steps = "[" + step_etl + "," + step_save_to_db + "]"
elif INTEGRITY_JOB_FLAG == True and ETL_JOB_FLAG == True and SAVE_TO_DB_FLAG == True:
    global_job_steps = "[" + step_integrity + "," + step_etl + "," + step_save_to_db + "]"
else:
    LOG.error("Please check the job configurations...")


# cluster level parameters (optional)
cluster_args = {
    "cluster-name": CLUSTER_NAME,
    "audience-config-file": "primary_audience_global_ind_conf",
    "master-instance-types": "m5.2xlarge,m5.4xlarge",
    "core-instance-types": "m5.2xlarge,m5.4xlarge",
    "task-instance-types": "m5.2xlarge,m5.4xlarge",
    "core-instance-capacity": NUM_CORE_NODES,
    "task-instance-capacity": NUM_TASK_NODES,
    "emr-version": EMR_VERSION,
    "audience-load-bootstrap": True
}

# set config variables in emr-steps
emr_steps = set_config_variable_in_emr_steps(global_job_steps, config)

# dag parameter
dag_args = {
    'owner': 'data.engineers@viooh.com',
    'start_date': days_ago(1)
}

dag = DAG(
    dag_id,
    schedule_interval=None,  # cron expression
    default_args=dag_args)

build_dag(emr_steps=emr_steps, dag=dag, cluster_args=cluster_args)