Configuraci贸n de base de datos source y entorno para ETL en Python

6/25

Lectura

隆Hola! En esta clase conocer谩s c贸mo configurar una base de datos con SQL, que ser谩 una de las 3 fuentes para extraer datos en el proyecto de ETL. Las otras dos fuentes son un archivo JSON y otro CSV que conocer谩s en clases posteriores.

...

Reg铆strate o inicia sesi贸n para leer el resto del contenido.

Aportes 29

Preguntas 12

Ordenar por:

驴Quieres ver m谩s aportes, preguntas y respuestas de la comunidad?

Buenas, Si desean y deciden utilizar DataSpell existe la licencia libre por 1 a帽o para estudiantes, aqui esta el enlace para mas info.

Estoy montado, en mi repositorio, un peque帽o tutorial de todas las instalaciones de Postgre SQL.

Me interesa dominar totalmente este tema y adem谩s me estoy divirtiendo un mont贸n, jajaja

Si a alg煤n compa帽ero le puede ser de utilidad, mi repositorio es

https://github.com/ddasilva64/ETLPPT23001esp

Gracias

CREA E IMPORTA TU BASE DE DATOS EN MENOS DE 20 MINUTOS!!

REPOSITORIO

Hola, me la pase d铆as en esta clase, pero decid铆 automatizar todo el proceso para quien me lea en alg煤n momento.

Simplemente, debemos de hacer unos sencillos pasos, y por medio de unos Script en BASH Descarga, divide e inserta sentencias INSERT INTO en cada uno de nuestros archivos. En total, crea 208 archivos, y hacemos la ingesta por medio de otro Script en Bash dentro de nuestro contenedor.

Y pum!! En menos de 20 minutos tenemos los 6 millones de registros!!

Despu茅s de varios intentos de tratar de conectarme a la base de datos con el Notebook y DBeaver me daban errores, lo que me solucion贸 fue cambiar de

-p 5432:5432 

a

-p 5433:5432 

(Es lo mismo si tiene un docker-compose)
Imagino que mi instalaci贸n local de Postgres sin Docker ya estaba ocupando el puerto 5432, entonces utilic茅 el 5433.

seria conveniente actualizar este curso algunas cosas no se pueden replicar tal como se indican aca, intente hacerlo paso a paso y no funciona.

Carlos, gracias por esta clase:

  • He aprendido muchas cosas
  • He visto que, en mi trabajo, DBeaver puede ser una buena herramienta para manejar la BD. La he configurado para Oracle (nuestra BD corporativa) 馃憤
  • Sobre todo, me he divertido much铆simo. 隆Eres genial! 馃槂
    Por cierto, todas las herramientas, que se nombran en esta clase, muestro como utilizarlas en mi repositorio ( dejo la URL, por si a alg煤n compa帽ero/a, le pueda ser de utilidad)

https://github.com/ddasilva64/ETLPPT23001esp/blob/master/extraccion-de-datos.md

Gracias

Comparto esta nota, donde destaca lo bueno y lo malo de DataSpell como herramienta.

Para cargar los datos a la tabla les recomiendo dividan el archivo SQL en varios archivos m谩s peque帽os e ir carg谩ndolos uno a uno, puesto que el script tiene m谩s de 6 millones de filas y esto puede colgar nuestro pc
Ejemplo de un compa帽ero

Buenas tardes, me gustar铆a se agregaran las versiones especificas que se est谩n usando para maximizar la reproducibilidad del curso

al correr el contenedor con la imagen postgres va a tomar la que se encuentre como latest, PERO con el tiempo puede llegar a ser incompatible el c贸digo, seria bueno que le agreguen el tag de la imagen correcta

No pude configurar la Base de Datos, siempre me pasa que no logro entender este tipo de instrucciones y ahi quedo estancada con la parte practica.

