1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 | from airflow import DAG
import pendulum
from template.viooh.publisher.publishers import PublisherDataS3Sink
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
default_args = {
'owner': 'data.engineers@viooh.com',
'start_date': pendulum.today('UTC').add(days=-1),
'retries': 3
}
publisher_config = Variable.get(key="publisher_config", deserialize_json=True)
with DAG(
dag_id='publisher-s3-sync',
default_args=default_args,
description='DAG to fetch the publisher data and sync it with s3 location',
schedule_interval='@hourly',
tags=['publisher-sync', 'data-eng']
) as dag:
def run_publisher_sync(config):
# Retrieve API URL and S3 path from Airflow variables or use default values
api_url = config["api_url"]
s3_bucket = config["s3_bucket"]
env = config["env"]
# The structure will be <Bucket>/<uat>/publishers/publisher_data.parquet
s3_key = env + '/' + config["s3_key"]
PublisherDataS3Sink().execute(api_url, s3_bucket, s3_key)
publisher_sync_task = PythonOperator(
task_id="publisher_sync_task",
python_callable=run_publisher_sync,
op_args=[publisher_config]
)
publisher_sync_task
|