No tienes acceso a esta clase

¡Continúa aprendiendo! Únete y comienza a potenciar tu carrera

BranchPythonOperator

26/29
Recursos

Aportes 4

Preguntas 0

Ordenar por:

¿Quieres ver más aportes, preguntas y respuestas de la comunidad?

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import BranchPythonOperator
from datetime import datetime, date

default_args = {
'start_date': datetime(2022, 8, 20),
'end_date': datetime(2022, 8, 25)
}

def _choose(**context):
    if context["logical_date"].date() < date(2022, 8, 23):
        return "finish_22_june"
    return "start_23_june"

with DAG(dag_id="10-branching",
    schedule_interval="@daily",
	default_args=default_args
) as dag:

    branching = BranchPythonOperator(task_id="branch",
	                                 python_callable=_choose)

    finish_22 = BashOperator(task_id="finish_22_june",
	                         bash_command="echo 'Running {{ds}}'")

    start_23 = BashOperator(task_id="start_23_june",
	                        bash_command="echo 'Running {{ds}}'")

    branching >> [finish_22, start_23]

⚠️ Falto una cosa que nos puede pasar y no entender el porque si existiese otra tarea después de start_15_june dependiendo el flujo si estas son skiped estas pueden no ejecutarse y también saltarse para evitar eso debemos usar el parámetro trigger_rule=TriggerRule.NONE_FAILED, El objeto TriggerRule se importa así

from airflow.utils.trigger_rule import TriggerRule

Modelo de ejemplo con un BrachOperator

from airflow import DAG
from airflow.utils.trigger_rule import TriggerRule
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator, BranchPythonOperator

from datetime import datetime


def _choose(**context):

    if context['logical_date'].date() < datetime(2022, 10, 16):
        return 'task2'

    return 'task3'

with DAG(
    dag_id='09-BPO',
    description='Orquestando el dash',
    schedule_interval= '@daily',
    start_date=datetime(2022,10,1),
    end_date= datetime(2022,11,1),
    catchup=True,
    max_active_runs=1, #Los dias se ejecutan con un maximo numero de workers en paralelo
    default_args={
        'depends_on_past':True #las tardes de los dias no se ejecutan en pararelo, las ejecuciones son secuenciales y dependen de la ejecucion de la misma tarea el dia anterior
    }
        ) as dag:


    branching = BranchPythonOperator(
        task_id='Choice',
        python_callable= _choose
    )

    finish_15_10 = BashOperator(task_id='task2',
                    bash_command="sleep 2 && echo 'Task 2'"
    )

    finished_1_11 = BashOperator(task_id='task3',
                    bash_command="sleep 2 && echo 'Task 3'"
    )

    final_t = BashOperator(task_id='task4',
                    bash_command="sleep 2 && echo 'Final Part'",
                    trigger_rule = TriggerRule.NONE_FAILED
    )


    branching >> [finish_15_10, finished_1_11]

    final_t << [finish_15_10, finished_1_11]
El operador `BranchPythonOperator` en Apache Airflow permite la creación de flujos de trabajo dinámicos al seleccionar la rama de ejecución en función de una condición evaluada en tiempo de ejecución. Es ideal cuando necesitas tomar decisiones basadas en datos o lógica dentro de un DAG. ### Funcionamiento del `BranchPythonOperator` 1. **Lógica de Rama**: * Ejecuta una función Python que devuelve el `task_id` de la(s) tarea(s) que debe(n) ejecutarse a continuación. * Las tareas que no sean seleccionadas por el operador serán automáticamente marcadas como **"skip"** (omitidas). 2. **Uso Contextual**: * Diseñado para escenarios donde el flujo de ejecución varía según una condición lógica, como resultados de una tarea previa o valores externos. ### Ejemplo Básico de Uso from airflow import DAG from airflow.operators.python import BranchPythonOperator, PythonOperator from airflow.operators.dummy import DummyOperator from datetime import datetime def decidir\_rama(\*\*kwargs): \# Lógica para elegir una rama basada en el contexto condicion = kwargs\['execution\_date'].day % 2 # Ejemplo: días pares o impares if condicion == 0: return 'rama\_par' else: return 'rama\_impar' with DAG( dag\_id='branch\_example', start\_date=datetime(2024, 1, 1), schedule\_interval='@daily', catchup=False, ) as dag: inicio = DummyOperator(task\_id='inicio') branch\_task = BranchPythonOperator( task\_id='branch\_decision', python\_callable=decidir\_rama ) rama\_par = DummyOperator(task\_id='rama\_par') rama\_impar = DummyOperator(task\_id='rama\_impar') fin = DummyOperator(task\_id='fin', trigger\_rule='none\_failed\_min\_one\_success') \# Flujo del DAG inicio >> branch\_task branch\_task >> rama\_par >> fin branch\_task >> rama\_impar >> fin ### Explicación del Código: 1. **Tareas iniciales**: * La tarea `inicio` se ejecuta antes del operador de bifurcación. 2. `BranchPythonOperator`: * La tarea `branch_decision` evalúa una condición y selecciona una rama: * Si el día es par, selecciona `rama_par`. * Si es impar, selecciona `rama_impar`. 3. **Tareas de ramas**: * `rama_par` y `rama_impar` representan las ramas divergentes. 4. **Tarea final**: * La tarea `fin` se ejecuta cuando al menos una de las ramas ha tenido éxito, controlado mediante la regla de disparo `none_failed_min_one_success`. ### Notas Clave: 1. **Devuelve uno o varios** `task_id`**s**: * La función que se usa en `python_callable` puede devolver: * Una cadena para seleccionar una sola tarea. * Una lista de cadenas para seleccionar múltiples tareas. 2. **Integración con XComs**: * El `BranchPythonOperator` puede usar datos de tareas anteriores recuperados mediante XComs para decidir qué rama ejecutar. 3. **Trigger Rule**: * Las tareas posteriores a las ramas deben manejarse con cuidado en términos de reglas de activación (`trigger_rule`), ya que las tareas no seleccionadas serán marcadas como omitidas. ### Ejemplo Avanzado con Datos Dinámicos y XComs def calcular\_rama(\*\*kwargs): \# Obtener un valor de XCom valor = kwargs\['ti'].xcom\_pull(task\_ids='tarea\_inicial') if valor == 'A': return 'rama\_a' else: return 'rama\_b' tarea\_inicial = PythonOperator( task\_id='tarea\_inicial', python\_callable=lambda: 'A' ) branch\_task = BranchPythonOperator( task\_id='branch\_decision', python\_callable=calcular\_rama, provide\_context=True )