Implementando un DAG
Clase 11 de 29 • Curso de Fundamentos de Apache Airflow
Contenido del curso
Clase 11 de 29 • Curso de Fundamentos de Apache Airflow
Contenido del curso
Javier Martinez
Orlando Murcia Perdomo
Royer Guerrero Pinilla
Eric Bellet
Juan Diego Ramirez Baylón
Eric Bellet
Eric Bellet
Camilo Andrés Rodriguez Higuera
Santiago Ahumada Lozano
Nicolás Muriel
Alfonso Andres Zapata Guzman
Eric Bellet
Javier Amaya Patricio
Elitsoft Chile
Cristian Orlando Rincon Bonilla
Eric Bellet
Josmer Hernández
Juan Antonio Aramburo Pasapera
Oscar Gama
Freddy Norberto Montañez Gordillo
Gerardo Mayel Fernández Alamilla
Esteban Eduardo Encina Dos Santos
Sebastián Salas
Andres Insuasty
Shirley Tatiana Pitta Picon
Gabriela Andreina García Uzcategui
Oscar Gama
Eric Bellet
from airflow import DAG from airflow.decorators import dag from airflow.operators.empty import EmptyOperator from datetime import datetime with DAG( dag_id="dag1", description="dag1", start_date=datetime(2022, 10, 31), schedule_interval="@once", ) as dag1: op = EmptyOperator(task_id="dummy") dag2 = DAG( dag_id="dag2", description="2dag2", start_date=datetime(2022, 10, 31), schedule_interval="@once", ) op2 = EmptyOperator(task_id="dummy2", dag=dag2) @dag( dag_id="dag3", description="dag3", start_date=datetime(2022, 10, 31), schedule_interval="@once", ) def generate_dag(): op3 = EmptyOperator(task_id="dummy3") dag3 = generate_dag()
Lo hiciste de las 3 formas muchas gracias!
✨ Lo que falto explicar un poco es porque se llama a la variable al final es decir el t1 de la ultima linea
Esto se llama el dependency tasks que es la manera en la cual creamos el graph. Esto significa que aquí decimos el orden de las tareas, por ejemplo
Podemos tener una serie de tareas que se ejecuten secuencialmente eso lo haríamos así
... fetch_weather >> fetch_sales >> clean_data >> generate_report
Sin embargo también podríamos querer que las dos primeras se ejecutaran paralelamente para optimizar tiempo
... [fetch_weather, fetch_sales] >> clean_data >> generate_report
Muy buena aportación! Más adelante en el curso hay una clase para este tema, pero está genial está aportación para los más curiosos
¿Cómo se puede solucionar el problema con la importación de librerías? import airflow could not be resolved
Puede ser que tengas este error https://intellij-support.jetbrains.com/hc/en-us/community/posts/4517353685394-Missing-site-packages-in-remote-docker-interpreter básicamente el PyCharm no está detectando las librerías que tienes en docker-compose, a veces puede ser que la versión del PyCharm no es compatible con la de Docker
Otra opción es instalar pip install airflow en tu local si no logras solucionar el error para que puedas ir avanzando
Si están usando Airflow 3.0.6, el parámetro schedule_interval cambió y ahora se usa simplemente schedule.
from airflow import DAG from airflow.operators.empty import EmptyOperator from datetime import datetime with DAG( dag_id="PrimerDAG", description="Este es nuestro DAG de prueba", start_date=datetime(2024, 8, 31), schedule="@once" # 👈 antes era schedule_interval ) as dag: t1 = EmptyOperator(task_id="dummy") t1
Gracias]!
Les dejo este video de Astronomer sobre una extension para VS Code para desarrollar mas rapido con Airflow.
Si estas en linux o en WSL2 y al intentar crear un archivo en la carpeta dag no te lo permite.
Vas a consola y ejecutas:
echo -e "AIRFLOW_UID=$(id -u)" > .env docker-compose up airflow-init docker-compose up -dY listo.
~ Por cierto, ya conectamos en LinkedIn? ~
Conectemos en LinkedIn, GitHub, Medium o Redes sociales
Gracias Alfonso!
Primer DAG de Airflow 🧪
Esta semana voy a iniciar como Data Engineer en mi nuevo trabajo, creando ETL para extracción financiera y creo que Airflow va a ser mi mejor aliado para la automatización de los mismos. . No olviden que tenemos la posibilidad de crear un proyecto increíble y lograr nuestro certificado con el mismo, comencemos a crear juntos 😁 y pedir feedback a los demás. .
. Código del Dag
Mi primer DAG
Mi primer DAG
Excelente Cristian!
Mi primer dag funcionando.
Aquí mi primer dag. Hay que considerar que los id no pueden contener espacios.
from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from datetime import datetime with DAG(dag_id="miprimerdag", description="Nuestro primer DAG", start_date=datetime(2023, 10, 18), schedule_interval="@once") as dag: t1 = DummyOperator(task_id="dummy") t1
No pude poner screenshot pero asi quedó el código:
<code> from datetime import datetime from airflow import DAG from airflow.operators.empty import EmptyOperator with DAG(dag_id="primerdag", description="Nuestro primer DAG", start_date=datetime(2022, 12, 1), schedule_interval="@once") as dag: t1=EmptyOperator(task_id="dummy") t1
from airflow import DAG from airflow.decorators import dag from airflow.operators.empty import EmptyOperator from datetime import datetime # Context manager with DAG(dag_id='dag1', schedule_interval='@once', description='Nuestro Primer DAG', start_date=datetime(2022, 7, 1)) as dag1: # Code goes here t1 = EmptyOperator(task_id='primer_task_cm', dag=dag1) # Standard Constructor dag2 = DAG(dag_id='dag2', description='Nuestro Primer DAG Standard Constructor', schedule_interval='@once', start_date=datetime(2022, 7, 1)) t2 = EmptyOperator(task_id='primer_task_sc', dag=dag2) # Decorator @dag(dag_id='dag3', description='Nuestro Primer DAG Decorator', schedule_interval='@once', start_date=datetime(2022, 7, 1)) def generate_dag3(): t3 = EmptyOperator(task_id='primer_task_dec') dag = generate_dag3()
from airflow import DAG from airflow.operators.empty import EmptyOperator from datetime import datetime doc_md = """ Primer DAG realizado en el curso de Platzi """ default_args = { 'owner': 'Esteban Encina', 'start_date': datetime(2022, 1, 1), 'schedule_interval': '@daily', 'description': 'Primer DAG' } with DAG('first_dag', default_args= default_args, doc_md = doc_md) as dag: t1 = EmptyOperator( task_id = 't1' )
from airflow import DAG from airflow.operators.empty import EmptyOperator from datetime import datetime with DAG(dag_id="primerdag", description="Nuestro primer DAG", start_date=datetime(2022, 7, 1), schedule_interval="@once") as dag: t1 = EmptyOperator(task_id="dummy")
from airflow import DAG from airflow.operators.empty import EmptyOperator from datetime import datetime with DAG( dag_id="primerdag", description="Primer DAG", start_date=datetime(2022,7,1), schedule_interval="@once" ) as dag: t1 = EmptyOperator(task_id="dummy") t1
Mi primer dag:
Alguien me puede indicar como instalar esta librería? from airflow.operators.empty import EmptyOperator
Ya con la instalación de Airflow deberías tenerlo. Seguramente tienes problemas con la version