Yo lo estoy realizando desde vscode, y lo que hice fue, usar un archivo para dividir el archivo en peque帽os archivos y luego usar otro script de python para cargar todos los archivos, les dejo el Repositorio por si quieren revisar y usarlo, puede ser complicado pero se entiende y realmente me funciono, cargue todos los 6 millones de datos en menos de 10 minutos.

**SOLUCI脫N ERROR DE CONEXI脫N BASE DE DATO** Compa帽eros, en pocas palabras el error se produce porque el puerto 5432 esta ocupado por otros serivicios de nuestro computador, ya sea otras aplicaciones en segundo plano. Se puede producir por instalaciones pasadas que hayan realizado de postgreSQL que genere que diferentes instancias de posgre.exe esten en ejecuci贸n al mismo tiempo. Deben de liberar el puerto 5432, que otros servicios no lo esten usando o en su defento cambien la configuraci贸n del contenedo ren docker al crearlo, configurando un puerto distinto: ejemplo: `docker run -d --name=postgres -p 5433:5433 -v postgres-volume:/var/lib/postgresql/data -e POSTGRES_PASSWORD=mysecretpass postgres` si desean verificar si su puerto 5432 esta en uso, este comando les ser谩 de utilidad: tasklist | find "postgres" : les ayudar谩 a verificar si existen muchas instancias postgres.exe en ejecuci贸n a la vez. netstat -an | find "5432" : Les permitir谩 ver que tan congestionado esta su puerto 5432 y si es necesario usar otro puerto o detener servicios que ya no deseen usar. Espero que les sea de utilidad.

Buenas, para los que trabajan con un jupyter notebook y postgres en dbeaver les dejo un script que utilice para :

A partir del archivo completo, generar 200 archivos m谩s peque帽os (al original lo trabaje como un .txt en vez de .sql cambi谩ndole la extensi贸n)

