Trigger Rules

Clase 20 de 29 • Curso de Fundamentos de Apache Airflow

Clase anteriorSiguiente clase
    rogelio cortez

    rogelio cortez

    student•
    hace 8 meses

    En esta práctica se simulan 3 casas y tres tipos de alarmas , las alarmas se ejecutaran dependiendo dela situación de cada casa

    a) ALARMA_INUNDACION : Que se activara si todas las casas se inundan es decir si todas fallan por lo tanto TriggerRule.ALL_FAILED

    b) ALRMA_CAMARA : Simulando una camara de seguridad se activara si hay un ladron en por lo menos 1 casa por lo tanto TriggerRule.ONE_FAILED

    c) ALARMA CONTROL : Simulando una alarma que enciende un led verde en caso que todo este bien es decir TriggerRule.NONE_FAILED

    El codigo es el siguiente:from airflow import DAGfrom airflow.operators.bash import BashOperatorfrom airflow.operators.python import PythonOperatorfrom datetime import datetimefrom airflow.utils.trigger_rule import TriggerRuleimport random def myFunction():        num = random.uniform (0,0.5)        if num > 0.4:            return True        else:            raise Exception default_args = {}with DAG(dag_id="monitoring2", description="Bash Operator DAG", start_date=datetime(2025, 1, 25),end_date = datetime(2025,2,25), schedule="@daily",          default_args=default_args, max_active_runs=1) as dag:            t1 = PythonOperator(task_id='casa1', python_callable=myFunction ,                          trigger_rule = TriggerRule.ALWAYS ,                          retries=2,#Cantidad de intentos                          retry_delay=2,#tiempo entre cada intento en segundos                          depends_on_past = False)            t2 = PythonOperator(task_id='casa2',python_callable=myFunction,                          retries=2,#Cantidad de intentos                          retry_delay=5,#tiempo entre cada intento en segundos                          trigger_rule = TriggerRule.ALWAYS,                          depends_on_past = False)        t3 = PythonOperator(task_id='casa3', python_callable=myFunction,                          retries=2,                          retry_delay=2,                          trigger_rule = TriggerRule.ALWAYS,                          depends_on_past = False)                t4 = BashOperator(task_id='ALARMA_INUNDACION', bash_command='sleep 5 && echo "ALARMA DE INCENDIOS ENCENDIDA"',                          retries=2,retry_delay=5,trigger_rule = TriggerRule.ALL_FAILED,depends_on_past = False)                t5 = BashOperator(task_id='ALARMA_CAMARA', bash_command='sleep 5 && echo "ALARMA DE CAMARA ENCENDIDA"',                          retries=2,retry_delay=5,trigger_rule = TriggerRule.ONE_FAILED,depends_on_past = False)        t6 = BashOperator(task_id='ALARMA_CONTROL', bash_command='sleep 5 && echo "LED EN VERDE"',                          retries=2,retry_delay=5,trigger_rule = TriggerRule.NONE_FAILED,depends_on_past = False)                              [t1,t2,t3] >> t4       [t1,t2,t3] >> t5       [t1,t2,t3] >> t6

    from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator from datetime import datetime from airflow.utils.trigger_rule import TriggerRule import random def myFunction(): num = random.uniform (0,0.5) if num > 0.4: return True else: raise Exception default_args = {} with DAG(dag_id="monitoring2", description="Bash Operator DAG", start_date=datetime(2025, 1, 25),end_date = datetime(2025,2,25), schedule="@daily", default_args=default_args, max_active_runs=1) as dag: t1 = PythonOperator(task_id='casa1', python_callable=myFunction , trigger_rule = TriggerRule.ALWAYS , retries=2,#Cantidad de intentos retry_delay=2,#tiempo entre cada intento en segundos depends_on_past = False) t2 = PythonOperator(task_id='casa2',python_callable=myFunction, retries=2,#Cantidad de intentos retry_delay=5,#tiempo entre cada intento en segundos trigger_rule = TriggerRule.ALWAYS, depends_on_past = False) t3 = PythonOperator(task_id='casa3', python_callable=myFunction, retries=2, retry_delay=2, trigger_rule = TriggerRule.ALWAYS, depends_on_past = False) t4 = BashOperator(task_id='ALARMA_INUNDACION', bash_command='sleep 5 && echo "ALARMA DE INCENDIOS ENCENDIDA"', retries=2,retry_delay=5,trigger_rule = TriggerRule.ALL_FAILED,depends_on_past = False) t5 = BashOperator(task_id='ALARMA_CAMARA', bash_command='sleep 5 && echo "ALARMA DE CAMARA ENCENDIDA"', retries=2,retry_delay=5,trigger_rule = TriggerRule.ONE_FAILED,depends_on_past = False) t6 = BashOperator(task_id='ALARMA_CONTROL', bash_command='sleep 5 && echo "LED EN VERDE"', retries=2,retry_delay=5,trigger_rule = TriggerRule.NONE_FAILED,depends_on_past = False) [t1,t2,t3] >> t4 [t1,t2,t3] >> t5 [t1,t2,t3] >> t6 ```Las tareas se definen de la forma \[t1,t2,t3] >> t4 ya que no causa dependencias una de la otra ya uqe si se pone  t1>>t2>>t3 >> t4 al final t4 dependera de la situacion de t3 Espero y les ayude , saludos!
    Gian HM

    Gian HM

    student•
    hace 2 años

    1. **all_success:** Este trigger se activa cuando todas las tareas antecesoras de una tarea dada han tenido éxito. En otras palabras, la tarea actual se ejecutará si todas las tareas anteriores han finalizado exitosamente. 2. **all_failed:** Se activa cuando todas las tareas antecesoras de una tarea dada han fallado. Esto significa que la tarea actual se ejecutará si todas las tareas anteriores han fallado. 3. **all_done:** Este trigger se activa cuando todas las tareas antecesoras de una tarea dada han finalizado, independientemente de su estado (éxito, falla, o en espera). 4. **one_success:** Se activa cuando al menos una de las tareas antecesoras de una tarea dada ha tenido éxito. La tarea actual se ejecutará si al menos una de las tareas anteriores ha finalizado con éxito. 5. **one_failed:** Se activa cuando al menos una de las tareas antecesoras de una tarea dada ha fallado. La tarea actual se ejecutará si al menos una de las tareas anteriores ha fallado. 6. **none_failed:** Este trigger es una opción que no viene por defecto en Apache Airflow. Sin embargo, parece ser una combinación lógica de los triggers anteriores. Se activaría cuando ninguna de las tareas antecesoras de una tarea dada haya fallado. La tarea actual se ejecutaría si ninguna de las tareas anteriores ha fallado.

    Jose Colmenares

    Jose Colmenares

    student•
    hace 2 años

    Tremenda clase

      Eric Bellet

      Eric Bellet

      teacher•
      hace 2 años

      Muchas gracias!

    Bryan Carvajal

    Bryan Carvajal

    student•
    hace 2 años
    2023-09-12 11_56_37-2. 2208_Airflow_Slides.png
    2023-09-12 11_56_50-2. 2208_Airflow_Slides.png
    2023-09-12 11_56_58-2. 2208_Airflow_Slides.png
    Marvin Avila Burgos

    Marvin Avila Burgos

    student•
    hace 2 años

    Las reglas de activación de Airflow son fáciles de usar pero, sin embargo, extremadamente poderosas. Le permiten crear canalizaciones de datos más complejas y abordar casos de uso reales. No dude en usarlos para manejar el error de una manera mejor y más confiable que solo con una devolución de llamada.

    Elitsoft Chile

    Elitsoft Chile

    student•
    hace 2 años

    En esta práctica usé. ALWAYS, NONE_FAILED_OR_SKIPPED, ONE_FAILED y ALL_FAILED. Para ello creé un pequeño mundo, donde las entidades (tareas) son cuatro casas, un bosque, un hospital, un colegio, una estación de Bomberos, Fiesta y Guardia Nacional. El problema es que se producen incendios. • Todos pueden llamar a los bomberos (menos la Guardia Nacional y Fiesta) o Aquí usé ALWAYS, para tener una entrada constante de datos. • Las casas llaman a la municipalidad, el bosque, hospital y colegio. • Los bomberos saltan si una de las tareas que los llaman falla, ONE_FAILED • La Guardia Nacional se ejecuta, si se queman los bomberos y la municipalidad, se llama a la guardia nacional (ALL_FAILED). • Si no se quema nada, hay Fiesta (NONE_FAILED_OR_SKIPPED).

    Las cuales se relacionan de la siguiente forma.

    IMG_008-tarea.jpg

    Este es el Código fuente

    #ejemplo de trigger rules from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator from datetime import datetime from airflow.utils.trigger_rule import TriggerRule def myfunction(): pass def myfunctionIncendio(): import random incendio = random.random() print("EL VALOR DEL INCENDIO ES:",incendio) if incendio < 0.3: return True else: raise Exception #return False #el -- return FALSE -- lo toma como una salida válida. No sirve default_args = {} with DAG(dag_id = "06.3-Monitoring_TareaB", description = "Monitoreando nuestro DAG", schedule_interval = "@daily", start_date = datetime(2023, 5, 1), end_date = datetime(2024, 1, 1), default_args = default_args, max_active_runs = 1 ) as dag: # "depends_on_past" por defecto es False, por lo que pueden surgir los procesos todos los días independientemente. # "depends_on_past" hace que se ejecute solo si el anterior proceso terminó, así se realiza uno a la vez secuencial # max_active_runs, se ejecute un modulo, evitando que haya paralelismo, y cuando se termine todo le grupo, se pasa al sgte día. t1 = PythonOperator(task_id="Pobalcion_01", python_callable=myfunctionIncendio, trigger_rule = TriggerRule.ALWAYS, retries = 2, retry_delay = 5, depends_on_past = False) t2 = PythonOperator(task_id="Poblacion_02", python_callable=myfunctionIncendio, trigger_rule = TriggerRule.ALWAYS, retries = 2, retry_delay = 5, depends_on_past = False) t3 = PythonOperator(task_id="Poblacion_03", python_callable=myfunctionIncendio, trigger_rule = TriggerRule.ALWAYS, retries = 2, retry_delay = 5, depends_on_past = False) t4 = PythonOperator(task_id="Poblacion_04", python_callable=myfunctionIncendio, trigger_rule = TriggerRule.ALWAYS, retries = 2, retry_delay = 5, depends_on_past = False) t5 = PythonOperator(task_id="Bosque_01", python_callable=myfunctionIncendio, trigger_rule = TriggerRule.ALWAYS, retries = 2, retry_delay = 5, depends_on_past = False) t6 = PythonOperator(task_id="Piscina_01", python_callable=myfunctionIncendio, trigger_rule = TriggerRule.ALWAYS, retries = 2, retry_delay = 5, depends_on_past = False) t7 = PythonOperator(task_id="colegio_01", python_callable=myfunctionIncendio, trigger_rule = TriggerRule.ALWAYS, retries = 2, retry_delay = 5, depends_on_past = False) t8 = PythonOperator(task_id="Municipalidad", python_callable=myfunctionIncendio, trigger_rule = TriggerRule.ALWAYS, retries = 2, retry_delay = 5, depends_on_past = False) t9 = PythonOperator(task_id="Fiesta", python_callable=myfunction, trigger_rule = TriggerRule.NONE_FAILED_OR_SKIPPED, retries = 2, retry_delay = 5, depends_on_past = False) t10 = PythonOperator(task_id="Bomberos", python_callable=myfunctionIncendio, trigger_rule = TriggerRule.ONE_FAILED, retries = 2, retry_delay = 5, depends_on_past = False) t100 = PythonOperator(task_id="GuardiaNacional", python_callable=myfunction, trigger_rule = TriggerRule.ALL_FAILED, retries = 2, retry_delay = 5, depends_on_past = False) [t1, t2, t3 , t4] >> t10 [t1, t2, t3 , t4, t6, t7, t8] >> t9 [t1, t2, t3, t4 ] >> t5 [t1, t2, t3, t4 ] >> t6 [t1, t2, t3, t4 ] >> t7 [t5,t6,t7] >> t8 [t8, t10] >> t100

    Originalmente el valor para que hubiera un incendio era de 0.7, pero como no se quemaba nada, lo bajé a 0.3 Aún así la municipalidad y los bomberos casi nunca se quemaban por lo que en una oportunidad que la municipalidad si se quemó, aproveché de forzar el dato de los bomberos para que al menos una vez saliera la Guardia Nacional.

    IMG_008-tareaB.jpg
    Sebastián Salas

    Sebastián Salas

    student•
    hace 3 años
    from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator from datetime import datetime from airflow.utils.trigger_rule import TriggerRule def myfunction(): raise Exception default_args = {} with DAG(dag_id="6.2-monitoring", description="Monitoreando nuestro DAG", schedule_interval="@daily", start_date=datetime(2022, 1, 1), end_date=datetime(2022, 6, 1), default_args=default_args, max_active_runs=1) as dag: t1 = BashOperator(task_id="tarea1", bash_command="sleep 5 && echo 'Primera tarea!'", trigger_rule=TriggerRule.ALL_SUCCESS, retries=2, retry_delay=5, depends_on_past=False) t2 = BashOperator(task_id="tarea2", bash_command="sleep 3 && echo 'Segunda tarea!'", retries=2, retry_delay=5, trigger_rule=TriggerRule.ALL_SUCCESS, depends_on_past=True) t3 = BashOperator(task_id="tarea3", bash_command="sleep 2 && echo 'Tercera tarea!'", retries=2, retry_delay=5, trigger_rule=TriggerRule.ALWAYS, depends_on_past=True) t4 = PythonOperator(task_id="tarea4", python_callable=myfunction, retries=2, retry_delay=5, trigger_rule=TriggerRule.ALL_SUCCESS, depends_on_past=True) t5 = BashOperator(task_id="tarea5", bash_command="sleep 2 && echo 'Quinta tarea!'", retries=2, retry_delay=5, depends_on_past=True) t1 >> t2 >> t3 >> t4 >> t5