No tienes acceso a esta clase

¬°Contin√ļa aprendiendo! √önete y comienza a potenciar tu carrera

Quedan menos de 24 hrs para aprender Inglés, AI y más a precio especial.

Antes: $249

Currency
$209
Suscríbete

Termina en:

0 Días
0 Hrs
50 Min
0 Seg

Orquestando un DAG II

17/29
Recursos

Aportes 8

Preguntas 6

Ordenar por:

¬ŅQuieres ver m√°s aportes, preguntas y respuestas de la comunidad?

Solo quiero decir GRACIAS! por el excelente uso del zoom en este curso, enfocando en lo que hay que enfocarse. Además de que en otros cursos es imposible verlo en el celular porque terminas viendo una pantalla negra con unas letritas moviendose, aquí sin ningun problema.

La cosa se pone un poco m√°s confusa si no tienes claro en que franja horaria esta siendo planificada jaja, porque tienes tu hora local y la hora del servidor donde tengas Aiflow si corre en la nube, lo m√°s recomendable es hacerlo en hora UTC.

Con respecto a los intervalos, efectivamente me tomo un tiempo entenderlo, por ejempo en mi trabajo los dag procesan los datos a dia cumplido, es decir, si mi próxima ejecucion esta programada para el 24/10/22 a las 00:00:00, en la realidad esto va a ejecutar cuando termine ese día, es decir el 25/10/22 00:00:00, justamente para ingestar los datos con todo el comportamiento que tuvo durante el día 24

Mi practica,

Le puse algunas pausas a los procesos,

y este es el código


from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.bash import BashOperator
from datetime import datetime



with DAG(dag_id = "5.4-orquestation-tarea",
         description          = "Probando la orquestacion 3",
         schedule_interval    = "0 0 * * *", #"se ejecuta cada dia a las 00:00 horas
         start_date           = datetime(2022, 5, 1),
         end_date             = datetime(2023, 5, 30),
         default_args         = {"depends_on_past": True},
         #max_active_runs      = 1 
         ) as dag:
    # "depends_on_past" por defecto es False, por lo que pueden surgir los procesos todos los días independientemente. 
    # "depends_on_past" hace que se ejecute solo si el anterior proceso terminó, así se realiza uno a la vez secuencial
    # max_active_runs, se ejecute un modulo, evitado que haya paralelismo, y cuando se termine todo le grupo, se pasa al sgte día.
 
 
    t1 = EmptyOperator(task_id="tarea1")
    t2 = BashOperator(task_id="tarea2", bash_command="sleep 2 && echo 'tarea2'")
    t3 = EmptyOperator(task_id="tarea3")
    t4 = EmptyOperator(task_id="tarea4")
    t5 = BashOperator(task_id="tarea5", bash_command="sleep 2 && echo 'tarea5'")
    t6 = EmptyOperator(task_id="tarea6")
    t7 = BashOperator(task_id="tarea7", bash_command="sleep 3 && echo 'tarea7'")
    t8 = BashOperator(task_id="tarea8", bash_command="sleep 2 && echo 'tarea8'")
    
    t1 >> [ t2, t3, t4] 
    t3 >> [t5, t6]
    [t4, t5] >> t7
    [t4, t7] >> t8 

)

Libro: Ruiter-Data Pipelines with Apache Airflow

Primer ejemplo

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from datetime import datetime

with DAG(dag_id="5.2-orquestation",
         description="Probando la orquestacion",
         schedule_interval="0 7 * * 1",
         start_date=datetime(2022, 1, 1),
         end_date=datetime(2022, 6, 1)) as dag:

    t1 = EmptyOperator(task_id = "tarea1")

    t2 = EmptyOperator(task_id = "tarea2")

    t3 = EmptyOperator(task_id = "tarea3")

    t4 = EmptyOperator(task_id = "tarea4")

    t1 >> t2 >> t3 >> t4

Segundo ejemplo

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from datetime import datetime

with DAG(dag_id="5.3-orquestation",
         description="Probando la orquestacion",
         schedule_interval="@monthly",
         start_date=datetime(2022, 1, 1),
         end_date=datetime(2022, 6, 1)) as dag:

    t1 = EmptyOperator(task_id = "tarea1")

    t2 = EmptyOperator(task_id = "tarea2")

    t3 = EmptyOperator(task_id = "tarea3")

    t4 = EmptyOperator(task_id = "tarea4")

    t1 >> t2 >> t3 >> t4
Cómo haces para que los DAGs se te carguen tan rápido? cada vez que agrego uno nuevo o lo elimino para limpiar las corridas, demora al menos un minuto en aparecer. Tengo la configuración basica del compose que indica 10s para esto creo, pero demora mucho mas.