No tienes acceso a esta clase

¡Continúa aprendiendo! Únete y comienza a potenciar tu carrera

Orquestando un DAG II

17/29
Recursos

Aportes 10

Preguntas 8

Ordenar por:

¿Quieres ver más aportes, preguntas y respuestas de la comunidad?

Solo quiero decir GRACIAS! por el excelente uso del zoom en este curso, enfocando en lo que hay que enfocarse. Además de que en otros cursos es imposible verlo en el celular porque terminas viendo una pantalla negra con unas letritas moviendose, aquí sin ningun problema.

La cosa se pone un poco más confusa si no tienes claro en que franja horaria esta siendo planificada jaja, porque tienes tu hora local y la hora del servidor donde tengas Aiflow si corre en la nube, lo más recomendable es hacerlo en hora UTC.

Con respecto a los intervalos, efectivamente me tomo un tiempo entenderlo, por ejempo en mi trabajo los dag procesan los datos a dia cumplido, es decir, si mi próxima ejecucion esta programada para el 24/10/22 a las 00:00:00, en la realidad esto va a ejecutar cuando termine ese día, es decir el 25/10/22 00:00:00, justamente para ingestar los datos con todo el comportamiento que tuvo durante el día 24

)

Libro: Ruiter-Data Pipelines with Apache Airflow

Primer ejemplo

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from datetime import datetime

with DAG(dag_id="5.2-orquestation",
         description="Probando la orquestacion",
         schedule_interval="0 7 * * 1",
         start_date=datetime(2022, 1, 1),
         end_date=datetime(2022, 6, 1)) as dag:

    t1 = EmptyOperator(task_id = "tarea1")

    t2 = EmptyOperator(task_id = "tarea2")

    t3 = EmptyOperator(task_id = "tarea3")

    t4 = EmptyOperator(task_id = "tarea4")

    t1 >> t2 >> t3 >> t4

Segundo ejemplo

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from datetime import datetime

with DAG(dag_id="5.3-orquestation",
         description="Probando la orquestacion",
         schedule_interval="@monthly",
         start_date=datetime(2022, 1, 1),
         end_date=datetime(2022, 6, 1)) as dag:

    t1 = EmptyOperator(task_id = "tarea1")

    t2 = EmptyOperator(task_id = "tarea2")

    t3 = EmptyOperator(task_id = "tarea3")

    t4 = EmptyOperator(task_id = "tarea4")

    t1 >> t2 >> t3 >> t4

Mi practica,

Le puse algunas pausas a los procesos,

y este es el código


from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.bash import BashOperator
from datetime import datetime



with DAG(dag_id = "5.4-orquestation-tarea",
         description          = "Probando la orquestacion 3",
         schedule_interval    = "0 0 * * *", #"se ejecuta cada dia a las 00:00 horas
         start_date           = datetime(2022, 5, 1),
         end_date             = datetime(2023, 5, 30),
         default_args         = {"depends_on_past": True},
         #max_active_runs      = 1 
         ) as dag:
    # "depends_on_past" por defecto es False, por lo que pueden surgir los procesos todos los días independientemente. 
    # "depends_on_past" hace que se ejecute solo si el anterior proceso terminó, así se realiza uno a la vez secuencial
    # max_active_runs, se ejecute un modulo, evitado que haya paralelismo, y cuando se termine todo le grupo, se pasa al sgte día.
 
 
    t1 = EmptyOperator(task_id="tarea1")
    t2 = BashOperator(task_id="tarea2", bash_command="sleep 2 && echo 'tarea2'")
    t3 = EmptyOperator(task_id="tarea3")
    t4 = EmptyOperator(task_id="tarea4")
    t5 = BashOperator(task_id="tarea5", bash_command="sleep 2 && echo 'tarea5'")
    t6 = EmptyOperator(task_id="tarea6")
    t7 = BashOperator(task_id="tarea7", bash_command="sleep 3 && echo 'tarea7'")
    t8 = BashOperator(task_id="tarea8", bash_command="sleep 2 && echo 'tarea8'")
    
    t1 >> [ t2, t3, t4] 
    t3 >> [t5, t6]
    [t4, t5] >> t7
    [t4, t7] >> t8 

