Custom Operator

Clase 15 de 29 • Curso de Fundamentos de Apache Airflow

Clase anteriorSiguiente clase
    Luis Rivero

    Luis Rivero

    student•
    hace 8 meses

    Claro, aquí tienes tu texto reformateado para que se entienda mejor:

    Quise modularizar un poco y encontré en la documentación que los custom operators se pueden almacenar en la carpeta plugins.

    En mi caso, creé una carpeta llamada custom_operators y dentro de ella un archivo llamado HelloOperator.py, con la siguiente estructura:

    plugins/ custom_operators/ HelloOperator.py ```Al importarlo en el DAG, lo hacía de la siguiente forma: ```js from plugins.custom_operators.HelloOperator import HelloOperator

    ⚠️ ¡Cuidado con esto! Si importas de esa manera, te encontrarás con el siguiente error:

    Broken DAG: [/opt/airflow/dags/4-customoperator.py]

    Traceback (most recent call last): File "<frozen importlib._bootstrap>", line 488, in _call_with_frames_removed File "/opt/airflow/dags/4-customoperator.py", line 6, in <module> from plugins.custom_operators.HelloOperator import HelloOperator ModuleNotFoundError: No module named 'plugins'

    ✅ La forma correcta de importar es:

    from custom_operators.HelloOperator import HelloOperator

    Para más detalles, puedes revisar la documentación oficial:

    Creating a custom Operator — Airflow Documentation

    Mario Alexander Vargas Celis

    Mario Alexander Vargas Celis

    student•
    hace un año

    En Apache Airflow, un Custom Operator permite extender las funcionalidades de los operadores estándar definiendo uno propio. Esto es útil cuando necesitas realizar tareas específicas que no están cubiertas por los operadores existentes.

    Pasos para crear un Custom Operator

    1. Importar las clases necesarias:
      • BaseOperator: es la clase base para todos los operadores de Airflow.
      • apply_defaults: facilita el manejo de parámetros para el operador.
    2. Definir tu operador personalizado: Heredas de BaseOperator y defines la lógica principal en el método execute().
    3. Registrar parámetros: Puedes pasar parámetros personalizados al operador y utilizarlos en la ejecución.
    4. Usar el operador en un DAG: Una vez definido, el operador personalizado se utiliza como cualquier otro operador en un DAG.

    Ejemplo de un Custom Operator

    Este operador escribe un mensaje personalizado en un archivo de texto.

    Archivo del operador personalizado (custom_operator.py):

    from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults

    class CustomWriteOperator(BaseOperator): @apply_defaults def __init__(self, file_path: str, message: str, *args, **kwargs): super().__init__(*args, **kwargs) self.file_path = file_path self.message = message

    def execute(self, context): self.log.info("Escribiendo mensaje en el archivo...") with open(self.file_path, "w") as file: file.write(self.message) self.log.info(f"Mensaje escrito: {self.message}")

    Archivo del DAG (custom_operator_dag.py):

    from airflow import DAG from custom_operator import CustomWriteOperator # Importar el operador personalizado from datetime import datetime

    # Definir el DAG with DAG( dag_id="custom_operator_dag", description="Ejemplo de Custom Operator", start_date=datetime(2024, 11, 28), schedule_interval="@once", ) as dag: # Instancia del operador personalizado escribir_mensaje = CustomWriteOperator( task_id="escribir_mensaje", file_path="/tmp/mensaje.txt", message="¡Hola desde el operador personalizado!", )

    Explicación del código

    1. Clase CustomWriteOperator:
      • __init__: inicializa los parámetros personalizados (file_path y message).
      • execute: contiene la lógica principal que se ejecuta cuando el DAG corre.
    2. Archivo del DAG:
      • El DAG utiliza el operador personalizado CustomWriteOperator para escribir un mensaje en un archivo.
    3. Ejecución:
      • Cuando el DAG se ejecuta, el operador crea un archivo en /tmp/mensaje.txt y escribe el mensaje proporcionado.

    Pruebas del operador

    • Asegúrate de que el archivo custom_operator.py esté en la carpeta dags o en una ruta incluida en el PYTHONPATH.
    • Verifica el registro de logs en la interfaz de Airflow para confirmar la ejecución del operador.

    Aplicaciones de operadores personalizados

    • Automatización de tareas específicas como consultas API personalizadas.
    • Procesos únicos de transformación de datos.
    • Integraciones con herramientas o sistemas no soportados nativamente por Airflow.

    Esto te permite adaptar Airflow a las necesidades exactas de tus proyectos.

    Bruno Nicolás Barajas Correa

    Bruno Nicolás Barajas Correa

    student•
    hace un año

    En lo personal utilizó en mi empresa airflow y tengo en mente implementar uno, siempre en mis automatizaciones tenemos que registrar en nuestra base de datos cuando se ejecuta y el nombre del proceso que se ejecuta así que lo puedo utilizar para ese contexto.

    Jesús Zelaya Contreras

    Jesús Zelaya Contreras

    student•
    hace 2 años

    Me seria de ayuda un custom operator que en mi día a día realice limpieza de datos

    Carlos Eduardo Bracho Rosales

    Carlos Eduardo Bracho Rosales

    student•
    hace 2 años

    Creating a custom Operator

    david.parra

    david.parra

    student•
    hace 3 años

    Cuándo dices que los tasks_id deben ser únicos, esto es dentro de todo nuestro entorno de airflow o es únicamente dentro del DAG en el que estemos implementando ese task?

      Eric Bellet

      Eric Bellet

      teacher•
      hace 3 años

      Únicamente dentro del DAG. En el caso de todo Airflow, sería el dag_id, que sí tendría que ser único

    Ciro Villafraz

    Ciro Villafraz

    student•
    hace 3 años

    En que escenarios sería necesario hacer nuestro propio operator?

      Eric Bellet

      Eric Bellet

      teacher•
      hace 3 años

      Cuando no consigas ninguno ya hecho para lo que necesitas. O necesitas usar algo que es muy nuevo y no existe el operador

      Santiago García Rincón

      Santiago García Rincón

      student•
      hace 18 días

      Cuando usarlo ?

      • Misma lógica repetida en varios DAGs.
      • Integración con un servicio interno (manejo de errores/retentos estandar).
      • Validaciones/transformaciones de negocio que quieres estandarizar.

    Johan Fernando Astudillo

    Johan Fernando Astudillo

    student•
    hace 3 años

    LLegue a salvarles la vida perros

    CLASE "customoperator"

    from airflow import DAG from datetime import datetime from hellooperator import HelloOperator with DAG(dag_id="customoperator_test", description="Una dependencia", schedule_interval="@once", start_date =datetime(2022,12,6)) as dag: t1 = HelloOperator(task_id="hello", name="algún nombre")``` HelloOperator

    from airflow.models.baseoperator import BaseOperator

    class HelloOperator(BaseOperator):

    def __init__(self, name:str, **kwargs): super().__init__(**kwargs) self.name= name def execute(self, context): print(f"Hola {self.name}")
    José Alberto Ortiz Vargas

    José Alberto Ortiz Vargas

    student•
    hace 3 años

    ¿Existe algun operator de SQ L?

      Eric Bellet

      Eric Bellet

      teacher•
      hace 3 años

      Sí, existen varios dependiendo de la base de datos. Por ejemplo, PostresOperator

    Royer Guerrero Pinilla

    Royer Guerrero Pinilla

    student•
    hace 3 años

    ✨ Un buen truco para evitar que el dag no se ejecute desde la fecha de inicio hasta ahora es usar el parametro catchup=False

    Screenshot 2022-11-07 at 12.02.01 PM.png