DAG: primary-audience-global-masterdb_v003

schedule: None


primary-audience-global-masterdb_v003

Toggle wrap
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
from airflow import DAG
from template.dag_template import build_dag
from airflow.utils.dates import days_ago
from template.utils import set_config_variable_in_emr_steps
from airflow.models import Variable
from datetime import datetime
import logging

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(filename)s:%(lineno)d [%(levelname)s]  %(message)s",
    handlers=[
        logging.StreamHandler()
    ])

LOG = logging.getLogger()

# Dag id
dag_id = 'primary-audience-global-masterdb_v003'

# DAG specific parameters
config = Variable.get("primary_audience_global_masterdb_conf", deserialize_json=True)
COUNTRY = config["COUNTRY"]
MEDIA_OWNER = config["MEDIA_OWNER"]
AUD_VERSION = config["AUD_VERSION"]
AUD_UPLOAD_DATE = config["AUD_UPLOAD_DATE"]

ENV = Variable.get('env')
config['ENV'] = ENV.lower()

# masterdb job conditions
if AUD_UPLOAD_DATE == "00000000":
    COMPUTE_DATE = datetime.now().strftime("%Y%m%d%H%M%S")
    LOG.info("Computation date equal " + COMPUTE_DATE)
else:
    COMPUTE_DATE = AUD_UPLOAD_DATE
    LOG.info("Computation date equal " + COMPUTE_DATE)
config["COMPUTATION_DATE"] = COMPUTE_DATE

AUDIENCE_BUCKET = "global-cn-audiences-output"

if config['ENV'] == "prd":
    AUDIENCE_BUCKET = "global-cn-audiences-output-prd"
config["AUDIENCE_BUCKET"] = AUDIENCE_BUCKET


# Cluster configurations
CLUSTER_NAME = 'primary-audience-global-masterdb-' + MEDIA_OWNER + '-' + AUD_VERSION
EMR_VERSION = "emr-6.0.0"
NUM_CORE_NODES = 1
NUM_TASK_NODES = 0

masterdb_steps = """[
      {
        "step-name": "SaveToDataBase",
        "py-script": "jobs/save_to_sql.py",
        "config-file": "configs/primary-audience-global-masterdb-$ENV/1.0.0/job.json",
        "config-json": [
          {"spark.app.audience.country":"$COUNTRY"},
          {"spark.app.audience.media.owner":"$MEDIA_OWNER"},
          {"spark.app.audience.version":"$AUD_VERSION"},
          {"spark.app.audience.output.bucket":"s3a://$AUDIENCE_BUCKET"},
          {"spark.app.audience.computation.date":"$COMPUTATION_DATE"},
          {"spark.app.audience.database":"$AUD_DATABASE"},
          {"spark.executor.memoryOverhead":"2500"}
        ],
        "artifact": "AudiencesPipeline",
        "packages": "mysql:mysql-connector-java:5.1.46"
      }
]"""


# cluster level parameters (optional)
cluster_args = {
    "cluster-name": CLUSTER_NAME,
    "audience-config-file": "primary_audience_global_masterdb_conf",
    "master-instance-types": "m5.xlarge,m5.2xlarge",
    "core-instance-types": "m5.2xlarge,m5.4xlarge",
    "task-instance-types": "m5.2xlarge,m5.4xlarge",
    "core-instance-capacity": NUM_CORE_NODES,
    "task-instance-capacity": NUM_TASK_NODES,
    "emr-version": EMR_VERSION,
    "audience-load-bootstrap": True
}

# set config variables in emr-steps
emr_steps = set_config_variable_in_emr_steps(masterdb_steps, config)

# dag parameter
dag_args = {
    'owner': 'data.engineers@viooh.com',
    'start_date': days_ago(1)
}

dag = DAG(
    dag_id,
    schedule_interval=None,  # cron expression
    default_args=dag_args)

build_dag(emr_steps=emr_steps, dag=dag, cluster_args=cluster_args)