No tienes acceso a esta clase

¡Continúa aprendiendo! Únete y comienza a potenciar tu carrera

¿Qué son los Xcoms?

25/29
Recursos

Aportes 5

Preguntas 1

Ordenar por:

¿Quieres ver más aportes, preguntas y respuestas de la comunidad?

✨ La razón por al cual es bash operator comparte su output es porque por defecto tiene el parametro do_xcom_push en True lo mismo podemos hacer con el python operator pero este pusheara lo que coloquemos en el return de la función

from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.models.xcom import XCom

default_args = {"depends_on_past": True}

def myfunction(**context):
    print(int(context["ti"].xcom_pull(task_ids='tarea_2')) - 24)

with DAG(dag_id="9-XCom",
    description="Probando los XCom",
    schedule_interval="@daily",
    start_date=datetime(2022, 1, 1),
	default_args=default_args,
    max_active_runs=1
) as dag:

    t1 = BashOperator(task_id="tarea_1",
					  bash_command="sleep 5 && echo $((3 * 8))")

    t2 = BashOperator(task_id="tarea_2",
					  bash_command="sleep 3 && echo {{ ti.xcom_pull(task_ids='tarea_1') }}")

    t3 = PythonOperator(task_id="tarea_3", 
                        python_callable=myfunction)

    t1 >> t2 >> t3

Un escenario de uso de Xcoms es retornar por ejm la cantidad de registros cargados a una base de datos que haría una tarea 1 y tomar esta cantidad para el envío de email de notificación desde una tarea 2

Falta caso de uso de comunicación de tareas ejecutandose en diferentes máquinas, no?
En Apache Airflow, los **XComs** (abreviatura de *"Cross Communications"*) son una herramienta que permite a las tareas intercambiar mensajes o compartir datos entre sí dentro de un DAG. Esencialmente, los XComs son un mecanismo de paso de mensajes utilizado para transferir pequeños fragmentos de datos entre tareas. ### Conceptos Clave de los XComs: 1. **Almacenamiento**: Los XComs se almacenan en la base de datos de Airflow. 2. **Tamaño**: Están pensados para transmitir datos pequeños, no para manejar grandes volúmenes de información. 3. **Contexto**: Usan un contexto de ejecución que incluye atributos como `dag_id`, `task_id`, `execution_date` y `key`. ### ¿Cómo funcionan los XComs? 1. **Poner un valor en XCom**: Las tareas pueden enviar datos usando el método `xcom_push`.from airflow.operators.python import PythonOperator from airflow import DAG from datetime import datetime def push\_function(\*\*kwargs): \# Empuja un valor al XCom kwargs\['ti'].xcom\_push(key='my\_key', value='Hola, Airflow!') with DAG( dag\_id='xcom\_example', start\_date=datetime(2024, 1, 1), schedule\_interval=None, ) as dag: push\_task = PythonOperator( task\_id='push\_task', python\_callable=push\_function ) Aquí, `my_key` es la clave para identificar el valor enviado al XCom. 2. **Obtener un valor de XCom**: Una tarea puede recuperar un valor de XCom usando el método `xcom_pull`.def pull\_function(\*\*kwargs): \# Recupera el valor desde el XCom value = kwargs\['ti'].xcom\_pull(key='my\_key', task\_ids='push\_task') print(f'Valor recibido: {value}') pull\_task = PythonOperator( task\_id='pull\_task', python\_callable=pull\_function ) push\_task >> pull\_task En este ejemplo, `task_ids='push_task'` indica de qué tarea obtener el valor. ### Uso con Operadores Bash Los XComs también pueden ser utilizados con operadores como `BashOperator`, donde el resultado de un comando Bash se almacena automáticamente en el XCom: from airflow.operators.bash import BashOperator bash\_task = BashOperator( task\_id='bash\_task', bash\_command='echo "Hola desde Bash"', xcom\_push=True # Habilita el envío del resultado al XCom ) python\_task = PythonOperator( task\_id='read\_bash\_xcom', python\_callable=lambda \*\*kwargs: print(kwargs\['ti'].xcom\_pull(task\_ids='bash\_task')) ) bash\_task >> python\_task ### Contextos Comunes de Uso 1. **Transferir datos entre tareas**: * Por ejemplo, una tarea puede descargar datos y otra procesarlos. 2. **Gestionar dependencias dinámicas**: * Usar datos de una tarea anterior para modificar el comportamiento de una tarea posterior. 3. **Combinación con Jinja Templates**:t2 = BashOperator( task\_id='dynamic\_task', bash\_command='echo "Valor: {{ ti.xcom\_pull(task\_ids="push\_task", key="my\_key") }}"' ) * Los XComs pueden ser utilizados directamente en plantillas Jinja para pasar valores dinámicos a operadores. ### Buenas Prácticas 1. **Evita XComs para datos grandes**: * Los XComs no están diseñados para manejar archivos o grandes volúmenes de datos. Usa sistemas externos como S3 o bases de datos para esto. 2. **Uso explícito de claves**: * Asigna claves descriptivas a los XComs para mantener claridad en los datos transferidos. 3. **Limpieza de datos antiguos**: * Los XComs permanecen en la base de datos de Airflow hasta que se eliminan manualmente. Usa estrategias de limpieza para evitar que se acumulen.