DAG: promote_lab_audience_to_prd_v01

schedule: None


promote_lab_audience_to_prd_v01

Toggle wrap
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import boto3

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable

# DAG specific parameters
config = Variable.get("audience_promote_to_prd_conf", deserialize_json=True)
ROLE_ARN = config['ROLE_ARN']
ROLE_SESSION_NAME = config['ROLE_SESSION_NAME']
DATA_PATH_SRC = config['DATA_PATH_SRC']
DATA_PATH_DEST = config['DATA_PATH_DEST']
S3_BUCKET_FROM = config['S3_BUCKET_FROM']
S3_BUCKET_TO = config['S3_BUCKET_TO']
DEFAULT_REGION = config['DEFAULT_REGION']

sts_client = boto3.client('sts', DEFAULT_REGION)

assumed_role_object=sts_client.assume_role(
    RoleArn=ROLE_ARN,
    RoleSessionName=ROLE_SESSION_NAME
)

credentials=assumed_role_object['Credentials']

s3_resource=boto3.resource(
    's3',
    aws_access_key_id=credentials['AccessKeyId'],
    aws_secret_access_key=credentials['SecretAccessKey'],
    aws_session_token=credentials['SessionToken'],
    region_name=DEFAULT_REGION
)

# dag parameter
dag_args = {
    'owner': 'data.engineers@viooh.com'
}

def copy_lab_audience_to_prd():
    SOURCE_BUCKET = s3_resource.Bucket(S3_BUCKET_FROM)
    DEST_BUCKET = s3_resource.Bucket(S3_BUCKET_TO)
    extra_args = {
        'ACL': 'bucket-owner-full-control'
    }
    for obj in SOURCE_BUCKET.objects.filter(Prefix=DATA_PATH_SRC):
        source = { 'Bucket': S3_BUCKET_FROM, 'Key': obj.key}
        dest_key = DATA_PATH_DEST + obj.key[len(DATA_PATH_SRC):]
        dest_obj = DEST_BUCKET.Object(dest_key)
        dest_obj.copy(source, extra_args)

with DAG(
        dag_id='promote_lab_audience_to_prd_v01',
        default_args=dag_args,
        description='A DAG to promote Audience from LAB to PRD',
        schedule_interval=None,
        start_date=days_ago(1),
        tags=['audience'],
) as dag:

    promote_lab_audience_to_prd = PythonOperator(
        task_id="promote_lab_audience_to_prd",
        python_callable=copy_lab_audience_to_prd
    )

    promote_lab_audience_to_prd