No tienes acceso a esta clase

¡Continúa aprendiendo! Únete y comienza a potenciar tu carrera

Scheduler

7/29
Recursos

Aportes 5

Preguntas 0

Ordenar por:

¿Quieres ver más aportes, preguntas y respuestas de la comunidad?

DAG => Flujo de datos
TASK => Componentes dentro del DAG que se puede anidar con otros TASKs.
Operator => Objeto con el que se crea la TASK.
Scheduler => Encargado de ejecutar los DAGs. Cuando ejecuta, en que intevalo de tiempo, etc.

✨ Existen 2 maneras de definir esto

  1. Usando expresiones CRON
# ┌─────── minute (0 - 59)
# │ ┌────── hour (0 - 23)
# │ │ ┌───── day of the month (1 - 31)
# │ │ │ ┌───── month (1 - 12)
# │ │ │ │ ┌──── day of the week (0 - 6) (Sunday to Saturday;
# │ │ │ │ │      7 is also Sunday on some systems)
# * * * * *
  1. Usando las opciones pre-configuradas
    @once Una y solo una vez
    @hourly Cada hora al comienzo de la hora
    @dialy Diariamente a medianoche
    @weekly Una vez a la semana el domingo
    @monthly Cada primer dia del mes a medianoche
    @yearly Cada 1 de Enero a medianoche

⚠️ Hay que tener cuidado porque el la vista de grid se organiza por fecha de programación schedule date y no por la fecha de ejecución execution_date lo que puede confundir y se puede llegar a pensar que no corrió

Vengo de trabajar mucho con Control-M, es muy parecido en cuando al core de la aplicación. Airflow cuenta con la ventaja de programar en Python, en cambio, para programar los jobs en Control-M usamos XML o la interfaz gráfica.

El **Scheduler** en **Apache Airflow** es el componente central encargado de gestionar la ejecución de los DAGs (Directed Acyclic Graphs) y las tareas definidas en ellos. Es responsable de planificar, desencadenar y supervisar las tareas según las dependencias y los intervalos de tiempo especificados. ## **Funciones principales del Scheduler** 1. **Planificación de DAGs**: * Detecta automáticamente los DAGs disponibles y calcula los intervalos de ejecución para ellos. * Identifica las tareas que están listas para ejecutarse según sus dependencias y programación. 2. **Asignación de tareas a los workers**: * Determina qué tareas deben ejecutarse y las asigna a los **workers** para su ejecución. 3. **Supervisión del estado de las tareas**: * Supervisa continuamente el estado de las tareas: `queued`, `running`, `success`, `failed`, etc. * Reintenta tareas en caso de fallos si está configurado. 4. **Ejecuta tareas programadas o manuales**: * Procesa ejecuciones automáticas basadas en el parámetro `schedule_interval` del DAG. * Maneja ejecuciones manuales iniciadas por usuarios desde la interfaz de usuario o la línea de comandos. ## **Flujo de trabajo del Scheduler** 1. **Carga de DAGs**: * El Scheduler analiza los DAGs definidos en los archivos de Python dentro del directorio especificado (`dags_folder`). * Verifica si hay nuevas ejecuciones pendientes basadas en las definiciones de los DAGs. 2. **Determinación de las tareas ejecutables**: * Evalúa las dependencias entre tareas para determinar cuáles están listas para ejecutarse. * Considera las configuraciones como: * `start_date`: Fecha desde la cual debe comenzar a ejecutarse el DAG. * `schedule_interval`: Frecuencia de ejecución del DAG. * `catchup`: Si debe ejecutarse para intervalos pasados o solo para el más reciente. 3. **Cola de tareas**: * Las tareas listas se colocan en una cola para ser recogidas por los workers disponibles. 4. **Monitoreo continuo**: * El Scheduler sigue monitoreando los DAGs y tareas para desencadenar nuevas ejecuciones y manejar fallos. ## **Configuración del Scheduler** El Scheduler puede configurarse desde el archivo `airflow.cfg` bajo la sección `[scheduler]`. Algunas opciones clave incluyen: 1. `scheduler_heartbeat_sec`: * Intervalo en segundos en que el Scheduler verifica el estado de los DAGs. 2. `min_file_process_interval`: * Tiempo mínimo entre análisis de los archivos del directorio de DAGs. 3. `num_runs`: * Número máximo de ejecuciones antes de reiniciar el proceso del Scheduler. 4. `max_threads`: * Número de subprocesos que puede utilizar el Scheduler para procesar tareas simultáneamente. 5. `dag_dir_list_interval`: * Intervalo para buscar cambios en el directorio de DAGs. ## **Inicio del Scheduler** El Scheduler se ejecuta como un servicio continuo que procesa los DAGs y las tareas. Para iniciar el Scheduler, puedes usar el siguiente comando en la línea de comandos: airflow scheduler Este comando: * Comienza a analizar los DAGs en el directorio especificado. * Gestiona las tareas programadas en función de sus dependencias y disponibilidad de recursos. ## **Ejemplo práctico** Supongamos que tienes un DAG que ejecuta una tarea diaria para procesar datos. El Scheduler: 1. Verifica si es necesario ejecutar el DAG en función de su `schedule_interval`. 2. Evalúa las dependencias entre tareas en el DAG. 3. Coloca las tareas listas en la cola. 4. Asigna las tareas a los workers disponibles para ejecutarlas. ### Código de ejemplo: from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime dag = DAG( dag\_id='daily\_processing', schedule\_interval='@daily', start\_date=datetime(2023, 1, 1), catchup=False, ) task = BashOperator( task\_id='process\_data', bash\_command='echo "Procesando datos..."', dag=dag, ) En este caso: * El Scheduler verificará diariamente si la tarea `process_data` debe ejecutarse. * Si está lista, la colocará en la cola para que un worker la procese. ## **Interacción con otros componentes** 1. **Workers**: * El Scheduler delega la ejecución de tareas a los workers disponibles. 2. **Base de datos metadata**: * El Scheduler consulta y actualiza continuamente el estado de los DAGs y tareas en la base de datos. 3. **Webserver**: * Permite a los usuarios monitorear y controlar las ejecuciones desde la interfaz web, interactuando indirectamente con el Scheduler. ## **Buenas prácticas para el Scheduler** 1. **Optimizar la cantidad de DAGs**: * Evita tener demasiados DAGs pequeños o con tareas innecesarias que sobrecarguen al Scheduler. 2. **Usar dependencias correctamente**: * Configura las dependencias de las tareas para evitar bloqueos innecesarios. 3. **Configurar el número adecuado de workers**: * Asegúrate de que los workers disponibles puedan procesar las tareas programadas eficientemente. 4. **Monitorear el rendimiento del Scheduler**: * Usa herramientas de monitoreo para verificar si el Scheduler está procesando los DAGs a tiempo y eficientemente. Con estas consideraciones, el Scheduler puede manejar flujos de trabajo complejos y escalar según las necesidades del proyecto.
¿El operador viene siendo un worker dentro del work flow?