¿Qué son los Xcoms?
Clase 25 de 29 • Curso de Fundamentos de Apache Airflow
Contenido del curso
Clase 25 de 29 • Curso de Fundamentos de Apache Airflow
Contenido del curso
Royer Guerrero Pinilla
Mauro Ezequiel Bravo
Julio Sarango
Leonardo Martin Mendez
Manuel CABRERA
Eric Bellet
Federico Martinez
Santiago Ahumada Lozano
Camilo Corredor
Cristian Decaroli
Mario Alexander Vargas Celis
✨ La razón por al cual es bash operator comparte su output es porque por defecto tiene el parametro do_xcom_push en True lo mismo podemos hacer con el python operator pero este pusheara lo que coloquemos en el return de la función
from datetime import datetime from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator from airflow.models.xcom import XCom default_args = {"depends_on_past": True} def myfunction(**context): print(int(context["ti"].xcom_pull(task_ids='tarea_2')) - 24) with DAG(dag_id="9-XCom", description="Probando los XCom", schedule_interval="@daily", start_date=datetime(2022, 1, 1), default_args=default_args, max_active_runs=1 ) as dag: t1 = BashOperator(task_id="tarea_1", bash_command="sleep 5 && echo $((3 * 8))") t2 = BashOperator(task_id="tarea_2", bash_command="sleep 3 && echo {{ ti.xcom_pull(task_ids='tarea_1') }}") t3 = PythonOperator(task_id="tarea_3", python_callable=myfunction) t1 >> t2 >> t3
Un escenario de uso de Xcoms es retornar por ejm la cantidad de registros cargados a una base de datos que haría una tarea 1 y tomar esta cantidad para el envío de email de notificación desde una tarea 2
Falta caso de uso de comunicación de tareas ejecutandose en diferentes máquinas, no?
Hola Profe, agradezco su ayuda, deseo realizar un xcom_pul en un sftpoperator pare no logro hacerlo, agradezco su orientacion porfavor.
download_results = SFTPOperator( task_id='download_results', ssh_conn_id='my_remote_server', remote_filepath="""{{ti.xcom_pull(task_ids='sftp_ostral', key='list_file_xml')}}""", local_filepath="""{{ti.xcom_pull(task_ids='sftp_ostral', key='list_file_xml'))}}""", operation='get', )
muchas gracias.
Hola Andres, qué error estás teniendo?
¿Cuándo usar XComs para comunicar?
Utiliza XComs cuando necesites que las tareas compartan información ligera, como IDs, rutas de archivo o resultados pequeños. Son ideales para pasar datos entre operadores que dependen uno del otro en un flujo de trabajo. Evita usarlos para grandes volúmenes de datos, ya que están optimizados para metadatos.
XCom Standard vs Custom XCom
Algo importante a mencionar es que las XComs son pensadas para transportar informacion de bajo storage entre tasks. En un backend XCom estandar, la data es serializada y escrita en la base de datos de metadata de airflow. Pero esta base de datos solo guarda strings referenciando bases de datos, por ejemplo una URI.
Se pueden usar XComs para procesar grandes volumenes de datos?
Sí! Estos se denominan Custom XCom Backends. Los cuales permiten guardar datos entre tareas en cualquier ubicacion y formato. Con ello se elimina la limitacion de tamaño de almacenamiento. Para este casos se suelen usar un sistema externo como AWS S3, Google Cloud Storage o Azure blob storage como ubicacion de los datos.
[2025-03-14T18:40:47.370-0500] {xcom.py:690} ERROR - Object of type Resource is not JSON serializable. If you are using pickle instead of JSON for XCom, then you need to enable pickle support for XCom in your *** config or make sure to decorate your object with attr.
Obtuve este error al crear un operador que crea un service google para acceder a un correo electronico. El objeto service que devuelve mi operador no sería JSON serializable. No entiendo por qué? Es decir, no lo puedo compartir con otra tarea?
Haciendo un proyecto personal, he utilizado xcom para obtener el valor de una task (una función de python) que pushea el valor de un json extraído desde una APi y que luego es consumido por otra task (también función de python) que lo utiliza como argumento. Funcionó perfectamente. Si bien no es lo mejor para enviar 'data', este era un pequeño json y logró enviarse correctamente.
En Apache Airflow, los XComs (abreviatura de "Cross Communications") son una herramienta que permite a las tareas intercambiar mensajes o compartir datos entre sí dentro de un DAG. Esencialmente, los XComs son un mecanismo de paso de mensajes utilizado para transferir pequeños fragmentos de datos entre tareas.
Conceptos Clave de los XComs:
dag_id, task_id, execution_date y key.¿Cómo funcionan los XComs?
Poner un valor en XCom: Las tareas pueden enviar datos usando el método xcom_push.from airflow.operators.python import PythonOperator
from airflow import DAG
from datetime import datetime
def push_function(**kwargs): # Empuja un valor al XCom kwargs['ti'].xcom_push(key='my_key', value='Hola, Airflow!')
with DAG(
dag_id='xcom_example',
start_date=datetime(2024, 1, 1),
schedule_interval=None,
) as dag:
push_task = PythonOperator(
task_id='push_task',
python_callable=push_function
)
Aquí, my_key es la clave para identificar el valor enviado al XCom.
Obtener un valor de XCom: Una tarea puede recuperar un valor de XCom usando el método xcom_pull.def pull_function(**kwargs):
# Recupera el valor desde el XCom
value = kwargs['ti'].xcom_pull(key='my_key', task_ids='push_task')
print(f'Valor recibido: {value}')
pull_task = PythonOperator( task_id='pull_task', python_callable=pull_function )
push_task >> pull_task
En este ejemplo, task_ids='push_task' indica de qué tarea obtener el valor.
Uso con Operadores Bash
Los XComs también pueden ser utilizados con operadores como BashOperator, donde el resultado de un comando Bash se almacena automáticamente en el XCom:
from airflow.operators.bash import BashOperator
bash_task = BashOperator( task_id='bash_task', bash_command='echo "Hola desde Bash"', xcom_push=True # Habilita el envío del resultado al XCom )
python_task = PythonOperator( task_id='read_bash_xcom', python_callable=lambda **kwargs: print(kwargs['ti'].xcom_pull(task_ids='bash_task')) )
bash_task >> python_task
Contextos Comunes de Uso
Transferir datos entre tareas:
Gestionar dependencias dinámicas:
Combinación con Jinja Templates:t2 = BashOperator( task_id='dynamic_task', bash_command='echo "Valor: {{ ti.xcom_pull(task_ids="push_task", key="my_key") }}"' )
Buenas Prácticas