DAG: primary-audience-global-cassandra_v003

schedule: None


primary-audience-global-cassandra_v003

Toggle wrap
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
from airflow import DAG
from template.dag_template import build_dag
from airflow.utils.dates import days_ago
from template.utils import set_config_variable_in_emr_steps
from airflow.models import Variable

# DAG specific parameters
config = Variable.get("primary_audience_global_cassandra_conf", deserialize_json=True)
COUNTRY = config['COUNTRY']
MEDIA_OWNER = config['MEDIA_OWNER']
AUD_VERSION = config['AUD_VERSION']

cluster_name = 'primary-audience-global-cassandra-' + MEDIA_OWNER + '-' + AUD_VERSION
dag_id = 'primary-audience-global-cassandra_v003'

# setting Keyspace upon Country parameter
if COUNTRY.lower() in ('be', 'es', 'dk', 'nl'):
    KEYSPACE = COUNTRY.lower()
else:
    KEYSPACE = MEDIA_OWNER.lower()
config['KEYSPACE'] = KEYSPACE

if COUNTRY.lower() == 'uk':
    emr_steps = """[{
        "step-name": "RegenerateRoute",
        "config-file": "configs/primary-audience-uk-cassandra/1.0.0/job.json",
        "config-json": [
                        {"spark.app.primary.audience.cassandra.keyspace": "uk_audience"},
                        {"spark.app.route.regenerate.date": "$AUD_UPLOAD_DATE"},
                        {"spark.app.route.regenerate.version": "$AUD_VERSION"}
                    ],
        "artifact": "RegenerateRoute",
        "packages": "com.datastax.spark:spark-cassandra-connector_2.11:2.3.0",
        "py-script": "com/viooh/routegen/route_regenerate_v1.py"
    }]"""
elif COUNTRY.lower() == 'hk' and MEDIA_OWNER.lower() == 'jcdecaux':
    emr_steps = """[{
        "step-name": "RegenerateRoute",
        "config-file": "configs/primary-audience-global-cassandra/1.0.0/job.json",
        "config-json": [
                        {"spark.app.primary.audience.base.path":"s3a://$AUDIENCE_BUCKET/$COUNTRY/$MEDIA_OWNER/$AUD_VERSION/raw/%date%/dynamic/"},
                        {"spark.app.primary.audience.date":"$AUD_UPLOAD_DATE"},
                        {"spark.app.primary.audience.cassandra.keyspace":"jcdecaux_hk_airport_audience"},
                        {"spark.app.primary.audience.version":"$AUD_VERSION"}
                    ],
        "artifact": "RegenerateRoute",
        "packages": "com.datastax.spark:spark-cassandra-connector_2.11:2.3.0",
        "py-script": "com/viooh/routegen/primary_audience_loader_cassandra.py"
      }]"""
elif COUNTRY.lower() == 'china' and MEDIA_OWNER.lower() == 'jcdecaux':
    emr_steps = """[{
        "step-name": "RegenerateRoute",
        "config-file": "configs/primary-audience-global-cassandra/1.0.0/job.json",
        "config-json": [
                        {"spark.app.primary.audience.base.path":"s3a://$AUDIENCE_BUCKET/$COUNTRY/$MEDIA_OWNER/$AUD_VERSION/raw/%date%/dynamic/"},
                        {"spark.app.primary.audience.date":"$AUD_UPLOAD_DATE"},
                        {"spark.app.primary.audience.cassandra.keyspace":"cn_audience"},
                        {"spark.app.primary.audience.version":"$AUD_VERSION"}
                    ],
        "artifact": "RegenerateRoute",
        "packages": "com.datastax.spark:spark-cassandra-connector_2.11:2.3.0",
        "py-script": "com/viooh/routegen/primary_audience_loader_cassandra.py"
      }]"""
else:
    emr_steps = """[{
        "step-name": "RegenerateRoute",
        "config-file": "configs/primary-audience-global-cassandra/1.0.0/job.json",
        "config-json": [
                        {"spark.app.primary.audience.base.path": "s3a://$AUDIENCE_BUCKET/$COUNTRY/$MEDIA_OWNER/$AUD_VERSION/raw/%date%/dynamic/"},
                        {"spark.app.primary.audience.date": "$AUD_UPLOAD_DATE"},
                        {"spark.app.primary.audience.cassandra.keyspace": "$KEYSPACE_audience"},
                        {"spark.app.primary.audience.version": "$AUD_VERSION"}
                    ],
        "artifact": "RegenerateRoute",
        "packages": "com.datastax.spark:spark-cassandra-connector_2.11:2.3.0",
        "py-script": "com/viooh/routegen/primary_audience_loader_cassandra.py"
    }]"""

# cluster level parameters (optional)
cluster_args = {
    "cluster-name": cluster_name,
    "audience-config-file": "primary_audience_global_cassandra_conf",
    "master-instance-types": "m5.2xlarge,m5.4xlarge",
    "core-instance-types": "m5.2xlarge,m5.4xlarge",
    "task-instance-types": "m5.2xlarge,m5.4xlarge",
    "core-instance-capacity": 5,
    "task-instance-capacity": 0,
    "emr-version": "emr-5.21.0"
}

# set config variables in emr-steps
emr_steps = set_config_variable_in_emr_steps(emr_steps, config)

# dag parameter
dag_args = {
    'owner': 'data.engineers@viooh.com',
    'start_date': days_ago(1)
}

dag = DAG(
    dag_id,
    schedule_interval=None,  # cron expression
    default_args=dag_args)

build_dag(emr_steps=emr_steps, dag=dag, cluster_args=cluster_args)