DAG: backfill_snowflake-reporting-hourly-import_v001

schedule: 10 */5 * * *


backfill_snowflake-reporting-hourly-import_v001

Toggle wrap
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import logging
from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import Variable
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.hooks.snowflake_hook import SnowflakeHook
from template.slack_alert import task_fail_slack_alert
from cn.snowflake.backfill_query_template import IMPORT_SUPPLY_TABLE_SQL, IMPORT_DEMAND_TABLE_SQL, \
    IMPORT_DEAL_TABLE_SQL, IMPORT_DEAL2_TABLE_SQL, IMPORT_DEAL_SYNC_TABLE_SQL, IMPORT_MODERATION_TABLE_SQL, IMPORT_ASSETS_SQL
import calendar

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(filename)s:%(lineno)d [%(levelname)s]  %(message)s",
    handlers=[logging.StreamHandler()]
)

LOG = logging.getLogger()

# airflow variables
cfg = Variable.get("snowflake_config", deserialize_json=True)

# estabilsh snowflake connection
dwh_hook = SnowflakeHook(snowflake_conn_id="snowflake_conn")

# Generate date pattern function
def generate_date_pattern(custom_date=None):
    # Use the custom date if provided, otherwise use today's date
    if custom_date:
        today = custom_date
    else:
        today = datetime.today()

    # Calculate start and end dates
    start_date = today - timedelta(days=10)  # 10 days ago
    end_date = today - timedelta(days=1)     # 1 day ago

    # Ensure that start and end are in the same month, otherwise handle the boundary
    if start_date.month != end_date.month:
        # Handle the month transition by creating two separate patterns for different months
        start_day = start_date.day
        end_day_in_start_month = calendar.monthrange(start_date.year, start_date.month)[1]
        days_in_start_month = list(range(start_day, end_day_in_start_month + 1))

        # Handle days in the current month
        start_day_in_end_month = 1
        end_day_in_end_month = end_date.day
        days_in_end_month = list(range(start_day_in_end_month, end_day_in_end_month + 1))

        # Create patterns for both months
        pattern_start_month = '|'.join(f'{day:02d}' for day in days_in_start_month)
        pattern_end_month = '|'.join(f'{day:02d}' for day in days_in_end_month)

        # Combine the patterns for both months
        date_pattern = (
            f'[0-9]+/{start_date.year}-{start_date.month:02d}-({pattern_start_month})/*/*/.*avro|'
            f'[0-9]+/{end_date.year}-{end_date.month:02d}-({pattern_end_month})/*/*/.*avro'
        )
    else:
        # Days are within the same month
        start_day = start_date.day
        end_day = end_date.day

        # Generate the list of days from start_day to end_day
        day_range = list(range(start_day, end_day + 1))

        # Format the pattern for Snowflake's COPY command
        day_pattern = '|'.join(f'{day:02d}' for day in day_range)

        # Construct the regex pattern for matching the directory structure in S3
        date_pattern = f'[0-9]+/{start_date.year}-{start_date.month:02d}-({day_pattern})/*/*/.*avro'

    return date_pattern


# load historical data into respective snowflake table from KAFKA source (Incremental Load)
def load_historical_kf_data(**kwargs):
    # Snowflake Ingestion datetime
    snowflake_ingestion_time = datetime.now().replace(second=0, microsecond=0)
    LOG.info('Snowflake ingestion datetime: %s', snowflake_ingestion_time)

    # Generate date pattern based on current date or custom date
    date_pattern = generate_date_pattern()

    LOG.info('Generated date pattern: %s', date_pattern)

    # Snowflake query formation
    import_query_template = kwargs['sf_query']
    snowflake_query = import_query_template.format(database_name=kwargs['database_name'],
                                                   schema_name=kwargs['schema_name'],
                                                   table_name=kwargs['table_name'],
                                                   s3_sf_ingestion_time=snowflake_ingestion_time,
                                                   pattern=date_pattern)

    LOG.info('Snowflake copy query: %s', snowflake_query)

    # Execute Snowflake query
    result2 = dwh_hook.get_first(snowflake_query)


# Generate airflow tasks function
def pythonoperator(task_id, task_cfg):
    return PythonOperator(task_id=task_id,
                          python_callable=load_historical_kf_data,
                          op_kwargs=task_cfg,
                          dag=dag,
                          provide_context=True)


# Airflow Dag Declaration
default_args = {
    'owner': 'data.engineers@viooh.com',
    'retries': 3,
    'max_retry_delay': timedelta(seconds=30),
    'max_active_runs': 1,
    'on_failure_callback': task_fail_slack_alert,
    'start_date': datetime(2024, 9, 27)
}

with DAG(dag_id='backfill_snowflake-reporting-hourly-import_v001',
         default_args=default_args,
         schedule_interval='10 */5 * * *',     # cron execution
         catchup=True) as dag:
    (
            DummyOperator(task_id='start')
            >>
            [
                pythonoperator('import_supply_table', {'sf_query': IMPORT_SUPPLY_TABLE_SQL, 'table_name': cfg['meta']['import_supply_table'], 'database_name': cfg['meta']['source_database'], 'schema_name': cfg['meta']['trading_schema']}),
                pythonoperator('import_assets_table', {'sf_query': IMPORT_ASSETS_SQL, 'table_name': 'UAT_ASSET_ASSET', 'database_name': cfg['meta']['source_database'], 'schema_name': 'ASSET_DATA'}),
                pythonoperator('import_demand_table', {'sf_query': IMPORT_DEMAND_TABLE_SQL, 'table_name': cfg['meta']['import_demand_table'], 'database_name': cfg['meta']['source_database'], 'schema_name': cfg['meta']['trading_schema']}),
                pythonoperator('import_deal_table', {'sf_query': IMPORT_DEAL_TABLE_SQL, 'table_name': cfg['meta']['import_deal_table'], 'database_name': cfg['meta']['source_database'], 'schema_name': cfg['meta']['deal_schema']}),
                pythonoperator('import_deal2_table', {'sf_query': IMPORT_DEAL2_TABLE_SQL, 'table_name': cfg['meta']['import_deal2_table'], 'database_name': cfg['meta']['source_database'], 'schema_name': cfg['meta']['deal_schema']}),
                pythonoperator('import_deal_sync_table', {'sf_query': IMPORT_DEAL_SYNC_TABLE_SQL, 'table_name': 'UAT_DEAL_SYNC_V1', 'database_name': 'ANALYTICS_LOAD', 'schema_name': 'DEAL_DATA'}),
                pythonoperator('import_moderation_table', {'sf_query': IMPORT_MODERATION_TABLE_SQL, 'table_name': 'UAT_MODERATION_V1', 'database_name': 'ANALYTICS_LOAD', 'schema_name': 'MODERATION_DATA'})
            ]
            >>
            DummyOperator(task_id='end')
    )