DAG: booking-processing-job_v002

schedule: 0 */1 * * *


Task Instance: create_emr_steps


Task Instance Details

Dependencies Blocking Task From Getting Scheduled
Dependency Reason
Previous Dagrun State depends_on_past is true for this task, but the previous task instance is in the state 'None' which is not a successful state.
Previous Dagrun State The tasks downstream of the previous task instance haven't completed (and wait_for_downstream is True).
Task Instance State Task is in the 'None' state which is not a valid state for execution. The task must be cleared in order to be run.
Execution Date The execution date is 2024-03-08T20:00:00+00:00 but this is before the task's start date 2024-07-04T12:00:00+00:00.
Attribute: python_callable
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
def get_config(**kwargs):
    config = generate_cluster_config(kwargs)
    job_flow_config = {"Name": config['Name'],
                       "LogUri": config['LogUri'],
                       "ReleaseLabel": config['ReleaseLabel'],
                       "Instances": config['Instances'],
                       "BootstrapActions": config['BootstrapActions'],
                       "Applications": config['Applications'],
                       "VisibleToAllUsers": True,
                       'Steps': config['Steps'],
                       "JobFlowRole": config['JobFlowRole'],
                       "ServiceRole": config['ServiceRole'],
                       "SecurityConfiguration": config['SecurityConfiguration'],
                       "Tags": config['Tags'],
                       "Configurations": config['Configurations']
                       }
    LOG.info("job_flow_config: %s", job_flow_config)
    return job_flow_config
Task Instance Attributes
Attribute Value
dag_id booking-processing-job_v002
duration None
end_date None
execution_date 2024-03-08T20:00:00+00:00
executor_config {}
generate_command <function TaskInstance.generate_command at 0x7f7bf685a7b8>
hostname
is_premature False
job_id None
key ('booking-processing-job_v002', 'create_emr_steps', <Pendulum [2024-03-08T20:00:00+00:00]>, 1)
log <Logger airflow.task (INFO)>
log_filepath /opt/airflow/logs/booking-processing-job_v002/create_emr_steps/2024-03-08T20:00:00+00:00.log
log_url https://airflow.devel.viooh.net.cn/admin/airflow/log?execution_date=2024-03-08T20%3A00%3A00%2B00%3A00&task_id=create_emr_steps&dag_id=booking-processing-job_v002
logger <Logger airflow.task (INFO)>
mark_success_url https://airflow.devel.viooh.net.cn/success?task_id=create_emr_steps&dag_id=booking-processing-job_v002&execution_date=2024-03-08T20%3A00%3A00%2B00%3A00&upstream=false&downstream=false
max_tries 4
metadata MetaData(bind=None)
next_try_number 1
operator PythonOperator
pid None
pool default_pool
pool_slots 1
prev_attempted_tries 0
previous_execution_date_success 2024-02-14 23:00:00+00:00
previous_start_date_success None
previous_ti <TaskInstance: booking-processing-job_v002.create_emr_steps 2024-03-08 19:00:00+00:00 [None]>
previous_ti_success <TaskInstance: booking-processing-job_v002.create_emr_steps 2024-02-14 23:00:00+00:00 [success]>
priority_weight 3
queue default
queued_dttm None
raw False
run_as_user None
start_date None
state None
task <Task(PythonOperator): create_emr_steps>
task_id create_emr_steps
test_mode False
try_number 1
unixname airflow
Task Attributes
Attribute Value
dag <DAG: booking-processing-job_v002>
dag_id booking-processing-job_v002
depends_on_past True
deps {<TIDep(Trigger Rule)>, <TIDep(Not Previously Skipped)>, <TIDep(Previous Dagrun State)>, <TIDep(Not In Retry Period)>}
do_xcom_push True
downstream_list [<Task(EmrCreateJobFlowOperator): create_cluster_and_add_emr_steps>]
downstream_task_ids {'create_cluster_and_add_emr_steps'}
email None
email_on_failure True
email_on_retry True
end_date None
execution_timeout None
executor_config {}
extra_links []
global_operator_extra_link_dict {}
inlets []
lineage_data None
log <Logger airflow.task.operators (INFO)>
logger <Logger airflow.task.operators (INFO)>
max_retry_delay None
on_failure_callback <function task_fail_slack_alert at 0x7f7be66b8a60>
on_retry_callback None
on_success_callback None
op_args []
op_kwargs {'master-instance-types': 'm5.xlarge,m5.2xlarge', 'core-instance-types': 'm5.xlarge,m5.2xlarge', 'task-instance-types': 'm5.xlarge,m5.2xlarge', 'core-instance-capacity': 3, 'task-instance-capacity': 0, 'ebs-volume-size': '50', 'job-type': 'batch', 'emr-steps': '[\n {\n "step-name": "CampaignExtractor",\n "config-json": [\n {"spark.driver.memory":"9g"}\n ],\n "main-class": "com.viooh.campaignextractor.CampaignExtractorMain",\n "group-id":"com/viooh/campaignextractor",\n "artifact": "campaign-extractor",\n "jars": "/usr/lib/spark/external/lib/spark-avro.jar"\n },\n {\n "step-name": "CampaignProcessingJob",\n "config-json": [\n {"spark.driver.memory":"9g"}\n ],\n "main-class": "com.viooh.CampaignProcessingMain",\n "artifact": "campaign-processing-job"\n },\n {\n "step-name": "CampaignDeltaJob",\n "config-json": [\n {"spark.driver.memory":"9g"}\n ],\n "main-class": "com.viooh.booking.delta.CampaignDeltaMain",\n "artifact": "pandora-campaign-delta",\n "enable-custom-metrics" : "True"\n }\n]', 'cluster-name': 'booking-processing-job', 'dag-id': 'booking-processing-job_v002', 'schedule_interval': '0 */1 * * *', 'trigger_dags': []}
operator_extra_link_dict {}
operator_extra_links ()
outlets []
owner data.engineers@viooh.com
params {}
pool default_pool
pool_slots 1
priority_weight 1
priority_weight_total 3
provide_context True
queue default
resources None
retries 4
retry_delay 0:05:00
retry_exponential_backoff False
run_as_user None
schedule_interval 0 */1 * * *
shallow_copy_attrs ('python_callable', 'op_kwargs')
sla None
start_date 2024-07-04 12:00:00+00:00
subdag None
task_concurrency None
task_id create_emr_steps
task_type PythonOperator
template_ext []
template_fields ('templates_dict', 'op_args', 'op_kwargs')
templates_dict None
trigger_rule all_success
ui_color #ffefeb
ui_fgcolor #000
upstream_list []
upstream_task_ids set()
wait_for_downstream True
weight_rule downstream