No tienes acceso a esta clase

¡Continúa aprendiendo! Únete y comienza a potenciar tu carrera

Definiendo dependencias entre tareas

14/29
Recursos

Aportes 16

Preguntas 4

Ordenar por:

¿Quieres ver más aportes, preguntas y respuestas de la comunidad?

también se pueden combinar:

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime

def print_hello():
    return print('hello world with python in airflow')

with DAG(dag_id='dependencias',
         description='DAG con dependencias',
         schedule_interval='@once',
         start_date=datetime(2022, 7, 1)) as dag:

    t1 = PythonOperator(task_id='t1_primer_task_python',
                        python_callable=print_hello)

    t2 = BashOperator(task_id='t2_task_bash',
                      bash_command='echo "Tarea 2"')

    t3 = BashOperator(task_id='t3_task_bash', bash_command='echo "Tarea 3"')

    t4 = BashOperator(task_id='t4_task_bash', bash_command='echo "Tarea 4"')

    t1.set_downstream([t2, t3])
    [t2, t3] >> t4

Ya se siente el poder de Airflow!

![](https://static.platzi.com/media/user_upload/638706563514874149-cee5a1fb-16b8-480f-bd06-4f1814467bc8.jpg)Así se vio mi ejemplo realizado con el material del curso
No me funciono, debi usar un DummyOperator, alguien sabe el por que? Gracias
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def print_hello():
    print("Hello Oscar :)")

with DAG(dag_id="python_operator",
         description="Nuestro primer DAG utilizando Python Operator",
         schedule_interval="@once",
         start_date=datetime(2023, 10, 18)) as dag:
    t1 = PythonOperator(task_id="hello_with_Python",
                        python_callable=print_hello)

con 5 tareas:
–> t3
t1 —> t2 |–>t5
–> t4

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime
import time

def prints():
    timestamp = int(time.time())
    # Print the timestamp
    print("Current timestamp:", timestamp)

def prints2():
    time.sleep(3)
    timestamp = int(time.time())
    # Print the timestamp
    print("Current timestamp2:", timestamp)


with DAG(
            dag_id='dependencias',
            description='our first DAG making depencies between taks',
            schedule_interval='@once',
            start_date=datetime(2023,8,3)
        ) as dag:
    
    t1=PythonOperator(task_id='task1',
                      python_callable=prints
                      )
    t2=BashOperator(task_id="task2",
                    bash_command="echo 'Hello platzis people'"
                    )
    t3=PythonOperator(task_id='task3',
                      python_callable=prints2
                      )
    t4=BashOperator(task_id="task4",
                    bash_command="echo 'task4'"
                    )
    t5=BashOperator(task_id="task5",
                    bash_command="echo 'task5'"
                    )
    
    """
              \--> t3
    t1 ---> t2
              \--> t4
    """
    #t1.set_downstream(t2)
    #t2.set_downstream([t3,t4])

    """
              \--> t3
    t1 ---> t2       |-->t5
              \--> t4
    """

    t1 >> t2  >> [t3,t4] >> t5
    

Dependencies are a powerful and popular Airflow feature. In Airflow, your pipelines are defined as Directed Acyclic Graphs (DAGs). Each task is a node in the graph and dependencies are the directed edges that determine how to move through the graph. Because of this, dependencies are key to following data engineering best practices because they help you define flexible pipelines with atomic tasks.

Throughout this guide, the following terms are used to describe task dependencies:

Upstream task: A task that must reach a specified state before a dependent task can run.
Downstream task: A dependent task that cannot run until an upstream task reaches a specified state.

Me gusta mucho este tipo de herramientas, se puede combinar con aws lambda y se puede hacer muchas cosas interesantes

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime

def print_hello(country, **kwargs):
    print(f'I am processing this {country}')


with DAG(dag_id='dependecias'
        , description='Creando dependencias entre tareas'
        , start_date=datetime(2023, 6, 7)
        , schedule_interval='@once') as dag:


    t1 = PythonOperator(task_id='process_ar'
                        , python_callable=print_hello
                        , op_kwargs={'country':'AR'})

    t2 = PythonOperator(task_id='process_uy'
                        , python_callable=print_hello
                        , op_kwargs={'country':'UY'})

    t3 = BashOperator(task_id='process_mx'
                        , bash_command="echo 'processing MX'")

    t4 = BashOperator(task_id='process_co'
                        , bash_command="echo 'processing CO'")

    t1 >> [t2, t3] >> t4

Un paso más…

Poderoso:

t1 >> [t3, t4] >> t5 >> t7
t2 >> t5
t6 >> [t7, t8] >> t9

~ Por cierto, ya conectamos en LinkedIn? ~

Conectemos en LinkedIn, GitHub, Medium o Redes sociales


B)

