Trigger Rules
Clase 20 de 29 • Curso de Fundamentos de Apache Airflow
Contenido del curso
Sebastián Salas
Elitsoft Chile
Gian HM
Marvin Avila Burgos
Bryan Carvajal
rogelio cortez
Jose Colmenares
Eric Bellet
Federico Martinez
from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator from datetime import datetime from airflow.utils.trigger_rule import TriggerRule def myfunction(): raise Exception default_args = {} with DAG(dag_id="6.2-monitoring", description="Monitoreando nuestro DAG", schedule_interval="@daily", start_date=datetime(2022, 1, 1), end_date=datetime(2022, 6, 1), default_args=default_args, max_active_runs=1) as dag: t1 = BashOperator(task_id="tarea1", bash_command="sleep 5 && echo 'Primera tarea!'", trigger_rule=TriggerRule.ALL_SUCCESS, retries=2, retry_delay=5, depends_on_past=False) t2 = BashOperator(task_id="tarea2", bash_command="sleep 3 && echo 'Segunda tarea!'", retries=2, retry_delay=5, trigger_rule=TriggerRule.ALL_SUCCESS, depends_on_past=True) t3 = BashOperator(task_id="tarea3", bash_command="sleep 2 && echo 'Tercera tarea!'", retries=2, retry_delay=5, trigger_rule=TriggerRule.ALWAYS, depends_on_past=True) t4 = PythonOperator(task_id="tarea4", python_callable=myfunction, retries=2, retry_delay=5, trigger_rule=TriggerRule.ALL_SUCCESS, depends_on_past=True) t5 = BashOperator(task_id="tarea5", bash_command="sleep 2 && echo 'Quinta tarea!'", retries=2, retry_delay=5, depends_on_past=True) t1 >> t2 >> t3 >> t4 >> t5
En esta práctica usé. ALWAYS, NONE_FAILED_OR_SKIPPED, ONE_FAILED y ALL_FAILED. Para ello creé un pequeño mundo, donde las entidades (tareas) son cuatro casas, un bosque, un hospital, un colegio, una estación de Bomberos, Fiesta y Guardia Nacional. El problema es que se producen incendios. • Todos pueden llamar a los bomberos (menos la Guardia Nacional y Fiesta) o Aquí usé ALWAYS, para tener una entrada constante de datos. • Las casas llaman a la municipalidad, el bosque, hospital y colegio. • Los bomberos saltan si una de las tareas que los llaman falla, ONE_FAILED • La Guardia Nacional se ejecuta, si se queman los bomberos y la municipalidad, se llama a la guardia nacional (ALL_FAILED). • Si no se quema nada, hay Fiesta (NONE_FAILED_OR_SKIPPED).
Las cuales se relacionan de la siguiente forma.
Este es el Código fuente
#ejemplo de trigger rules from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator from datetime import datetime from airflow.utils.trigger_rule import TriggerRule def myfunction(): pass def myfunctionIncendio(): import random incendio = random.random() print("EL VALOR DEL INCENDIO ES:",incendio) if incendio < 0.3: return True else: raise Exception #return False #el -- return FALSE -- lo toma como una salida válida. No sirve default_args = {} with DAG(dag_id = "06.3-Monitoring_TareaB", description = "Monitoreando nuestro DAG", schedule_interval = "@daily", start_date = datetime(2023, 5, 1), end_date = datetime(2024, 1, 1), default_args = default_args, max_active_runs = 1 ) as dag: # "depends_on_past" por defecto es False, por lo que pueden surgir los procesos todos los días independientemente. # "depends_on_past" hace que se ejecute solo si el anterior proceso terminó, así se realiza uno a la vez secuencial # max_active_runs, se ejecute un modulo, evitando que haya paralelismo, y cuando se termine todo le grupo, se pasa al sgte día. t1 = PythonOperator(task_id="Pobalcion_01", python_callable=myfunctionIncendio, trigger_rule = TriggerRule.ALWAYS, retries = 2, retry_delay = 5, depends_on_past = False) t2 = PythonOperator(task_id="Poblacion_02", python_callable=myfunctionIncendio, trigger_rule = TriggerRule.ALWAYS, retries = 2, retry_delay = 5, depends_on_past = False) t3 = PythonOperator(task_id="Poblacion_03", python_callable=myfunctionIncendio, trigger_rule = TriggerRule.ALWAYS, retries = 2, retry_delay = 5, depends_on_past = False) t4 = PythonOperator(task_id="Poblacion_04", python_callable=myfunctionIncendio, trigger_rule = TriggerRule.ALWAYS, retries = 2, retry_delay = 5, depends_on_past = False) t5 = PythonOperator(task_id="Bosque_01", python_callable=myfunctionIncendio, trigger_rule = TriggerRule.ALWAYS, retries = 2, retry_delay = 5, depends_on_past = False) t6 = PythonOperator(task_id="Piscina_01", python_callable=myfunctionIncendio, trigger_rule = TriggerRule.ALWAYS, retries = 2, retry_delay = 5, depends_on_past = False) t7 = PythonOperator(task_id="colegio_01", python_callable=myfunctionIncendio, trigger_rule = TriggerRule.ALWAYS, retries = 2, retry_delay = 5, depends_on_past = False) t8 = PythonOperator(task_id="Municipalidad", python_callable=myfunctionIncendio, trigger_rule = TriggerRule.ALWAYS, retries = 2, retry_delay = 5, depends_on_past = False) t9 = PythonOperator(task_id="Fiesta", python_callable=myfunction, trigger_rule = TriggerRule.NONE_FAILED_OR_SKIPPED, retries = 2, retry_delay = 5, depends_on_past = False) t10 = PythonOperator(task_id="Bomberos", python_callable=myfunctionIncendio, trigger_rule = TriggerRule.ONE_FAILED, retries = 2, retry_delay = 5, depends_on_past = False) t100 = PythonOperator(task_id="GuardiaNacional", python_callable=myfunction, trigger_rule = TriggerRule.ALL_FAILED, retries = 2, retry_delay = 5, depends_on_past = False) [t1, t2, t3 , t4] >> t10 [t1, t2, t3 , t4, t6, t7, t8] >> t9 [t1, t2, t3, t4 ] >> t5 [t1, t2, t3, t4 ] >> t6 [t1, t2, t3, t4 ] >> t7 [t5,t6,t7] >> t8 [t8, t10] >> t100
Originalmente el valor para que hubiera un incendio era de 0.7, pero como no se quemaba nada, lo bajé a 0.3 Aún así la municipalidad y los bomberos casi nunca se quemaban por lo que en una oportunidad que la municipalidad si se quemó, aproveché de forzar el dato de los bomberos para que al menos una vez saliera la Guardia Nacional.
1. **all_success:** Este trigger se activa cuando todas las tareas antecesoras de una tarea dada han tenido éxito. En otras palabras, la tarea actual se ejecutará si todas las tareas anteriores han finalizado exitosamente. 2. **all_failed:** Se activa cuando todas las tareas antecesoras de una tarea dada han fallado. Esto significa que la tarea actual se ejecutará si todas las tareas anteriores han fallado. 3. **all_done:** Este trigger se activa cuando todas las tareas antecesoras de una tarea dada han finalizado, independientemente de su estado (éxito, falla, o en espera). 4. **one_success:** Se activa cuando al menos una de las tareas antecesoras de una tarea dada ha tenido éxito. La tarea actual se ejecutará si al menos una de las tareas anteriores ha finalizado con éxito. 5. **one_failed:** Se activa cuando al menos una de las tareas antecesoras de una tarea dada ha fallado. La tarea actual se ejecutará si al menos una de las tareas anteriores ha fallado. 6. **none_failed:** Este trigger es una opción que no viene por defecto en Apache Airflow. Sin embargo, parece ser una combinación lógica de los triggers anteriores. Se activaría cuando ninguna de las tareas antecesoras de una tarea dada haya fallado. La tarea actual se ejecutaría si ninguna de las tareas anteriores ha fallado.
Las reglas de activación de Airflow son fáciles de usar pero, sin embargo, extremadamente poderosas. Le permiten crear canalizaciones de datos más complejas y abordar casos de uso reales. No dude en usarlos para manejar el error de una manera mejor y más confiable que solo con una devolución de llamada.
En esta práctica se simulan 3 casas y tres tipos de alarmas , las alarmas se ejecutaran dependiendo dela situación de cada casa
a) ALARMA_INUNDACION : Que se activara si todas las casas se inundan es decir si todas fallan por lo tanto TriggerRule.ALL_FAILED
b) ALRMA_CAMARA : Simulando una camara de seguridad se activara si hay un ladron en por lo menos 1 casa por lo tanto TriggerRule.ONE_FAILED
c) ALARMA CONTROL : Simulando una alarma que enciende un led verde en caso que todo este bien es decir TriggerRule.NONE_FAILED
El codigo es el siguiente:from airflow import DAGfrom airflow.operators.bash import BashOperatorfrom airflow.operators.python import PythonOperatorfrom datetime import datetimefrom airflow.utils.trigger_rule import TriggerRuleimport random def myFunction(): num = random.uniform (0,0.5) if num > 0.4: return True else: raise Exception default_args = {}with DAG(dag_id="monitoring2", description="Bash Operator DAG", start_date=datetime(2025, 1, 25),end_date = datetime(2025,2,25), schedule="@daily", default_args=default_args, max_active_runs=1) as dag: t1 = PythonOperator(task_id='casa1', python_callable=myFunction , trigger_rule = TriggerRule.ALWAYS , retries=2,#Cantidad de intentos retry_delay=2,#tiempo entre cada intento en segundos depends_on_past = False) t2 = PythonOperator(task_id='casa2',python_callable=myFunction, retries=2,#Cantidad de intentos retry_delay=5,#tiempo entre cada intento en segundos trigger_rule = TriggerRule.ALWAYS, depends_on_past = False) t3 = PythonOperator(task_id='casa3', python_callable=myFunction, retries=2, retry_delay=2, trigger_rule = TriggerRule.ALWAYS, depends_on_past = False) t4 = BashOperator(task_id='ALARMA_INUNDACION', bash_command='sleep 5 && echo "ALARMA DE INCENDIOS ENCENDIDA"', retries=2,retry_delay=5,trigger_rule = TriggerRule.ALL_FAILED,depends_on_past = False) t5 = BashOperator(task_id='ALARMA_CAMARA', bash_command='sleep 5 && echo "ALARMA DE CAMARA ENCENDIDA"', retries=2,retry_delay=5,trigger_rule = TriggerRule.ONE_FAILED,depends_on_past = False) t6 = BashOperator(task_id='ALARMA_CONTROL', bash_command='sleep 5 && echo "LED EN VERDE"', retries=2,retry_delay=5,trigger_rule = TriggerRule.NONE_FAILED,depends_on_past = False) [t1,t2,t3] >> t4 [t1,t2,t3] >> t5 [t1,t2,t3] >> t6
from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator from datetime import datetime from airflow.utils.trigger_rule import TriggerRule import random def myFunction(): num = random.uniform (0,0.5) if num > 0.4: return True else: raise Exception default_args = {} with DAG(dag_id="monitoring2", description="Bash Operator DAG", start_date=datetime(2025, 1, 25),end_date = datetime(2025,2,25), schedule="@daily", default_args=default_args, max_active_runs=1) as dag: t1 = PythonOperator(task_id='casa1', python_callable=myFunction , trigger_rule = TriggerRule.ALWAYS , retries=2,#Cantidad de intentos retry_delay=2,#tiempo entre cada intento en segundos depends_on_past = False) t2 = PythonOperator(task_id='casa2',python_callable=myFunction, retries=2,#Cantidad de intentos retry_delay=5,#tiempo entre cada intento en segundos trigger_rule = TriggerRule.ALWAYS, depends_on_past = False) t3 = PythonOperator(task_id='casa3', python_callable=myFunction, retries=2, retry_delay=2, trigger_rule = TriggerRule.ALWAYS, depends_on_past = False) t4 = BashOperator(task_id='ALARMA_INUNDACION', bash_command='sleep 5 && echo "ALARMA DE INCENDIOS ENCENDIDA"', retries=2,retry_delay=5,trigger_rule = TriggerRule.ALL_FAILED,depends_on_past = False) t5 = BashOperator(task_id='ALARMA_CAMARA', bash_command='sleep 5 && echo "ALARMA DE CAMARA ENCENDIDA"', retries=2,retry_delay=5,trigger_rule = TriggerRule.ONE_FAILED,depends_on_past = False) t6 = BashOperator(task_id='ALARMA_CONTROL', bash_command='sleep 5 && echo "LED EN VERDE"', retries=2,retry_delay=5,trigger_rule = TriggerRule.NONE_FAILED,depends_on_past = False) [t1,t2,t3] >> t4 [t1,t2,t3] >> t5 [t1,t2,t3] >> t6 ```Las tareas se definen de la forma \[t1,t2,t3] >> t4 ya que no causa dependencias una de la otra ya uqe si se pone  t1>>t2>>t3 >> t4 al final t4 dependera de la situacion de t3 Espero y les ayude , saludos!
Tremenda clase
Muchas gracias!
¿Cuándo usar distintas reglas de disparo?
Las reglas de disparo (trigger_rule) permiten definir la lógica de cuándo una tarea debe ejecutarse basándose en el estado de sus dependencias. Por ejemplo, utilizarías all_success si todas las tareas previas son críticas, o one_success si solo una de varias dependencias necesita completarse para continuar.