No tienes acceso a esta clase

¡Continúa aprendiendo! Únete y comienza a potenciar tu carrera

Monitoring

18/29
Recursos

Aportes 4

Preguntas 2

Ordenar por:

¿Quieres ver más aportes, preguntas y respuestas de la comunidad?

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import datetime


def myfunction():
    pass

with DAG(dag_id="6.1-monitoring",
        description="Monitoreando nuestro DAG",
        schedule_interval="@daily",
        start_date=datetime(2022, 1, 1),
        end_date=datetime(2022, 2, 1)) as dag:


    t1 = BashOperator(task_id="tarea1",
                    bash_command="sleep 2 && echo 'Primera tarea!'")

    t2 = BashOperator(task_id="tarea2",
                    bash_command="sleep 2 && echo 'Segunda tarea!'")

    t3 = BashOperator(task_id="tarea3",
                    bash_command="sleep 2 && echo 'Tercera tarea!'")

    t4 = PythonOperator(task_id="tarea4",
                    python_callable=myfunction)

    t5 = BashOperator(task_id="tarea5",
                    bash_command="sleep 2 && echo 'Quinta tarea!'")


    t1 >> t2 >> t3 >> t4 >> t5

Aunque no lo parezca, esta es una de las clases más importantes. De nada sirve saber como construir Operators y orquestar DAG's si cuando algo falla no lo sabemos solventar ~

### **Monitoring en la Orquestación de DAGs** El monitoreo es una parte crucial para garantizar que tus flujos de trabajo (DAGs) se ejecuten de manera eficiente, manejando fallos y obteniendo visibilidad en tiempo real de su estado. En el contexto de herramientas como Apache Airflow, aquí tienes las mejores prácticas y herramientas para el monitoreo efectivo: ### **1. Interfaz Web** La interfaz web de Airflow es la herramienta principal para el monitoreo visual de DAGs: * **Vista de DAGs**: * Observa el estado general de todos los DAGs. * Muestra colores para representar el estado de las tareas: * Verde: Éxito * Rojo: Fallo * Amarillo: En ejecución * Gris: Sin ejecutar * **Vista de Gantt**: * Proporciona un análisis temporal de las tareas ejecutadas. * Ayuda a identificar cuellos de botella. * **Vista de Logs**: * Para cada tarea, puedes acceder a los registros de ejecución. * Ideal para depurar errores o evaluar tiempos de ejecución. ### **2. Alertas y Notificaciones** Configura alertas automáticas para informar sobre fallos o eventos clave: * **Notificaciones por Correo Electrónico**: Configura `email_on_failure` o `email_on_retry` en las tareas:default\_args = { 'email': \['[email protected]'], 'email\_on\_failure': True, 'email\_on\_retry': False, } * **Callbacks Personalizados**: Usa `on_failure_callback` o `on_success_callback` para realizar acciones específicas, como enviar un mensaje a Slack o registrar errores en un sistema externo:def notificar\_error(context): print(f"Tarea fallida: {context\['task\_instance'].task\_id}") tarea = PythonOperator( task\_id='mi\_tarea', python\_callable=mi\_funcion, on\_failure\_callback=notificar\_error, ) ### **3. Métricas y Logs Centralizados** Integra Airflow con sistemas externos para recolectar y visualizar métricas: * **Prometheus y Grafana**: * Configura el **exportador Prometheus** para Airflow. * Visualiza métricas como: * Número de tareas completadas. * Tiempos promedio de ejecución. * Tareas fallidas por DAG. * **Elasticsearch**: * Centraliza los logs de ejecución para búsquedas y análisis más eficientes. ### **4. Manejo de Retries y Fallos** Supervisa y ajusta las políticas de reintentos en tareas problemáticas: * **Configurar Retries**:tarea = PythonOperator( task\_id='mi\_tarea', python\_callable=mi\_funcion, retries=3, retry\_delay=timedelta(minutes=5), ) * **Resúmenes de Errores**: La interfaz web permite acceder a listas de tareas fallidas para análisis detallado. ### **5. Auditorías y Seguimiento Histórico** Monitorea cómo ha evolucionado el rendimiento de tus DAGs a lo largo del tiempo: * **Historial de Ejecuciones**: Usa la vista "Tree View" o "Graph View" para ver el historial y patrones de fallos o ejecuciones exitosas. * **Exportar Logs**: Guarda los registros para auditorías externas:airflow tasks logs dag\_id task\_id execution\_date > log.txt ### **6. Optimización Basada en Monitoreo** Identifica cuellos de botella y optimiza el rendimiento: * Observa tareas que consumen mucho tiempo y evalúa su paralelización. * Usa sensores de manera eficiente, evitando bloqueos prolongados. * Configura límites de concurrencia y priorización de tareas. ### **7. Integración con Herramientas Externas** * **Slack**: Notifica fallos directamente a un canal de Slack. * **PagerDuty**: Alerta en caso de errores críticos en tiempo real. * **AWS CloudWatch** (si se ejecuta en AWS): Monitorea recursos y ejecuta acciones automáticas en función del uso.
¿Por qué cuando ejecuta el primer clear, ejecuta las tareas de la primera fecha y cuando ejecuta el segundo clear, no solo limpia la ejecución de esa fecha, también las consecutivas?