You don't have access to this class

Keep learning! Join and start boosting your career

Aprovecha el precio especial y haz tu profesión a prueba de IA

Antes: $249

Currency
$209
Suscríbete

Termina en:

2 Días
22 Hrs
2 Min
11 Seg

Trigger Rules

20/29
Resources

Contributions 7

Questions 0

Sort by:

Want to see more contributions, questions and answers from the community?

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import datetime
from airflow.utils.trigger_rule import TriggerRule


def myfunction():
    raise Exception


default_args = {}


with DAG(dag_id="6.2-monitoring",
        description="Monitoreando nuestro DAG",
        schedule_interval="@daily",
        start_date=datetime(2022, 1, 1),
        end_date=datetime(2022, 6, 1),
        default_args=default_args,
        max_active_runs=1) as dag:


    t1 = BashOperator(task_id="tarea1",
                    bash_command="sleep 5 && echo 'Primera tarea!'",
                    trigger_rule=TriggerRule.ALL_SUCCESS,
                    retries=2,
                    retry_delay=5,
                    depends_on_past=False)

    t2 = BashOperator(task_id="tarea2",
                    bash_command="sleep 3 && echo 'Segunda tarea!'",
                    retries=2,
                    retry_delay=5,
                    trigger_rule=TriggerRule.ALL_SUCCESS,
                    depends_on_past=True)

    t3 = BashOperator(task_id="tarea3",
                    bash_command="sleep 2 && echo 'Tercera tarea!'",
                    retries=2,
                    retry_delay=5,
                    trigger_rule=TriggerRule.ALWAYS,
                    depends_on_past=True)

    t4 = PythonOperator(task_id="tarea4",
                    python_callable=myfunction,
                    retries=2,
                    retry_delay=5,
                    trigger_rule=TriggerRule.ALL_SUCCESS,
                    depends_on_past=True)

    t5 = BashOperator(task_id="tarea5",
                    bash_command="sleep 2 && echo 'Quinta tarea!'",
                    retries=2,
                    retry_delay=5,
                    depends_on_past=True)


    t1 >> t2 >> t3 >> t4 >> t5

En esta práctica usé.
ALWAYS, NONE_FAILED_OR_SKIPPED, ONE_FAILED y ALL_FAILED.
Para ello creé un pequeño mundo, donde las entidades (tareas) son cuatro casas, un bosque, un hospital, un colegio, una estación de Bomberos, Fiesta y Guardia Nacional.
El problema es que se producen incendios.
• Todos pueden llamar a los bomberos (menos la Guardia Nacional y Fiesta)
o Aquí usé ALWAYS, para tener una entrada constante de datos.
• Las casas llaman a la municipalidad, el bosque, hospital y colegio.
• Los bomberos saltan si una de las tareas que los llaman falla, ONE_FAILED
• La Guardia Nacional se ejecuta, si se queman los bomberos y la municipalidad, se llama a la guardia nacional (ALL_FAILED).
• Si no se quema nada, hay Fiesta (NONE_FAILED_OR_SKIPPED).

Las cuales se relacionan de la siguiente forma.

Este es el Código fuente

#ejemplo de trigger rules

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import datetime
from airflow.utils.trigger_rule import TriggerRule

def myfunction():
     pass  

def myfunctionIncendio():
     import random
     incendio = random.random()
     print("EL VALOR DEL INCENDIO ES:",incendio)
     if incendio < 0.3:
          return True
     else:
          raise Exception
          #return False     #el -- return FALSE -- lo toma como una salida válida. No sirve
      
default_args = {}

