No tienes acceso a esta clase

¡Continúa aprendiendo! Únete y comienza a potenciar tu carrera

FileSensor

23/29
Recursos

Aportes 9

Preguntas 2

Ordenar por:

¿Quieres ver más aportes, preguntas y respuestas de la comunidad?

Mirando la documentación de airflow encontré lo siguiente:

mode: How the sensor operates. There are two types of modes:
poke: This is the default mode. When using poke, the sensor occupies a worker slot for the entire execution time and sleeps between pokes. This mode is best if you expect a short runtime for the sensor.
reschedule: When using this mode, if the criteria is not met then the sensor releases its worker slot and reschedules the next check for a later time. This mode is best if you expect a long runtime for the sensor, because it is less resource intensive and frees up workers for other tasks.

Es decir que, poke lo utilizaría cuando espero que el sensor se active de forma rápida, ya que tendría al worker asignado y la respuesta sería mucho más veloz. En cambio utilizaría reschedule si se que la respuesta al sensor tardará mas en llegar, en cuyo caso liberaría al worker y lo asignaría mas adelante.

from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.filesystem import FileSensor


with DAG(dag_id="7.3-filesensor",
    description="FileSensor",
    schedule_interval="@daily",
    start_date=datetime(2022, 8, 20),
    end_date=datetime(2022, 8, 25),
    max_active_runs=1
) as dag:

    t1 = BashOperator(task_id="creating_file",
					  bash_command="sleep 10 && touch /tmp/file.txt")

    t2 = FileSensor(task_id="waiting_file",
					  filepath="/tmp/file.txt")

    t3 = BashOperator(task_id="end_task",
					  bash_command="echo 'El fichero ha llegado'")

    t1 >> t2 >> t3

Comparto mi ejemplo imprimiendo el contenido del archivo en la última task ejecutada.

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.filesystem import FileSensor

from datetime import datetime


with DAG(dag_id="filesensor",
         description="FileSensor",
         schedule_interval="@daily",
         start_date=datetime(2022, 9, 20),
         end_date=datetime(2022, 12, 22),
         max_active_runs=1
        ) as dag:

    t1 = BashOperator(task_id="creating_file",
                      bash_command="sleep 10 && echo 'Hi, from my first FileSensor.' > /tmp/file.txt",
                      )
    
    t2 = FileSensor(task_id="waiting_file",
                    filepath="/tmp/file.txt")

    t3 = BashOperator(task_id="end_task",
                      bash_command="echo 'The file has been received, and contains this: ' && cat /tmp/file.txt")
    
    t1 >> t2 >> t3

**Modo "poke"** * **Funcionamiento**: El sensor permanece activo en un bucle de espera, verificando periódicamente si se cumple la condición configurada. * **Ventajas**: * Útil si la espera es corta y la condición se cumple rápidamente. * Garantiza una verificación continua sin interrupciones. * **Desventajas**: * Consume un slot del ejecutor de Airflow durante toda la espera, lo cual puede impactar el rendimiento si hay muchos sensores ejecutándose. * No es eficiente en sistemas con alta concurrencia o en esperas largas. **Modo "reschedule"** * **Funcionamiento**: El sensor se "desprograma" (reschedule) entre las verificaciones, liberando el slot del ejecutor mientras espera para realizar una nueva comprobación. * **Ventajas**: * Más eficiente en términos de recursos, ya que no bloquea slots innecesariamente. * Ideal para sensores que necesitan esperar largos periodos. * **Desventajas**: * Puede ser menos inmediato si se necesita una reacción rápida.
En esta página encontre más información: <https://medium.com/@chanon.krittapholchai/apache-airflow-useful-practices-sensor-operator-ead91b9f3884>

Para los que no hayan encontrado la librería de FileSensor como se muestra en la clase, yo lo pude utilizar de esta manera:

