No tienes acceso a esta clase

¡Continúa aprendiendo! Únete y comienza a potenciar tu carrera

Carga de datos en ETL

23/24
Recursos

El proceso de creación de una base de datos y sus tablas es crucial en el manejo de datos, especialmente cuando se trata de proyectos ETL (Extract, Transform, Load). En este ejercicio, hemos usado MySQL junto con Python para demostrar cómo se puede automatizar esta tarea.

Primero, verificamos si la base de datos existe y, si no es así, la creamos. Usamos el siguiente comando SQL:

CREATE DATABASE IF NOT EXISTS Akila;
USE Akila;

Una vez creada la base de datos, procedemos a la creación de las tablas. Por ejemplo, la tabla actor transformado con su llave primaria y campos personalizados:

CREATE TABLE IF NOT EXISTS actor_transformado (
  actor_id INT PRIMARY KEY,
  first_name VARCHAR(50),
  last_name VARCHAR(50),
  description TEXT
);

Esta tabla almacenará transformaciones hechas sobre nuestros datos de actores, especificando campos como first_name y last_name.

¿Cómo ejecutamos las consultas SQL usando Python?

Para garantizar la integridad del proceso, Python ofrece herramientas que facilitan la conexión con bases de datos SQL, como MySQL Connector y SQLAlchemy. Aquí se explica cómo manejar estas operaciones en Python.

  1. Conexión al servidor: Primeramente, establecemos la conexión al servidor de base de datos y creamos un cursor.

    import mysql.connector

    conn = mysql.connector.connect( host="tu_host", user="tu_usuario", password="tu_contraseña", database="Akila" ) cursor = conn.cursor()

  2. Ejecutar las consultas: Tanto para crear la base de datos como las tablas, ejecutamos las consultas con el cursor.

    cursor.execute("CREATE DATABASE IF NOT EXISTS Akila;") cursor.execute("USE Akila;") cursor.execute(""" CREATE TABLE IF NOT EXISTS actor_transformado ( actor_id INT PRIMARY KEY, first_name VARCHAR(50), last_name VARCHAR(50), description TEXT ); """)

  3. Confirmar cambios y cerrar conexión: Después de ejecutar las consultas, confirmamos los cambios y cerramos la conexión.

    conn.commit() cursor.close() conn.close()

¿Cómo validar la carga y transformación de los datos con Python?

Una vez creadas las tablas, la carga de datos se realiza usando Pandas y SQLAlchemy. Este proceso recopila los datos, los transforma según necesidades y los guarda en las tablas SQL.

  1. Conectar y cargar datos: Usamos pandas para leer y transformar datos, luego los cargamos a nuestras tablas con SQLAlchemy.

    from sqlalchemy import create_engine import pandas as pd

    engine = create_engine('mysql+mysqlconnector://tu_usuario:tu_contraseña@tu_host/Akila') df = pd.DataFrame({ 'actor_id': [1, 2], 'first_name': ['John', 'Jane'], 'last_name': ['Doe', 'Doe'], 'description': ['Actor principal', 'Actor secundario'] }) df.to_sql('actor_transformado', engine, if_exists='replace', index=False)

  2. Validar carga de datos: Validamos usando queries SQL para asegurar que los datos estén correctamente cargados.

    query = "SELECT * FROM actor_transformado;" df_result = pd.read_sql(query, engine) print(df_result)

Este proceso muestra cómo utilizar la programación para gestionar bases de datos de manera eficiente. Además, al terminar el ejercicio, es una excelente práctica verificar manualmente los datos en MySQL para garantizar que todo esté en su lugar. ¿Has logrado ejecutar este proyecto? ¿Qué retos enfrentaste? ¡Cuéntanos en los comentarios para que podamos ayudarte! Sigue adelante, cada paso cuenta en tu proceso de aprendizaje.

Aportes 5

Preguntas 0

Ordenar por:

¿Quieres ver más aportes, preguntas y respuestas de la comunidad?

