No tienes acceso a esta clase

隆Contin煤a aprendiendo! 脷nete y comienza a potenciar tu carrera

Trigger Rules

20/29
Recursos

Aportes 5

Preguntas 0

Ordenar por:

驴Quieres ver m谩s aportes, preguntas y respuestas de la comunidad?

o inicia sesi贸n.

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import datetime
from airflow.utils.trigger_rule import TriggerRule


def myfunction():
    raise Exception


default_args = {}


with DAG(dag_id="6.2-monitoring",
        description="Monitoreando nuestro DAG",
        schedule_interval="@daily",
        start_date=datetime(2022, 1, 1),
        end_date=datetime(2022, 6, 1),
        default_args=default_args,
        max_active_runs=1) as dag:


    t1 = BashOperator(task_id="tarea1",
                    bash_command="sleep 5 && echo 'Primera tarea!'",
                    trigger_rule=TriggerRule.ALL_SUCCESS,
                    retries=2,
                    retry_delay=5,
                    depends_on_past=False)

    t2 = BashOperator(task_id="tarea2",
                    bash_command="sleep 3 && echo 'Segunda tarea!'",
                    retries=2,
                    retry_delay=5,
                    trigger_rule=TriggerRule.ALL_SUCCESS,
                    depends_on_past=True)

    t3 = BashOperator(task_id="tarea3",
                    bash_command="sleep 2 && echo 'Tercera tarea!'",
                    retries=2,
                    retry_delay=5,
                    trigger_rule=TriggerRule.ALWAYS,
                    depends_on_past=True)

    t4 = PythonOperator(task_id="tarea4",
                    python_callable=myfunction,
                    retries=2,
                    retry_delay=5,
                    trigger_rule=TriggerRule.ALL_SUCCESS,
                    depends_on_past=True)

    t5 = BashOperator(task_id="tarea5",
                    bash_command="sleep 2 && echo 'Quinta tarea!'",
                    retries=2,
                    retry_delay=5,
                    depends_on_past=True)


    t1 >> t2 >> t3 >> t4 >> t5

En esta pr谩ctica us茅.
ALWAYS, NONE_FAILED_OR_SKIPPED, ONE_FAILED y ALL_FAILED.
Para ello cre茅 un peque帽o mundo, donde las entidades (tareas) son cuatro casas, un bosque, un hospital, un colegio, una estaci贸n de Bomberos, Fiesta y Guardia Nacional.
El problema es que se producen incendios.
鈥 Todos pueden llamar a los bomberos (menos la Guardia Nacional y Fiesta)
o Aqu铆 us茅 ALWAYS, para tener una entrada constante de datos.
鈥 Las casas llaman a la municipalidad, el bosque, hospital y colegio.
鈥 Los bomberos saltan si una de las tareas que los llaman falla, ONE_FAILED
鈥 La Guardia Nacional se ejecuta, si se queman los bomberos y la municipalidad, se llama a la guardia nacional (ALL_FAILED).
鈥 Si no se quema nada, hay Fiesta (NONE_FAILED_OR_SKIPPED).

Las cuales se relacionan de la siguiente forma.

Este es el C贸digo fuente

#ejemplo de trigger rules

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import datetime
from airflow.utils.trigger_rule import TriggerRule

def myfunction():
     pass  

def myfunctionIncendio():
     import random
     incendio = random.random()
     print("EL VALOR DEL INCENDIO ES:",incendio)
     if incendio < 0.3:
          return True
     else:
          raise Exception
          #return False     #el -- return FALSE -- lo toma como una salida v谩lida. No sirve
      
default_args = {}