from airflow.contrib.sensors.file_sensor import FileSensor
* **Poke:** Verificaciones frecuentes, ideal para condiciones cambiantes rápidamente. * **Reschedule:** Verificaciones menos frecuentes, ideal para condiciones que pueden tardar más en cumplirse.
El \*\*ExternalTaskSensor\*\* es un operador de Apache Airflow utilizado para sincronizar tareas entre diferentes DAGs (Directed Acyclic Graphs). Su propósito principal es garantizar que una tarea en un DAG no comience hasta que una tarea específica en otro DAG se complete con éxito. \### \*\*Contexto de uso\*\*En proyectos complejos, puede haber dependencias entre DAGs. Por ejemplo: \- Un DAG se encarga de recopilar datos (ETL).- Otro DAG analiza esos datos.- El análisis no debe comenzar hasta que la recopilación haya terminado. En estos casos, el \*\*ExternalTaskSensor\*\* ayuda a coordinar la ejecución entre los DAGs. \### \*\*Características clave\*\*1. \*\*Espera activa\*\*: Este sensor verifica periódicamente el estado de la tarea externa hasta que detecta que se completó con éxito.2. \*\*Condiciones configurables\*\*: Puedes especificar la tarea y el DAG externo, el intervalo de verificación, y el tiempo máximo de espera.3. \*\*Detección de estado\*\*: Solo continúa si la tarea especificada tiene el estado `success` (por defecto). \### \*\*Parámetros principales\*\*- `external\_dag\_id`: El ID del DAG externo.- `external\_task\_id`: El ID de la tarea en el DAG externo que debe completarse.- `execution\_date`: Opcional, para especificar una fecha de ejecución específica.- `timeout`: Tiempo máximo (en segundos) que el sensor espera antes de fallar.- `poke\_interval`: Intervalo (en segundos) entre verificaciones.- `mode`: Puede ser:  - `'poke'` (por defecto): Comprueba periódicamente.  - `'reschedule'`: Optimiza recursos del scheduler. \### \*\*Ejemplo práctico\*\*Imagina que tienes dos DAGs: `dag\_etl` y `dag\_analysis`. El DAG de análisis debe esperar a que el DAG de ETL complete su tarea llamada `extract\_data`. Código para el DAG `dag\_analysis`:```pythonfrom airflow import DAGfrom airflow.sensors.external\_task import ExternalTaskSensorfrom airflow.operators.dummy import DummyOperatorfrom datetime import datetime, timedelta default\_args = {    'owner': 'airflow',    'retries': 1,    'retry\_delay': timedelta(minutes=5),} with DAG(    'dag\_analysis',    default\_args=default\_args,    description='DAG que depende de otro DAG',    schedule\_interval=None,    start\_date=datetime(2024, 1, 1),    catchup=False,) as dag:     wait\_for\_etl = ExternalTaskSensor(        task\_id='wait\_for\_etl',        external\_dag\_id='dag\_etl',  # ID del DAG externo        external\_task\_id='extract\_data',  # ID de la tarea en el DAG externo        poke\_interval=30,  # Revisa cada 30 segundos        timeout=3600,  # Espera hasta 1 hora        mode='poke',  # Usa espera activa    )     start\_analysis = DummyOperator(task\_id='start\_analysis')     wait\_for\_etl >> start\_analysis``` \### \*\*Consideraciones\*\*1. \*\*Ejecución previa:\*\* Asegúrate de que el DAG externo tenga una ejecución previa exitosa.2. \*\*Ciclo de vida del DAG:\*\* Ambos DAGs deben estar habilitados para que el sensor funcione.3. \*\*Uso de recursos:\*\* Usa el modo `reschedule` para reducir el consumo de recursos en el scheduler si el tiempo de espera es largo. \## FileSensor El \*\*FileSensor\*\* es un operador de sensor en Apache Airflow que espera la existencia de un archivo en un directorio específico. Es útil cuando se necesita que un archivo esté presente antes de que una tarea o flujo continúe. \### \*\*Casos de uso\*\*- Procesamiento de datos: Esperar la llegada de un archivo en una carpeta para iniciar su procesamiento.- Integración con sistemas externos: Asegurar que un archivo generado por otro sistema esté disponible antes de continuar. \### \*\*Parámetros principales\*\*- \*\*`filepath`\*\*: Ruta al archivo que el sensor espera. Puede ser una ruta absoluta o relativa.- \*\*`fs\_conn\_id`\*\*: ID de la conexión al sistema de archivos, si es un almacenamiento externo (por ejemplo, S3 o HDFS).- \*\*`poke\_interval`\*\*: Intervalo de tiempo (en segundos) entre cada verificación.- \*\*`timeout`\*\*: Tiempo máximo (en segundos) que el sensor espera antes de marcar un fallo.- \*\*`mode`\*\*: Define cómo espera el sensor:  - `'poke'`: Verifica continuamente (espera activa).  - `'reschedule'`: Reduce el uso de recursos pausando entre verificaciones. \### \*\*Ejemplo básico con un archivo local\*\*En este ejemplo, el sensor espera un archivo llamado `data\_ready.txt` en la carpeta `/tmp`. ```pythonfrom datetime import datetimefrom airflow import DAGfrom airflow.sensors.filesystem import FileSensorfrom airflow.operators.bash import BashOperator default\_args = {    'start\_date': datetime(2024, 1, 1),} with DAG(    dag\_id="file\_sensor\_example",    schedule\_interval="@daily",    default\_args=default\_args,    catchup=False,) as dag:     wait\_for\_file = FileSensor(        task\_id="wait\_for\_file",        filepath="/tmp/data\_ready.txt",        poke\_interval=30,  # Verifica cada 30 segundos        timeout=600,       # Falla si el archivo no aparece en 10 minutos        mode="poke",       # Espera activa    )     process\_file = BashOperator(        task\_id="process\_file",        bash\_command="cat /tmp/data\_ready.txt && echo 'Archivo procesado!'",    )     wait\_for\_file >> process\_file``` \### \*\*Conexión a sistemas externos\*\*Si necesitas monitorear archivos en sistemas como Amazon S3, HDFS o Google Cloud Storage, puedes usar el parámetro `fs\_conn\_id` con una conexión configurada en Airflow. Ejemplo para un archivo en Amazon S3: ```pythonfrom airflow.providers.amazon.aws.sensors.s3 import S3KeySensor with DAG(    dag\_id="s3\_file\_sensor\_example",    schedule\_interval="@daily",    start\_date=datetime(2024, 1, 1),    catchup=False,) as dag:     wait\_for\_s3\_file = S3KeySensor(        task\_id="wait\_for\_s3\_file",        bucket\_name="my-bucket",        bucket\_key="path/to/data\_ready.txt",        aws\_conn\_id="my\_s3\_conn",  # Configurado en Airflow        poke\_interval=60,  # Verifica cada minuto        timeout=3600,      # Tiempo máximo de espera: 1 hora    )``` \### \*\*Consideraciones importantes\*\*1. \*\*Error si el archivo no aparece:\*\*   - Configura un `timeout` adecuado para evitar que el sensor quede en espera indefinida.   - Maneja el error con notificaciones o tareas de limpieza si el archivo no llega. 2\. \*\*Uso eficiente de recursos:\*\*   - Usa `mode="reschedule"` si esperas largos períodos entre verificaciones para reducir la carga del scheduler. 3\. \*\*Pruebas locales:\*\*   - Durante el desarrollo, prueba la creación manual del archivo en el directorio especificado para verificar que el sensor lo detecta correctamente.

Lo logre 😄