Jonathan Villavicencio Gonzalez
EstudianteElias Dudamel
EstudianteSebastián Salas
EstudianteHéctor Javier Herrera Espínola
EstudianteEric Bellet
ProfesorJohan Fernando Astudillo
EstudianteCecilia Gabriela Rodríguez Flores
EstudianteÁlvaro José Polo Franco
EstudianteMartin Macchia
EstudianteLeidy Johana Alarcon Moya
EstudianteElitsoft Chile
EstudianteWilliam Yesid Cuesta Cardona
EstudianteDaniel Felipe Zuluaga Cárdenas
EstudianteFreddy Norberto Montañez Gordillo
EstudianteGerardo Mayel Fernández Alamilla
EstudianteJavier Ladino
EstudianteGeisemberg Marcos Zamora Yuave
EstudianteCamilo Andrés Rodriguez Higuera
EstudianteJulen Alvaro
EstudianteJulen Alvaro
EstudianteAndres Troaños
EstudianteEric Bellet
ProfesorMarlon Ricra
EstudianteEric Bellet
ProfesorThis is wonderful!
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def print_hello(): print('Hello people from python funtion') with DAG( dag_id='python_operator', description='Fisrt dag using python operator', schedule_interval='@once', start_date=datetime(2022,8,1) ) as dag: t1 = PythonOperator( task_id = 'hello_with_python', python_callable = print_hello ) ```
Me gusta la parte de que se se sigan podiendo reutilizar funciones, dejo ejemplo
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def print_hello(country, **kwargs): print(f'I am processing this {country}') with DAG(dag_id='pythonoperator' , description='Utilizando python operator' , start_date=datetime(2023, 6, 7) , schedule_interval='@once') as dag: t1 = PythonOperator(task_id='process_ar' , python_callable=print_hello , op_kwargs={'country':'AR'}) t2 = PythonOperator(task_id='process_uy' , python_callable=print_hello , op_kwargs={'country':'UY'}) t1 >> t2
Me sale este error: Probé instalar de mil maneras, en todas igual:
No me sale el error en tu pregunta
from airflow import DAG from datetime import datetime from airflow.operators.python import PythonOperator def print_hello(): print("HELLO PAPUS") with DAG(dag_id="pythonoperator", description="Nuestro primer DAG utilizando python ope", schedule_interval="@once", start_date =datetime(2022,12,6)) as dag: t1 = PythonOperator(task_id="hello_with_python", python_callable=print_hello) t1 ```
from airflow import DAGfrom airflow.operators.python import PythonOperatorfrom datetime import datetime def print_hello(): print("Hola de pruebas.") with DAG(dag_id="pythonoperator", description="Nuestro primer DAG utilizando python operator", schedule_interval="@once", start_date=datetime(2022, 8, 1)) as dag: t1 = PythonOperator( task_id="hello_with_python", python_callable=print_hello ) t1
:)
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def print_hello(): return print('hello world with python in airflow') with DAG(dag_id='primer_dag_python', schedule_interval='@once', start_date=datetime(2022, 7, 1)) as dag: t1 = PythonOperator(task_id='primer_task_python', python_callable=print_hello)
El resultado
Hola, me funciona todo pero de una manera muy extraña:
Por un lado no encuentra nada que esté en airflow.operators y no es de extrañar, porque la carpeta "plugins" está vacía. El la carpeta gemela /opt/airflow/plugins dentro de airflow-airflow-webserver-1 tampoco he encontrado nada. Luego tarda minutos en subirse al localhost y en aparecer en la lista de DAGs (igual es que ando algo justo de RAM en local, tengo la VM de docker capada a 4GB que debería de ser suficiente). Lo curioso es que sin embargo el .log aparece:
Alguien tiene alguna explicación de por qué funciona a pesar de que parece que los archivos .py no tienen acceso a airflow.operators?
lo de que iba lento ya lo he solucionado, le faltaba RAM a Docker. He modificado el archivo .wslconfig que se encuentra en la carpeta del usuario de windows y que controla los recursos que se le asignan a Docker. Ahora lo que me va lento es el resto del Sistema Operativo, pero qué le vamos a hacer:
# Settings apply across all Linux distros running on WSL 2 [wsl2] # Limits VM memory to use no more than 5 GB, this can be set as whole numbers using GB or MB memory=5GB # Sets the VM to use two virtual processors processors=2
Lo de que el directorio plugins siga vacío y aún así el programa corra sigo sin explicármelo, todas esas referencias deben de estar en algún lugar en los contenedores de docker pero no las he podido encontrar manualmente, desde luego el directorio '/opt/airflow/plugins' de airflow-airflow-webserver-1 está vacío.
Ejecute ambos dags y ninguno de los dos me da la opción del pequeño cuadro verder y ambos aparecen como success, tampoco puedo ver lo logs
Alguien sabe que debo hacer o que sucede?
Me muestras el código?
Como ejecuto un script de python, que este fuera del archivo del DAG; es decir un archivo externo
Acá el desafío es, ¿cómo conectas tu máquina con Airflow a ese fichero? Hay varias maneras, puedes hacerlo con un SSHExecuteOperator, o montas una API, rtc