from airflow import DAG
from template.dag_template import build_dag
from airflow.utils.dates import days_ago
from template.utils import set_config_variable_in_emr_steps
from airflow.models import Variable
from datetime import datetime
import logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(filename)s:%(lineno)d [%(levelname)s] %(message)s",
handlers=[
logging.StreamHandler()
])
LOG = logging.getLogger()
# Dag id
dag_id = 'primary-audience-global-masterdb_v003'
# DAG specific parameters
config = Variable.get("primary_audience_global_masterdb_conf", deserialize_json=True)
COUNTRY = config["COUNTRY"]
MEDIA_OWNER = config["MEDIA_OWNER"]
AUD_VERSION = config["AUD_VERSION"]
AUD_UPLOAD_DATE = config["AUD_UPLOAD_DATE"]
ENV = Variable.get('env')
config['ENV'] = ENV.lower()
# masterdb job conditions
if AUD_UPLOAD_DATE == "00000000":
COMPUTE_DATE = datetime.now().strftime("%Y%m%d%H%M%S")
LOG.info("Computation date equal " + COMPUTE_DATE)
else:
COMPUTE_DATE = AUD_UPLOAD_DATE
LOG.info("Computation date equal " + COMPUTE_DATE)
config["COMPUTATION_DATE"] = COMPUTE_DATE
AUDIENCE_BUCKET = "global-cn-audiences-output"
if config['ENV'] == "prd":
AUDIENCE_BUCKET = "global-cn-audiences-output-prd"
config["AUDIENCE_BUCKET"] = AUDIENCE_BUCKET
# Cluster configurations
CLUSTER_NAME = 'primary-audience-global-masterdb-' + MEDIA_OWNER + '-' + AUD_VERSION
EMR_VERSION = "emr-6.0.0"
NUM_CORE_NODES = 1
NUM_TASK_NODES = 0
masterdb_steps = """[
{
"step-name": "SaveToDataBase",
"py-script": "jobs/save_to_sql.py",
"config-file": "configs/primary-audience-global-masterdb-$ENV/1.0.0/job.json",
"config-json": [
{"spark.app.audience.country":"$COUNTRY"},
{"spark.app.audience.media.owner":"$MEDIA_OWNER"},
{"spark.app.audience.version":"$AUD_VERSION"},
{"spark.app.audience.output.bucket":"s3a://$AUDIENCE_BUCKET"},
{"spark.app.audience.computation.date":"$COMPUTATION_DATE"},
{"spark.app.audience.database":"$AUD_DATABASE"},
{"spark.executor.memoryOverhead":"2500"}
],
"artifact": "AudiencesPipeline",
"packages": "mysql:mysql-connector-java:5.1.46"
}
]"""
# cluster level parameters (optional)
cluster_args = {
"cluster-name": CLUSTER_NAME,
"audience-config-file": "primary_audience_global_masterdb_conf",
"master-instance-types": "m5.xlarge,m5.2xlarge",
"core-instance-types": "m5.2xlarge,m5.4xlarge",
"task-instance-types": "m5.2xlarge,m5.4xlarge",
"core-instance-capacity": NUM_CORE_NODES,
"task-instance-capacity": NUM_TASK_NODES,
"emr-version": EMR_VERSION,
"audience-load-bootstrap": True
}
# set config variables in emr-steps
emr_steps = set_config_variable_in_emr_steps(masterdb_steps, config)
# dag parameter
dag_args = {
'owner': 'data.engineers@viooh.com',
'start_date': days_ago(1)
}
dag = DAG(
dag_id,
schedule_interval=None, # cron expression
default_args=dag_args)
build_dag(emr_steps=emr_steps, dag=dag, cluster_args=cluster_args)