No tienes acceso a esta clase

隆Contin煤a aprendiendo! 脷nete y comienza a potenciar tu carrera

Implementando un DAG

11/29
Recursos

Aportes 13

Preguntas 2

Ordenar por:

驴Quieres ver m谩s aportes, preguntas y respuestas de la comunidad?

o inicia sesi贸n.

from airflow import DAG
from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator

from datetime import datetime

with DAG(
    dag_id="dag1",
    description="dag1",
    start_date=datetime(2022, 10, 31),
    schedule_interval="@once",
) as dag1:
    op = EmptyOperator(task_id="dummy")


dag2 = DAG(
    dag_id="dag2",
    description="2dag2",
    start_date=datetime(2022, 10, 31),
    schedule_interval="@once",
)
op2 = EmptyOperator(task_id="dummy2", dag=dag2)


@dag(
    dag_id="dag3",
    description="dag3",
    start_date=datetime(2022, 10, 31),
    schedule_interval="@once",
)
def generate_dag():
    op3 = EmptyOperator(task_id="dummy3")


dag3 = generate_dag()

鉁 Lo que falto explicar un poco es porque se llama a la variable al final es decir el t1 de la ultima linea
Esto se llama el dependency tasks que es la manera en la cual creamos el graph. Esto significa que aqu铆 decimos el orden de las tareas, por ejemplo

Podemos tener una serie de tareas que se ejecuten secuencialmente eso lo har铆amos as铆

...
fetch_weather >> fetch_sales >> clean_data >> generate_report

Sin embargo tambi茅n podr铆amos querer que las dos primeras se ejecutaran paralelamente para optimizar tiempo

...
[fetch_weather, fetch_sales] >> clean_data >> generate_report

Les dejo este video de Astronomer sobre una extension para VS Code para desarrollar mas rapido con Airflow.

https://www.youtube.com/watch?v=r2vlPeBe2U4

Mi primer DAG

Mi primer DAG

Primer DAG de Airflow 馃И

Esta semana voy a iniciar como Data Engineer en mi nuevo trabajo, creando ETL para extracci贸n financiera y creo que Airflow va a ser mi mejor aliado para la automatizaci贸n de los mismos.
.
No olviden que tenemos la posibilidad de crear un proyecto incre铆ble y lograr nuestro certificado con el mismo, comencemos a crear juntos 馃榿 y pedir feedback a los dem谩s.
.

.
C贸digo del Dag

No pude poner screenshot pero asi qued贸 el c贸digo:

<code> 
from datetime import datetime
from airflow import DAG
from airflow.operators.empty import EmptyOperator

with DAG(dag_id="primerdag",
            description="Nuestro primer DAG",
            start_date=datetime(2022, 12, 1),
            schedule_interval="@once") as dag:
    t1=EmptyOperator(task_id="dummy")
    t1

from airflow import DAG
from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator
from datetime import datetime

# Context manager
with DAG(dag_id='dag1',
         schedule_interval='@once',
         description='Nuestro Primer DAG',
         start_date=datetime(2022, 7, 1)) as dag1:
    # Code goes here
    t1 = EmptyOperator(task_id='primer_task_cm', dag=dag1)

# Standard Constructor

dag2 = DAG(dag_id='dag2', description='Nuestro Primer DAG Standard Constructor',
           schedule_interval='@once', start_date=datetime(2022, 7, 1))
t2 = EmptyOperator(task_id='primer_task_sc', dag=dag2)

# Decorator

@dag(dag_id='dag3', description='Nuestro Primer DAG Decorator', schedule_interval='@once', start_date=datetime(2022, 7, 1))
def generate_dag3():
    t3 = EmptyOperator(task_id='primer_task_dec')
dag = generate_dag3()

from airflow import DAG 
from airflow.operators.empty import EmptyOperator
from datetime import datetime


doc_md = """
Primer DAG realizado en el curso de Platzi
"""

default_args = {
    'owner': 'Esteban Encina',
    'start_date': datetime(2022, 1, 1),
    'schedule_interval': '@daily',
    'description': 'Primer DAG'
}

with DAG('first_dag', default_args= default_args, doc_md = doc_md) as dag:
    
    t1 = EmptyOperator(
        task_id = 't1'
    )
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from datetime import datetime

with DAG(dag_id="primerdag",
        description="Nuestro primer DAG",
        start_date=datetime(2022, 7, 1),
        schedule_interval="@once") as dag:

    t1 = EmptyOperator(task_id="dummy")

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from datetime import datetime

with DAG(
    dag_id="primerdag",
    description="Primer DAG",
    start_date=datetime(2022,7,1),
    schedule_interval="@once"
    ) as dag:
    t1 = EmptyOperator(task_id="dummy")
    t1

Si estas en linux o en WSL2 y al intentar crear un archivo en la carpeta dag no te lo permite.

Vas a consola y ejecutas:

echo -e "AIRFLOW_UID=$(id -u)" > .env
docker-compose up airflow-init
docker-compose up -d

Y listo.


~ Por cierto, ya conectamos en LinkedIn? ~

Conectemos en LinkedIn, GitHub, Medium o Redes sociales

from airflow import DAG
from datetime import datetime
from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator

# Estandar Construnctor
my_dag1 = DAG(
    dag_id="My_first_dag",
    description="This is my first DAG",
    start_date=datetime(2022, 10, 31),
    schedule_interval="@daily",
    catchup=False,
)

op1 = EmptyOperator(task_id="My_first_task", dag=my_dag1)


# Context Manager
with DAG(
    "MY_Second_DAG",
    start_date=datetime(2023, 1, 11),
    schedule_interval="@once",
) as dag2:
    op = EmptyOperator(task_id="My_second_task")

# Decorators
@dag(
    "My_third_DAG",
    start_date=datetime(2023, 1, 11),
    schedule_interval="0 0 * * *",
    catchup=False,
)
def generateDAG():
    op = EmptyOperator(task_id="My_third_task")

my_dag3 = generateDAG()