Python Operator
Clase 13 de 29 • Curso de Fundamentos de Apache Airflow
Contenido del curso
Clase 13 de 29 • Curso de Fundamentos de Apache Airflow
Contenido del curso
Jonathan Villavicencio Gonzalez
Elias Dudamel
Sebastián Salas
Héctor Javier Herrera Espínola
Eric Bellet
Johan Fernando Astudillo
Cecilia Gabriela Rodríguez Flores
Álvaro José Polo Franco
Martin Macchia
Leidy Johana Alarcon Moya
Elitsoft Chile
William Yesid Cuesta Cardona
Daniel Felipe Zuluaga Cárdenas
Freddy Norberto Montañez Gordillo
Gerardo Mayel Fernández Alamilla
Camilo Andrés Rodriguez Higuera
Julen Alvaro
Julen Alvaro
Andres Troaños
Eric Bellet
Marlon Ricra
Eric Bellet
Jose Daniel Velasquez H
rogelio cortez
This is wonderful!
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def print_hello(): print('Hello people from python funtion') with DAG( dag_id='python_operator', description='Fisrt dag using python operator', schedule_interval='@once', start_date=datetime(2022,8,1) ) as dag: t1 = PythonOperator( task_id = 'hello_with_python', python_callable = print_hello ) ```
Me gusta la parte de que se se sigan podiendo reutilizar funciones, dejo ejemplo
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def print_hello(country, **kwargs): print(f'I am processing this {country}') with DAG(dag_id='pythonoperator' , description='Utilizando python operator' , start_date=datetime(2023, 6, 7) , schedule_interval='@once') as dag: t1 = PythonOperator(task_id='process_ar' , python_callable=print_hello , op_kwargs={'country':'AR'}) t2 = PythonOperator(task_id='process_uy' , python_callable=print_hello , op_kwargs={'country':'UY'}) t1 >> t2
Me sale este error: Probé instalar de mil maneras, en todas igual:
No me sale el error en tu pregunta
from airflow import DAG from datetime import datetime from airflow.operators.python import PythonOperator def print_hello(): print("HELLO PAPUS") with DAG(dag_id="pythonoperator", description="Nuestro primer DAG utilizando python ope", schedule_interval="@once", start_date =datetime(2022,12,6)) as dag: t1 = PythonOperator(task_id="hello_with_python", python_callable=print_hello) t1 ```
from airflow import DAGfrom airflow.operators.python import PythonOperatorfrom datetime import datetime def print_hello(): print("Hola de pruebas.") with DAG(dag_id="pythonoperator", description="Nuestro primer DAG utilizando python operator", schedule_interval="@once", start_date=datetime(2022, 8, 1)) as dag: t1 = PythonOperator( task_id="hello_with_python", python_callable=print_hello ) t1
:)
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def print_hello(): return print('hello world with python in airflow') with DAG(dag_id='primer_dag_python', schedule_interval='@once', start_date=datetime(2022, 7, 1)) as dag: t1 = PythonOperator(task_id='primer_task_python', python_callable=print_hello)
El resultado
Hola, me funciona todo pero de una manera muy extraña:
Por un lado no encuentra nada que esté en airflow.operators y no es de extrañar, porque la carpeta "plugins" está vacía. El la carpeta gemela /opt/airflow/plugins dentro de airflow-airflow-webserver-1 tampoco he encontrado nada. Luego tarda minutos en subirse al localhost y en aparecer en la lista de DAGs (igual es que ando algo justo de RAM en local, tengo la VM de docker capada a 4GB que debería de ser suficiente). Lo curioso es que sin embargo el .log aparece:
Alguien tiene alguna explicación de por qué funciona a pesar de que parece que los archivos .py no tienen acceso a airflow.operators?
lo de que iba lento ya lo he solucionado, le faltaba RAM a Docker. He modificado el archivo .wslconfig que se encuentra en la carpeta del usuario de windows y que controla los recursos que se le asignan a Docker. Ahora lo que me va lento es el resto del Sistema Operativo, pero qué le vamos a hacer:
# Settings apply across all Linux distros running on WSL 2 [wsl2] # Limits VM memory to use no more than 5 GB, this can be set as whole numbers using GB or MB memory=5GB # Sets the VM to use two virtual processors processors=2
Lo de que el directorio plugins siga vacío y aún así el programa corra sigo sin explicármelo, todas esas referencias deben de estar en algún lugar en los contenedores de docker pero no las he podido encontrar manualmente, desde luego el directorio '/opt/airflow/plugins' de airflow-airflow-webserver-1 está vacío.
Ejecute ambos dags y ninguno de los dos me da la opción del pequeño cuadro verder y ambos aparecen como success, tampoco puedo ver lo logs
Alguien sabe que debo hacer o que sucede?
Me muestras el código?
Como ejecuto un script de python, que este fuera del archivo del DAG; es decir un archivo externo
Acá el desafío es, ¿cómo conectas tu máquina con Airflow a ese fichero? Hay varias maneras, puedes hacerlo con un SSHExecuteOperator, o montas una API, rtc
from airflow import DAGfrom datetime import datetimefrom airflow.operators.python import PythonOperator
#If you need access to Airflow context variables (e.g., ds, run_id), you must include **kwargs.def hello(country): #**kwaargs): print(f"Hello {country}") with DAG (dag_id="pythonOperator", description="Python Operator DAG", start_date=datetime(2025, 3, 27), schedule="@once") as dag: t1 = PythonOperator(task_id='taskPython', python_callable=hello,op_args=["Mexico"]) t2 = PythonOperator(task_id='taskPython2', python_callable=hello,op_kwargs={'countrty':"France"}) # op_args = you can pass a list of arguments to the function. if you have more than one argument you must pass the arguments in order in a list# op_kwargs = you can pass a dictionary where the keys are the name of the argumnet ant the values are the values you want to pass. if you have more than one argument you can pass them in any order.