Scheduler

Clase 7 de 29 • Curso de Fundamentos de Apache Airflow

Clase anteriorSiguiente clase
    Mario Alexander Vargas Celis

    Mario Alexander Vargas Celis

    student•
    hace un año

    El Scheduler en Apache Airflow es el componente central encargado de gestionar la ejecución de los DAGs (Directed Acyclic Graphs) y las tareas definidas en ellos. Es responsable de planificar, desencadenar y supervisar las tareas según las dependencias y los intervalos de tiempo especificados.

    Funciones principales del Scheduler

    1. Planificación de DAGs:
      • Detecta automáticamente los DAGs disponibles y calcula los intervalos de ejecución para ellos.
      • Identifica las tareas que están listas para ejecutarse según sus dependencias y programación.
    2. Asignación de tareas a los workers:
      • Determina qué tareas deben ejecutarse y las asigna a los workers para su ejecución.
    3. Supervisión del estado de las tareas:
      • Supervisa continuamente el estado de las tareas: queued, running, success, failed, etc.
      • Reintenta tareas en caso de fallos si está configurado.
    4. Ejecuta tareas programadas o manuales:
      • Procesa ejecuciones automáticas basadas en el parámetro schedule_interval del DAG.
      • Maneja ejecuciones manuales iniciadas por usuarios desde la interfaz de usuario o la línea de comandos.

    Flujo de trabajo del Scheduler

    1. Carga de DAGs:
      • El Scheduler analiza los DAGs definidos en los archivos de Python dentro del directorio especificado (dags_folder).
      • Verifica si hay nuevas ejecuciones pendientes basadas en las definiciones de los DAGs.
    2. Determinación de las tareas ejecutables:
      • Evalúa las dependencias entre tareas para determinar cuáles están listas para ejecutarse.
      • Considera las configuraciones como:
        • start_date: Fecha desde la cual debe comenzar a ejecutarse el DAG.
        • schedule_interval: Frecuencia de ejecución del DAG.
        • catchup: Si debe ejecutarse para intervalos pasados o solo para el más reciente.
    3. Cola de tareas:
      • Las tareas listas se colocan en una cola para ser recogidas por los workers disponibles.
    4. Monitoreo continuo:
      • El Scheduler sigue monitoreando los DAGs y tareas para desencadenar nuevas ejecuciones y manejar fallos.

    Configuración del Scheduler

    El Scheduler puede configurarse desde el archivo airflow.cfg bajo la sección [scheduler]. Algunas opciones clave incluyen:

    1. scheduler_heartbeat_sec:
      • Intervalo en segundos en que el Scheduler verifica el estado de los DAGs.
    2. min_file_process_interval:
      • Tiempo mínimo entre análisis de los archivos del directorio de DAGs.
    3. num_runs:
      • Número máximo de ejecuciones antes de reiniciar el proceso del Scheduler.
    4. max_threads:
      • Número de subprocesos que puede utilizar el Scheduler para procesar tareas simultáneamente.
    5. dag_dir_list_interval:
      • Intervalo para buscar cambios en el directorio de DAGs.

    Inicio del Scheduler

    El Scheduler se ejecuta como un servicio continuo que procesa los DAGs y las tareas. Para iniciar el Scheduler, puedes usar el siguiente comando en la línea de comandos:

    airflow scheduler

    Este comando:

    • Comienza a analizar los DAGs en el directorio especificado.
    • Gestiona las tareas programadas en función de sus dependencias y disponibilidad de recursos.

    Ejemplo práctico

    Supongamos que tienes un DAG que ejecuta una tarea diaria para procesar datos. El Scheduler:

    1. Verifica si es necesario ejecutar el DAG en función de su schedule_interval.
    2. Evalúa las dependencias entre tareas en el DAG.
    3. Coloca las tareas listas en la cola.
    4. Asigna las tareas a los workers disponibles para ejecutarlas.

    Código de ejemplo:

    from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime

    dag = DAG( dag_id='daily_processing', schedule_interval='@daily', start_date=datetime(2023, 1, 1), catchup=False, )

    task = BashOperator( task_id='process_data', bash_command='echo "Procesando datos..."', dag=dag, )

    En este caso:

    • El Scheduler verificará diariamente si la tarea process_data debe ejecutarse.
    • Si está lista, la colocará en la cola para que un worker la procese.

    Interacción con otros componentes

    1. Workers:
      • El Scheduler delega la ejecución de tareas a los workers disponibles.
    2. Base de datos metadata:
      • El Scheduler consulta y actualiza continuamente el estado de los DAGs y tareas en la base de datos.
    3. Webserver:
      • Permite a los usuarios monitorear y controlar las ejecuciones desde la interfaz web, interactuando indirectamente con el Scheduler.

    Buenas prácticas para el Scheduler

    1. Optimizar la cantidad de DAGs:
      • Evita tener demasiados DAGs pequeños o con tareas innecesarias que sobrecarguen al Scheduler.
    2. Usar dependencias correctamente:
      • Configura las dependencias de las tareas para evitar bloqueos innecesarios.
    3. Configurar el número adecuado de workers:
      • Asegúrate de que los workers disponibles puedan procesar las tareas programadas eficientemente.
    4. Monitorear el rendimiento del Scheduler:
      • Usa herramientas de monitoreo para verificar si el Scheduler está procesando los DAGs a tiempo y eficientemente.

    Con estas consideraciones, el Scheduler puede manejar flujos de trabajo complejos y escalar según las necesidades del proyecto.

    Camilo Corredor

    Camilo Corredor

    student•
    hace un año

    ¿El operador viene siendo un worker dentro del work flow?

    Sergio Waldemar Garzón Mariño

    Sergio Waldemar Garzón Mariño

    student•
    hace 2 años

    Vengo de trabajar mucho con Control-M, es muy parecido en cuando al core de la aplicación. Airflow cuenta con la ventaja de programar en Python, en cambio, para programar los jobs en Control-M usamos XML o la interfaz gráfica.

    Royer Guerrero Pinilla

    Royer Guerrero Pinilla

    student•
    hace 3 años

    ✨ Existen 2 maneras de definir esto

    1. Usando expresiones CRON
    # ┌─────── minute (0 - 59) # │ ┌────── hour (0 - 23) # │ │ ┌───── day of the month (1 - 31) # │ │ │ ┌───── month (1 - 12) # │ │ │ │ ┌──── day of the week (0 - 6) (Sunday to Saturday; # │ │ │ │ │ 7 is also Sunday on some systems) # * * * * *
    1. Usando las opciones pre-configuradas @once Una y solo una vez @hourly Cada hora al comienzo de la hora @dialy Diariamente a medianoche @weekly Una vez a la semana el domingo @monthly Cada primer dia del mes a medianoche @yearly Cada 1 de Enero a medianoche

    ⚠️ Hay que tener cuidado porque el la vista de grid se organiza por fecha de programación schedule date y no por la fecha de ejecución execution_date lo que puede confundir y se puede llegar a pensar que no corrió

    Mauro Ezequiel Bravo

    Mauro Ezequiel Bravo

    student•
    hace 3 años

    DAG => Flujo de datos TASK => Componentes dentro del DAG que se puede anidar con otros TASKs. Operator => Objeto con el que se crea la TASK. Scheduler => Encargado de ejecutar los DAGs. Cuando ejecuta, en que intevalo de tiempo, etc.