Javier Martinez
EstudianteOrlando Murcia Perdomo
EstudianteRoyer Guerrero Pinilla
EstudianteEric Bellet
ProfesorCamilo Andrés Rodriguez Higuera
EstudianteSantiago Ahumada Lozano
EstudianteJuan Diego Ramirez Baylón
EstudianteEric Bellet
ProfesorEric Bellet
ProfesorNicolás Muriel
EstudianteAlfonso Andres Zapata Guzman
EstudianteEric Bellet
ProfesorJavier Amaya Patricio
EstudianteElitsoft Chile
EstudianteCristian Orlando Rincon Bonilla
EstudianteEric Bellet
ProfesorJosmer Hernández
EstudianteJuan Antonio Aramburo Pasapera
EstudianteOscar Gama
EstudianteFreddy Norberto Montañez Gordillo
EstudianteGerardo Mayel Fernández Alamilla
EstudianteEsteban Eduardo Encina Dos Santos
EstudianteSebastián Salas
EstudianteAndres Insuasty
EstudianteGeisemberg Marcos Zamora Yuave
EstudianteShirley Tatiana Pitta Picon
EstudianteGabriela Andreina García Uzcategui
Estudiantefrom airflow import DAG from airflow.decorators import dag from airflow.operators.empty import EmptyOperator from datetime import datetime with DAG( dag_id="dag1", description="dag1", start_date=datetime(2022, 10, 31), schedule_interval="@once", ) as dag1: op = EmptyOperator(task_id="dummy") dag2 = DAG( dag_id="dag2", description="2dag2", start_date=datetime(2022, 10, 31), schedule_interval="@once", ) op2 = EmptyOperator(task_id="dummy2", dag=dag2) @dag( dag_id="dag3", description="dag3", start_date=datetime(2022, 10, 31), schedule_interval="@once", ) def generate_dag(): op3 = EmptyOperator(task_id="dummy3") dag3 = generate_dag()
Lo hiciste de las 3 formas muchas gracias!
✨ Lo que falto explicar un poco es porque se llama a la variable al final es decir el t1 de la ultima linea
Esto se llama el dependency tasks que es la manera en la cual creamos el graph. Esto significa que aquí decimos el orden de las tareas, por ejemplo
Podemos tener una serie de tareas que se ejecuten secuencialmente eso lo haríamos así
... fetch_weather >> fetch_sales >> clean_data >> generate_report
Sin embargo también podríamos querer que las dos primeras se ejecutaran paralelamente para optimizar tiempo
... [fetch_weather, fetch_sales] >> clean_data >> generate_report
Muy buena aportación! Más adelante en el curso hay una clase para este tema, pero está genial está aportación para los más curiosos
Si están usando Airflow 3.0.6, el parámetro schedule_interval cambió y ahora se usa simplemente schedule.
from airflow import DAG from airflow.operators.empty import EmptyOperator from datetime import datetime with DAG( dag_id="PrimerDAG", description="Este es nuestro DAG de prueba", start_date=datetime(2024, 8, 31), schedule="@once" # 👈 antes era schedule_interval ) as dag: t1 = EmptyOperator(task_id="dummy") t1
Gracias]!
¿Cómo se puede solucionar el problema con la importación de librerías? import airflow could not be resolved
Puede ser que tengas este error https://intellij-support.jetbrains.com/hc/en-us/community/posts/4517353685394-Missing-site-packages-in-remote-docker-interpreter básicamente el PyCharm no está detectando las librerías que tienes en docker-compose, a veces puede ser que la versión del PyCharm no es compatible con la de Docker
Otra opción es instalar pip install airflow en tu local si no logras solucionar el error para que puedas ir avanzando
Les dejo este video de Astronomer sobre una extension para VS Code para desarrollar mas rapido con Airflow.
Si estas en linux o en WSL2 y al intentar crear un archivo en la carpeta dag no te lo permite.
Vas a consola y ejecutas:
echo -e "AIRFLOW_UID=$(id -u)" > .env docker-compose up airflow-init docker-compose up -dY listo.
~ Por cierto, ya conectamos en LinkedIn? ~
Conectemos en LinkedIn, GitHub, Medium o Redes sociales
Gracias Alfonso!
Primer DAG de Airflow 🧪
Esta semana voy a iniciar como Data Engineer en mi nuevo trabajo, creando ETL para extracción financiera y creo que Airflow va a ser mi mejor aliado para la automatización de los mismos. . No olviden que tenemos la posibilidad de crear un proyecto increíble y lograr nuestro certificado con el mismo, comencemos a crear juntos 😁 y pedir feedback a los demás. .
. Código del Dag
Mi primer DAG
Mi primer DAG
Excelente Cristian!
Mi primer dag funcionando.
Aquí mi primer dag. Hay que considerar que los id no pueden contener espacios.
from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from datetime import datetime with DAG(dag_id="miprimerdag", description="Nuestro primer DAG", start_date=datetime(2023, 10, 18), schedule_interval="@once") as dag: t1 = DummyOperator(task_id="dummy") t1
No pude poner screenshot pero asi quedó el código:
<code> from datetime import datetime from airflow import DAG from airflow.operators.empty import EmptyOperator with DAG(dag_id="primerdag", description="Nuestro primer DAG", start_date=datetime(2022, 12, 1), schedule_interval="@once") as dag: t1=EmptyOperator(task_id="dummy") t1
from airflow import DAG from airflow.decorators import dag from airflow.operators.empty import EmptyOperator from datetime import datetime # Context manager with DAG(dag_id='dag1', schedule_interval='@once', description='Nuestro Primer DAG', start_date=datetime(2022, 7, 1)) as dag1: # Code goes here t1 = EmptyOperator(task_id='primer_task_cm', dag=dag1) # Standard Constructor dag2 = DAG(dag_id='dag2', description='Nuestro Primer DAG Standard Constructor', schedule_interval='@once', start_date=datetime(2022, 7, 1)) t2 = EmptyOperator(task_id='primer_task_sc', dag=dag2) # Decorator @dag(dag_id='dag3', description='Nuestro Primer DAG Decorator', schedule_interval='@once', start_date=datetime(2022, 7, 1)) def generate_dag3(): t3 = EmptyOperator(task_id='primer_task_dec') dag = generate_dag3()
from airflow import DAG from airflow.operators.empty import EmptyOperator from datetime import datetime doc_md = """ Primer DAG realizado en el curso de Platzi """ default_args = { 'owner': 'Esteban Encina', 'start_date': datetime(2022, 1, 1), 'schedule_interval': '@daily', 'description': 'Primer DAG' } with DAG('first_dag', default_args= default_args, doc_md = doc_md) as dag: t1 = EmptyOperator( task_id = 't1' )
from airflow import DAG from airflow.operators.empty import EmptyOperator from datetime import datetime with DAG(dag_id="primerdag", description="Nuestro primer DAG", start_date=datetime(2022, 7, 1), schedule_interval="@once") as dag: t1 = EmptyOperator(task_id="dummy")
from airflow import DAG from airflow.operators.empty import EmptyOperator from datetime import datetime with DAG( dag_id="primerdag", description="Primer DAG", start_date=datetime(2022,7,1), schedule_interval="@once" ) as dag: t1 = EmptyOperator(task_id="dummy") t1
Mi primer dag: