No tienes acceso a esta clase

¡Continúa aprendiendo! Únete y comienza a potenciar tu carrera

Convierte tus certificados en títulos universitarios en USA

Antes: $249

Currency
$209

Paga en 4 cuotas sin intereses

Paga en 4 cuotas sin intereses
Suscríbete

Termina en:

18 Días
9 Hrs
17 Min
54 Seg

FileSensor

23/29
Recursos

Aportes 6

Preguntas 2

Ordenar por:

¿Quieres ver más aportes, preguntas y respuestas de la comunidad?

Mirando la documentación de airflow encontré lo siguiente:

mode: How the sensor operates. There are two types of modes:
poke: This is the default mode. When using poke, the sensor occupies a worker slot for the entire execution time and sleeps between pokes. This mode is best if you expect a short runtime for the sensor.
reschedule: When using this mode, if the criteria is not met then the sensor releases its worker slot and reschedules the next check for a later time. This mode is best if you expect a long runtime for the sensor, because it is less resource intensive and frees up workers for other tasks.

Es decir que, poke lo utilizaría cuando espero que el sensor se active de forma rápida, ya que tendría al worker asignado y la respuesta sería mucho más veloz. En cambio utilizaría reschedule si se que la respuesta al sensor tardará mas en llegar, en cuyo caso liberaría al worker y lo asignaría mas adelante.

from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.filesystem import FileSensor


with DAG(dag_id="7.3-filesensor",
    description="FileSensor",
    schedule_interval="@daily",
    start_date=datetime(2022, 8, 20),
    end_date=datetime(2022, 8, 25),
    max_active_runs=1
) as dag:

    t1 = BashOperator(task_id="creating_file",
					  bash_command="sleep 10 && touch /tmp/file.txt")

    t2 = FileSensor(task_id="waiting_file",
					  filepath="/tmp/file.txt")

    t3 = BashOperator(task_id="end_task",
					  bash_command="echo 'El fichero ha llegado'")

    t1 >> t2 >> t3

Comparto mi ejemplo imprimiendo el contenido del archivo en la última task ejecutada.

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.filesystem import FileSensor

from datetime import datetime


with DAG(dag_id="filesensor",
         description="FileSensor",
         schedule_interval="@daily",
         start_date=datetime(2022, 9, 20),
         end_date=datetime(2022, 12, 22),
         max_active_runs=1
        ) as dag:

    t1 = BashOperator(task_id="creating_file",
                      bash_command="sleep 10 && echo 'Hi, from my first FileSensor.' > /tmp/file.txt",
                      )
    
    t2 = FileSensor(task_id="waiting_file",
                    filepath="/tmp/file.txt")

    t3 = BashOperator(task_id="end_task",
                      bash_command="echo 'The file has been received, and contains this: ' && cat /tmp/file.txt")
    
    t1 >> t2 >> t3

En esta página encontre más información: <https://medium.com/@chanon.krittapholchai/apache-airflow-useful-practices-sensor-operator-ead91b9f3884>

Para los que no hayan encontrado la librería de FileSensor como se muestra en la clase, yo lo pude utilizar de esta manera:

from airflow.contrib.sensors.file_sensor import FileSensor

Lo logre 😄