Monitoring

Clase 18 de 29 • Curso de Fundamentos de Apache Airflow

Clase anteriorSiguiente clase
    Mario Alexander Vargas Celis

    Mario Alexander Vargas Celis

    student•
    hace un año

    Monitoring en la Orquestación de DAGs

    El monitoreo es una parte crucial para garantizar que tus flujos de trabajo (DAGs) se ejecuten de manera eficiente, manejando fallos y obteniendo visibilidad en tiempo real de su estado. En el contexto de herramientas como Apache Airflow, aquí tienes las mejores prácticas y herramientas para el monitoreo efectivo:

    1. Interfaz Web

    La interfaz web de Airflow es la herramienta principal para el monitoreo visual de DAGs:

    • Vista de DAGs:
      • Observa el estado general de todos los DAGs.
      • Muestra colores para representar el estado de las tareas:
        • Verde: Éxito
        • Rojo: Fallo
        • Amarillo: En ejecución
        • Gris: Sin ejecutar
    • Vista de Gantt:
      • Proporciona un análisis temporal de las tareas ejecutadas.
      • Ayuda a identificar cuellos de botella.
    • Vista de Logs:
      • Para cada tarea, puedes acceder a los registros de ejecución.
      • Ideal para depurar errores o evaluar tiempos de ejecución.

    2. Alertas y Notificaciones

    Configura alertas automáticas para informar sobre fallos o eventos clave:

    • Notificaciones por Correo Electrónico: Configura email_on_failure o email_on_retry en las tareas:default_args = { 'email': ['mario.vargas@example.com'], 'email_on_failure': True, 'email_on_retry': False, }

    • Callbacks Personalizados: Usa on_failure_callback o on_success_callback para realizar acciones específicas, como enviar un mensaje a Slack o registrar errores en un sistema externo:def notificar_error(context): print(f"Tarea fallida: {context['task_instance'].task_id}")

      tarea = PythonOperator( task_id='mi_tarea', python_callable=mi_funcion, on_failure_callback=notificar_error, )

    3. Métricas y Logs Centralizados

    Integra Airflow con sistemas externos para recolectar y visualizar métricas:

    • Prometheus y Grafana:
      • Configura el exportador Prometheus para Airflow.
      • Visualiza métricas como:
        • Número de tareas completadas.
        • Tiempos promedio de ejecución.
        • Tareas fallidas por DAG.
    • Elasticsearch:
      • Centraliza los logs de ejecución para búsquedas y análisis más eficientes.

    4. Manejo de Retries y Fallos

    Supervisa y ajusta las políticas de reintentos en tareas problemáticas:

    • Configurar Retries:tarea = PythonOperator( task_id='mi_tarea', python_callable=mi_funcion, retries=3, retry_delay=timedelta(minutes=5), )

    • Resúmenes de Errores: La interfaz web permite acceder a listas de tareas fallidas para análisis detallado.

    5. Auditorías y Seguimiento Histórico

    Monitorea cómo ha evolucionado el rendimiento de tus DAGs a lo largo del tiempo:

    • Historial de Ejecuciones: Usa la vista "Tree View" o "Graph View" para ver el historial y patrones de fallos o ejecuciones exitosas.
    • Exportar Logs: Guarda los registros para auditorías externas:airflow tasks logs dag_id task_id execution_date > log.txt

    6. Optimización Basada en Monitoreo

    Identifica cuellos de botella y optimiza el rendimiento:

    • Observa tareas que consumen mucho tiempo y evalúa su paralelización.
    • Usa sensores de manera eficiente, evitando bloqueos prolongados.
    • Configura límites de concurrencia y priorización de tareas.

    7. Integración con Herramientas Externas

    • Slack: Notifica fallos directamente a un canal de Slack.
    • PagerDuty: Alerta en caso de errores críticos en tiempo real.
    • AWS CloudWatch (si se ejecuta en AWS): Monitorea recursos y ejecuta acciones automáticas en función del uso.
    David Quintanar Pérez

    David Quintanar Pérez

    student•
    hace 2 años

    ¿Por qué cuando ejecuta el primer clear, ejecuta las tareas de la primera fecha y cuando ejecuta el segundo clear, no solo limpia la ejecución de esa fecha, también las consecutivas?

      Eric Bellet

      Eric Bellet

      teacher•
      hace 2 años

      Porque han empezado nuevas ejecuciones

    Cesar Payro

    Cesar Payro

    student•
    hace 3 años

    no supe que paso con los 'amarillos' que van despues de un fallo? se corrieron o no? uno pensaria que no porque 'dependen' de la tarea anterior no? solo queria confirmar..

      Eric Bellet

      Eric Bellet

      teacher•
      hace 3 años

      Es exactamente eso

    Stanley Melgar

    Stanley Melgar

    student•
    hace 3 años

    Aunque no lo parezca, esta es una de las clases más importantes. De nada sirve saber como construir Operators y orquestar DAG's si cuando algo falla no lo sabemos solventar ~

    Sebastián Salas

    Sebastián Salas

    student•
    hace 3 años
    from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator from datetime import datetime def myfunction(): pass with DAG(dag_id="6.1-monitoring", description="Monitoreando nuestro DAG", schedule_interval="@daily", start_date=datetime(2022, 1, 1), end_date=datetime(2022, 2, 1)) as dag: t1 = BashOperator(task_id="tarea1", bash_command="sleep 2 && echo 'Primera tarea!'") t2 = BashOperator(task_id="tarea2", bash_command="sleep 2 && echo 'Segunda tarea!'") t3 = BashOperator(task_id="tarea3", bash_command="sleep 2 && echo 'Tercera tarea!'") t4 = PythonOperator(task_id="tarea4", python_callable=myfunction) t5 = BashOperator(task_id="tarea5", bash_command="sleep 2 && echo 'Quinta tarea!'") t1 >> t2 >> t3 >> t4 >> t5
      Alfonso Andres Zapata Guzman

      Alfonso Andres Zapata Guzman

      student•
      hace 2 años

      Gracias por tus aportes man.


      ~ Que tal Platzinauta, ya conectamos en LinkedIn? ~

      ¡Que estas esperando! Conectemos en LinkedIn, GitHub, Medium o Redes sociales