No tienes acceso a esta clase

¡Continúa aprendiendo! Únete y comienza a potenciar tu carrera

No se trata de lo que quieres comprar, sino de quién quieres ser. Aprovecha el precio especial.

Antes: $249

Currency
$209

Paga en 4 cuotas sin intereses

Paga en 4 cuotas sin intereses
Suscríbete

Termina en:

14 Días
7 Hrs
2 Min
49 Seg

Custom Operator

15/29
Recursos

Aportes 6

Preguntas 3

Ordenar por:

¿Quieres ver más aportes, preguntas y respuestas de la comunidad?

✨ Un buen truco para evitar que el dag no se ejecute desde la fecha de inicio hasta ahora es usar el parametro catchup=False

LLegue a salvarles la vida perros

CLASE “customoperator”

from airflow import DAG
from datetime import datetime
from hellooperator import HelloOperator

with DAG(dag_id="customoperator_test",
         description="Una dependencia",
         schedule_interval="@once",
         start_date =datetime(2022,12,6)) as dag:


    t1 = HelloOperator(task_id="hello",
                       name="algún nombre")```


HelloOperator

from airflow.models.baseoperator import BaseOperator

class HelloOperator(BaseOperator):

def __init__(self, name:str, **kwargs):
    super().__init__(**kwargs)

    self.name= name

def execute(self, context):
    print(f"Hola {self.name}")

Me seria de ayuda un custom operator que en mi día a día realice limpieza de datos
En Apache Airflow, un **Custom Operator** permite extender las funcionalidades de los operadores estándar definiendo uno propio. Esto es útil cuando necesitas realizar tareas específicas que no están cubiertas por los operadores existentes. ### Pasos para crear un Custom Operator 1. **Importar las clases necesarias:** * `BaseOperator`: es la clase base para todos los operadores de Airflow. * `apply_defaults`: facilita el manejo de parámetros para el operador. 2. **Definir tu operador personalizado:** Heredas de `BaseOperator` y defines la lógica principal en el método `execute()`. 3. **Registrar parámetros:** Puedes pasar parámetros personalizados al operador y utilizarlos en la ejecución. 4. **Usar el operador en un DAG:** Una vez definido, el operador personalizado se utiliza como cualquier otro operador en un DAG. ### Ejemplo de un Custom Operator Este operador escribe un mensaje personalizado en un archivo de texto. #### Archivo del operador personalizado (`custom_operator.py`): from airflow.models import BaseOperator from airflow.utils.decorators import apply\_defaults class CustomWriteOperator(BaseOperator): @apply\_defaults def \_\_init\_\_(self, file\_path: str, message: str, \*args, \*\*kwargs): super().\_\_init\_\_(\*args, \*\*kwargs) self.file\_path = file\_path self.message = message def execute(self, context): self.log.info("Escribiendo mensaje en el archivo...") with open(self.file\_path, "w") as file: file.write(self.message) self.log.info(f"Mensaje escrito: {self.message}") #### Archivo del DAG (`custom_operator_dag.py`): from airflow import DAG from custom\_operator import CustomWriteOperator # Importar el operador personalizado from datetime import datetime \# Definir el DAG with DAG( dag\_id="custom\_operator\_dag", description="Ejemplo de Custom Operator", start\_date=datetime(2024, 11, 28), schedule\_interval="@once", ) as dag: \# Instancia del operador personalizado escribir\_mensaje = CustomWriteOperator( task\_id="escribir\_mensaje", file\_path="/tmp/mensaje.txt", message="¡Hola desde el operador personalizado!", ) ### Explicación del código 1. **Clase** `CustomWriteOperator`**:** * `__init__`: inicializa los parámetros personalizados (`file_path` y `message`). * `execute`: contiene la lógica principal que se ejecuta cuando el DAG corre. 2. **Archivo del DAG:** * El DAG utiliza el operador personalizado `CustomWriteOperator` para escribir un mensaje en un archivo. 3. **Ejecución:** * Cuando el DAG se ejecuta, el operador crea un archivo en `/tmp/mensaje.txt` y escribe el mensaje proporcionado. ### Pruebas del operador * Asegúrate de que el archivo `custom_operator.py` esté en la carpeta `dags` o en una ruta incluida en el `PYTHONPATH`. * Verifica el registro de logs en la interfaz de Airflow para confirmar la ejecución del operador. ### Aplicaciones de operadores personalizados * Automatización de tareas específicas como consultas API personalizadas. * Procesos únicos de transformación de datos. * Integraciones con herramientas o sistemas no soportados nativamente por Airflow. Esto te permite adaptar Airflow a las necesidades exactas de tus proyectos.
En lo personal utilizó en mi empresa airflow y tengo en mente implementar uno, siempre en mis automatizaciones tenemos que registrar en nuestra base de datos cuando se ejecuta y el nombre del proceso que se ejecuta así que lo puedo utilizar para ese contexto.