FileSensor
Clase 23 de 29 • Curso de Fundamentos de Apache Airflow
Contenido del curso
Clase 23 de 29 • Curso de Fundamentos de Apache Airflow
Contenido del curso
Pablo Nicolas Delucchi .
Eric Bellet
Mauro Ezequiel Bravo
Felipe de Jesús Delgado Troncoso
Emilio Nicolás Mendoza Patti
Santiago Ahumada Lozano
Luis Ernesto Domínguez Velásquez
Jonattan Andrez Blanco Baron
Cecilia Gabriela Rodríguez Flores
Oscar Gama
Santiago Ahumada Lozano
Arturo Delgado
Gerardo Mayel Fernández Alamilla
Eric Bellet
Leonardo Carvallo
Eric Bellet
Elián Dashiel Estévez Espinal
Elián Dashiel Estévez Espinal
Mario Alexander Vargas Celis
Mirando la documentación de airflow encontré lo siguiente:
mode: How the sensor operates. There are two types of modes: poke: This is the default mode. When using poke, the sensor occupies a worker slot for the entire execution time and sleeps between pokes. This mode is best if you expect a short runtime for the sensor. reschedule: When using this mode, if the criteria is not met then the sensor releases its worker slot and reschedules the next check for a later time. This mode is best if you expect a long runtime for the sensor, because it is less resource intensive and frees up workers for other tasks.
Es decir que, poke lo utilizaría cuando espero que el sensor se active de forma rápida, ya que tendría al worker asignado y la respuesta sería mucho más veloz. En cambio utilizaría reschedule si se que la respuesta al sensor tardará mas en llegar, en cuyo caso liberaría al worker y lo asignaría mas adelante.
Exactamente Pablo.
from datetime import datetime from airflow import DAG from airflow.operators.bash import BashOperator from airflow.sensors.filesystem import FileSensor with DAG(dag_id="7.3-filesensor", description="FileSensor", schedule_interval="@daily", start_date=datetime(2022, 8, 20), end_date=datetime(2022, 8, 25), max_active_runs=1 ) as dag: t1 = BashOperator(task_id="creating_file", bash_command="sleep 10 && touch /tmp/file.txt") t2 = FileSensor(task_id="waiting_file", filepath="/tmp/file.txt") t3 = BashOperator(task_id="end_task", bash_command="echo 'El fichero ha llegado'") t1 >> t2 >> t3
Comparto mi ejemplo imprimiendo el contenido del archivo en la última task ejecutada.
from airflow import DAG from airflow.operators.bash import BashOperator from airflow.sensors.filesystem import FileSensor from datetime import datetime with DAG(dag_id="filesensor", description="FileSensor", schedule_interval="@daily", start_date=datetime(2022, 9, 20), end_date=datetime(2022, 12, 22), max_active_runs=1 ) as dag: t1 = BashOperator(task_id="creating_file", bash_command="sleep 10 && echo 'Hi, from my first FileSensor.' > /tmp/file.txt", ) t2 = FileSensor(task_id="waiting_file", filepath="/tmp/file.txt") t3 = BashOperator(task_id="end_task", bash_command="echo 'The file has been received, and contains this: ' && cat /tmp/file.txt") t1 >> t2 >> t3
Lo logre :D
Cool!
Modo "poke"
Modo "reschedule"
En esta página encontre más información:
Para los que no hayan encontrado la librería de FileSensor como se muestra en la clase, yo lo pude utilizar de esta manera:
from airflow.contrib.sensors.file_sensor import FileSensor
En producción es util usar un Filesensor?
Si están siguiendo el tutorial en el 2025, al momento de crear la conexión fs_default agreguen { "path": "/"} al Extra fields JSON para que no tengan problemas.
duda tal vez básica y no se precisamente del curso pero con qué extensión diste click para ver los parámetros del sensor?
Con PyCharm es nativo. Con la mac hsces command + click
Que función estaria cumpliendo la programación del dag en donde vive el sensor..? Si yo quiero que ejecute semanal, semanalmente el dag se va a despertar y el sensor va a comenzar a verificar si se cumple la funcion..? Luego se cumple o no, y la semana que viene se vuelve a despertar para hacer poke..? es asi..?
Hay 2 tipos de sensores, poke y reschedule. En el caso de ejecuciones semanales yo usaría el reschedule, que básicamente se le asigna el task a un worker cada X tiempo donde tú puedes definir ese X. Cada ejecución activa el sensor, entonces sí cada semana se despertaría.
Para quien le aparezca el siguiente error:
Traceback (most recent call last): File "/home/airflow/.local/lib/python3.8/site-packages/airflow/sensors/base.py", line 257, in execute raise e File "/home/airflow/.local/lib/python3.8/site-packages/airflow/sensors/base.py", line 239, in execute poke_return = self.poke(context) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/sensors/filesystem.py", line 61, in poke hook = FSHook(self.fs_conn_id) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/hooks/filesystem.py", line 38, in __init__ conn = self.get_connection(conn_id) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/hooks/base.py", line 72, in get_connection conn = Connection.get_connection_from_secrets(conn_id) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/connection.py", line 477, in get_connection_from_secrets raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined") airflow.exceptions.AirflowNotFoundException: The conn_id `fs_default` isn't defined ```1. Configuren la conexion en el panel web de airflow (En Admin > Conections) y agreguen una de tipo File (path):  Entocnes en el python: ```python from airflow import DAG from datetime import datetime from airflow.operators.bash import BashOperator from airflow.sensors.filesystem import FileSensor with DAG(dag_id="7.3.FILE_SENSOR", description="DAG que actua como sensor", schedule_interval="@daily", start_date=datetime(2025, 6, 26), end_date=datetime(2025, 10, 26), max_active_runs=1): FILE = "/tmp/file.txt" t1 = BashOperator(task_id="creation_file", bash_command=f"sleep 3 && touch {FILE}") t2 = FileSensor(task_id="waiting_file", filepath=FILE, fs_conn_id="fs_default", # Por alguna razon me da error de conexion timeout=60) t3 = BashOperator(task_id='end_task', bash_command="echo 'El fichero ha llegado'") t1 >> t2 >> t3
Mas adelante en el video lo explica, solo que al principio "asusta"
El **ExternalTaskSensor** es un operador de Apache Airflow utilizado para sincronizar tareas entre diferentes DAGs (Directed Acyclic Graphs). Su propósito principal es garantizar que una tarea en un DAG no comience hasta que una tarea específica en otro DAG se complete con éxito.
### **Contexto de uso**En proyectos complejos, puede haber dependencias entre DAGs. Por ejemplo:
- Un DAG se encarga de recopilar datos (ETL).- Otro DAG analiza esos datos.- El análisis no debe comenzar hasta que la recopilación haya terminado.
En estos casos, el **ExternalTaskSensor** ayuda a coordinar la ejecución entre los DAGs.
### **Características clave**1. **Espera activa**: Este sensor verifica periódicamente el estado de la tarea externa hasta que detecta que se completó con éxito.2. **Condiciones configurables**: Puedes especificar la tarea y el DAG externo, el intervalo de verificación, y el tiempo máximo de espera.3. **Detección de estado**: Solo continúa si la tarea especificada tiene el estado success (por defecto).
### **Parámetros principales**- external\_dag\_id: El ID del DAG externo.- external\_task\_id: El ID de la tarea en el DAG externo que debe completarse.- execution\_date: Opcional, para especificar una fecha de ejecución específica.- timeout: Tiempo máximo (en segundos) que el sensor espera antes de fallar.- poke\_interval: Intervalo (en segundos) entre verificaciones.- mode: Puede ser: - 'poke' (por defecto): Comprueba periódicamente. - 'reschedule': Optimiza recursos del scheduler.
### **Ejemplo práctico**Imagina que tienes dos DAGs: dag\_etl y dag\_analysis. El DAG de análisis debe esperar a que el DAG de ETL complete su tarea llamada extract\_data.
Código para el DAG dag\_analysis:pythonfrom airflow import DAGfrom airflow.sensors.external\_task import ExternalTaskSensorfrom airflow.operators.dummy import DummyOperatorfrom datetime import datetime, timedelta default\_args = { 'owner': 'airflow', 'retries': 1, 'retry\_delay': timedelta(minutes=5),} with DAG( 'dag\_analysis', default\_args=default\_args, description='DAG que depende de otro DAG', schedule\_interval=None, start\_date=datetime(2024, 1, 1), catchup=False,) as dag: wait\_for\_etl = ExternalTaskSensor( task\_id='wait\_for\_etl', external\_dag\_id='dag\_etl', # ID del DAG externo external\_task\_id='extract\_data', # ID de la tarea en el DAG externo poke\_interval=30, # Revisa cada 30 segundos timeout=3600, # Espera hasta 1 hora mode='poke', # Usa espera activa ) start\_analysis = DummyOperator(task\_id='start\_analysis') wait\_for\_etl >> start\_analysis
### **Consideraciones**1. **Ejecución previa:** Asegúrate de que el DAG externo tenga una ejecución previa exitosa.2. **Ciclo de vida del DAG:** Ambos DAGs deben estar habilitados para que el sensor funcione.3. **Uso de recursos:** Usa el modo reschedule para reducir el consumo de recursos en el scheduler si el tiempo de espera es largo.
## FileSensor
El **FileSensor** es un operador de sensor en Apache Airflow que espera la existencia de un archivo en un directorio específico. Es útil cuando se necesita que un archivo esté presente antes de que una tarea o flujo continúe.
### **Casos de uso**- Procesamiento de datos: Esperar la llegada de un archivo en una carpeta para iniciar su procesamiento.- Integración con sistemas externos: Asegurar que un archivo generado por otro sistema esté disponible antes de continuar.
### **Parámetros principales**- **filepath**: Ruta al archivo que el sensor espera. Puede ser una ruta absoluta o relativa.- **fs\_conn\_id**: ID de la conexión al sistema de archivos, si es un almacenamiento externo (por ejemplo, S3 o HDFS).- **poke\_interval**: Intervalo de tiempo (en segundos) entre cada verificación.- **timeout**: Tiempo máximo (en segundos) que el sensor espera antes de marcar un fallo.- **mode**: Define cómo espera el sensor: - 'poke': Verifica continuamente (espera activa). - 'reschedule': Reduce el uso de recursos pausando entre verificaciones.
### **Ejemplo básico con un archivo local**En este ejemplo, el sensor espera un archivo llamado data\_ready.txt en la carpeta /tmp.
default\_args = { 'start\_date': datetime(2024, 1, 1),} with DAG( dag\_id="file\_sensor\_example", schedule\_interval="@daily", default\_args=default\_args, catchup=False,) as dag: wait\_for\_file = FileSensor( task\_id="wait\_for\_file", filepath="/tmp/data\_ready.txt", poke\_interval=30, # Verifica cada 30 segundos timeout=600, # Falla si el archivo no aparece en 10 minutos mode="poke", # Espera activa ) process\_file = BashOperator( task\_id="process\_file", bash\_command="cat /tmp/data\_ready.txt && echo 'Archivo procesado!'", ) wait\_for\_file >> process\_file``` \### \*\*Conexión a sistemas externos\*\*Si necesitas monitorear archivos en sistemas como Amazon S3, HDFS o Google Cloud Storage, puedes usar el parámetro `fs\_conn\_id` con una conexión configurada en Airflow. Ejemplo para un archivo en Amazon S3: ```pythonfrom airflow.providers.amazon.aws.sensors.s3 import S3KeySensor with DAG( dag\_id="s3\_file\_sensor\_example", schedule\_interval="@daily", start\_date=datetime(2024, 1, 1), catchup=False,) as dag: wait\_for\_s3\_file = S3KeySensor( task\_id="wait\_for\_s3\_file", bucket\_name="my-bucket", bucket\_key="path/to/data\_ready.txt", aws\_conn\_id="my\_s3\_conn", # Configurado en Airflow poke\_interval=60, # Verifica cada minuto timeout=3600, # Tiempo máximo de espera: 1 hora )``` \### \*\*Consideraciones importantes\*\*1. \*\*Error si el archivo no aparece:\*\* - Configura un `timeout` adecuado para evitar que el sensor quede en espera indefinida. - Maneja el error con notificaciones o tareas de limpieza si el archivo no llega. 2\. \*\*Uso eficiente de recursos:\*\* - Usa `mode="reschedule"` si esperas largos períodos entre verificaciones para reducir la carga del scheduler. 3\. \*\*Pruebas locales:\*\* - Durante el desarrollo, prueba la creación manual del archivo en el directorio especificado para verificar que el sensor lo detecta correctamente.