No tienes acceso a esta clase

¡Continúa aprendiendo! Únete y comienza a potenciar tu carrera

Trigger Rules

20/29
Recursos

Aportes 6

Preguntas 0

Ordenar por:

¿Quieres ver más aportes, preguntas y respuestas de la comunidad?

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import datetime
from airflow.utils.trigger_rule import TriggerRule


def myfunction():
    raise Exception


default_args = {}


with DAG(dag_id="6.2-monitoring",
        description="Monitoreando nuestro DAG",
        schedule_interval="@daily",
        start_date=datetime(2022, 1, 1),
        end_date=datetime(2022, 6, 1),
        default_args=default_args,
        max_active_runs=1) as dag:


    t1 = BashOperator(task_id="tarea1",
                    bash_command="sleep 5 && echo 'Primera tarea!'",
                    trigger_rule=TriggerRule.ALL_SUCCESS,
                    retries=2,
                    retry_delay=5,
                    depends_on_past=False)

    t2 = BashOperator(task_id="tarea2",
                    bash_command="sleep 3 && echo 'Segunda tarea!'",
                    retries=2,
                    retry_delay=5,
                    trigger_rule=TriggerRule.ALL_SUCCESS,
                    depends_on_past=True)

    t3 = BashOperator(task_id="tarea3",
                    bash_command="sleep 2 && echo 'Tercera tarea!'",
                    retries=2,
                    retry_delay=5,
                    trigger_rule=TriggerRule.ALWAYS,
                    depends_on_past=True)

    t4 = PythonOperator(task_id="tarea4",
                    python_callable=myfunction,
                    retries=2,
                    retry_delay=5,
                    trigger_rule=TriggerRule.ALL_SUCCESS,
                    depends_on_past=True)

    t5 = BashOperator(task_id="tarea5",
                    bash_command="sleep 2 && echo 'Quinta tarea!'",
                    retries=2,
                    retry_delay=5,
                    depends_on_past=True)


    t1 >> t2 >> t3 >> t4 >> t5

En esta práctica usé.
ALWAYS, NONE_FAILED_OR_SKIPPED, ONE_FAILED y ALL_FAILED.
Para ello creé un pequeño mundo, donde las entidades (tareas) son cuatro casas, un bosque, un hospital, un colegio, una estación de Bomberos, Fiesta y Guardia Nacional.
El problema es que se producen incendios.
• Todos pueden llamar a los bomberos (menos la Guardia Nacional y Fiesta)
o Aquí usé ALWAYS, para tener una entrada constante de datos.
• Las casas llaman a la municipalidad, el bosque, hospital y colegio.
• Los bomberos saltan si una de las tareas que los llaman falla, ONE_FAILED
• La Guardia Nacional se ejecuta, si se queman los bomberos y la municipalidad, se llama a la guardia nacional (ALL_FAILED).
• Si no se quema nada, hay Fiesta (NONE_FAILED_OR_SKIPPED).

Las cuales se relacionan de la siguiente forma.

Este es el Código fuente

#ejemplo de trigger rules

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import datetime
from airflow.utils.trigger_rule import TriggerRule

def myfunction():
     pass  

def myfunctionIncendio():
     import random
     incendio = random.random()
     print("EL VALOR DEL INCENDIO ES:",incendio)
     if incendio < 0.3:
          return True
     else:
          raise Exception
          #return False     #el -- return FALSE -- lo toma como una salida válida. No sirve
      
default_args = {}

