No tienes acceso a esta clase

¬°Contin√ļa aprendiendo! √önete y comienza a potenciar tu carrera

Aprende Inglés, Programación, AI, Ciberseguridad y más a precio especial.

Antes: $249

Currency
$209
Suscríbete

Termina en:

2 Días
20 Hrs
57 Min
15 Seg

FileSensor

23/29
Recursos

Aportes 6

Preguntas 2

Ordenar por:

¬ŅQuieres ver m√°s aportes, preguntas y respuestas de la comunidad?

Mirando la documentación de airflow encontré lo siguiente:

mode: How the sensor operates. There are two types of modes:
poke: This is the default mode. When using poke, the sensor occupies a worker slot for the entire execution time and sleeps between pokes. This mode is best if you expect a short runtime for the sensor.
reschedule: When using this mode, if the criteria is not met then the sensor releases its worker slot and reschedules the next check for a later time. This mode is best if you expect a long runtime for the sensor, because it is less resource intensive and frees up workers for other tasks.

Es decir que, poke lo utilizaría cuando espero que el sensor se active de forma rápida, ya que tendría al worker asignado y la respuesta sería mucho más veloz. En cambio utilizaría reschedule si se que la respuesta al sensor tardará mas en llegar, en cuyo caso liberaría al worker y lo asignaría mas adelante.

from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.filesystem import FileSensor


with DAG(dag_id="7.3-filesensor",
    description="FileSensor",
    schedule_interval="@daily",
    start_date=datetime(2022, 8, 20),
    end_date=datetime(2022, 8, 25),
    max_active_runs=1
) as dag:

    t1 = BashOperator(task_id="creating_file",
					  bash_command="sleep 10 && touch /tmp/file.txt")

    t2 = FileSensor(task_id="waiting_file",
					  filepath="/tmp/file.txt")

    t3 = BashOperator(task_id="end_task",
					  bash_command="echo 'El fichero ha llegado'")

    t1 >> t2 >> t3

Comparto mi ejemplo imprimiendo el contenido del archivo en la √ļltima task ejecutada.

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.filesystem import FileSensor

from datetime import datetime


with DAG(dag_id="filesensor",
         description="FileSensor",
         schedule_interval="@daily",
         start_date=datetime(2022, 9, 20),
         end_date=datetime(2022, 12, 22),
         max_active_runs=1
        ) as dag:

    t1 = BashOperator(task_id="creating_file",
                      bash_command="sleep 10 && echo 'Hi, from my first FileSensor.' > /tmp/file.txt",
                      )
    
    t2 = FileSensor(task_id="waiting_file",
                    filepath="/tmp/file.txt")

    t3 = BashOperator(task_id="end_task",
                      bash_command="echo 'The file has been received, and contains this: ' && cat /tmp/file.txt")
    
    t1 >> t2 >> t3

Para los que no hayan encontrado la librería de FileSensor como se muestra en la clase, yo lo pude utilizar de esta manera:

from airflow.contrib.sensors.file_sensor import FileSensor
En esta página encontre más información: <https://medium.com/@chanon.krittapholchai/apache-airflow-useful-practices-sensor-operator-ead91b9f3884>

Lo logre ūüėĄ