FileSensor

Clase 23 de 29 • Curso de Fundamentos de Apache Airflow

Clase anteriorSiguiente clase
    Arturo Delgado

    Arturo Delgado

    student•
    hace 4 meses

    Si están siguiendo el tutorial en el 2025, al momento de crear la conexión fs_default agreguen { "path": "/"} al Extra fields JSON para que no tengan problemas.

    Elián Dashiel Estévez Espinal

    Elián Dashiel Estévez Espinal

    student•
    hace 5 meses

    Para quien le aparezca el siguiente error:

    Traceback (most recent call last): File "/home/airflow/.local/lib/python3.8/site-packages/airflow/sensors/base.py", line 257, in execute raise e File "/home/airflow/.local/lib/python3.8/site-packages/airflow/sensors/base.py", line 239, in execute poke_return = self.poke(context) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/sensors/filesystem.py", line 61, in poke hook = FSHook(self.fs_conn_id) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/hooks/filesystem.py", line 38, in __init__ conn = self.get_connection(conn_id) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/hooks/base.py", line 72, in get_connection conn = Connection.get_connection_from_secrets(conn_id) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/connection.py", line 477, in get_connection_from_secrets raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined") airflow.exceptions.AirflowNotFoundException: The conn_id `fs_default` isn't defined ```1. Configuren la conexion en el panel web de airflow (En Admin > Conections) y agreguen una de tipo File (path): ![](https://static.platzi.com/media/user_upload/upload-d4e3407d-e435-49a3-9cbf-6fc070f44d11.png) Entocnes en el python: ```python from airflow import DAG from datetime import datetime from airflow.operators.bash import BashOperator from airflow.sensors.filesystem import FileSensor with DAG(dag_id="7.3.FILE_SENSOR", description="DAG que actua como sensor", schedule_interval="@daily", start_date=datetime(2025, 6, 26), end_date=datetime(2025, 10, 26), max_active_runs=1): FILE = "/tmp/file.txt" t1 = BashOperator(task_id="creation_file", bash_command=f"sleep 3 && touch {FILE}") t2 = FileSensor(task_id="waiting_file", filepath=FILE, fs_conn_id="fs_default", # Por alguna razon me da error de conexion timeout=60) t3 = BashOperator(task_id='end_task', bash_command="echo 'El fichero ha llegado'") t1 >> t2 >> t3
      Elián Dashiel Estévez Espinal

      Elián Dashiel Estévez Espinal

      student•
      hace 5 meses

      Mas adelante en el video lo explica, solo que al principio "asusta"

    Luis Ernesto Domínguez Velásquez

    Luis Ernesto Domínguez Velásquez

    student•
    hace 10 meses
    • Poke: Verificaciones frecuentes, ideal para condiciones cambiantes rápidamente.
    • Reschedule: Verificaciones menos frecuentes, ideal para condiciones que pueden tardar más en cumplirse.
    Mario Alexander Vargas Celis

    Mario Alexander Vargas Celis

    student•
    hace 10 meses

    El **ExternalTaskSensor** es un operador de Apache Airflow utilizado para sincronizar tareas entre diferentes DAGs (Directed Acyclic Graphs). Su propósito principal es garantizar que una tarea en un DAG no comience hasta que una tarea específica en otro DAG se complete con éxito. ### **Contexto de uso**En proyectos complejos, puede haber dependencias entre DAGs. Por ejemplo: - Un DAG se encarga de recopilar datos (ETL).- Otro DAG analiza esos datos.- El análisis no debe comenzar hasta que la recopilación haya terminado. En estos casos, el **ExternalTaskSensor** ayuda a coordinar la ejecución entre los DAGs. ### **Características clave**1. **Espera activa**: Este sensor verifica periódicamente el estado de la tarea externa hasta que detecta que se completó con éxito.2. **Condiciones configurables**: Puedes especificar la tarea y el DAG externo, el intervalo de verificación, y el tiempo máximo de espera.3. **Detección de estado**: Solo continúa si la tarea especificada tiene el estado success (por defecto). ### **Parámetros principales**- external\_dag\_id: El ID del DAG externo.- external\_task\_id: El ID de la tarea en el DAG externo que debe completarse.- execution\_date: Opcional, para especificar una fecha de ejecución específica.- timeout: Tiempo máximo (en segundos) que el sensor espera antes de fallar.- poke\_interval: Intervalo (en segundos) entre verificaciones.- mode: Puede ser:  - 'poke' (por defecto): Comprueba periódicamente.  - 'reschedule': Optimiza recursos del scheduler. ### **Ejemplo práctico**Imagina que tienes dos DAGs: dag\_etl y dag\_analysis. El DAG de análisis debe esperar a que el DAG de ETL complete su tarea llamada extract\_data. Código para el DAG dag\_analysis:pythonfrom airflow import DAGfrom airflow.sensors.external\_task import ExternalTaskSensorfrom airflow.operators.dummy import DummyOperatorfrom datetime import datetime, timedelta default\_args = {    'owner': 'airflow',    'retries': 1,    'retry\_delay': timedelta(minutes=5),} with DAG(    'dag\_analysis',    default\_args=default\_args,    description='DAG que depende de otro DAG',    schedule\_interval=None,    start\_date=datetime(2024, 1, 1),    catchup=False,) as dag:     wait\_for\_etl = ExternalTaskSensor(        task\_id='wait\_for\_etl',        external\_dag\_id='dag\_etl',  # ID del DAG externo        external\_task\_id='extract\_data',  # ID de la tarea en el DAG externo        poke\_interval=30,  # Revisa cada 30 segundos        timeout=3600,  # Espera hasta 1 hora        mode='poke',  # Usa espera activa    )     start\_analysis = DummyOperator(task\_id='start\_analysis')     wait\_for\_etl >> start\_analysis ### **Consideraciones**1. **Ejecución previa:** Asegúrate de que el DAG externo tenga una ejecución previa exitosa.2. **Ciclo de vida del DAG:** Ambos DAGs deben estar habilitados para que el sensor funcione.3. **Uso de recursos:** Usa el modo reschedule para reducir el consumo de recursos en el scheduler si el tiempo de espera es largo. ## FileSensor El **FileSensor** es un operador de sensor en Apache Airflow que espera la existencia de un archivo en un directorio específico. Es útil cuando se necesita que un archivo esté presente antes de que una tarea o flujo continúe. ### **Casos de uso**- Procesamiento de datos: Esperar la llegada de un archivo en una carpeta para iniciar su procesamiento.- Integración con sistemas externos: Asegurar que un archivo generado por otro sistema esté disponible antes de continuar. ### **Parámetros principales**- **filepath**: Ruta al archivo que el sensor espera. Puede ser una ruta absoluta o relativa.- **fs\_conn\_id**: ID de la conexión al sistema de archivos, si es un almacenamiento externo (por ejemplo, S3 o HDFS).- **poke\_interval**: Intervalo de tiempo (en segundos) entre cada verificación.- **timeout**: Tiempo máximo (en segundos) que el sensor espera antes de marcar un fallo.- **mode**: Define cómo espera el sensor:  - 'poke': Verifica continuamente (espera activa).  - 'reschedule': Reduce el uso de recursos pausando entre verificaciones. ### **Ejemplo básico con un archivo local**En este ejemplo, el sensor espera un archivo llamado data\_ready.txt en la carpeta /tmp.

    default\_args = {    'start\_date': datetime(2024, 1, 1),} with DAG(    dag\_id="file\_sensor\_example",    schedule\_interval="@daily",    default\_args=default\_args,    catchup=False,) as dag:     wait\_for\_file = FileSensor(        task\_id="wait\_for\_file",        filepath="/tmp/data\_ready.txt",        poke\_interval=30,  # Verifica cada 30 segundos        timeout=600,       # Falla si el archivo no aparece en 10 minutos        mode="poke",       # Espera activa    )     process\_file = BashOperator(        task\_id="process\_file",        bash\_command="cat /tmp/data\_ready.txt && echo 'Archivo procesado!'",    )     wait\_for\_file >> process\_file``` \### \*\*Conexión a sistemas externos\*\*Si necesitas monitorear archivos en sistemas como Amazon S3, HDFS o Google Cloud Storage, puedes usar el parámetro `fs\_conn\_id` con una conexión configurada en Airflow. Ejemplo para un archivo en Amazon S3: ```pythonfrom airflow.providers.amazon.aws.sensors.s3 import S3KeySensor with DAG(    dag\_id="s3\_file\_sensor\_example",    schedule\_interval="@daily",    start\_date=datetime(2024, 1, 1),    catchup=False,) as dag:     wait\_for\_s3\_file = S3KeySensor(        task\_id="wait\_for\_s3\_file",        bucket\_name="my-bucket",        bucket\_key="path/to/data\_ready.txt",        aws\_conn\_id="my\_s3\_conn",  # Configurado en Airflow        poke\_interval=60,  # Verifica cada minuto        timeout=3600,      # Tiempo máximo de espera: 1 hora    )``` \### \*\*Consideraciones importantes\*\*1. \*\*Error si el archivo no aparece:\*\*   - Configura un `timeout` adecuado para evitar que el sensor quede en espera indefinida.   - Maneja el error con notificaciones o tareas de limpieza si el archivo no llega. 2\. \*\*Uso eficiente de recursos:\*\*   - Usa `mode="reschedule"` si esperas largos períodos entre verificaciones para reducir la carga del scheduler. 3\. \*\*Pruebas locales:\*\*   - Durante el desarrollo, prueba la creación manual del archivo en el directorio especificado para verificar que el sensor lo detecta correctamente.
    Jonattan Andrez Blanco Baron

    Jonattan Andrez Blanco Baron

    student•
    hace un año

    Modo "poke"

    • Funcionamiento: El sensor permanece activo en un bucle de espera, verificando periódicamente si se cumple la condición configurada.
    • Ventajas:
      • Útil si la espera es corta y la condición se cumple rápidamente.
      • Garantiza una verificación continua sin interrupciones.
    • Desventajas:
      • Consume un slot del ejecutor de Airflow durante toda la espera, lo cual puede impactar el rendimiento si hay muchos sensores ejecutándose.
      • No es eficiente en sistemas con alta concurrencia o en esperas largas.

    Modo "reschedule"

    • Funcionamiento: El sensor se "desprograma" (reschedule) entre las verificaciones, liberando el slot del ejecutor mientras espera para realizar una nueva comprobación.
    • Ventajas:
      • Más eficiente en términos de recursos, ya que no bloquea slots innecesariamente.
      • Ideal para sensores que necesitan esperar largos periodos.
    • Desventajas:
      • Puede ser menos inmediato si se necesita una reacción rápida.
    Cecilia Gabriela Rodríguez Flores

    Cecilia Gabriela Rodríguez Flores

    student•
    hace 2 años

    En esta página encontre más información: https://medium.com/@chanon.krittapholchai/apache-airflow-useful-practices-sensor-operator-ead91b9f3884

    Emilio Nicolás Mendoza Patti

    Emilio Nicolás Mendoza Patti

    student•
    hace 2 años

    Lo logre :D

    aiflow3.png
    Oscar Gama

    Oscar Gama

    student•
    hace 2 años

    Para los que no hayan encontrado la librería de FileSensor como se muestra en la clase, yo lo pude utilizar de esta manera:

    from airflow.contrib.sensors.file_sensor import FileSensor
    Felipe de Jesús Delgado Troncoso

    Felipe de Jesús Delgado Troncoso

    student•
    hace 3 años

    Comparto mi ejemplo imprimiendo el contenido del archivo en la última task ejecutada.

    from airflow import DAG from airflow.operators.bash import BashOperator from airflow.sensors.filesystem import FileSensor from datetime import datetime with DAG(dag_id="filesensor", description="FileSensor", schedule_interval="@daily", start_date=datetime(2022, 9, 20), end_date=datetime(2022, 12, 22), max_active_runs=1 ) as dag: t1 = BashOperator(task_id="creating_file", bash_command="sleep 10 && echo 'Hi, from my first FileSensor.' > /tmp/file.txt", ) t2 = FileSensor(task_id="waiting_file", filepath="/tmp/file.txt") t3 = BashOperator(task_id="end_task", bash_command="echo 'The file has been received, and contains this: ' && cat /tmp/file.txt") t1 >> t2 >> t3
    Gerardo Mayel Fernández Alamilla

    Gerardo Mayel Fernández Alamilla

    student•
    hace 3 años

    duda tal vez básica y no se precisamente del curso pero con qué extensión diste click para ver los parámetros del sensor?

      Eric Bellet

      Eric Bellet

      teacher•
      hace 3 años

      Con PyCharm es nativo. Con la mac hsces command + click

    Mauro Ezequiel Bravo

    Mauro Ezequiel Bravo

    student•
    hace 3 años
    from datetime import datetime from airflow import DAG from airflow.operators.bash import BashOperator from airflow.sensors.filesystem import FileSensor with DAG(dag_id="7.3-filesensor", description="FileSensor", schedule_interval="@daily", start_date=datetime(2022, 8, 20), end_date=datetime(2022, 8, 25), max_active_runs=1 ) as dag: t1 = BashOperator(task_id="creating_file", bash_command="sleep 10 && touch /tmp/file.txt") t2 = FileSensor(task_id="waiting_file", filepath="/tmp/file.txt") t3 = BashOperator(task_id="end_task", bash_command="echo 'El fichero ha llegado'") t1 >> t2 >> t3
    Leonardo Carvallo

    Leonardo Carvallo

    student•
    hace 3 años

    Que función estaria cumpliendo la programación del dag en donde vive el sensor..? Si yo quiero que ejecute semanal, semanalmente el dag se va a despertar y el sensor va a comenzar a verificar si se cumple la funcion..? Luego se cumple o no, y la semana que viene se vuelve a despertar para hacer poke..? es asi..?

      Eric Bellet

      Eric Bellet

      teacher•
      hace 3 años

      Hay 2 tipos de sensores, poke y reschedule. En el caso de ejecuciones semanales yo usaría el reschedule, que básicamente se le asigna el task a un worker cada X tiempo donde tú puedes definir ese X. Cada ejecución activa el sensor, entonces sí cada semana se despertaría.

    Pablo Nicolas Delucchi .

    Pablo Nicolas Delucchi .

    student•
    hace 3 años

    Mirando la documentación de airflow encontré lo siguiente:

    mode: How the sensor operates. There are two types of modes: poke: This is the default mode. When using poke, the sensor occupies a worker slot for the entire execution time and sleeps between pokes. This mode is best if you expect a short runtime for the sensor. reschedule: When using this mode, if the criteria is not met then the sensor releases its worker slot and reschedules the next check for a later time. This mode is best if you expect a long runtime for the sensor, because it is less resource intensive and frees up workers for other tasks.

    Es decir que, poke lo utilizaría cuando espero que el sensor se active de forma rápida, ya que tendría al worker asignado y la respuesta sería mucho más veloz. En cambio utilizaría reschedule si se que la respuesta al sensor tardará mas en llegar, en cuyo caso liberaría al worker y lo asignaría mas adelante.

      Eric Bellet

      Eric Bellet

      teacher•
      hace 3 años

      Exactamente Pablo.