with DAG(dag_id = "06.3-Monitoring_TareaB",
         description          = "Monitoreando nuestro DAG",
         schedule_interval    = "@daily",
         start_date           = datetime(2023, 5, 1),
         end_date             = datetime(2024, 1, 1),
         default_args         = default_args,
         max_active_runs      = 1 
         ) as dag:
    # "depends_on_past" por defecto es False, por lo que pueden surgir los procesos todos los días independientemente. 
    # "depends_on_past" hace que se ejecute solo si el anterior proceso terminó, así se realiza uno a la vez secuencial
    # max_active_runs, se ejecute un modulo, evitando que haya paralelismo, y cuando se termine todo le grupo, se pasa al sgte día.
 

    t1 = PythonOperator(task_id="Pobalcion_01",
                      python_callable=myfunctionIncendio,
                      trigger_rule  = TriggerRule.ALWAYS, 
                      retries      = 2,
                      retry_delay  = 5,
                      depends_on_past = False)
    
    t2 = PythonOperator(task_id="Poblacion_02",
                      python_callable=myfunctionIncendio,
                      trigger_rule  = TriggerRule.ALWAYS, 
                      retries      = 2,
                      retry_delay  = 5,
                      depends_on_past = False)
    t3 = PythonOperator(task_id="Poblacion_03",
                      python_callable=myfunctionIncendio,
                      trigger_rule  = TriggerRule.ALWAYS, 
                      retries      = 2,
                      retry_delay  = 5,
                      depends_on_past = False)
    t4 = PythonOperator(task_id="Poblacion_04",
                      python_callable=myfunctionIncendio,
                      trigger_rule  = TriggerRule.ALWAYS, 
                      retries      = 2,
                      retry_delay  = 5,
                      depends_on_past = False)
    t5 = PythonOperator(task_id="Bosque_01",
                      python_callable=myfunctionIncendio,
                      trigger_rule  = TriggerRule.ALWAYS, 
                      retries      = 2,
                      retry_delay  = 5,
                      depends_on_past = False)
    t6 = PythonOperator(task_id="Piscina_01",
                      python_callable=myfunctionIncendio,
                      trigger_rule  = TriggerRule.ALWAYS, 
                      retries      = 2,
                      retry_delay  = 5,
                      depends_on_past = False)
    t7 = PythonOperator(task_id="colegio_01",
                      python_callable=myfunctionIncendio,
                      trigger_rule  = TriggerRule.ALWAYS, 
                      retries      = 2,
                      retry_delay  = 5,
                      depends_on_past = False)
    
    t8 = PythonOperator(task_id="Municipalidad",
                      python_callable=myfunctionIncendio,
                      trigger_rule  = TriggerRule.ALWAYS, 
                      retries      = 2,
                      retry_delay  = 5,
                      depends_on_past = False)
    
    t9 = PythonOperator(task_id="Fiesta",
                      python_callable=myfunction,
                      trigger_rule  = TriggerRule.NONE_FAILED_OR_SKIPPED, 
                      retries      = 2,
                      retry_delay  = 5,
                      depends_on_past = False)
    
    
    t10 = PythonOperator(task_id="Bomberos",
                      python_callable=myfunctionIncendio,
                      trigger_rule  = TriggerRule.ONE_FAILED,
                      retries      = 2,
                      retry_delay  = 5,
                      depends_on_past = False)
    
    t100 = PythonOperator(task_id="GuardiaNacional",
                      python_callable=myfunction,
                      trigger_rule  = TriggerRule.ALL_FAILED,
                      retries      = 2,
                      retry_delay  = 5,
                      depends_on_past = False)
    

    [t1, t2, t3 , t4] >> t10
    [t1, t2, t3 , t4, t6, t7, t8] >> t9

    [t1, t2, t3, t4 ] >> t5
    [t1, t2, t3, t4 ] >> t6
    [t1, t2, t3, t4 ] >> t7

    [t5,t6,t7] >> t8

    [t8, t10] >> t100

Originalmente el valor para que hubiera un incendio era de 0.7, pero como no se quemaba nada, lo bajé a 0.3
Aún así la municipalidad y los bomberos casi nunca se quemaban por lo que en una oportunidad que la municipalidad si se quemó, aproveché de forzar el dato de los bomberos para que al menos una vez saliera la Guardia Nacional.

1\. \*\*all\_success:\*\* Este trigger se activa cuando todas las tareas antecesoras de una tarea dada han tenido éxito. En otras palabras, la tarea actual se ejecutará si todas las tareas anteriores han finalizado exitosamente. 2\. \*\*all\_failed:\*\* Se activa cuando todas las tareas antecesoras de una tarea dada han fallado. Esto significa que la tarea actual se ejecutará si todas las tareas anteriores han fallado. 3\. \*\*all\_done:\*\* Este trigger se activa cuando todas las tareas antecesoras de una tarea dada han finalizado, independientemente de su estado (éxito, falla, o en espera). 4\. \*\*one\_success:\*\* Se activa cuando al menos una de las tareas antecesoras de una tarea dada ha tenido éxito. La tarea actual se ejecutará si al menos una de las tareas anteriores ha finalizado con éxito. 5\. \*\*one\_failed:\*\* Se activa cuando al menos una de las tareas antecesoras de una tarea dada ha fallado. La tarea actual se ejecutará si al menos una de las tareas anteriores ha fallado. 6\. \*\*none\_failed:\*\* Este trigger es una opción que no viene por defecto en Apache Airflow. Sin embargo, parece ser una combinación lógica de los triggers anteriores. Se activaría cuando ninguna de las tareas antecesoras de una tarea dada haya fallado. La tarea actual se ejecutaría si ninguna de las tareas anteriores ha fallado.

Las reglas de activación de Airflow son fáciles de usar pero, sin embargo, extremadamente poderosas. Le permiten crear canalizaciones de datos más complejas y abordar casos de uso reales. No dude en usarlos para manejar el error de una manera mejor y más confiable que solo con una devolución de llamada.



Tremenda clase