Tasks y Operators
Clase 6 de 29 • Curso de Fundamentos de Apache Airflow
Contenido del curso
Clase 6 de 29 • Curso de Fundamentos de Apache Airflow
Contenido del curso
Royer Guerrero Pinilla
Eric Bellet
Víctor Mazo
Lucas Gonzalez
Mario Alexander Vargas Celis
Silvia Veronica Noriega
Jhony Urbano Diaz Quispe
Alfonso Andres Zapata Guzman
Camilo Corredor
🏗️ Architecture
🧰 Tech Stack
Gracias por compartirlo!
En resumen, una task es la acción o el paso que deseamos realizar, mientras que el operator dictamina la forma de realizar dicha acción, así por ejemplo: Deseo mostrar un 'hola mundo' (task), y la forma en que lo podría hacer sería mediante un Bash Operator (valga la redundancia, el operator).
Un DAG está compuesto por tareas:
En Apache Airflow, tasks y operators son conceptos clave para construir y ejecutar flujos de trabajo.
Tasks (Tareas)
Una task es una unidad individual de trabajo dentro de un DAG (Directed Acyclic Graph). Cada tarea representa una operación específica que se ejecuta como parte del flujo de trabajo. Las tareas son instancias de operadores, y juntas conforman las actividades que ocurren en un DAG.
Características de una Task
retries=3success, failed, running, skipped.Operators (Operadores)
Un operator es una plantilla predefinida en Airflow que define lo que hace una tarea. Los operadores proporcionan la lógica para ejecutar una acción específica, como ejecutar un script de Python, interactuar con una API o copiar datos entre bases de datos.
Tipos de Operadores
BashOperator: Ejecuta comandos Bash.PythonOperator: Ejecuta funciones de Python.S3ToGCSOperator: Copia datos de S3 a Google Cloud Storage.MySqlToPostgresOperator: Transfiere datos entre bases de datos.S3KeySensor: Espera a que un archivo específico esté disponible en S3.HttpSensor: Verifica que una URL esté activa.BaseOperator.Ejemplo de Operators en uso
BashOperator
Ejecuta comandos en el sistema operativo:
from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime
dag = DAG( 'bash_example', schedule_interval='@daily', start_date=datetime(2023, 1, 1), catchup=False, )
bash_task = BashOperator( task_id='print_date', bash_command='date', dag=dag, )
PythonOperator
Ejecuta funciones en Python:
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime
def my_python_function(): print("Hello from Python!")
dag = DAG( 'python_example', schedule_interval='@daily', start_date=datetime(2023, 1, 1), catchup=False, )
python_task = PythonOperator( task_id='run_python', python_callable=my_python_function, dag=dag, )
Sensor
Espera a que un archivo exista en un sistema S3:
from airflow import DAG from airflow.providers.amazon.aws.sensors.s3_key import S3KeySensor from datetime import datetime
dag = DAG( 'sensor_example', schedule_interval='@daily', start_date=datetime(2023, 1, 1), catchup=False, )
s3_sensor = S3KeySensor( task_id='wait_for_file', bucket_name='my-bucket', bucket_key='path/to/file.csv', aws_conn_id='my_aws_connection', dag=dag, )
Relación entre Tasks y Operators
Por ejemplo:
bash_task = BashOperator( task_id='show_date', bash_command='date', dag=dag, )
En este caso:
BashOperator define cómo ejecutar un comando Bash.bash_task es la tarea específica que ejecuta el comando date.Configuración Avanzada de Tasks
Dependencias:task1 >> task2 # task2 se ejecuta después de task1 task3 << task1 # task1 se ejecuta antes de task3
Propiedades comunes:
retry_delay: Tiempo entre reintentos.timeout: Límite de tiempo para completar la tarea.execution_timeout: Tiempo máximo permitido para que la tarea se ejecute.Ejemplo con varias dependencias:start = DummyOperator(task_id='start', dag=dag) process = PythonOperator(task_id='process', python_callable=my_function, dag=dag) end = DummyOperator(task_id='end', dag=dag)
start >> process >> end
Resumen
Èse codigo python me quedo dando vueltas en la cabeza.. nunca lo habia visto asi, hasta ahora... que es eso de callable?
en python puedes pasar una funcion como argumento y es lo que estan haciendo
En python la funciones son pasajeros/ciudadanos de primera clase.
Por el contrario es muy normal pasar funciones a otras funciones, ya sea funciones que creamos nosotros y se la pasamos a funciones de librerias que exportamos. Como funciones exportadas de una libreria a otra funcion exportada de otra libreria distinta. En los cursos de python aca en platzi se trabaja un modulo creando funciones y pasandolas a otras funciones que creamos, viendo alcance del scope, etc, y en cursos de la ruta de ciencia de datos se ve varias veces como pasamos funciones de sklearn a otras funciones de sklearn, o una que es muy representativa, pasar de sklearn a imblearn, no hay que saltarse los cursos de la ruta, al comienzo del curso se indica que se requiere python.
~ Por cierto, ya conectamos en LinkedIn? ~
Conectemos en LinkedIn, GitHub, Medium o Redes sociales
El decorador @task en Apache Airflow se utiliza para marcar funciones como tareas dentro de un DAG (Directed Acyclic Graph). Esto permite que Airflow las trate como tareas independientes en el flujo de trabajo. Al usar @task, se simplifica la creación de tareas, ya que no es necesario definir explícitamente un operador. Además, proporciona una forma más clara y concisa de gestionar la lógica de las tareas en el flujo de trabajo. Es parte de la funcionalidad de "TaskFlow", que facilita la orquestación de procesos en Airflow.