No tienes acceso a esta clase

¡Continúa aprendiendo! Únete y comienza a potenciar tu carrera

BranchPythonOperator

26/29
Recursos

Aportes 3

Preguntas 0

Ordenar por:

¿Quieres ver más aportes, preguntas y respuestas de la comunidad?

⚠️ Falto una cosa que nos puede pasar y no entender el porque si existiese otra tarea después de start_15_june dependiendo el flujo si estas son skiped estas pueden no ejecutarse y también saltarse para evitar eso debemos usar el parámetro trigger_rule=TriggerRule.NONE_FAILED, El objeto TriggerRule se importa así

from airflow.utils.trigger_rule import TriggerRule
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import BranchPythonOperator
from datetime import datetime, date

default_args = {
'start_date': datetime(2022, 8, 20),
'end_date': datetime(2022, 8, 25)
}

def _choose(**context):
    if context["logical_date"].date() < date(2022, 8, 23):
        return "finish_22_june"
    return "start_23_june"

with DAG(dag_id="10-branching",
    schedule_interval="@daily",
	default_args=default_args
) as dag:

    branching = BranchPythonOperator(task_id="branch",
	                                 python_callable=_choose)

    finish_22 = BashOperator(task_id="finish_22_june",
	                         bash_command="echo 'Running {{ds}}'")

    start_23 = BashOperator(task_id="start_23_june",
	                        bash_command="echo 'Running {{ds}}'")

    branching >> [finish_22, start_23]

Modelo de ejemplo con un BrachOperator

from airflow import DAG
from airflow.utils.trigger_rule import TriggerRule
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator, BranchPythonOperator

from datetime import datetime


def _choose(**context):

    if context['logical_date'].date() < datetime(2022, 10, 16):
        return 'task2'

    return 'task3'

with DAG(
    dag_id='09-BPO',
    description='Orquestando el dash',
    schedule_interval= '@daily',
    start_date=datetime(2022,10,1),
    end_date= datetime(2022,11,1),
    catchup=True,
    max_active_runs=1, #Los dias se ejecutan con un maximo numero de workers en paralelo
    default_args={
        'depends_on_past':True #las tardes de los dias no se ejecutan en pararelo, las ejecuciones son secuenciales y dependen de la ejecucion de la misma tarea el dia anterior
    }
        ) as dag:


    branching = BranchPythonOperator(
        task_id='Choice',
        python_callable= _choose
    )

    finish_15_10 = BashOperator(task_id='task2',
                    bash_command="sleep 2 && echo 'Task 2'"
    )

    finished_1_11 = BashOperator(task_id='task3',
                    bash_command="sleep 2 && echo 'Task 3'"
    )

    final_t = BashOperator(task_id='task4',
                    bash_command="sleep 2 && echo 'Final Part'",
                    trigger_rule = TriggerRule.NONE_FAILED
    )


    branching >> [finish_15_10, finished_1_11]

    final_t << [finish_15_10, finished_1_11]