Mauro Ezequiel Bravo
EstudianteRoyer Guerrero Pinilla
EstudianteCésar Pérez
EstudianteMario Alexander Vargas Celis
Estudiantefrom airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.python import BranchPythonOperator from datetime import datetime, date default_args = { 'start_date': datetime(2022, 8, 20), 'end_date': datetime(2022, 8, 25) } def _choose(**context): if context["logical_date"].date() < date(2022, 8, 23): return "finish_22_june" return "start_23_june" with DAG(dag_id="10-branching", schedule_interval="@daily", default_args=default_args ) as dag: branching = BranchPythonOperator(task_id="branch", python_callable=_choose) finish_22 = BashOperator(task_id="finish_22_june", bash_command="echo 'Running {{ds}}'") start_23 = BashOperator(task_id="start_23_june", bash_command="echo 'Running {{ds}}'") branching >> [finish_22, start_23]
⚠️ Falto una cosa que nos puede pasar y no entender el porque si existiese otra tarea después de start_15_june dependiendo el flujo si estas son skiped estas pueden no ejecutarse y también saltarse para evitar eso debemos usar el parámetro trigger_rule=TriggerRule.NONE_FAILED, El objeto TriggerRule se importa así
from airflow.utils.trigger_rule import TriggerRule
Modelo de ejemplo con un BrachOperator
from airflow import DAG from airflow.utils.trigger_rule import TriggerRule from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator, BranchPythonOperator from datetime import datetime def _choose(**context): if context['logical_date'].date() < datetime(2022, 10, 16): return 'task2' return 'task3' with DAG( dag_id='09-BPO', description='Orquestando el dash', schedule_interval= '@daily', start_date=datetime(2022,10,1), end_date= datetime(2022,11,1), catchup=True, max_active_runs=1, #Los dias se ejecutan con un maximo numero de workers en paralelo default_args={ 'depends_on_past':True #las tardes de los dias no se ejecutan en pararelo, las ejecuciones son secuenciales y dependen de la ejecucion de la misma tarea el dia anterior } ) as dag: branching = BranchPythonOperator( task_id='Choice', python_callable= _choose ) finish_15_10 = BashOperator(task_id='task2', bash_command="sleep 2 && echo 'Task 2'" ) finished_1_11 = BashOperator(task_id='task3', bash_command="sleep 2 && echo 'Task 3'" ) final_t = BashOperator(task_id='task4', bash_command="sleep 2 && echo 'Final Part'", trigger_rule = TriggerRule.NONE_FAILED ) branching >> [finish_15_10, finished_1_11] final_t << [finish_15_10, finished_1_11]
El operador BranchPythonOperator en Apache Airflow permite la creación de flujos de trabajo dinámicos al seleccionar la rama de ejecución en función de una condición evaluada en tiempo de ejecución. Es ideal cuando necesitas tomar decisiones basadas en datos o lógica dentro de un DAG.
Funcionamiento del BranchPythonOperator
task_id de la(s) tarea(s) que debe(n) ejecutarse a continuación.Ejemplo Básico de Uso
from airflow import DAG from airflow.operators.python import BranchPythonOperator, PythonOperator from airflow.operators.dummy import DummyOperator from datetime import datetime
def decidir_rama(**kwargs): # Lógica para elegir una rama basada en el contexto condicion = kwargs['execution_date'].day % 2 # Ejemplo: días pares o impares if condicion == 0: return 'rama_par' else: return 'rama_impar'
with DAG( dag_id='branch_example', start_date=datetime(2024, 1, 1), schedule_interval='@daily', catchup=False, ) as dag: inicio = DummyOperator(task_id='inicio')
branch_task = BranchPythonOperator( task_id='branch_decision', python_callable=decidir_rama )
rama_par = DummyOperator(task_id='rama_par') rama_impar = DummyOperator(task_id='rama_impar')
fin = DummyOperator(task_id='fin', trigger_rule='none_failed_min_one_success')
# Flujo del DAG inicio >> branch_task branch_task >> rama_par >> fin branch_task >> rama_impar >> fin
Explicación del Código:
inicio se ejecuta antes del operador de bifurcación.BranchPythonOperator:
branch_decision evalúa una condición y selecciona una rama:
rama_par.rama_impar.rama_par y rama_impar representan las ramas divergentes.fin se ejecuta cuando al menos una de las ramas ha tenido éxito, controlado mediante la regla de disparo none_failed_min_one_success.Notas Clave:
task_ids:
python_callable puede devolver:
BranchPythonOperator puede usar datos de tareas anteriores recuperados mediante XComs para decidir qué rama ejecutar.trigger_rule), ya que las tareas no seleccionadas serán marcadas como omitidas.Ejemplo Avanzado con Datos Dinámicos y XComs
def calcular_rama(**kwargs): # Obtener un valor de XCom valor = kwargs['ti'].xcom_pull(task_ids='tarea_inicial') if valor == 'A': return 'rama_a' else: return 'rama_b'
tarea_inicial = PythonOperator( task_id='tarea_inicial', python_callable=lambda: 'A' )
branch_task = BranchPythonOperator( task_id='branch_decision', python_callable=calcular_rama, provide_context=True )