DAG: publisher-s3-sync

schedule: @hourly


publisher-s3-sync

Toggle wrap
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from airflow import DAG
import pendulum
from template.viooh.publisher.publishers import PublisherDataS3Sink
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator

default_args = {
    'owner': 'data.engineers@viooh.com',
    'start_date': pendulum.today('UTC').add(days=-1),
    'retries': 3
}

publisher_config = Variable.get(key="publisher_config", deserialize_json=True)

with DAG(
        dag_id='publisher-s3-sync',
        default_args=default_args,
        description='DAG to fetch the publisher data and sync it with s3 location',
        schedule_interval='@hourly',
        tags=['publisher-sync', 'data-eng']
) as dag:

    def run_publisher_sync(config):
        # Retrieve API URL and S3 path from Airflow variables or use default values
        api_url = config["api_url"]
        s3_bucket = config["s3_bucket"]
        env = config["env"]
        # The structure will be <Bucket>/<uat>/publishers/publisher_data.parquet
        s3_key = env + '/' + config["s3_key"]
        PublisherDataS3Sink().execute(api_url, s3_bucket, s3_key)

    publisher_sync_task = PythonOperator(
        task_id="publisher_sync_task",
        python_callable=run_publisher_sync,
        op_args=[publisher_config]
    )

    publisher_sync_task