with DAG(dag_id = "06.3-Monitoring_TareaB",
         description          = "Monitoreando nuestro DAG",
         schedule_interval    = "@daily",
         start_date           = datetime(2023, 5, 1),
         end_date             = datetime(2024, 1, 1),
         default_args         = default_args,
         max_active_runs      = 1 
         ) as dag:
    # "depends_on_past" por defecto es False, por lo que pueden surgir los procesos todos los días independientemente. 
    # "depends_on_past" hace que se ejecute solo si el anterior proceso terminó, así se realiza uno a la vez secuencial
    # max_active_runs, se ejecute un modulo, evitando que haya paralelismo, y cuando se termine todo le grupo, se pasa al sgte día.
 

    t1 = PythonOperator(task_id="Pobalcion_01",
                      python_callable=myfunctionIncendio,
                      trigger_rule  = TriggerRule.ALWAYS, 
                      retries      = 2,
                      retry_delay  = 5,
                      depends_on_past = False)
    
    t2 = PythonOperator(task_id="Poblacion_02",
                      python_callable=myfunctionIncendio,
                      trigger_rule  = TriggerRule.ALWAYS, 
                      retries      = 2,
                      retry_delay  = 5,
                      depends_on_past = False)
    t3 = PythonOperator(task_id="Poblacion_03",
                      python_callable=myfunctionIncendio,
                      trigger_rule  = TriggerRule.ALWAYS, 
                      retries      = 2,
                      retry_delay  = 5,
                      depends_on_past = False)
    t4 = PythonOperator(task_id="Poblacion_04",
                      python_callable=myfunctionIncendio,
                      trigger_rule  = TriggerRule.ALWAYS, 
                      retries      = 2,
                      retry_delay  = 5,
                      depends_on_past = False)
    t5 = PythonOperator(task_id="Bosque_01",
                      python_callable=myfunctionIncendio,
                      trigger_rule  = TriggerRule.ALWAYS, 
                      retries      = 2,
                      retry_delay  = 5,
                      depends_on_past = False)
    t6 = PythonOperator(task_id="Piscina_01",
                      python_callable=myfunctionIncendio,
                      trigger_rule  = TriggerRule.ALWAYS, 
                      retries      = 2,
                      retry_delay  = 5,
                      depends_on_past = False)
    t7 = PythonOperator(task_id="colegio_01",
                      python_callable=myfunctionIncendio,
                      trigger_rule  = TriggerRule.ALWAYS, 
                      retries      = 2,
                      retry_delay  = 5,
                      depends_on_past = False)
    
    t8 = PythonOperator(task_id="Municipalidad",
                      python_callable=myfunctionIncendio,
                      trigger_rule  = TriggerRule.ALWAYS, 
                      retries      = 2,
                      retry_delay  = 5,
                      depends_on_past = False)
    
    t9 = PythonOperator(task_id="Fiesta",
                      python_callable=myfunction,
                      trigger_rule  = TriggerRule.NONE_FAILED_OR_SKIPPED, 
                      retries      = 2,
                      retry_delay  = 5,
                      depends_on_past = False)
    
    
    t10 = PythonOperator(task_id="Bomberos",
                      python_callable=myfunctionIncendio,
                      trigger_rule  = TriggerRule.ONE_FAILED,
                      retries      = 2,
                      retry_delay  = 5,
                      depends_on_past = False)
    
    t100 = PythonOperator(task_id="GuardiaNacional",
                      python_callable=myfunction,
                      trigger_rule  = TriggerRule.ALL_FAILED,
                      retries      = 2,
                      retry_delay  = 5,
                      depends_on_past = False)
    

    [t1, t2, t3 , t4] >> t10
    [t1, t2, t3 , t4, t6, t7, t8] >> t9

    [t1, t2, t3, t4 ] >> t5
    [t1, t2, t3, t4 ] >> t6
    [t1, t2, t3, t4 ] >> t7

    [t5,t6,t7] >> t8

    [t8, t10] >> t100

Originalmente el valor para que hubiera un incendio era de 0.7, pero como no se quemaba nada, lo bajé a 0.3
Aún así la municipalidad y los bomberos casi nunca se quemaban por lo que en una oportunidad que la municipalidad si se quemó, aproveché de forzar el dato de los bomberos para que al menos una vez saliera la Guardia Nacional.

1\. \*\*all\_success:\*\* Este trigger se activa cuando todas las tareas antecesoras de una tarea dada han tenido éxito. En otras palabras, la tarea actual se ejecutará si todas las tareas anteriores han finalizado exitosamente. 2\. \*\*all\_failed:\*\* Se activa cuando todas las tareas antecesoras de una tarea dada han fallado. Esto significa que la tarea actual se ejecutará si todas las tareas anteriores han fallado. 3\. \*\*all\_done:\*\* Este trigger se activa cuando todas las tareas antecesoras de una tarea dada han finalizado, independientemente de su estado (éxito, falla, o en espera). 4\. \*\*one\_success:\*\* Se activa cuando al menos una de las tareas antecesoras de una tarea dada ha tenido éxito. La tarea actual se ejecutará si al menos una de las tareas anteriores ha finalizado con éxito. 5\. \*\*one\_failed:\*\* Se activa cuando al menos una de las tareas antecesoras de una tarea dada ha fallado. La tarea actual se ejecutará si al menos una de las tareas anteriores ha fallado. 6\. \*\*none\_failed:\*\* Este trigger es una opción que no viene por defecto en Apache Airflow. Sin embargo, parece ser una combinación lógica de los triggers anteriores. Se activaría cuando ninguna de las tareas antecesoras de una tarea dada haya fallado. La tarea actual se ejecutaría si ninguna de las tareas anteriores ha fallado.

Las reglas de activación de Airflow son fáciles de usar pero, sin embargo, extremadamente poderosas. Le permiten crear canalizaciones de datos más complejas y abordar casos de uso reales. No dude en usarlos para manejar el error de una manera mejor y más confiable que solo con una devolución de llamada.



