Orquestando un DAG I
Clase 16 de 29 • Curso de Fundamentos de Apache Airflow
Contenido del curso
Clase 16 de 29 • Curso de Fundamentos de Apache Airflow
Contenido del curso
Héctor Arturo Can Bacab
Marvin Alvarenga
Stanley Melgar
Oscar Camilo Luna Feo
Santiago Zuluaga Saldarriaga
Norberto Iván Tolaba
Gerardo Mayel Fernández Alamilla
Eric Bellet
Royer Guerrero Pinilla
sebastian felipe herrera sanchez
sebastian felipe herrera sanchez
Eric Bellet
José Daniel López Ramírez
Marvin Alvarenga
Eric Bellet
Carlos Plaza
Eric Bellet
Mario Alexander Vargas Celis
Carlos Perilla
Julián Esteban Oliveros Forero
Para aquellas personas a las que solo se ejecuto una vez y no todo el mes como en la clase. Esto es debido a que posiblemente pusieron la fecha de inicio al día actual, ejecutaron y luego lo cambiaron a una fecha más atrás para hacer la simulación de tareas secuenciales por día.
La única forma en la que se me resolvío fue al darle al botón de eliminar, no estoy completamente seguro, pero parece ser que si ya ejecutaste un día, Airflow no va a ejecutar días anteriores, aunque el start_date y end_date esten correctos, sino hasta que se haya limpiado el registro de ese DAG.
Airflow por defecto usa UTC como zona horaria, a menos que se le cambie directamente en el archivo de configuración la zona horaria por defecto a utilizar.
Una forma de manejar zonas horarias es con Python de la siguiente forma:
from datetime import datetime, timedelta, timezone TIMEZONE = timezone(timedelta(hours=-6)) # UTC-6 with DAG(dag_id='orquestation', start_date=datetime(2023, 1, 1, tzinfo=TIMEZONE), end_date=datetime(2023, 2, 1, tzinfo=TIMEZONE), schedule_interval='@daily') as dag: ...
También podría utilizarse la libreria pytz para hacerlo más intuitivo pero puede ser logrado sin instalar nada.
Crontab es una herramienta muy útil de Linux, permite automatizar tareas en tu PC fácilmente.
Por ejemplo, hacer copias de seguridad, actualizar repositorios mediante un pull etc. La imaginación es el límite.
En este repositorio está un script de Python para ordenar los archivos de un directorio en concreto (por ejemplo Downloads, donde siempre va a parar todo si no eres ordenado), genera logs de los archivos que se movieron y hacia donde con fecha y hora. Todo con base en la extensión de los archivos ~
from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime with DAG(dag_id="5.1-orquestacion", description="Probando la orquestacion", schedule_interval="@daily", start_date=datetime(2022, 5, 1), end_date=datetime(2023, 6, 1), default_args={"depends_on_past": True}, max_active_runs=1) as dag: t1 = BashOperator(task_id="tarea1", bash_command="sleep 2 && echo 'Tarea 1'") t2 = BashOperator(task_id="tarea2", bash_command="sleep 2 && echo 'Tarea 2'") t3 = BashOperator(task_id="tarea3", bash_command="sleep 2 && echo 'Tarea 3'") t4 = BashOperator(task_id="tarea4", bash_command="sleep 2 && echo 'Tarea 4'") t1 >> t2 >> [t3,t4]
Por qué a el le funciona en la clase sin el uso del default_args={"depends_on_past": True}? En mi caso sí no se especifica ese argumento falla.
Notar que este código está para un año.
No entendi por qué se simuló todo el mes? si lo parámetros dicen un mes pero no transcurrió
¿En qué minuto del video? Según entiendo la pregunta, Airflow ejecuta cuando el final del intervalo ha llegado. Por ejemplo en un intervalo mensual, el mes de Febrero se va a ejecutar cuando ya sea Marzo
Eso pasa porque Airflow detecta si la fecha de inicio es mayor que a la actual entonces el ejecuta todas las tareas que no fueron ejecutadas si te das cuenta el lo esta ejecutando el día 02-08-2022 y el start_date del dag esta dede una fecha menor el 01-06-2022, si deseas evitar ese comportamiento debes usar el parámetro catchup al momento de inicializar el DAG
El argumento depends_on_past=True se aplica a cada tarea individualmente y no al DAG completo. Esto significa que cada tarea esperará a que su ejecución anterior se complete antes de comenzar, pero no necesariamente esperará a que todas las tareas del DAG se completen.
Ejemplo:
tarea1 del día 2 no comenzará hasta que tarea1 del día 1 se haya completado con éxito.
tarea2 del día 2 no comenzará hasta que tarea1 del día 2 se haya completado con éxito.
Tengo una pregunta, ¿Como tengo que organizar mis archivos a la hora de empezar un proyecto en airflow?
Realmente depende del proyecto o empresa en la que estés trabajando. En mi empresa organizamos los dags por carpetas relacionadas a cada proyecto, y el nombre de cada DAG sigue una sintaxis interna
@sebastian, Esto tal vez te ayude:
En ese caso que habían ejecuciones pendientes, cual sería un ejemplo real o más concreto del por qué no quisieramos que se ejecutaran en paralelo y de golpe las pendientes? Gracias.
Normalemente si tienes un proceso que depende del resultado del anterior. Por ejemplo, de los datos que puede generar
Por que se ejecutan varias tareas si el parametro schedule_interval esta dado como "@daily". No deberia ser una sola ejecucion? O es tal vez porque los parametros de fecha estan dados como start_date=1/may/2022 y end_date =1/jun/2022, pero la ejecucion del ejercicio se lo esta realizando el 22/ago/2022. Es decir airflow estaria como ejecutando un conjunto de tareas de dias acumulados, y es por eso que parecia que todo se realizada el mismo dia?
Exacto, por defecto ejecuta todos los días acumulados al mismo tiempo. Si no quieres eso, puedes utilizar el max_active_runs y el depends_on_past
Un DAG (Directed Acyclic Graph) en el contexto de la orquestación de tareas (como Apache Airflow) es una estructura que define la secuencia y las dependencias entre las tareas que se ejecutan como parte de un flujo de trabajo.
Para limpiar también es útil seleccionar en la sección Task Actions las opciones Past y Future, luego clickear en Clear.
Con esto borramos las tareas pasadas y futuras. Recomiendo primero detener la ejecución del DAG.
Usualmente Airflow está en UTC entonces cuando se crea un crontab y quieren que se ejecute en su hora local deben hacer el cambio horario por ejemplo:
Si se quiere ejecutar una tarea a las 7 de Colombia en el schedule interval se deberá poner que se ejecute a las 11