No tienes acceso a esta clase

¡Continúa aprendiendo! Únete y comienza a potenciar tu carrera

Convierte tus certificados en títulos universitarios en USA

Antes: $249

Currency
$209

Paga en 4 cuotas sin intereses

Paga en 4 cuotas sin intereses
Suscríbete

Termina en:

18 Días
9 Hrs
20 Min
50 Seg

Tasks y Operators

6/29
Recursos

Aportes 5

Preguntas 0

Ordenar por:

¿Quieres ver más aportes, preguntas y respuestas de la comunidad?

🏗️ Architecture

🧰 Tech Stack

  • 🌿 Celery - Airflow Workers
  • 🍄 Redis - Queue
  • 💽 SQLite - Airflow Metastore (Really se puede cambiar por cualquier base de datos relacional)
En resumen, una task es la acción o el paso que deseamos realizar, mientras que el operator dictamina la forma de realizar dicha acción, así por ejemplo: Deseo mostrar un 'hola mundo' (task), y la forma en que lo podría hacer sería mediante un Bash Operator (valga la redundancia, el operator).

Un DAG está compuesto por tareas:

  • Operators.
  • Sensors.
  • TaskFlow-decorated@task.
    Entre los operators podemos utilizar( entre otros) BashOperators o PythonOperators, algunas veces se pueden utilizar ambos

Èse codigo python me quedo dando vueltas en la cabeza… nunca lo habia visto asi, hasta ahora… que es eso de callable?

En **Apache Airflow**, **tasks** y **operators** son conceptos clave para construir y ejecutar flujos de trabajo. ## **Tasks (Tareas)** Una **task** es una unidad individual de trabajo dentro de un DAG (*Directed Acyclic Graph*). Cada tarea representa una operación específica que se ejecuta como parte del flujo de trabajo. Las tareas son instancias de operadores, y juntas conforman las actividades que ocurren en un DAG. ### **Características de una Task** 1. **Independencia**: Cada tarea es independiente y realiza una acción específica. 2. **Dependencias**: Las tareas pueden depender unas de otras para garantizar que se ejecuten en el orden correcto. 3. **Configuración de reintentos**: * Puedes configurar cuántas veces intentará ejecutarse una tarea si falla. * Ejemplo: `retries=3` 4. **Estado de ejecución**: * Los estados posibles incluyen: `success`, `failed`, `running`, `skipped`. ## **Operators (Operadores)** Un **operator** es una plantilla predefinida en Airflow que define lo que hace una tarea. Los operadores proporcionan la lógica para ejecutar una acción específica, como ejecutar un script de Python, interactuar con una API o copiar datos entre bases de datos. ### **Tipos de Operadores** 1. **Operadores de acción**: * Ejecutan una acción concreta, como un script o comando. * Ejemplos: * `BashOperator`: Ejecuta comandos Bash. * `PythonOperator`: Ejecuta funciones de Python. 2. **Operadores de transferencia**: * Manejan transferencias de datos entre sistemas. * Ejemplos: * `S3ToGCSOperator`: Copia datos de S3 a Google Cloud Storage. * `MySqlToPostgresOperator`: Transfiere datos entre bases de datos. 3. **Operadores de sensores**: * Esperan a que ocurra un evento antes de continuar. * Ejemplos: * `S3KeySensor`: Espera a que un archivo específico esté disponible en S3. * `HttpSensor`: Verifica que una URL esté activa. 4. **Operadores personalizados**: * Puedes crear operadores personalizados mediante herencia de clases base como `BaseOperator`. ### **Ejemplo de Operators en uso** #### **BashOperator** Ejecuta comandos en el sistema operativo: from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime dag = DAG( 'bash\_example', schedule\_interval='@daily', start\_date=datetime(2023, 1, 1), catchup=False, ) bash\_task = BashOperator( task\_id='print\_date', bash\_command='date', dag=dag, ) #### **PythonOperator** Ejecuta funciones en Python: from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def my\_python\_function(): print("Hello from Python!") dag = DAG( 'python\_example', schedule\_interval='@daily', start\_date=datetime(2023, 1, 1), catchup=False, ) python\_task = PythonOperator( task\_id='run\_python', python\_callable=my\_python\_function, dag=dag, ) #### **Sensor** Espera a que un archivo exista en un sistema S3: from airflow import DAG from airflow.providers.amazon.aws.sensors.s3\_key import S3KeySensor from datetime import datetime dag = DAG( 'sensor\_example', schedule\_interval='@daily', start\_date=datetime(2023, 1, 1), catchup=False, ) s3\_sensor = S3KeySensor( task\_id='wait\_for\_file', bucket\_name='my-bucket', bucket\_key='path/to/file.csv', aws\_conn\_id='my\_aws\_connection', dag=dag, ) ## **Relación entre Tasks y Operators** * Una **task** es una instancia de un **operator**. * Un **operator** define la lógica de ejecución (qué hace). * Una **task** utiliza esa lógica y la aplica en un DAG específico (cómo y cuándo lo hace). Por ejemplo: bash\_task = BashOperator( task\_id='show\_date', bash\_command='date', dag=dag, ) En este caso: * `BashOperator` define cómo ejecutar un comando Bash. * `bash_task` es la tarea específica que ejecuta el comando `date`. ## **Configuración Avanzada de Tasks** 1. **Dependencias**:task1 >> task2 # task2 se ejecuta después de task1 task3 << task1 # task1 se ejecuta antes de task3 * Se configuran para controlar el orden de ejecución. 2. **Propiedades comunes**: * `retry_delay`: Tiempo entre reintentos. * `timeout`: Límite de tiempo para completar la tarea. * `execution_timeout`: Tiempo máximo permitido para que la tarea se ejecute. 3. **Ejemplo con varias dependencias**:start = DummyOperator(task\_id='start', dag=dag) process = PythonOperator(task\_id='process', python\_callable=my\_function, dag=dag) end = DummyOperator(task\_id='end', dag=dag) start >> process >> end ### **Resumen** * Las **tasks** son las actividades que ejecuta un DAG. * Los **operators** son plantillas predefinidas que implementan la lógica de ejecución de las tareas. * Juntos, permiten crear flujos de trabajo dinámicos, reutilizables y escalables en Airflow.