with DAG(dag_id = "06.3-Monitoring_TareaB",
         description          = "Monitoreando nuestro DAG",
         schedule_interval    = "@daily",
         start_date           = datetime(2023, 5, 1),
         end_date             = datetime(2024, 1, 1),
         default_args         = default_args,
         max_active_runs      = 1 
         ) as dag:
    # "depends_on_past" por defecto es False, por lo que pueden surgir los procesos todos los d铆as independientemente. 
    # "depends_on_past" hace que se ejecute solo si el anterior proceso termin贸, as铆 se realiza uno a la vez secuencial
    # max_active_runs, se ejecute un modulo, evitando que haya paralelismo, y cuando se termine todo le grupo, se pasa al sgte d铆a.
 

    t1 = PythonOperator(task_id="Pobalcion_01",
                      python_callable=myfunctionIncendio,
                      trigger_rule  = TriggerRule.ALWAYS, 
                      retries      = 2,
                      retry_delay  = 5,
                      depends_on_past = False)
    
    t2 = PythonOperator(task_id="Poblacion_02",
                      python_callable=myfunctionIncendio,
                      trigger_rule  = TriggerRule.ALWAYS, 
                      retries      = 2,
                      retry_delay  = 5,
                      depends_on_past = False)
    t3 = PythonOperator(task_id="Poblacion_03",
                      python_callable=myfunctionIncendio,
                      trigger_rule  = TriggerRule.ALWAYS, 
                      retries      = 2,
                      retry_delay  = 5,
                      depends_on_past = False)
    t4 = PythonOperator(task_id="Poblacion_04",
                      python_callable=myfunctionIncendio,
                      trigger_rule  = TriggerRule.ALWAYS, 
                      retries      = 2,
                      retry_delay  = 5,
                      depends_on_past = False)
    t5 = PythonOperator(task_id="Bosque_01",
                      python_callable=myfunctionIncendio,
                      trigger_rule  = TriggerRule.ALWAYS, 
                      retries      = 2,
                      retry_delay  = 5,
                      depends_on_past = False)
    t6 = PythonOperator(task_id="Piscina_01",
                      python_callable=myfunctionIncendio,
                      trigger_rule  = TriggerRule.ALWAYS, 
                      retries      = 2,
                      retry_delay  = 5,
                      depends_on_past = False)
    t7 = PythonOperator(task_id="colegio_01",
                      python_callable=myfunctionIncendio,
                      trigger_rule  = TriggerRule.ALWAYS, 
                      retries      = 2,
                      retry_delay  = 5,
                      depends_on_past = False)
    
    t8 = PythonOperator(task_id="Municipalidad",
                      python_callable=myfunctionIncendio,
                      trigger_rule  = TriggerRule.ALWAYS, 
                      retries      = 2,
                      retry_delay  = 5,
                      depends_on_past = False)
    
    t9 = PythonOperator(task_id="Fiesta",
                      python_callable=myfunction,
                      trigger_rule  = TriggerRule.NONE_FAILED_OR_SKIPPED, 
                      retries      = 2,
                      retry_delay  = 5,
                      depends_on_past = False)
    
    
    t10 = PythonOperator(task_id="Bomberos",
                      python_callable=myfunctionIncendio,
                      trigger_rule  = TriggerRule.ONE_FAILED,
                      retries      = 2,
                      retry_delay  = 5,
                      depends_on_past = False)
    
    t100 = PythonOperator(task_id="GuardiaNacional",
                      python_callable=myfunction,
                      trigger_rule  = TriggerRule.ALL_FAILED,
                      retries      = 2,
                      retry_delay  = 5,
                      depends_on_past = False)
    

    [t1, t2, t3 , t4] >> t10
    [t1, t2, t3 , t4, t6, t7, t8] >> t9

    [t1, t2, t3, t4 ] >> t5
    [t1, t2, t3, t4 ] >> t6
    [t1, t2, t3, t4 ] >> t7

    [t5,t6,t7] >> t8

    [t8, t10] >> t100

Originalmente el valor para que hubiera un incendio era de 0.7, pero como no se quemaba nada, lo baj茅 a 0.3
A煤n as铆 la municipalidad y los bomberos casi nunca se quemaban por lo que en una oportunidad que la municipalidad si se quem贸, aprovech茅 de forzar el dato de los bomberos para que al menos una vez saliera la Guardia Nacional.



Tremenda clase

Las reglas de activaci贸n de Airflow son f谩ciles de usar pero, sin embargo, extremadamente poderosas. Le permiten crear canalizaciones de datos m谩s complejas y abordar casos de uso reales. No dude en usarlos para manejar el error de una manera mejor y m谩s confiable que solo con una devoluci贸n de llamada.