1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139 | import logging
from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import Variable
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.hooks.snowflake_hook import SnowflakeHook
from template.slack_alert import task_fail_slack_alert
from cn.snowflake.backfill_query_template import IMPORT_SUPPLY_TABLE_SQL, IMPORT_DEMAND_TABLE_SQL, \
IMPORT_DEAL_TABLE_SQL, IMPORT_DEAL2_TABLE_SQL, IMPORT_DEAL_SYNC_TABLE_SQL, IMPORT_MODERATION_TABLE_SQL, IMPORT_ASSETS_SQL
import calendar
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(filename)s:%(lineno)d [%(levelname)s] %(message)s",
handlers=[logging.StreamHandler()]
)
LOG = logging.getLogger()
# airflow variables
cfg = Variable.get("snowflake_config", deserialize_json=True)
# estabilsh snowflake connection
dwh_hook = SnowflakeHook(snowflake_conn_id="snowflake_conn")
# Generate date pattern function
def generate_date_pattern(custom_date=None):
# Use the custom date if provided, otherwise use today's date
if custom_date:
today = custom_date
else:
today = datetime.today()
# Calculate start and end dates
start_date = today - timedelta(days=10) # 10 days ago
end_date = today - timedelta(days=1) # 1 day ago
# Ensure that start and end are in the same month, otherwise handle the boundary
if start_date.month != end_date.month:
# Handle the month transition by creating two separate patterns for different months
start_day = start_date.day
end_day_in_start_month = calendar.monthrange(start_date.year, start_date.month)[1]
days_in_start_month = list(range(start_day, end_day_in_start_month + 1))
# Handle days in the current month
start_day_in_end_month = 1
end_day_in_end_month = end_date.day
days_in_end_month = list(range(start_day_in_end_month, end_day_in_end_month + 1))
# Create patterns for both months
pattern_start_month = '|'.join(f'{day:02d}' for day in days_in_start_month)
pattern_end_month = '|'.join(f'{day:02d}' for day in days_in_end_month)
# Combine the patterns for both months
date_pattern = (
f'[0-9]+/{start_date.year}-{start_date.month:02d}-({pattern_start_month})/*/*/.*avro|'
f'[0-9]+/{end_date.year}-{end_date.month:02d}-({pattern_end_month})/*/*/.*avro'
)
else:
# Days are within the same month
start_day = start_date.day
end_day = end_date.day
# Generate the list of days from start_day to end_day
day_range = list(range(start_day, end_day + 1))
# Format the pattern for Snowflake's COPY command
day_pattern = '|'.join(f'{day:02d}' for day in day_range)
# Construct the regex pattern for matching the directory structure in S3
date_pattern = f'[0-9]+/{start_date.year}-{start_date.month:02d}-({day_pattern})/*/*/.*avro'
return date_pattern
# load historical data into respective snowflake table from KAFKA source (Incremental Load)
def load_historical_kf_data(**kwargs):
# Snowflake Ingestion datetime
snowflake_ingestion_time = datetime.now().replace(second=0, microsecond=0)
LOG.info('Snowflake ingestion datetime: %s', snowflake_ingestion_time)
# Generate date pattern based on current date or custom date
date_pattern = generate_date_pattern()
LOG.info('Generated date pattern: %s', date_pattern)
# Snowflake query formation
import_query_template = kwargs['sf_query']
snowflake_query = import_query_template.format(database_name=kwargs['database_name'],
schema_name=kwargs['schema_name'],
table_name=kwargs['table_name'],
s3_sf_ingestion_time=snowflake_ingestion_time,
pattern=date_pattern)
LOG.info('Snowflake copy query: %s', snowflake_query)
# Execute Snowflake query
result2 = dwh_hook.get_first(snowflake_query)
# Generate airflow tasks function
def pythonoperator(task_id, task_cfg):
return PythonOperator(task_id=task_id,
python_callable=load_historical_kf_data,
op_kwargs=task_cfg,
dag=dag,
provide_context=True)
# Airflow Dag Declaration
default_args = {
'owner': 'data.engineers@viooh.com',
'retries': 3,
'max_retry_delay': timedelta(seconds=30),
'max_active_runs': 1,
'on_failure_callback': task_fail_slack_alert,
'start_date': datetime(2024, 9, 27)
}
with DAG(dag_id='backfill_snowflake-reporting-hourly-import_v001',
default_args=default_args,
schedule_interval='10 */5 * * *', # cron execution
catchup=True) as dag:
(
DummyOperator(task_id='start')
>>
[
pythonoperator('import_supply_table', {'sf_query': IMPORT_SUPPLY_TABLE_SQL, 'table_name': cfg['meta']['import_supply_table'], 'database_name': cfg['meta']['source_database'], 'schema_name': cfg['meta']['trading_schema']}),
pythonoperator('import_assets_table', {'sf_query': IMPORT_ASSETS_SQL, 'table_name': 'UAT_ASSET_ASSET', 'database_name': cfg['meta']['source_database'], 'schema_name': 'ASSET_DATA'}),
pythonoperator('import_demand_table', {'sf_query': IMPORT_DEMAND_TABLE_SQL, 'table_name': cfg['meta']['import_demand_table'], 'database_name': cfg['meta']['source_database'], 'schema_name': cfg['meta']['trading_schema']}),
pythonoperator('import_deal_table', {'sf_query': IMPORT_DEAL_TABLE_SQL, 'table_name': cfg['meta']['import_deal_table'], 'database_name': cfg['meta']['source_database'], 'schema_name': cfg['meta']['deal_schema']}),
pythonoperator('import_deal2_table', {'sf_query': IMPORT_DEAL2_TABLE_SQL, 'table_name': cfg['meta']['import_deal2_table'], 'database_name': cfg['meta']['source_database'], 'schema_name': cfg['meta']['deal_schema']}),
pythonoperator('import_deal_sync_table', {'sf_query': IMPORT_DEAL_SYNC_TABLE_SQL, 'table_name': 'UAT_DEAL_SYNC_V1', 'database_name': 'ANALYTICS_LOAD', 'schema_name': 'DEAL_DATA'}),
pythonoperator('import_moderation_table', {'sf_query': IMPORT_MODERATION_TABLE_SQL, 'table_name': 'UAT_MODERATION_V1', 'database_name': 'ANALYTICS_LOAD', 'schema_name': 'MODERATION_DATA'})
]
>>
DummyOperator(task_id='end')
)
|