No tienes acceso a esta clase

¡Continúa aprendiendo! Únete y comienza a potenciar tu carrera

Implementando un DAG

11/29
Recursos

Aportes 18

Preguntas 3

Ordenar por:

¿Quieres ver más aportes, preguntas y respuestas de la comunidad?

from airflow import DAG
from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator

from datetime import datetime

with DAG(
    dag_id="dag1",
    description="dag1",
    start_date=datetime(2022, 10, 31),
    schedule_interval="@once",
) as dag1:
    op = EmptyOperator(task_id="dummy")


dag2 = DAG(
    dag_id="dag2",
    description="2dag2",
    start_date=datetime(2022, 10, 31),
    schedule_interval="@once",
)
op2 = EmptyOperator(task_id="dummy2", dag=dag2)


@dag(
    dag_id="dag3",
    description="dag3",
    start_date=datetime(2022, 10, 31),
    schedule_interval="@once",
)
def generate_dag():
    op3 = EmptyOperator(task_id="dummy3")


dag3 = generate_dag()

✨ Lo que falto explicar un poco es porque se llama a la variable al final es decir el t1 de la ultima linea
Esto se llama el dependency tasks que es la manera en la cual creamos el graph. Esto significa que aquí decimos el orden de las tareas, por ejemplo

Podemos tener una serie de tareas que se ejecuten secuencialmente eso lo haríamos así

...
fetch_weather >> fetch_sales >> clean_data >> generate_report

Sin embargo también podríamos querer que las dos primeras se ejecutaran paralelamente para optimizar tiempo

...
[fetch_weather, fetch_sales] >> clean_data >> generate_report

Les dejo este video de Astronomer sobre una extension para VS Code para desarrollar mas rapido con Airflow.

https://www.youtube.com/watch?v=r2vlPeBe2U4

Primer DAG de Airflow 🧪

Esta semana voy a iniciar como Data Engineer en mi nuevo trabajo, creando ETL para extracción financiera y creo que Airflow va a ser mi mejor aliado para la automatización de los mismos.
.
No olviden que tenemos la posibilidad de crear un proyecto increíble y lograr nuestro certificado con el mismo, comencemos a crear juntos 😁 y pedir feedback a los demás.
.

.
Código del Dag

Mi primer DAG

Mi primer DAG

Mi primer dag funcionando. ![](https://static.platzi.com/media/user_upload/image-0993d54c-76ea-4f15-9d30-b271ba21651d.jpg)
Aquí mi primer dag. Hay que considerar que los id no pueden contener espacios. ![](https://static.platzi.com/media/user_upload/image-66577a1f-bf51-4aef-9169-11d069ea5f20.jpg)
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime

with DAG(dag_id="miprimerdag",
         description="Nuestro primer DAG",
         start_date=datetime(2023, 10, 18),
         schedule_interval="@once") as dag:
    t1 = DummyOperator(task_id="dummy")
    t1

No pude poner screenshot pero asi quedó el código:

<code> 
from datetime import datetime
from airflow import DAG
from airflow.operators.empty import EmptyOperator

with DAG(dag_id="primerdag",
            description="Nuestro primer DAG",
            start_date=datetime(2022, 12, 1),
            schedule_interval="@once") as dag:
    t1=EmptyOperator(task_id="dummy")
    t1

from airflow import DAG
from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator
from datetime import datetime

# Context manager
with DAG(dag_id='dag1',
         schedule_interval='@once',
         description='Nuestro Primer DAG',
         start_date=datetime(2022, 7, 1)) as dag1:
    # Code goes here
    t1 = EmptyOperator(task_id='primer_task_cm', dag=dag1)

# Standard Constructor

dag2 = DAG(dag_id='dag2', description='Nuestro Primer DAG Standard Constructor',
           schedule_interval='@once', start_date=datetime(2022, 7, 1))
t2 = EmptyOperator(task_id='primer_task_sc', dag=dag2)

# Decorator

@dag(dag_id='dag3', description='Nuestro Primer DAG Decorator', schedule_interval='@once', start_date=datetime(2022, 7, 1))
def generate_dag3():
    t3 = EmptyOperator(task_id='primer_task_dec')
dag = generate_dag3()

from airflow import DAG 
from airflow.operators.empty import EmptyOperator
from datetime import datetime


doc_md = """
Primer DAG realizado en el curso de Platzi
"""

default_args = {
    'owner': 'Esteban Encina',
    'start_date': datetime(2022, 1, 1),
    'schedule_interval': '@daily',
    'description': 'Primer DAG'
}

with DAG('first_dag', default_args= default_args, doc_md = doc_md) as dag:
    
    t1 = EmptyOperator(
        task_id = 't1'
    )
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from datetime import datetime

with DAG(dag_id="primerdag",
        description="Nuestro primer DAG",
        start_date=datetime(2022, 7, 1),
        schedule_interval="@once") as dag:

    t1 = EmptyOperator(task_id="dummy")

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from datetime import datetime

with DAG(
    dag_id="primerdag",
    description="Primer DAG",
    start_date=datetime(2022,7,1),
    schedule_interval="@once"
    ) as dag:
    t1 = EmptyOperator(task_id="dummy")
    t1
Para implementar un DAG (Directed Acyclic Graph) en Apache Airflow, necesitas crear un archivo de Python en la carpeta de `dags` de tu proyecto y definir la estructura del DAG, incluyendo las tareas que deben ejecutarse y sus dependencias. A continuación, te explico los pasos detallados para implementar un DAG en Airflow.
![](https://static.platzi.com/media/user_upload/image-79e2dc06-a816-4be4-9f79-29579a384007.jpg)![](https://static.platzi.com/media/user_upload/image-8dffe984-5636-4017-b936-c7aaf09e1485.jpg)

Si estas en linux o en WSL2 y al intentar crear un archivo en la carpeta dag no te lo permite.

Vas a consola y ejecutas:

echo -e "AIRFLOW_UID=$(id -u)" > .env
docker-compose up airflow-init
docker-compose up -d

Y listo.


~ Por cierto, ya conectamos en LinkedIn? ~

Conectemos en LinkedIn, GitHub, Medium o Redes sociales

from airflow import DAG
from datetime import datetime
from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator

# Estandar Construnctor
my_dag1 = DAG(
    dag_id="My_first_dag",
    description="This is my first DAG",
    start_date=datetime(2022, 10, 31),
    schedule_interval="@daily",
    catchup=False,
)

op1 = EmptyOperator(task_id="My_first_task", dag=my_dag1)


# Context Manager
with DAG(
    "MY_Second_DAG",
    start_date=datetime(2023, 1, 11),
    schedule_interval="@once",
) as dag2:
    op = EmptyOperator(task_id="My_second_task")

# Decorators
@dag(
    "My_third_DAG",
    start_date=datetime(2023, 1, 11),
    schedule_interval="0 0 * * *",
    catchup=False,
)
def generateDAG():
    op = EmptyOperator(task_id="My_third_task")

my_dag3 = generateDAG()