Cómo haces para que los DAGs se te carguen tan rápido? cada vez que agrego uno nuevo o lo elimino para limpiar las corridas, demora al menos un minuto en aparecer. Tengo la configuración basica del compose que indica 10s para esto creo, pero demora mucho mas.
Excelente y valiosa la explicación final de este capitulo. Lo entendí claro y preciso, ya quedó agregado a mis apuntes jejej
En la segunda etapa de "Orquestando un DAG", profundizamos en conceptos avanzados y optimizaciones para manejar tareas más complejas. Aquí exploraremos técnicas clave para escalar, depurar, y mejorar la eficiencia en el diseño y la ejecución de DAGs. ### **1. Definiendo Dependencias Complejas** A medida que tu flujo de trabajo crece, es posible que necesites manejar múltiples dependencias entre tareas: * **Dependencias Lineales**:tarea\_1 >> tarea\_2 >> tarea\_3 * **Dependencias Ramificadas**:\[tarea\_1, tarea\_2] >> tarea\_3 tarea\_3 >> \[tarea\_4, tarea\_5] * **Configuración Dinámica de Dependencias**: Si las tareas dependen de un número variable de entradas:for i in range(5): previous\_task >> PythonOperator( task\_id=f'tarea\_{i}', python\_callable=funcion\_dinamica, ) ### **2. Uso de Sensores** Los sensores son operadores especiales que esperan un evento o condición antes de continuar. Por ejemplo, esperar a que un archivo se cree: from airflow.sensors.filesystem import FileSensor esperar\_archivo = FileSensor( task\_id='esperar\_archivo', filepath='/ruta/al/archivo', poke\_interval=30, # Verifica cada 30 segundos timeout=600, # Expira después de 10 minutos ) ### **3. Paralelismo y Pools** Para flujos de trabajo grandes, el paralelismo optimiza el uso de recursos: * **Configurar** `concurrency` **del DAG**: Limita el número máximo de tareas simultáneas en un DAG.with DAG( 'dag\_con\_paralelismo', concurrency=10, # Máximo de 10 tareas a la vez ... ) * **Usar Pools**: Agrupa tareas para compartir recursos específicos:airflow pools set pool\_name 5 "Descripción del pool" Luego, asigna el pool en las tareas:tarea\_optimizada = PythonOperator( task\_id='tarea\_optimizada', python\_callable=mi\_funcion, pool='pool\_name', ) ### **4. Manejo de Errores y Retries** Es importante configurar estrategias de manejo de errores para mantener la robustez del DAG: default\_args = { 'retries': 3, # Reintenta 3 veces 'retry\_delay': timedelta(minutes=5), # Espera 5 minutos entre reintentos 'on\_failure\_callback': mi\_funcion\_de\_notificacion, } Además, puedes especificar una tarea en particular que debe ejecutarse en caso de fallos: tarea\_fallida >> tarea\_notificar\_fallo ### **5. Integración con APIs y Scripts Externos** Es común ejecutar scripts o interactuar con APIs externas desde un DAG. Por ejemplo, usando `BashOperator` o `HttpSensor`: * **Ejecutar un Script Bash**:from airflow.operators.bash import BashOperator tarea\_bash = BashOperator( task\_id='ejecutar\_script', bash\_command='python3 /ruta/a/mi\_script.py', ) * **Esperar una Respuesta de API**:from airflow.sensors.http import HttpSensor esperar\_api = HttpSensor( task\_id='esperar\_api', http\_conn\_id='mi\_api', endpoint='/status', response\_check=lambda response: response.status\_code == 200, ) ### **6. Depuración Avanzada** Para depurar errores en tareas o DAGs complejos: * **Ver Logs Detallados**: Usa la interfaz de Airflow o la CLI:airflow tasks logs dag\_id task\_id execution\_date * **Ejecutar Tareas en Modo Local**:airflow tasks test dag\_id task\_id execution\_date ### **7. Prácticas de Diseño Escalable** * Divide DAGs grandes en DAGs más pequeños, vinculados mediante **ExternalTaskSensor**. * Usa **temporalidad dinámica** con el parámetro `execution_date` para manejar tareas dependientes del tiempo. * Emplea variables o conexiones definidas en Airflow para parametrizar tareas.