Recorro cada archivo .txt y lo env铆o a postgres (hay que eliminar la 煤ltima 鈥,鈥 que queda al final de cada archivo como consecuencia de la partici贸n del archivo original

Consideraciones:
*La creaci贸n de la tabla la ejecut茅 directamente desde postgres
*Las sentencias : 鈥淐REATE 鈥tc鈥 y "insert into public.trades (country_code, year, comm_code, flow, trade_usd, kg, quantity, quantity_name)
values " Las elimin茅 del .txt original para que queden solamente los valores a insertar, luego agrego esa sentencia en cada iteraci贸n del loop

import psycopg2  ( previa !pip install psycopg2)
# Establecer los detalles de conexi贸n a la base de datos
host = 'localhost'
database = 'postgres'
port = 5432 (siempre que el puerto queest茅 usando pos sea 5432, chequear por las dudas, de ser otro, hay que cambiarlo)
username = 'postgres'
password = 'xxxxxxxxx' (reemplazar por la constrase帽a de cada uno

# Establecer la ruta del archivo completo
sql_file_path = 'postgres_public_trades.txt' (cambi茅 la extensi贸n .sql por .txt y no defino el path del archivo porque est谩 guardado enla misma carpeta que mi .ipynb)

# Leer el contenido del archivo completo
with open(sql_file_path, 'r') asfile:
    sql_statements = file.readlines()

# Calcular la cantidad del铆neas que va a tener cada archivo
total_files = 200
lines_per_file = len(sql_statements) // total_files

# Crear los archivos de partes
for i inrange(total_files):
    start_index = i * lines_per_file
    end_index = start_index + lines_per_file

    # Obtener el nombre del archivo de parte
    file_name = f"parte_{i + 1}.txt"

    # Escribir las l铆neas correspondientes en el archivo de parte
    with open(file_name, "w") as output_file:
        output_file.writelines(sql_statements[start_index:end_index])
## Hasta ac谩 lo que hice fue particionar el archivo original cono 6.000.000 de registros, en 200 archivos m谩s peque帽os con unos 30.000 registros en cada uno, aproximadamente. Ahora voy a enviar cada archivo a la base:

# Construir la sentencia SQL de inserci贸n
insert_statement = "insert into public.trades (country_code, year, comm_code, flow, trade_usd, kg, quantity, quantity_name) values "

# Iterar sobre los archivos desde parte_1.txt hasta parte_200.txt
for i in range(1, 201):
    # Ruta del archivo actual
    file_path = f"parte_{i}.txt"

    # Leer los valores a insertar desde el archivo
    with open(file_path, 'r') asfile:
        values_to_insert = file.read()

    # Eliminar la 煤ltima coma dela 煤ltima l铆nea
    values_to_insert = values_to_insert.strip()
    values_to_insert = values_to_insert.rstrip(',')

    # Conectarse a la base de datos
    conn = psycopg2.connect(host=host, database=database, port=port, user=username, password=password)
    cur = conn.cursor()

    # Ejecutar la inserci贸nde los valores enla base de datos
    cur.execute(insert_statement + values_to_insert)

    # Confirmar los cambios y cerrar la conexi贸n a la base de datos
    conn.commit()
    cur.close()
    conn.close()
Luego de muchos intentos logre la conexi贸n con DBeaver, cambiando el puerto al 5433, ya que el 5432 ya estaba en uso en mi equipo local. B谩sicamente, por si les es de utilidad, deben crear la imagen en Docker as铆: ```js docker run -d --name=postgres -p 5433:5432 -v postgres-volume:/var/lib/postgresql/data -e POSTGRES_PASSWORD=mysecretpass postgres ```Luego en DBEaver tambi茅n deben indicarle que el puerto es el 5433.

Si no pueden crear 鈥淣ew Workspace Directory鈥, revisen este video: https://www.youtube.com/watch?v=Ax9jXToHsIo&t=112s

Quien vaya por la opci贸n de postgress + dbveaver dejo una soluci贸n para dividir la base de datos con Python, este txt de 6.2Millones de filas que nos proporcionan en el curso.
yo particularmente lo ejecut茅 desde Jupyter notebook y es sencillo, 3 pasos.
1 crear una copia del txt con el que vamos a trabajar
2 eliminar la primera parte del c贸digo SQL , es decir dejar en nuestra copia todo lo que va despues del
insert into public.trades (country_code, year, comm_code, flow, trade_usd, kg, quantity, quantity_name)
values

ya que esto lo vamos a usar por cada carga que hagamos de cada porci贸n que obtengamos del txt.
3 nuestra copia de 6.2M solo de los valores a insertar, colocar el nombre el el c贸digo python y ejecutar (recordar que nuestr txt debe estar en la misma locaci贸n/ubicaci贸n que nuestro notebook.
Cada uno puede modificar la cantidad de filas por porci贸n, yo las hice de 600mil para obtener 10 partes
splitLen = 621636 # nro que corresponde a la cantidad de filas por cada parte que queremos serparar
outputBase = 鈥榩arte鈥 # parte.1.txt, parte.2.txt, etc.

leemos nuestro archivo txt

input = open(鈥榩ostgres_public_trades_completo.txt鈥, 鈥榬鈥).read().split(鈥橽n鈥)

at = 1
for lines in range(0, len(input), splitLen):
# creamos una lista con las porciones
outputData = input[lines:lines+splitLen]

# abre el archivo de salida que denominamos anteriormente como "parte" 
# lo escribimos con las lineas seleccionadas y se cierra el archivo con nuevo nombre + nro de archivo
output = open(outputBase + str(at) + '.txt', 'w')
output.write('\n'.join(outputData))
output.close()

# Incrementa el contador
at += 1

Hola, adjunto c贸digo para ejecutar los archivos ya divididos, los separ茅 de a 30.000 fueron 208 documentos los cuales se demoraron 8 minutos en insertar los 6 millones de registros.
Recuerden crear un pipenv para las dependencias

import os
import psycopg2
import time

def get_conn():
    conn = psycopg2.connect(database="postgres",
                        host="localhost",
                        user="postgres",
                        password="rootpts",
                        port="5432")
    
    if conn.status == 1:
        cursor = conn.cursor()
        return cursor, conn


def start_script():
    cursor, conn = get_conn()
    print('Starting script...')
    entries = os.listdir('txt/')
    for entry in entries:
        time.sleep(1)
        print(f'Reading {entry}...')
        with open(f'txt/{entry}', 'r') as f:
            data = f.read()
            new_data = data.replace('\n', '')
            without_comma = ''.join(new_data.rsplit(',', 1))
            cursor.execute(f"insert into trades (country_code, year, comm_code, flow, trade_usd, kg, quantity, quantity_name) VALUES {without_comma}")
            conn.commit()
            print(f'Inserted {entry} into database!')


if __name__ == '__main__':
    start_script()

Finalmente lo logre con DataSpell, me fue complicado crear los ambientes y me toco leer para solucionarlo鈥

Platzi no es solo ver videos, gracias a los comentarios de esta clase y la comunidad desbloque este logro jeje

Postgres usando un archivo docker-compose

version: "3"
services:
  postgres:
    image: postgres
    container_name: postgres
    ports:
      - "5432:5432"
    volumes:
      - postgres-volume:/var/lib/postgresql/data
    environment:
      - POSTGRES_PASSWORD=mysecretpass
volumes:
  postgres-volume:

Para ejecutar el script use DBeaver

Inserta los datos en menos de 3 minutos (dependiendo de tu pc)

He desarrollado un programa para insertar grandes vol煤menes de datos en PostgreSQL de manera r谩pida y eficiente. Surgi贸 como soluci贸n a los problemas de rendimiento que enfrentamos al insertar mas de 6 millones de registros.

Me encantar铆a recibir su feedback para mejorar este proyecto.

Repositorio GitHub

En mi caso me arrojaba error al momento de realizar el test de connection en DataSpell, inform谩ndome que habia un error en el puerto. La soluci贸n: Simplemente abr铆 docker seleccione la base de datos y le di play y volver a testear la coneccion en DataSpell y ya. ![]()
Estar铆a bueno que subieran los archivos ya divididos, un compa帽ero anterior comparti贸 un repositorio con scripts para dividirlo, no funciona de inmediato pero con unos ajustes se logra.
# 鈿 Quick Fix para FATAL鈥 password authentication failed for鈥 ```txt docker exec -it postgres psql --dbname=postgres --username=postgres --command="ALTER USER postgres PASSWORD 'mysecretpass'" ``` Esta instrucci贸n ejecuta un comando para actualizar la contrase帽a por defecto a la que se utiliza en el curso. Cr茅ditos y m谩s informaci贸n: <https://stackoverflow.com/questions/55038942/fatal-password-authentication-failed-for-user-postgres-postgresql-11-with-pg>
# 馃悕 Instale Miniconda (Conda pero lite) en Ubuntu * Cree una carpeta `miniconda3` en su carpeta de usuario (con la virgulilla, `~`, en este caso), descargue el script de instalaci贸n de Miniconda (ser铆a como un lite de Conda), ejec煤telo y, finalmente, elim铆nelo. Simplemente, copie y pegue lo siguiente en su l铆nea de comandos. 鈿 ```txt mkdir -p ~/miniconda3 wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh -O ~/miniconda3/miniconda.sh bash ~/miniconda3/miniconda.sh -b -u -p ~/miniconda3 rm -rf ~/miniconda3/miniconda.sh ``` * 鈱 Una vez instalado, para que las configuraciones nuevas se incluyan autom谩ticamente al abrir la l铆nea de comandos (solo aplica a Bash). ```txt ~/miniconda3/bin/conda init bash ``` # Ahora, instale DataSpell con un solo comando. 猸愶笍 ```txt sudo snap install dataspell --classic ```
Con este queda en menos de 5 minutos ```js import re, psycopg2 from pathlib import Path # Variables # Conecci贸n postgres db_config = { "host": "localhost", "port": "5432", "database": "postgres", "user": "postgres", "password": "mysecretpass" } conn = psycopg2.connect(**db_config) cur = conn.cursor() # Directorio para los fragmentos p = Path('frag_sql') p.mkdir(exist_ok=True) print("Iniciando fragmentaci贸n y carga") with open('postgres_public_trades.sql', 'r') as op: # Obtener n煤mero de filas for count, line in enumerate(op): pass filas_tot = count + 1 partition = 20000 repeticiones = (filas_tot) // partition final = (filas_tot) % partition with open('postgres_public_trades.sql', 'r') as op: for i in range(repeticiones): # archivos a crear if i == 0: # Primer bloque (ya tiene 'insert into') chunck = re.sub(',$', ';', ''.join([next(op) for _ in range(partition)])) elif i == repeticiones - 1: # 脷ltimo bloque (menos repeticiones) chunck = re.sub(',$', ';', ''.join([next(op) for _ in range(final)])) chunck = f'''insert into public.trades (country_code, year, comm_code, flow, trade_usd, kg, quantity, quantity_name) values {chunck}''' else: chunck = re.sub(',$', ';', ''.join([next(op) for _ in range(partition)])) chunck = f'''insert into public.trades (country_code, year, comm_code, flow, trade_usd, kg, quantity, quantity_name) values {chunck}''' # Crear archivos sql with open(Path.joinpath(p, f'frag_{i+1}.sql'), 'w') as sql: sql.write(chunck) # Correr la consulta SQL try: cur.execute(chunck) except psycopg2.Error as e: print(f'frag_{i+1}.sql No se carg贸 correctamente') # Confirmar la transacci贸n y cerrar la conexi贸n conn.commit() conn.close() print("Fragmentos creados y cargados") ```
Espero esto les sirva, yo no pude utilizar ninguna de las sugerencia lo que hice fue instalar docker y crear la base como dice en el paso a paso, luego cree dentro de dbeaver con la sentencia de create la base de datos y modifique el data set para que se vea de la siguiente forma, le quite los tab del inicio: ![](https://static.platzi.com/media/user_upload/image-da0a2758-d576-40e6-807f-b4b2545f667d.jpg) luego ejecute el siguiente codigo de python y se demoro alrededor de 5 min y todo funciono perfecto : ```python import psycopg2 from psycopg2.extras import execute_values from tqdm import tqdm # Function to get the connection and cursor def get_conn(): conn = psycopg2.connect(database="postgres", host="localhost", user="postgres", password="mysecretpass", port="5432") cursor = conn.cursor() return cursor, conn # Function to insert data in bulk def insert_data_in_bulk(data): cursor, conn = get_conn() insert_statement = f"INSERT INTO trades (country_code, year, comm_code, flow, trade_usd, kg, quantity, quantity_name) VALUES {data};" cursor.execute(insert_statement) conn.commit() conn.close() # Read data from the SQL file with open('postgres_public_tradesv2.sql', 'r') as file: data = file.read() def split_into_batches(data, num_items): data_list = data.split(',\n') batches = [',\n'.join(data_list[i:i + num_items]) for i in range(0, len(data_list), num_items)] return batches num_items = 10000 # Specify the number of items per batch batches = split_into_batches(data, num_items) # Call the function for each batch with a progress bar for i, batch in enumerate(tqdm(batches, desc="Inserting data")): insert_data_in_bulk(batch) ```import psycopg2 from psycopg2.extras import execute\_values from tqdm import tqdm \# Function to get the connection and cursor def get\_conn(): conn = psycopg2.connect(database="postgres", host="localhost", user="postgres", password="mysecretpass", port="5432") cursor = conn.cursor() return cursor, conn \# Function to insert data in bulk def insert\_data\_in\_bulk(data): cursor, conn = get\_conn() insert\_statement = f"INSERT INTO trades (country\_code, year, comm\_code, flow, trade\_usd, kg, quantity, quantity\_name) VALUES {data};" cursor.execute(insert\_statement) conn.commit() conn.close() \# Read data from the SQL file with open('postgres\_public\_tradesv2.sql', 'r') as file: data = file.read() def split\_into\_batches(data, num\_items): data\_list = data.split(',\n') batches = \[',\n'.join(data\_list\[i:i + num\_items]) for i in range(0, len(data\_list), num\_items)] return batches num\_items = 10000 # Specify the number of items per batch batches = split\_into\_batches(data, num\_items) \# Call the function for each batch with a progress bar for i, batch in enumerate(tqdm(batches, desc="Inserting data")): insert\_data\_in\_bulk(batch)

Carga autom谩tica de datos por partes

Cuando intent茅 ingresar los datos directamente (Creaci贸n de tabla + 1 INSERT con 6216353 registros directamente) con el archivo 鈥減ostgres_public_trades.sql鈥 mi computador b谩sicamente casi explota. Con base en lo anterior, decid铆 realizar un c贸digo (notebook, script) en Python que particiona el archivo 鈥減ostgres_public_trades.sql鈥 para poder realizar las inserciones por partes y en ejecutarlas en serie (partici贸n por partici贸n).

Instrucciones:

  1. Crea tu base de datos con el nombre que desees <DB_name>
  2. Crea una carpeta con el nombre que desees (ese mismo nombre se lo debes asignar a la variable <folder_name>) y en una ruta de tu preferencia (esa misma ruta se la debes asignar a la variable <parent_dir>), para este ejemplo:
folder_name = "splits"
parent_dir = "C:/Users/HP/Documents/python/002_ETL/"
  1. El c贸digo est谩 hecho para realizar 20 particiones de la sentencia INSERT y 1 partici贸n para la creaci贸n de la tabla. En caso de que quieras cambiar el n煤mero de particiones debes cambiar el valor de la variable <n_splits> (Ten en cuenta que el n煤mero total de particiones es <n_splits+1>).

  2. Guarda el archivo 鈥減ostgres_public_trades.sql鈥 dentro de la carpeta creada en el paso 1 y Descarga el c贸digo como notebook o script seg煤n tu preferencia (Te recomiendo el notebook si tienes VScode con las extensiones instaladas o cualquier otro IDE que te permita correrlo, el script fue generado a partir del NoteBook) y realiza los cambios necesarios para la ejecuci贸n (asignaci贸n de folder_name y parent_dir).


  3. Debes abrir tu IDE (en este caso VScode) o editor de c贸digo enlazado con la ruta de la carpeta que creaste y ejecuta el notebook/script (en mi computador, el tiempo de ejecuci贸n aproximado fue de 2min para n_splits=20). Luego de ejecutar el c贸digo, autom谩ticamente se crea la carpeta splits y el archivo splits.sql dentro de la carpeta creada en el paso 1.



  4. Desde la terminal que est茅s usando para postgres (ya sea desde DataSpell, bash (terminal ubuntu), cmd, PowerShell, psql shell, PSQL Tool de PgAdmin, etc), ejecuta el comando \i seguido de un espacio y la ruta completa para ejecutar el archivo splits.sql (el tiempo de ejecuci贸n aproximado fue de 5min). Debes asegurarte de estar conectado en la base de datos (que ya creaste) con el comando \c <DB_name>.

# General Form
\i <parent_dir><folder_name>/splits.sql

# Sample
\i C:/Users/HP/Documents/python/002_ETL/splits.sql



Nota: En mi caso, el tiempo de ejecuci贸n (splits con python + carga de datos) total pas贸 de memoria insuficiente a 7min aproximadamente con el mismo computador 馃憖. Espero haberte podido ayudar 鉁

Error: FATAL: role "postgres" does not exist

  • Siguiendo todos los pasos el usuario user=postgres no funcion贸.
  • Al parecer mi m谩quina lo creo con el nombre de usuario de la m谩quina, el cual -para estar seguro- lo puedes obtener en la terminal ejecutando:
> whoami
platzi_user
  • Y funcion贸 el test de conexi贸n: