Orquestando un DAG II
Clase 17 de 29 • Curso de Fundamentos de Apache Airflow
Contenido del curso
Clase 17 de 29 • Curso de Fundamentos de Apache Airflow
Contenido del curso
Jose Luis Resendiz Gutierrez
Eric Bellet
Leonardo Carvallo
Alfredo Olmedo
Leonardo Carvallo
Eric Bellet
Sabina Ixchel Garcia Mendoza
Eric Bellet
Oscar Camilo Luna Feo
Andres Insuasty
Eric Bellet
Elitsoft Chile
Jose Alejandro Rivillas
Eric Bellet
Matías Nicolás Sosa
Eric Bellet
Mario Alexander Vargas Celis
Pablo Ponte
Eric Bellet
Norberto Iván Tolaba
Royer Guerrero Pinilla
Luis Rivero
Eric Bellet
Emilio Nicolás Mendoza Patti
Eric Bellet
Jordi Trigo
Eric Bellet
Daniel Olave
Daniel ruiz
Eric Bellet
Alvaro Isaías Acuña
Eric Bellet
Solo quiero decir GRACIAS! por el excelente uso del zoom en este curso, enfocando en lo que hay que enfocarse. Además de que en otros cursos es imposible verlo en el celular porque terminas viendo una pantalla negra con unas letritas moviendose, aquí sin ningun problema.
Muchas gracias Jose, me alegra que puedas ver los vídeos sin dificultades
La cosa se pone un poco más confusa si no tienes claro en que franja horaria esta siendo planificada jaja, porque tienes tu hora local y la hora del servidor donde tengas Aiflow si corre en la nube, lo más recomendable es hacerlo en hora UTC.
Eso se arregla en configuracion, desde la planificacion de quienes son tus clientes etc
Con respecto a los intervalos, efectivamente me tomo un tiempo entenderlo, por ejempo en mi trabajo los dag procesan los datos a dia cumplido, es decir, si mi próxima ejecucion esta programada para el 24/10/22 a las 00:00:00, en la realidad esto va a ejecutar cuando termine ese día, es decir el 25/10/22 00:00:00, justamente para ingestar los datos con todo el comportamiento que tuvo durante el día 24
Exactamente, una buena manera de entenderlo también es con el scheduler interval @monthly, donde por ejemplo la ejecución del 1 de Febrero no se ejecuta hasta que sea el 1 de Marzo
Libro: Ruiter-Data Pipelines with Apache Airflow
Gracias por compartirlo ☺️
Primer ejemplo
from airflow import DAG from airflow.operators.empty import EmptyOperator from datetime import datetime with DAG(dag_id="5.2-orquestation", description="Probando la orquestacion", schedule_interval="0 7 * * 1", start_date=datetime(2022, 1, 1), end_date=datetime(2022, 6, 1)) as dag: t1 = EmptyOperator(task_id = "tarea1") t2 = EmptyOperator(task_id = "tarea2") t3 = EmptyOperator(task_id = "tarea3") t4 = EmptyOperator(task_id = "tarea4") t1 >> t2 >> t3 >> t4
Segundo ejemplo
from airflow import DAG from airflow.operators.empty import EmptyOperator from datetime import datetime with DAG(dag_id="5.3-orquestation", description="Probando la orquestacion", schedule_interval="@monthly", start_date=datetime(2022, 1, 1), end_date=datetime(2022, 6, 1)) as dag: t1 = EmptyOperator(task_id = "tarea1") t2 = EmptyOperator(task_id = "tarea2") t3 = EmptyOperator(task_id = "tarea3") t4 = EmptyOperator(task_id = "tarea4") t1 >> t2 >> t3 >> t4
yo hago un dag con fecha de enero. eso hace que se ponga al día con todos los meses que no ha corrido. pero no quiero que de un golpe haga todas las ejecuciones hasta el mes actual. Este ha sido un problema también para ustedes??
Hola Andres, es un comportamiento normal. Airflow por default ejecuta todas las ejecuciones previas a la fecha actual al mismo tiempo. Si quieres que se ejecuten una a una, tienes que usar la opción max_active_runs=1 y el depends_on_past=True. En los vídeos de orquestración explico unos ejemplos
Mi practica,
y este es el código
from airflow import DAG from airflow.operators.empty import EmptyOperator from airflow.operators.bash import BashOperator from datetime import datetime with DAG(dag_id = "5.4-orquestation-tarea", description = "Probando la orquestacion 3", schedule_interval = "0 0 * * *", #"se ejecuta cada dia a las 00:00 horas start_date = datetime(2022, 5, 1), end_date = datetime(2023, 5, 30), default_args = {"depends_on_past": True}, #max_active_runs = 1 ) as dag: # "depends_on_past" por defecto es False, por lo que pueden surgir los procesos todos los días independientemente. # "depends_on_past" hace que se ejecute solo si el anterior proceso terminó, así se realiza uno a la vez secuencial # max_active_runs, se ejecute un modulo, evitado que haya paralelismo, y cuando se termine todo le grupo, se pasa al sgte día. t1 = EmptyOperator(task_id="tarea1") t2 = BashOperator(task_id="tarea2", bash_command="sleep 2 && echo 'tarea2'") t3 = EmptyOperator(task_id="tarea3") t4 = EmptyOperator(task_id="tarea4") t5 = BashOperator(task_id="tarea5", bash_command="sleep 2 && echo 'tarea5'") t6 = EmptyOperator(task_id="tarea6") t7 = BashOperator(task_id="tarea7", bash_command="sleep 3 && echo 'tarea7'") t8 = BashOperator(task_id="tarea8", bash_command="sleep 2 && echo 'tarea8'") t1 >> [ t2, t3, t4] t3 >> [t5, t6] [t4, t5] >> t7 [t4, t7] >> t8
Cuando hago el ejemplo que acá se usa con cron, en vez de los preset, no se me corren las tareas planeadas pasadas. ¿Es nomal?
Sí es normal, eso depende de como hagas la limpieza de las tareas, y como tengas configurado del DAG
no sé si entiendo bien pero estás simulando como que pasan varios días? cómo ponés a andar la dag y se ejecutan varias veces como si fueran días o meses distintos?
Sí, cada ejecución puede ser en el intervalo de tiempo que quieras, diario, horario, mensual, etc
En la segunda etapa de "Orquestando un DAG", profundizamos en conceptos avanzados y optimizaciones para manejar tareas más complejas. Aquí exploraremos técnicas clave para escalar, depurar, y mejorar la eficiencia en el diseño y la ejecución de DAGs.
1. Definiendo Dependencias Complejas
A medida que tu flujo de trabajo crece, es posible que necesites manejar múltiples dependencias entre tareas:
Dependencias Lineales:tarea_1 >> tarea_2 >> tarea_3
Dependencias Ramificadas:[tarea_1, tarea_2] >> tarea_3 tarea_3 >> [tarea_4, tarea_5]
Configuración Dinámica de Dependencias: Si las tareas dependen de un número variable de entradas:for i in range(5): previous_task >> PythonOperator( task_id=f'tarea_{i}', python_callable=funcion_dinamica, )
2. Uso de Sensores
Los sensores son operadores especiales que esperan un evento o condición antes de continuar. Por ejemplo, esperar a que un archivo se cree:
from airflow.sensors.filesystem import FileSensor
esperar_archivo = FileSensor( task_id='esperar_archivo', filepath='/ruta/al/archivo', poke_interval=30, # Verifica cada 30 segundos timeout=600, # Expira después de 10 minutos )
3. Paralelismo y Pools
Para flujos de trabajo grandes, el paralelismo optimiza el uso de recursos:
Configurar concurrency del DAG: Limita el número máximo de tareas simultáneas en un DAG.with DAG(
'dag_con_paralelismo',
concurrency=10, # Máximo de 10 tareas a la vez
...
)
Usar Pools: Agrupa tareas para compartir recursos específicos:airflow pools set pool_name 5 "Descripción del pool" Luego, asigna el pool en las tareas:tarea_optimizada = PythonOperator( task_id='tarea_optimizada', python_callable=mi_funcion, pool='pool_name', )
4. Manejo de Errores y Retries
Es importante configurar estrategias de manejo de errores para mantener la robustez del DAG:
default_args = { 'retries': 3, # Reintenta 3 veces 'retry_delay': timedelta(minutes=5), # Espera 5 minutos entre reintentos 'on_failure_callback': mi_funcion_de_notificacion, }
Además, puedes especificar una tarea en particular que debe ejecutarse en caso de fallos:
tarea_fallida >> tarea_notificar_fallo
5. Integración con APIs y Scripts Externos
Es común ejecutar scripts o interactuar con APIs externas desde un DAG. Por ejemplo, usando BashOperator o HttpSensor:
Ejecutar un Script Bash:from airflow.operators.bash import BashOperator
tarea_bash = BashOperator( task_id='ejecutar_script', bash_command='python3 /ruta/a/mi_script.py', )
Esperar una Respuesta de API:from airflow.sensors.http import HttpSensor
esperar_api = HttpSensor( task_id='esperar_api', http_conn_id='mi_api', endpoint='/status', response_check=lambda response: response.status_code == 200, )
6. Depuración Avanzada
Para depurar errores en tareas o DAGs complejos:
Ver Logs Detallados: Usa la interfaz de Airflow o la CLI:airflow tasks logs dag_id task_id execution_date
Ejecutar Tareas en Modo Local:airflow tasks test dag_id task_id execution_date
7. Prácticas de Diseño Escalable
execution_date para manejar tareas dependientes del tiempo.Cómo haces para que los DAGs se te carguen tan rápido? cada vez que agrego uno nuevo o lo elimino para limpiar las corridas, demora al menos un minuto en aparecer.
Tengo la configuración basica del compose que indica 10s para esto creo, pero demora mucho mas.
Has probado reiniciar el servicio de Airflow?
Eso mismo me pregunté, estoy en WSL-ubuntu y me tarda así, y Airflow me consume mucha CPU y memoria,tengo 12Gb y a veces consume 7Gb, más el navegador, etc. casi ocupa toda la memoria. Vi que profe a veces ejecuta 2 DAG y tranqui, qué PC tendrá o es por qué no utiliza Windows
En caso quiera realizar un procesamiento de datos pasados, hay alguna variable que me permita saber la fecha de ejecución del dag?.
Por ejemplo, hoy es 29 de Marzo del 2025. Quiero correr un data pipeline desde el 1 de Enero de 2025. Este recibe la fecha y la utiliza para saber que datos extraer del data source y luego procesarlo. Si estoy ejecutando un 29 de Marzo ¿Cómo le digo al data pipeline que debe ir iterando desde el 1 de Enero (día por día)?
Tienes que definir un start_date 1 de Enero, en un interval @daily. La variable logical_date te indica la fecha de la ejecución
Me surge la siguiente duda, tengo el codigo configurado desde las mismas fechas que en video, pero veo que no se ejecuta desde enero a junio sino desde febrero a julio, algun crack que me explique porque ?
Hola Emilio, fijate que dice Date Interval End 2022-02-01.
El Date Interval Start sería 2022-01-01 y el Data Interval End 2022-02-01. Airflow trabaja con intervalos de tiempo.
Una duda. Al crear el DAG exactamente igual a de Eric siempre que lo ejecuto desde Airflow se salta siempre la ejecución del primer lunes que le toca. Qué puede ser?
Normalemente el cron puede ser lo que cause esto
Excelente y valiosa la explicación final de este capitulo. Lo entendí claro y preciso, ya quedó agregado a mis apuntes jejej
existe alguna forma para q en mi visaul me reconozca los atributos de los objetos y se autocimplete y que no me aparezca como si no tuviera nada importado?
Sí, puedes instalar la librería de Airflow en tu local
Como puedo conectarme a una base de datos SQL server?
Puedes hacerlo de esta manera https://airflow.apache.org/docs/apache-airflow-providers-microsoft-mssql/stable/connections/mssql.html