Tremenda clase
En esta práctica se simulan 3 casas y tres tipos de alarmas , las alarmas se ejecutaran dependiendo dela situación de cada casa a) ALARMA\_INUNDACION : Que se activara si todas las casas se inundan es decir si todas fallan por lo tanto TriggerRule.ALL\_FAILED b) ALRMA\_CAMARA : Simulando una camara de seguridad se activara si hay un ladron en por lo menos 1 casa por lo tanto TriggerRule.ONE\_FAILED c) ALARMA CONTROL : Simulando una alarma que enciende un led verde en caso que todo este bien es decir TriggerRule.NONE\_FAILED ![](https://static.platzi.com/media/user_upload/image-18f371d8-a589-4230-83b8-55b7acd5d282.jpg) ![](https://static.platzi.com/media/user_upload/image-d018f20d-15ac-4e6d-8796-1e20f7e45682.jpg) El codigo es el siguiente:from airflow import DAGfrom airflow.operators.bash import BashOperatorfrom airflow.operators.python import PythonOperatorfrom datetime import datetimefrom airflow.utils.trigger\_rule import TriggerRuleimport random def myFunction():        num = random.uniform (0,0.5)        if num > 0.4:            return True        else:            raise Exception default\_args = {}with DAG(dag\_id="monitoring2", description="Bash Operator DAG", start\_date=datetime(2025, 1, 25),end\_date = datetime(2025,2,25), schedule="@daily",          default\_args=default\_args, max\_active\_runs=1) as dag:            t1 = PythonOperator(task\_id='casa1', python\_callable=myFunction ,                          trigger\_rule = TriggerRule.ALWAYS ,                          retries=2,#Cantidad de intentos                          retry\_delay=2,#tiempo entre cada intento en segundos                          depends\_on\_past = False)            t2 = PythonOperator(task\_id='casa2',python\_callable=myFunction,                          retries=2,#Cantidad de intentos                          retry\_delay=5,#tiempo entre cada intento en segundos                          trigger\_rule = TriggerRule.ALWAYS,                          depends\_on\_past = False)        t3 = PythonOperator(task\_id='casa3', python\_callable=myFunction,                          retries=2,                          retry\_delay=2,                          trigger\_rule = TriggerRule.ALWAYS,                          depends\_on\_past = False)                t4 = BashOperator(task\_id='ALARMA\_INUNDACION', bash\_command='sleep 5 && echo "ALARMA DE INCENDIOS ENCENDIDA"',                          retries=2,retry\_delay=5,trigger\_rule = TriggerRule.ALL\_FAILED,depends\_on\_past = False)                t5 = BashOperator(task\_id='ALARMA\_CAMARA', bash\_command='sleep 5 && echo "ALARMA DE CAMARA ENCENDIDA"',                          retries=2,retry\_delay=5,trigger\_rule = TriggerRule.ONE\_FAILED,depends\_on\_past = False)        t6 = BashOperator(task\_id='ALARMA\_CONTROL', bash\_command='sleep 5 && echo "LED EN VERDE"',                          retries=2,retry\_delay=5,trigger\_rule = TriggerRule.NONE\_FAILED,depends\_on\_past = False)                              \[t1,t2,t3] >> t4       \[t1,t2,t3] >> t5       \[t1,t2,t3] >> t6 ```js from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator from datetime import datetime from airflow.utils.trigger_rule import TriggerRule import random def myFunction(): num = random.uniform (0,0.5) if num > 0.4: return True else: raise Exception default_args = {} with DAG(dag_id="monitoring2", description="Bash Operator DAG", start_date=datetime(2025, 1, 25),end_date = datetime(2025,2,25), schedule="@daily", default_args=default_args, max_active_runs=1) as dag: t1 = PythonOperator(task_id='casa1', python_callable=myFunction , trigger_rule = TriggerRule.ALWAYS , retries=2,#Cantidad de intentos retry_delay=2,#tiempo entre cada intento en segundos depends_on_past = False) t2 = PythonOperator(task_id='casa2',python_callable=myFunction, retries=2,#Cantidad de intentos retry_delay=5,#tiempo entre cada intento en segundos trigger_rule = TriggerRule.ALWAYS, depends_on_past = False) t3 = PythonOperator(task_id='casa3', python_callable=myFunction, retries=2, retry_delay=2, trigger_rule = TriggerRule.ALWAYS, depends_on_past = False) t4 = BashOperator(task_id='ALARMA_INUNDACION', bash_command='sleep 5 && echo "ALARMA DE INCENDIOS ENCENDIDA"', retries=2,retry_delay=5,trigger_rule = TriggerRule.ALL_FAILED,depends_on_past = False) t5 = BashOperator(task_id='ALARMA_CAMARA', bash_command='sleep 5 && echo "ALARMA DE CAMARA ENCENDIDA"', retries=2,retry_delay=5,trigger_rule = TriggerRule.ONE_FAILED,depends_on_past = False) t6 = BashOperator(task_id='ALARMA_CONTROL', bash_command='sleep 5 && echo "LED EN VERDE"', retries=2,retry_delay=5,trigger_rule = TriggerRule.NONE_FAILED,depends_on_past = False) [t1,t2,t3] >> t4 [t1,t2,t3] >> t5 [t1,t2,t3] >> t6 ```Las tareas se definen de la forma \[t1,t2,t3] >> t4 ya que no causa dependencias una de la otra ya uqe si se pone t1>>t2>>t3 >> t4 al final t4 dependera de la situacion de t3 Espero y les ayude , saludos!