Cuando pensamos en la ejecución de tareas largas, repetitivas y con un margen de error humano alto, inmediatamente se nos viene a la menta “automatización”, nadie quiere tener que realizar este tipo de tareas una y otra vez, cuando podría delegarlas a unas líneas de código y utilizar el tiempo en algo que genere mayor valor, además, ¿no es más divertido automatizar una tarea que ejecutarla por nuestra cuenta?
Cada vez más personas y compañías están pasando a una modelo de data-driven y en sus tareas diarias empiezan a tomar mayor importancia el desarrollando tuberías de datos o data pipelines. Apache Airflow es un marco de trabajo para la creación de data pipelines orquestados que facilita la ejecución de tareas repetitivas y la integración con múltiples herramientas y plataformas de cloud en un sólo entorno y utilizando python con lenguaje de programación.
A lo largo de este tutorial se estará haciendo referencia a contenido del libro Data Pipelines with Apache Airflow - Bas Harenslak, Julian de Ruiter el cual se encuentra en inglés, por eso las imágenes estarán en inglés.
Una tubería de datos, mejor conocido el airflow como DAG, generalmente consiste en múltiples tareas o acciones que necesitan ser ejecutadas para conseguir el resultado esperado. Por ejemplo, imaginemos que se tiene que actualizar un dashboard sobre las condiciones del clima, nuestro dag se podría distribuir en las siguientes tareas.
Las tareas deberán ejecutarse en ese orden por obvias razones, no se puede limpiar una información que no se ha terminado de obtener ni se puede actualizar sin tener la información lista. Un diagrama a gran escala se vería de la siguiente manera.
Cuando se pasan estas tareas a Airflow, se tendría algo así
Se puede notar que las fechas de dependencia van en una dirección y así es como siempre irán, de debe tener en cuenta que una tarea no se va a ejecutar hasta que sus dependencias sean cumplidas, por lo que una doble dependencia o tener un ciclo en nuestro flujo no sería correcto ya que no se ejecutaría.
Al momento de irse ejecutando las tareas, se puede ir viendo este avance desde la interfaz gráfica de la siguiente manera.
La interfaz gráfica provee otros datos sobre la ejecución de nuestros dags, como el nombre de los flujos de trabajo, la fecha de ejecución o programación y el estado de las tareas que se han ejecutado. Un flujo de trabajo se puede ejecutar desde el icono de “play” y es necesario que esté activado, en el slice a la izquierda del nombre del flujo.
Para irnos directamente al código debemos tener un par de cosas en cuenta:
Para iniciar a crear nuestros dags, nos situaremos dentro de la carpeta de dags donde se creará el archivo dag.py
. Hacemos las importaciones necesarias y declaramos el dag.
from airflow import DAG
from datetime import datetime
with DAG(
dag_id="mi_primer_dag",
start_date=datetime.now(),
schedule_interval=None
) as dag:
Partiendo de este dag, todas las declaraciones de tareas o dependencias se harán dentro de esta sentencia with, es decir, se debe identar el código o también se podría declarar de la siguiente manera y declarar un parámetro dag=dag
en las tareas que se declaren.
dag = DAG(
dag_id="mi_primer_dag",
start_date=datetime.now(),
schedule_interval=None
)
el schedule_interval
de nuestro dag puede ser tan sencillo o complejo como se requiera, a continuación, se muestra las diferentes posibilidades para su declaración.
# en casos muy generales"@daily"# todos los días a las 00:00"@hourly"# cada hora a los 00 minutos# con necesidades más específicas de minutos, horas y/o días"0 15 * * *"# todos los días a las 3"0 15,17 * * *"# todos los días a las 3 y a las 5"0 15-17 * * *"# todos los días, cada hora desde las 3 hasta las 5"0 15 * * SAT"# todos los sábados a las 3"0 15 * * SAT,MON"# todos los sábados y lunes a las 3"0 15 1 * *"# todos los primeros de cada mes a las 3# para casos menos comunes y más particularesimport datetime as dt
dt.timedelta(days=2) # cada dos días
dt.timedelta(hours=5) # cada cinco horas
tener en cuenta que cada posición representa algo muy especifico
para empezar a crear las tareas que tendrá el dag se debe considerar que pueden ser de dos tipos, operadores y sensores. Los primeros ejecutan una acción en cuanto sus dependencias son resueltas, mientras que los sensores se quedan esperando por un evento para poder ejecutarse, se puede decir que es un operador con una dependencia externa la cual revisa con cierta periodicidad para comprobar que se haya cumplido.
Pero a todas estas, ¿Qué es un operador? es tan simple como ejecutar una función, los operadores más comunes serian: PythonOperator
, BashOperator
, BranchPythonOperator
y DummyOperator
.
Al declarar una tarea de nuestro dag se asemeja a mandar a llamar una función con el respectivo nombre del operador o sensor, por ejemplo, así se declararía una tarea con PythonOperator.
from airflow.operators.python_operator import PythonOperator
deffunction_task_1():
...
task1 = PythonOperator(
task_id="task_1", # debe ser único dentro del dag
python_callable=function_task_1, # función que se ejecutará
dag=dag # se identifica el dag al cual pertenece esta tarea
)
al momento de ejecutarse la tarea con id "task_1"
se ejecutará la función function_task_1
, en este punto se podría pensar ¿y si necesito mandarle parámetros a la función? en ese caso se tendría que hacer con otros parámetros del PythonOperator op_args
y op_kwargs
para enviar argumentos o argumentos nombrados respectivamente.
Con este operador es posible ejecutar comandos en la terminal, y así se vería un ejemplo con este operador
from airflow.operators.bash import BashOperator
task2 = BashOperator(
task_id="task_2", # debe ser único dentro del dag
bash_command=(
"mkdir -p /data && "# se crea el directorio data"curl -o /data/events.json "# se descarga en el archivo json,"https:/ /localhost:5000/events"# el resultado de la petición
)
dag=dag # se identifica el dag al cual pertenece esta tarea
)
Este operador es la excepción a la definición de que un operador es ejecutar una acción, ya que en esencia no hace nada, sin embargo, puede ser muy útil como conector entre tareas o demás, se declararía de la siguiente manera
from airflow.operators.dummy import DummyOperator
task3 = DummyOperator(
task_id="task_3",
dag=dag # se identifica el dag al cual pertenece esta tarea
)
La diferencia de un BranchPythonOperator con el PythonOperator es en esencia a nivel de dependencias, tema del que no se ha hablado ¿Cómo declaramos dependencias? ¿Cómo puedo decir que una tarea va antes o después de otra, pues con los signos >>
o <<
Considerando las 3 tareas que se crearon con posterioridad se va a establecer que primero se ejecuta task_1
, luego task_2
y por último task_3
, quedaría así.
task1 >> task2 >> task3
Si por otro lado se quisiera establecer que tanto task_2
y task_3
se podrían ejecutar cuando task_1
termine, se tendría así:
task1 >> [task2, task3]
Y si ahora quisiera que cumpliendo cierta condición se ejecutara solo task_2
o solo task_3
¿cómo sería? es aquí donde entra el BranchPythonOperator, se reescribe task_1
de la siguiente manera
from airflow.operators.python import BranchPythonOperator
deffunction_task_1():if condition_1:
...
return"task_2"elif condition_2:
...
return"task_3"else:
...
return ["task_2", "task_3"]
task1 = PythonOperator(
task_id="task_1", # debe ser único dentro del dag
python_callable=function_task_1, # función que se ejecutará
dag=dag # se identifica el dag al cual pertenece esta tarea
)
task1 >> [task2, task3]
La función debe retornar el id de la tarea que se va a ejecutar o una lista de ids, en la función anterior se tiene que de cumplir una condición 1, la tarea a ejecutarse sería task_2
, en caso de cumplir una condición 2, la tarea a ejecutarse sería task_3
y en caso contrario se ejecutaría tanto task_2
como task_3
.
Esto fue apenas un pequeño abrebocas de lo que se puede hacer con airflow, piensa en ¿Y si no necesariamente necesito que se cumplan todas mis dependencias? ¿Y si quiero ejecutar una tarea en la nube?
Hay una gran variedad de operadores en airflow que aumentan las posibilidades de los flujos que se pueden crear, enviar correo, subir archivos a la nube, ejecutar otro dag de mi proyecto, etc. En la documentación de airflow se puede consultar todos los providers con los que se podría trabajar y las diferentes cosas que se podría hacer.
Y como si fuera poco, todo lo que se pueda programar en python, fácilmente se puede convertir en un custom Operator o un custom Sensor. ¿Qué tal te parece crear tus propios operadores y aportar a la comunidad de airflow?
¡Nunca pares de aprender! 💚
Simplemente excelente, gracias por tomarte el tiempo de compartir este conocimiento
wow amigo excelente informacion, gracias por compartir muy bien explicado si pudieras subir algunos ejemplos de algunos operadores …eres bueno explicando y los colores que colocas para resaltar se hace dinamico y enfoca el flujo del codigo …pasa tu correo amigo…paso el mio [email protected]