Proyecto terminado: ![](https://static.platzi.com/media/user_upload/image-335bd8f5-5ce6-430a-96d1-f31a9b5102f9.jpg)
Para cargar datos en un proceso ETL utilizando Python en un Jupyter Notebook, puedes seguir este ejemplo: 1. **Conexión a la base de datos**: ```python from sqlalchemy import create_engine engine = create_engine('mysql+pymysql://user:password@host/dbname') ``` 2. **Carga de datos**: ```python import pandas as pd # Leer datos desde un archivo CSV df = pd.read_csv('datos.csv') # Cargar datos a la tabla en MySQL df.to_sql('nombre_tabla', con=engine, if_exists='replace', index=False) ``` 3. **Validación**: ```python # Consultar datos result = pd.read_sql('SELECT * FROM nombre_tabla', con=engine) print(result) ``` Asegúrate de tener instaladas las librerías necesarias: `pandas`, `sqlalchemy`, y `pymysql`.
En clase, se trabajó en el proceso de carga de datos utilizando Python y SQL. Aquí tienes un resumen de los scripts clave: 1. **Creación de la base de datos y tablas**: ```sql CREATE DATABASE IF NOT EXISTS SaquilaEtl; USE SaquilaEtl; CREATE TABLE actor_transformado ( actor_id INT PRIMARY KEY, first_name VARCHAR(50), last_name VARCHAR(50) ); CREATE TABLE last_name ( last_name VARCHAR(50) PRIMARY KEY, total_actors INT ); ``` 2. **Conexión y carga de datos en Python**: ```python import pandas as pd from sqlalchemy import create_engine engine = create_engine('mysql+pymysql://user:password@host/SaquilaEtl') df_actors.to_sql('actor_transformado', con=engine, if_exists='replace') df_last_name.to_sql('last_name', con=engine, if_exists='replace') ``` 3. **Validación de datos**: ```python with engine.connect() as conn: result = conn.execute("SELECT * FROM actor_transformado") for row in result: print(row) ``` Este resumen captura los scripts esenciales utilizados para la carga y validación de datos.
En el contexto de un proceso ETL (Extracción, Transformación y Carga), la etapa de **Carga de Datos** se refiere al paso final en el que los datos procesados y transformados se insertan en su destino final. Este destino puede ser un almacén de datos (data warehouse), una base de datos, o incluso un archivo. ## **1. Métodos Comunes de Carga de Datos** ### **a. Carga Completa** Se carga todo el conjunto de datos en el destino, sobrescribiendo cualquier dato existente. Este método se usa generalmente cuando: * Se realiza la carga inicial del sistema. * Los datos cambian completamente en cada ciclo de ETL. **Ejemplo:** import pandas as pd from sqlalchemy import create\_engine \# Crear una conexión a la base de datos engine = create\_engine("mysql+pymysql://usuario:contraseña@localhost/base\_de\_datos") \# Cargar todo el DataFrame df.to\_sql("tabla\_destino", con=engine, if\_exists="replace", index=False) ### **b. Carga Incremental** Solo se cargan los datos nuevos o actualizados desde la última carga. Es más eficiente y adecuado para sistemas con grandes volúmenes de datos. **Pasos comunes:** 1. Identificar registros nuevos o modificados (por ejemplo, con una columna `timestamp` o un identificador único). 2. Insertar solo esos registros en el destino. **Ejemplo:** \# Seleccionar registros modificados después de una fecha específica nuevos\_datos = df\[df\['fecha\_modificacion'] > ultima\_fecha\_carga] \# Agregar los datos nuevos a la tabla existente nuevos\_datos.to\_sql("tabla\_destino", con=engine, if\_exists="append", index=False) ## **2. Estrategias de Carga por Tipo de Destino** ### **a. Almacén de Datos (Data Warehouse)** Los datos procesados se cargan en un esquema optimizado para análisis, como un esquema estrella o copo de nieve. Herramientas como Apache Airflow y Talend son útiles para este tipo de cargas. ### **b. Bases de Datos Relacionales** 1. **Carga directa con Python:** Utilizando bibliotecas como `SQLAlchemy` o `pymysql`:import pymysql connection = pymysql.connect( host="localhost", user="usuario", password="contraseña", database="base\_de\_datos" ) cursor = connection.cursor() for \_, row in df.iterrows(): query = f""" INSERT INTO tabla (col1, col2, col3) VALUES ({row\['col1']}, {row\['col2']}, {row\['col3']}) """ cursor.execute(query) connection.commit() cursor.close() 2. **Validación antes de la carga:** * Asegúrate de que los datos cumplan con las restricciones de la base de datos. * Evita duplicados si es necesario. ### **c. Archivos** 1. **Formato CSV:**df.to\_csv("archivo\_salida.csv", index=False) 2. **Formato Excel:**df.to\_excel("archivo\_salida.xlsx", index=False) 3. **JSON:**df.to\_json("archivo\_salida.json", orient="records", lines=True) ## **3. Buenas Prácticas en la Carga de Datos** ### **a. Uso de Transacciones** * Garantiza que la carga sea atómica y se pueda revertir en caso de error. with engine.begin() as connection: connection.execute("INSERT INTO tabla ...") ### **b. Control de Errores** Registra y maneja los errores durante la carga: try: df.to\_sql("tabla\_destino", con=engine, if\_exists="append", index=False) except Exception as e: print(f"Error durante la carga: {e}") ### **c. Monitorización** * Usa herramientas de registro (`logging`) para rastrear el progreso. * Implementa métricas para validar la carga (e.g., número de registros cargados). ## **4. Código Completo de Ejemplo** import pandas as pd from sqlalchemy import create\_engine \# Configuración de la base de datos engine = create\_engine("mysql+pymysql://usuario:contraseña@localhost/base\_de\_datos") \# Extracción de datos (simulada) df = pd.read\_csv("datos.csv") \# Transformación de datos (simulada) df\['total'] = df\['cantidad'] \* df\['precio\_unitario'] \# Carga de datos try: df.to\_sql("ventas", con=engine, if\_exists="replace", index=False) print("Datos cargados exitosamente.") except Exception as e: print(f"Error durante la carga: {e}") ## **5. Conclusión** La etapa de **carga** en un proceso ETL debe ser eficiente, segura y confiable para garantizar la integridad y disponibilidad de los datos en el destino final. La elección entre carga completa o incremental, junto con el uso de buenas prácticas, determinará el éxito de este proceso.
Proyecto culminado con éxito, aunque luego de la transformación haciendo la validación en mysql tenía registros con filas completas NULL y tuve hacer una transformación más al df\_filtered antes de ejecutar la consulta de carga de datos. ```js df_filtered.dropna(inplace=True) ```De resto, genial.