1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 | import boto3
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable
# DAG specific parameters
config = Variable.get("audience_promote_to_prd_conf", deserialize_json=True)
ROLE_ARN = config['ROLE_ARN']
ROLE_SESSION_NAME = config['ROLE_SESSION_NAME']
DATA_PATH_SRC = config['DATA_PATH_SRC']
DATA_PATH_DEST = config['DATA_PATH_DEST']
S3_BUCKET_FROM = config['S3_BUCKET_FROM']
S3_BUCKET_TO = config['S3_BUCKET_TO']
DEFAULT_REGION = config['DEFAULT_REGION']
sts_client = boto3.client('sts', DEFAULT_REGION)
assumed_role_object=sts_client.assume_role(
RoleArn=ROLE_ARN,
RoleSessionName=ROLE_SESSION_NAME
)
credentials=assumed_role_object['Credentials']
s3_resource=boto3.resource(
's3',
aws_access_key_id=credentials['AccessKeyId'],
aws_secret_access_key=credentials['SecretAccessKey'],
aws_session_token=credentials['SessionToken'],
region_name=DEFAULT_REGION
)
# dag parameter
dag_args = {
'owner': 'data.engineers@viooh.com'
}
def copy_lab_audience_to_prd():
SOURCE_BUCKET = s3_resource.Bucket(S3_BUCKET_FROM)
DEST_BUCKET = s3_resource.Bucket(S3_BUCKET_TO)
extra_args = {
'ACL': 'bucket-owner-full-control'
}
for obj in SOURCE_BUCKET.objects.filter(Prefix=DATA_PATH_SRC):
source = { 'Bucket': S3_BUCKET_FROM, 'Key': obj.key}
dest_key = DATA_PATH_DEST + obj.key[len(DATA_PATH_SRC):]
dest_obj = DEST_BUCKET.Object(dest_key)
dest_obj.copy(source, extra_args)
with DAG(
dag_id='promote_lab_audience_to_prd_v01',
default_args=dag_args,
description='A DAG to promote Audience from LAB to PRD',
schedule_interval=None,
start_date=days_ago(1),
tags=['audience'],
) as dag:
promote_lab_audience_to_prd = PythonOperator(
task_id="promote_lab_audience_to_prd",
python_callable=copy_lab_audience_to_prd
)
promote_lab_audience_to_prd
|