![](https://static.platzi.com/media/user_upload/image-35d6c413-1405-4672-b6c6-906a22c9b0ae.jpg) ![](https://static.platzi.com/media/user_upload/image-94403abd-404e-47bf-b54f-0a38c732c3bb.jpg)
Este son mis tareas programadas: ![](https://static.platzi.com/media/user_upload/imagen-e12fd25d-c800-4a0f-b641-eaefb48f07bc.jpg)
En Apache Airflow, las dependencias entre tareas se definen usando operadores que establecen relaciones de ejecución. Estas dependencias determinan el orden en el que las tareas deben ejecutarse en el DAG. ### Métodos para definir dependencias 1. **Usando el operador** `>>` **(hacia adelante):** Este operador indica que una tarea debe ejecutarse antes de otra.task1 >> task2 # task1 se ejecuta antes de task2 2. **Usando el operador** `<<` **(hacia atrás):** Este operador indica que una tarea debe ejecutarse después de otra.task1 << task2 # task2 se ejecuta antes de task1 3. **Definiendo dependencias múltiples:** Puedes definir dependencias entre varias tareas a la vez:task1 >> \[task2, task3] # task1 se ejecuta antes de task2 y task3 \[task2, task3] >> task4 # task2 y task3 deben completarse antes de ejecutar task4 4. **Usando el método** `.set_downstream()` **y** `.set_upstream()`**:** Estos métodos establecen relaciones explícitas entre tareas.task1.set\_downstream(task2) # Igual a task1 >> task2 task2.set\_upstream(task1) # Igual a task1 >> task2 ### Ejemplo práctico from airflow import DAG from airflow.operators.empty import EmptyOperator from datetime import datetime \# Definir el DAG with DAG( dag\_id="dependencias\_dag", description="Definiendo dependencias entre tareas", start\_date=datetime(2024, 11, 28), schedule\_interval="@once", ) as dag: \# Tareas inicio = EmptyOperator(task\_id="inicio") procesar\_datos = EmptyOperator(task\_id="procesar\_datos") generar\_reporte = EmptyOperator(task\_id="generar\_reporte") fin = EmptyOperator(task\_id="fin") \# Definir dependencias inicio >> procesar\_datos >> generar\_reporte >> fin ### Resultado En el ejemplo anterior: 1. La tarea `inicio` debe completarse antes de `procesar_datos`. 2. `procesar_datos` debe completarse antes de `generar_reporte`. 3. Finalmente, `generar_reporte` debe completarse antes de `fin`. ### Visualización Cuando el DAG se carga correctamente, las dependencias se pueden observar en el interfaz de Airflow como un flujo claro entre las tareas. Esto asegura un orden lógico y ejecutable en el proceso.
from airflow import DAG
from datetime import datetime
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

def print_hello():
    print("HELLO PAPUS1")

with DAG(dag_id="dependencias_test",
         description="Creando dependendecias parcero",
         schedule_interval="@once",
         start_date =datetime(2022,12,6)) as dag:


    t1 = PythonOperator(task_id="tarea1",
                        python_callable=print_hello)

    t2 = BashOperator(task_id="tarea2",
                      bash_command="echo 'Hello PAPUS2'")       

    t3 = BashOperator(task_id="tarea3",
                      bash_command="echo 'Hello PAPUS3'")  

    t4 = BashOperator(task_id="tarea4",
                      bash_command="echo 'Hello PAPUS4'")

    t1.set_downstream(t2) 
    t2.set_downstream([t3,t4])                                                                                 

    #Other way
    #t1 >> t2 >> [t3,t4]```

Profe sería genial que subieras las imagenes que usaste en el panel de recursos