Royer Guerrero Pinilla
EstudianteJohan Fernando Astudillo
EstudianteCiro Villafraz
EstudianteEric Bellet
ProfesorSantiago García Rincón
EstudianteMario Alexander Vargas Celis
EstudianteCarlos Eduardo Bracho Rosales
Estudiantedavid.parra
EstudianteEric Bellet
ProfesorLuis Rivero
EstudianteJesús Zelaya Contreras
EstudianteJosé Alberto Ortiz Vargas
EstudianteEric Bellet
ProfesorBruno Nicolás Barajas Correa
Estudiante✨ Un buen truco para evitar que el dag no se ejecute desde la fecha de inicio hasta ahora es usar el parametro catchup=False
LLegue a salvarles la vida perros
CLASE "customoperator"
from airflow import DAG from datetime import datetime from hellooperator import HelloOperator with DAG(dag_id="customoperator_test", description="Una dependencia", schedule_interval="@once", start_date =datetime(2022,12,6)) as dag: t1 = HelloOperator(task_id="hello", name="algún nombre")``` HelloOperator
from airflow.models.baseoperator import BaseOperator
class HelloOperator(BaseOperator):
def __init__(self, name:str, **kwargs): super().__init__(**kwargs) self.name= name def execute(self, context): print(f"Hola {self.name}")
En que escenarios sería necesario hacer nuestro propio operator?
Cuando no consigas ninguno ya hecho para lo que necesitas. O necesitas usar algo que es muy nuevo y no existe el operador
Cuando usarlo ?
En Apache Airflow, un Custom Operator permite extender las funcionalidades de los operadores estándar definiendo uno propio. Esto es útil cuando necesitas realizar tareas específicas que no están cubiertas por los operadores existentes.
Pasos para crear un Custom Operator
BaseOperator: es la clase base para todos los operadores de Airflow.apply_defaults: facilita el manejo de parámetros para el operador.BaseOperator y defines la lógica principal en el método execute().Ejemplo de un Custom Operator
Este operador escribe un mensaje personalizado en un archivo de texto.
Archivo del operador personalizado (custom_operator.py):
from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults
class CustomWriteOperator(BaseOperator): @apply_defaults def __init__(self, file_path: str, message: str, *args, **kwargs): super().__init__(*args, **kwargs) self.file_path = file_path self.message = message
def execute(self, context): self.log.info("Escribiendo mensaje en el archivo...") with open(self.file_path, "w") as file: file.write(self.message) self.log.info(f"Mensaje escrito: {self.message}")
Archivo del DAG (custom_operator_dag.py):
from airflow import DAG from custom_operator import CustomWriteOperator # Importar el operador personalizado from datetime import datetime
# Definir el DAG with DAG( dag_id="custom_operator_dag", description="Ejemplo de Custom Operator", start_date=datetime(2024, 11, 28), schedule_interval="@once", ) as dag: # Instancia del operador personalizado escribir_mensaje = CustomWriteOperator( task_id="escribir_mensaje", file_path="/tmp/mensaje.txt", message="¡Hola desde el operador personalizado!", )
Explicación del código
CustomWriteOperator:
__init__: inicializa los parámetros personalizados (file_path y message).execute: contiene la lógica principal que se ejecuta cuando el DAG corre.CustomWriteOperator para escribir un mensaje en un archivo./tmp/mensaje.txt y escribe el mensaje proporcionado.Pruebas del operador
custom_operator.py esté en la carpeta dags o en una ruta incluida en el PYTHONPATH.Aplicaciones de operadores personalizados
Esto te permite adaptar Airflow a las necesidades exactas de tus proyectos.
Creating a custom Operator
Cuándo dices que los tasks_id deben ser únicos, esto es dentro de todo nuestro entorno de airflow o es únicamente dentro del DAG en el que estemos implementando ese task?
Únicamente dentro del DAG. En el caso de todo Airflow, sería el dag_id, que sí tendría que ser único
Claro, aquí tienes tu texto reformateado para que se entienda mejor:
Quise modularizar un poco y encontré en la documentación que los custom operators se pueden almacenar en la carpeta plugins.
En mi caso, creé una carpeta llamada custom_operators y dentro de ella un archivo llamado HelloOperator.py, con la siguiente estructura:
plugins/ custom_operators/ HelloOperator.py ```Al importarlo en el DAG, lo hacía de la siguiente forma: ```js from plugins.custom_operators.HelloOperator import HelloOperator
⚠️ ¡Cuidado con esto! Si importas de esa manera, te encontrarás con el siguiente error:
Broken DAG: [/opt/airflow/dags/4-customoperator.py]
Traceback (most recent call last): File "<frozen importlib._bootstrap>", line 488, in _call_with_frames_removed File "/opt/airflow/dags/4-customoperator.py", line 6, in <module> from plugins.custom_operators.HelloOperator import HelloOperator ModuleNotFoundError: No module named 'plugins'
✅ La forma correcta de importar es:
from custom_operators.HelloOperator import HelloOperator
Para más detalles, puedes revisar la documentación oficial:
Creating a custom Operator — Airflow Documentation
Me seria de ayuda un custom operator que en mi día a día realice limpieza de datos
¿Existe algun operator de SQ L?
Sí, existen varios dependiendo de la base de datos. Por ejemplo, PostresOperator
En lo personal utilizó en mi empresa airflow y tengo en mente implementar uno, siempre en mis automatizaciones tenemos que registrar en nuestra base de datos cuando se ejecuta y el nombre del proceso que se ejecuta así que lo puedo utilizar para ese contexto.