You don't have access to this class

Keep learning! Join and start boosting your career

Aprovecha el precio especial y haz tu profesión a prueba de IA

Antes: $249

Currency
$209
Suscríbete

Termina en:

0 Días
7 Hrs
47 Min
49 Seg

Tasks y Operators

6/29
Resources

Contributions 6

Questions 0

Sort by:

Want to see more contributions, questions and answers from the community?

🏗️ Architecture

🧰 Tech Stack

  • 🌿 Celery - Airflow Workers
  • 🍄 Redis - Queue
  • 💽 SQLite - Airflow Metastore (Really se puede cambiar por cualquier base de datos relacional)
En resumen, una task es la acción o el paso que deseamos realizar, mientras que el operator dictamina la forma de realizar dicha acción, así por ejemplo: Deseo mostrar un 'hola mundo' (task), y la forma en que lo podría hacer sería mediante un Bash Operator (valga la redundancia, el operator).

Un DAG está compuesto por tareas:

  • Operators.
  • Sensors.
  • TaskFlow-decorated@task.
    Entre los operators podemos utilizar( entre otros) BashOperators o PythonOperators, algunas veces se pueden utilizar ambos

Èse codigo python me quedo dando vueltas en la cabeza… nunca lo habia visto asi, hasta ahora… que es eso de callable?

El decorador `@task` en Apache Airflow se utiliza para marcar funciones como tareas dentro de un DAG (Directed Acyclic Graph). Esto permite que Airflow las trate como tareas independientes en el flujo de trabajo. Al usar `@task`, se simplifica la creación de tareas, ya que no es necesario definir explícitamente un operador. Además, proporciona una forma más clara y concisa de gestionar la lógica de las tareas en el flujo de trabajo. Es parte de la funcionalidad de "TaskFlow", que facilita la orquestación de procesos en Airflow.
En **Apache Airflow**, **tasks** y **operators** son conceptos clave para construir y ejecutar flujos de trabajo. ## **Tasks (Tareas)** Una **task** es una unidad individual de trabajo dentro de un DAG (*Directed Acyclic Graph*). Cada tarea representa una operación específica que se ejecuta como parte del flujo de trabajo. Las tareas son instancias de operadores, y juntas conforman las actividades que ocurren en un DAG. ### **Características de una Task** 1. **Independencia**: Cada tarea es independiente y realiza una acción específica. 2. **Dependencias**: Las tareas pueden depender unas de otras para garantizar que se ejecuten en el orden correcto. 3. **Configuración de reintentos**: * Puedes configurar cuántas veces intentará ejecutarse una tarea si falla. * Ejemplo: `retries=3` 4. **Estado de ejecución**: * Los estados posibles incluyen: `success`, `failed`, `running`, `skipped`. ## **Operators (Operadores)** Un **operator** es una plantilla predefinida en Airflow que define lo que hace una tarea. Los operadores proporcionan la lógica para ejecutar una acción específica, como ejecutar un script de Python, interactuar con una API o copiar datos entre bases de datos. ### **Tipos de Operadores** 1. **Operadores de acción**: * Ejecutan una acción concreta, como un script o comando. * Ejemplos: * `BashOperator`: Ejecuta comandos Bash. * `PythonOperator`: Ejecuta funciones de Python. 2. **Operadores de transferencia**: * Manejan transferencias de datos entre sistemas. * Ejemplos: * `S3ToGCSOperator`: Copia datos de S3 a Google Cloud Storage. * `MySqlToPostgresOperator`: Transfiere datos entre bases de datos. 3. **Operadores de sensores**: * Esperan a que ocurra un evento antes de continuar. * Ejemplos: * `S3KeySensor`: Espera a que un archivo específico esté disponible en S3. * `HttpSensor`: Verifica que una URL esté activa. 4. **Operadores personalizados**: * Puedes crear operadores personalizados mediante herencia de clases base como `BaseOperator`. ### **Ejemplo de Operators en uso** #### **BashOperator** Ejecuta comandos en el sistema operativo: from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime dag = DAG( 'bash\_example', schedule\_interval='@daily', start\_date=datetime(2023, 1, 1), catchup=False, ) bash\_task = BashOperator( task\_id='print\_date', bash\_command='date', dag=dag, ) #### **PythonOperator** Ejecuta funciones en Python: from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def my\_python\_function(): print("Hello from Python!") dag = DAG( 'python\_example', schedule\_interval='@daily', start\_date=datetime(2023, 1, 1), catchup=False, ) python\_task = PythonOperator( task\_id='run\_python', python\_callable=my\_python\_function, dag=dag, ) #### **Sensor** Espera a que un archivo exista en un sistema S3: from airflow import DAG from airflow.providers.amazon.aws.sensors.s3\_key import S3KeySensor from datetime import datetime dag = DAG( 'sensor\_example', schedule\_interval='@daily', start\_date=datetime(2023, 1, 1), catchup=False, ) s3\_sensor = S3KeySensor( task\_id='wait\_for\_file', bucket\_name='my-bucket', bucket\_key='path/to/file.csv', aws\_conn\_id='my\_aws\_connection', dag=dag, ) ## **Relación entre Tasks y Operators** * Una **task** es una instancia de un **operator**. * Un **operator** define la lógica de ejecución (qué hace). * Una **task** utiliza esa lógica y la aplica en un DAG específico (cómo y cuándo lo hace). Por ejemplo: bash\_task = BashOperator( task\_id='show\_date', bash\_command='date', dag=dag, ) En este caso: * `BashOperator` define cómo ejecutar un comando Bash. * `bash_task` es la tarea específica que ejecuta el comando `date`. ## **Configuración Avanzada de Tasks** 1. **Dependencias**:task1 >> task2 # task2 se ejecuta después de task1 task3 << task1 # task1 se ejecuta antes de task3 * Se configuran para controlar el orden de ejecución. 2. **Propiedades comunes**: * `retry_delay`: Tiempo entre reintentos. * `timeout`: Límite de tiempo para completar la tarea. * `execution_timeout`: Tiempo máximo permitido para que la tarea se ejecute. 3. **Ejemplo con varias dependencias**:start = DummyOperator(task\_id='start', dag=dag) process = PythonOperator(task\_id='process', python\_callable=my\_function, dag=dag) end = DummyOperator(task\_id='end', dag=dag) start >> process >> end ### **Resumen** * Las **tasks** son las actividades que ejecuta un DAG. * Los **operators** son plantillas predefinidas que implementan la lógica de ejecución de las tareas. * Juntos, permiten crear flujos de trabajo dinámicos, reutilizables y escalables en Airflow.