No tienes acceso a esta clase

¡Continúa aprendiendo! Únete y comienza a potenciar tu carrera

Implementando un DAG

11/29
Recursos

Aportes 13

Preguntas 2

Ordenar por:

¿Quieres ver más aportes, preguntas y respuestas de la comunidad?

o inicia sesión.

from airflow import DAG
from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator

from datetime import datetime

with DAG(
    dag_id="dag1",
    description="dag1",
    start_date=datetime(2022, 10, 31),
    schedule_interval="@once",
) as dag1:
    op = EmptyOperator(task_id="dummy")


dag2 = DAG(
    dag_id="dag2",
    description="2dag2",
    start_date=datetime(2022, 10, 31),
    schedule_interval="@once",
)
op2 = EmptyOperator(task_id="dummy2", dag=dag2)


@dag(
    dag_id="dag3",
    description="dag3",
    start_date=datetime(2022, 10, 31),
    schedule_interval="@once",
)
def generate_dag():
    op3 = EmptyOperator(task_id="dummy3")


dag3 = generate_dag()

✨ Lo que falto explicar un poco es porque se llama a la variable al final es decir el t1 de la ultima linea
Esto se llama el dependency tasks que es la manera en la cual creamos el graph. Esto significa que aquí decimos el orden de las tareas, por ejemplo

Podemos tener una serie de tareas que se ejecuten secuencialmente eso lo haríamos así

...
fetch_weather >> fetch_sales >> clean_data >> generate_report

Sin embargo también podríamos querer que las dos primeras se ejecutaran paralelamente para optimizar tiempo

...
[fetch_weather, fetch_sales] >> clean_data >> generate_report

Les dejo este video de Astronomer sobre una extension para VS Code para desarrollar mas rapido con Airflow.

https://www.youtube.com/watch?v=r2vlPeBe2U4

Mi primer DAG

Mi primer DAG

Primer DAG de Airflow 🧪

Esta semana voy a iniciar como Data Engineer en mi nuevo trabajo, creando ETL para extracción financiera y creo que Airflow va a ser mi mejor aliado para la automatización de los mismos.
.
No olviden que tenemos la posibilidad de crear un proyecto increíble y lograr nuestro certificado con el mismo, comencemos a crear juntos 😁 y pedir feedback a los demás.
.

.
Código del Dag

No pude poner screenshot pero asi quedó el código:

<code> 
from datetime import datetime
from airflow import DAG
from airflow.operators.empty import EmptyOperator

with DAG(dag_id="primerdag",
            description="Nuestro primer DAG",
            start_date=datetime(2022, 12, 1),
            schedule_interval="@once") as dag:
    t1=EmptyOperator(task_id="dummy")
    t1

from airflow import DAG
from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator
from datetime import datetime

# Context manager
with DAG(dag_id='dag1',
         schedule_interval='@once',
         description='Nuestro Primer DAG',
         start_date=datetime(2022, 7, 1)) as dag1:
    # Code goes here
    t1 = EmptyOperator(task_id='primer_task_cm', dag=dag1)

# Standard Constructor

dag2 = DAG(dag_id='dag2', description='Nuestro Primer DAG Standard Constructor',
           schedule_interval='@once', start_date=datetime(2022, 7, 1))
t2 = EmptyOperator(task_id='primer_task_sc', dag=dag2)

# Decorator

@dag(dag_id='dag3', description='Nuestro Primer DAG Decorator', schedule_interval='@once', start_date=datetime(2022, 7, 1))
def generate_dag3():
    t3 = EmptyOperator(task_id='primer_task_dec')
dag = generate_dag3()

from airflow import DAG 
from airflow.operators.empty import EmptyOperator
from datetime import datetime


doc_md = """
Primer DAG realizado en el curso de Platzi
"""

default_args = {
    'owner': 'Esteban Encina',
    'start_date': datetime(2022, 1, 1),
    'schedule_interval': '@daily',
    'description': 'Primer DAG'
}

with DAG('first_dag', default_args= default_args, doc_md = doc_md) as dag:
    
    t1 = EmptyOperator(
        task_id = 't1'
    )
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from datetime import datetime

with DAG(dag_id="primerdag",
        description="Nuestro primer DAG",
        start_date=datetime(2022, 7, 1),
        schedule_interval="@once") as dag:

    t1 = EmptyOperator(task_id="dummy")

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from datetime import datetime

with DAG(
    dag_id="primerdag",
    description="Primer DAG",
    start_date=datetime(2022,7,1),
    schedule_interval="@once"
    ) as dag:
    t1 = EmptyOperator(task_id="dummy")
    t1

Si estas en linux o en WSL2 y al intentar crear un archivo en la carpeta dag no te lo permite.

Vas a consola y ejecutas:

echo -e "AIRFLOW_UID=$(id -u)" > .env
docker-compose up airflow-init
docker-compose up -d

Y listo.


~ Por cierto, ya conectamos en LinkedIn? ~

Conectemos en LinkedIn, GitHub, Medium o Redes sociales

from airflow import DAG
from datetime import datetime
from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator

# Estandar Construnctor
my_dag1 = DAG(
    dag_id="My_first_dag",
    description="This is my first DAG",
    start_date=datetime(2022, 10, 31),
    schedule_interval="@daily",
    catchup=False,
)

op1 = EmptyOperator(task_id="My_first_task", dag=my_dag1)


# Context Manager
with DAG(
    "MY_Second_DAG",
    start_date=datetime(2023, 1, 11),
    schedule_interval="@once",
) as dag2:
    op = EmptyOperator(task_id="My_second_task")

# Decorators
@dag(
    "My_third_DAG",
    start_date=datetime(2023, 1, 11),
    schedule_interval="0 0 * * *",
    catchup=False,
)
def generateDAG():
    op = EmptyOperator(task_id="My_third_task")

my_dag3 = generateDAG()