No tienes acceso a esta clase

¡Continúa aprendiendo! Únete y comienza a potenciar tu carrera

Definiendo dependencias entre tareas

14/29
Recursos

Aportes 12

Preguntas 4

Ordenar por:

¿Quieres ver más aportes, preguntas y respuestas de la comunidad?

también se pueden combinar:

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime

def print_hello():
    return print('hello world with python in airflow')

with DAG(dag_id='dependencias',
         description='DAG con dependencias',
         schedule_interval='@once',
         start_date=datetime(2022, 7, 1)) as dag:

    t1 = PythonOperator(task_id='t1_primer_task_python',
                        python_callable=print_hello)

    t2 = BashOperator(task_id='t2_task_bash',
                      bash_command='echo "Tarea 2"')

    t3 = BashOperator(task_id='t3_task_bash', bash_command='echo "Tarea 3"')

    t4 = BashOperator(task_id='t4_task_bash', bash_command='echo "Tarea 4"')

    t1.set_downstream([t2, t3])
    [t2, t3] >> t4

Ya se siente el poder de Airflow!

No me funciono, debi usar un DummyOperator, alguien sabe el por que? Gracias
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def print_hello():
    print("Hello Oscar :)")

with DAG(dag_id="python_operator",
         description="Nuestro primer DAG utilizando Python Operator",
         schedule_interval="@once",
         start_date=datetime(2023, 10, 18)) as dag:
    t1 = PythonOperator(task_id="hello_with_Python",
                        python_callable=print_hello)

con 5 tareas:
–> t3
t1 —> t2 |–>t5
–> t4

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime
import time

def prints():
    timestamp = int(time.time())
    # Print the timestamp
    print("Current timestamp:", timestamp)

def prints2():
    time.sleep(3)
    timestamp = int(time.time())
    # Print the timestamp
    print("Current timestamp2:", timestamp)


with DAG(
            dag_id='dependencias',
            description='our first DAG making depencies between taks',
            schedule_interval='@once',
            start_date=datetime(2023,8,3)
        ) as dag:
    
    t1=PythonOperator(task_id='task1',
                      python_callable=prints
                      )
    t2=BashOperator(task_id="task2",
                    bash_command="echo 'Hello platzis people'"
                    )
    t3=PythonOperator(task_id='task3',
                      python_callable=prints2
                      )
    t4=BashOperator(task_id="task4",
                    bash_command="echo 'task4'"
                    )
    t5=BashOperator(task_id="task5",
                    bash_command="echo 'task5'"
                    )
    
    """
              \--> t3
    t1 ---> t2
              \--> t4
    """
    #t1.set_downstream(t2)
    #t2.set_downstream([t3,t4])

    """
              \--> t3
    t1 ---> t2       |-->t5
              \--> t4
    """

    t1 >> t2  >> [t3,t4] >> t5
    

Dependencies are a powerful and popular Airflow feature. In Airflow, your pipelines are defined as Directed Acyclic Graphs (DAGs). Each task is a node in the graph and dependencies are the directed edges that determine how to move through the graph. Because of this, dependencies are key to following data engineering best practices because they help you define flexible pipelines with atomic tasks.

Throughout this guide, the following terms are used to describe task dependencies:

Upstream task: A task that must reach a specified state before a dependent task can run.
Downstream task: A dependent task that cannot run until an upstream task reaches a specified state.

Me gusta mucho este tipo de herramientas, se puede combinar con aws lambda y se puede hacer muchas cosas interesantes

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime

def print_hello(country, **kwargs):
    print(f'I am processing this {country}')


with DAG(dag_id='dependecias'
        , description='Creando dependencias entre tareas'
        , start_date=datetime(2023, 6, 7)
        , schedule_interval='@once') as dag:


    t1 = PythonOperator(task_id='process_ar'
                        , python_callable=print_hello
                        , op_kwargs={'country':'AR'})

    t2 = PythonOperator(task_id='process_uy'
                        , python_callable=print_hello
                        , op_kwargs={'country':'UY'})

    t3 = BashOperator(task_id='process_mx'
                        , bash_command="echo 'processing MX'")

    t4 = BashOperator(task_id='process_co'
                        , bash_command="echo 'processing CO'")

    t1 >> [t2, t3] >> t4

Un paso más…

Poderoso:

t1 >> [t3, t4] >> t5 >> t7
t2 >> t5
t6 >> [t7, t8] >> t9

~ Por cierto, ya conectamos en LinkedIn? ~

Conectemos en LinkedIn, GitHub, Medium o Redes sociales


B)

from airflow import DAG
from datetime import datetime
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

def print_hello():
    print("HELLO PAPUS1")

with DAG(dag_id="dependencias_test",
         description="Creando dependendecias parcero",
         schedule_interval="@once",
         start_date =datetime(2022,12,6)) as dag:


    t1 = PythonOperator(task_id="tarea1",
                        python_callable=print_hello)

    t2 = BashOperator(task_id="tarea2",
                      bash_command="echo 'Hello PAPUS2'")       

    t3 = BashOperator(task_id="tarea3",
                      bash_command="echo 'Hello PAPUS3'")  

    t4 = BashOperator(task_id="tarea4",
                      bash_command="echo 'Hello PAPUS4'")

    t1.set_downstream(t2) 
    t2.set_downstream([t3,t4])                                                                                 

    #Other way
    #t1 >> t2 >> [t3,t4]```

Profe sería genial que subieras las imagenes que usaste en el panel de recursos