No tienes acceso a esta clase

¡Continúa aprendiendo! Únete y comienza a potenciar tu carrera

Orquestando un DAG I

16/29
Recursos

Aportes 6

Preguntas 4

Ordenar por:

¿Quieres ver más aportes, preguntas y respuestas de la comunidad?

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

with DAG(dag_id="5.1-orquestacion",
         description="Probando la orquestacion",
         schedule_interval="@daily",
         start_date=datetime(2022, 5, 1),
         end_date=datetime(2023, 6, 1),
         default_args={"depends_on_past": True},
         max_active_runs=1) as dag:

    t1 = BashOperator(task_id="tarea1",
                      bash_command="sleep 2 && echo 'Tarea 1'")

    t2 = BashOperator(task_id="tarea2",
                      bash_command="sleep 2 && echo 'Tarea 2'")

    t3 = BashOperator(task_id="tarea3",
                      bash_command="sleep 2 && echo 'Tarea 3'")

    t4 = BashOperator(task_id="tarea4",
                      bash_command="sleep 2 && echo 'Tarea 4'")

    t1 >> t2 >> [t3,t4]

Para aquellas personas a las que solo se ejecuto una vez y no todo el mes como en la clase. Esto es debido a que posiblemente pusieron la fecha de inicio al día actual, ejecutaron y luego lo cambiaron a una fecha más atrás para hacer la simulación de tareas secuenciales por día.

La única forma en la que se me resolvío fue al darle al botón de eliminar, no estoy completamente seguro, pero parece ser que si ya ejecutaste un día, Airflow no va a ejecutar días anteriores, aunque el start_date y end_date esten correctos, sino hasta que se haya limpiado el registro de ese DAG.

Crontab es una herramienta muy útil de Linux, permite automatizar tareas en tu PC fácilmente.
Por ejemplo, hacer copias de seguridad, actualizar repositorios mediante un pull etc. La imaginación es el límite.
En este repositorio está un script de Python para ordenar los archivos de un directorio en concreto (por ejemplo Downloads, donde siempre va a parar todo si no eres ordenado), genera logs de los archivos que se movieron y hacia donde con fecha y hora. Todo con base en la extensión de los archivos ~

Airflow por defecto usa UTC como zona horaria, a menos que se le cambie directamente en el archivo de configuración la zona horaria por defecto a utilizar.

Una forma de manejar zonas horarias es con Python de la siguiente forma:

from datetime import datetime, timedelta, timezone

TIMEZONE = timezone(timedelta(hours=-6))  # UTC-6

with DAG(dag_id='orquestation',
         start_date=datetime(2023, 1, 1, tzinfo=TIMEZONE),
         end_date=datetime(2023, 2, 1, tzinfo=TIMEZONE),
         schedule_interval='@daily') as dag:

	...

También podría utilizarse la libreria pytz para hacerlo más intuitivo pero puede ser logrado sin instalar nada.

Para limpiar también es útil seleccionar en la sección Task Actions las opciones Past y Future, luego clickear en Clear.

Con esto borramos las tareas pasadas y futuras. Recomiendo primero detener la ejecución del DAG.

Usualmente Airflow está en UTC entonces cuando se crea un crontab y quieren que se ejecute en su hora local deben hacer el cambio horario por ejemplo:

Si se quiere ejecutar una tarea a las 7 de Colombia en el schedule interval se deberá poner que se ejecute a las 11