Automatización del Pipeline

Clase 36 de 38Curso de Ingeniería de Datos con Python

Contenido del curso

Web scraping

Pandas

Resumen

Logra una automatización ETL confiable en Python orquestando extracción, transformación y carga con un solo archivo: pipeline.py. Con una estructura clara de proyecto, logging informativo y subprocess para ejecutar comandos de terminal, el flujo queda integrado de punta a punta: desde el web scraping hasta la base de datos.

¿Cómo se estructura el proyecto ETL en Python?

La base es organizar todo en un solo proyecto. Así, cada etapa es independiente pero coordinada por un metaprograma. Esta organización reduce errores y facilita la ejecución secuencial.

  • Carpeta extract: scraper con main.py, y config.yml con selectores CSS y periódicos en el punto de entrada.
  • Carpeta transform: receta para limpiar datos y estandarizar campos.
  • Carpeta load: scripts para cargar datos a la base de datos.
  • Archivo pipeline.py: orquestador que automatiza los tres pasos.

Convenciones de nombres que facilitan el flujo:

  • En extract se generan archivos por sitio con nombre que incluye UID + fecha.
  • Para transform, el archivo se renombra usando guion bajo para que el programa identifique el news site UID.
  • Al finalizar transform se crea un CLEAN_nombre.csv y se mueve a load renombrado como UID.csv.

¿Cómo automatizar el flujo con pipeline.py y subprocess?

El objetivo es unificar scripts independientes en un flujo único, manteniendo visibilidad con logging y control con subprocess. La idea: iterar sobre una lista de sitios, correr cada etapa en su carpeta y mover/limpiar archivos en cada paso.

  • Configurar logging: logging.basicConfig con nivel info para ver el progreso.
  • Importar subprocess: permite ejecutar comandos como si estuviéramos en la terminal.
  • Crear logger: referencia por nombre del archivo para mensajes claros.
  • Definir news_sites_uids: lista de UIDs de periódicos a procesar.
  • Definir el punto de entrada: llamar a main, que coordina extract, transform y load.

Fragmentos esenciales:

import logging, subprocess logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) news_sites_uids = ['uid1', 'uid2'] def main(): extract() transform() load()

El patrón de ejecución de cada etapa usa subprocess.run con el current working directory según la carpeta correspondiente:

subprocess.run(['python', 'main.py', '--uid', uid], cwd='extract')

¿Qué hacen los pasos extract, transform y load?

Cada paso comunica al usuario lo que ocurre y ejecuta acciones puntuales. Se itera sobre cada news site UID para garantizar orden y trazabilidad. Al final, queda un flujo limpio y reproducible.

¿Qué ocurre en extract?

  • Mostrar inicio: mensaje de "starting extract process" con logging.
  • Iterar UIDs: correr python main.py del scraper por cada sitio con subprocess.run.
  • Mover resultados: usar find para localizar archivos que inician con el UID y moverlos a transform.
  • Renombrar archivos: aplicar patrón con guion bajo que espera transform.
  • Definir cwd: ejecutar en la carpeta extract para evitar rutas rotas.
  • Nota clave: aprender terminal es esencial, se utilizan comandos como find y mv.

Ejemplo de movimiento con terminal dentro de subprocess:

find . -name "{uid}*" -exec mv {} ../transform/{uid}_data.csv \;

¿Cómo se procesa en transform?

  • Mostrar inicio: "starting transform process" con logging.
  • Variables de archivo: definir Dirty Data File Name (archivo sucio con guion bajo) y Clean Data File Name (prefijo CLEAN).
  • Ejecutar limpieza: python main.py en la carpeta transform para producir el archivo limpio.
  • Eliminar residuos: borrar el archivo sucio con rm.
  • Preparar carga: mover el limpio a load renombrado como UID.csv.
  • Definir cwd: ejecutar todo en transform para mantener rutas consistentes.

Patrones mencionados:

  • Dirty Data File Name: uid_archivo.csv con guion bajo para que transform identifique el UID.
  • Clean Data File Name: CLEAN_uid_archivo.csv que luego se convertirá en uid.csv.

¿Cómo se realiza el load?

  • Mostrar inicio: "starting load process" con logging.
  • Determinar archivo de entrada: clean_data_file_name = f"{uid}.csv".
  • Cargar a la base: ejecutar python main.py en la carpeta load.
  • Limpieza final: eliminar el CSV una vez cargado con rm.

Detalles útiles del orquestador:

  • Se usa subprocess.run para cada comando por UID.
  • Se aprovecha el current working directory según la etapa: extract, transform o load.
  • Se estandariza el nombre final: UID.csv para simplificar la carga.

Al terminar, el metaprograma integra todo el proceso: web scraping → limpieza → carga. Si usas la línea de comando con fluidez, incluso puedes convertir pipeline.py en ejecutable; de lo contrario, basta con correr: python pipeline.py.

¿Con qué periódicos vas a trabajar y qué hallazgos obtuviste? comparte tus resultados y dudas en los comentarios.