Convierte tus certificados en títulos universitarios en USA

Antes: $249

Currency
$209

Paga en 4 cuotas sin intereses

Paga en 4 cuotas sin intereses
Suscríbete

Termina en:

19 Días
10 Hrs
4 Min
35 Seg

Configuración de base de datos source y entorno para ETL en Python

6/25

Lectura

¡Hola! En esta clase conocerás cómo configurar una base de datos con SQL, que será una de las 3 fuentes para extraer datos en el proyecto de ETL. Las otras dos fuentes son un archivo JSON y otro CSV que conocerás en clases posteriores.

...

Regístrate o inicia sesión para leer el resto del contenido.

Aportes 31

Preguntas 14

Ordenar por:

¿Quieres ver más aportes, preguntas y respuestas de la comunidad?

Después de varios intentos de tratar de conectarme a la base de datos con el Notebook y DBeaver me daban errores, lo que me solucionó fue cambiar de

-p 5432:5432 

a

-p 5433:5432 

(Es lo mismo si tiene un docker-compose)
Imagino que mi instalación local de Postgres sin Docker ya estaba ocupando el puerto 5432, entonces utilicé el 5433.

seria conveniente actualizar este curso algunas cosas no se pueden replicar tal como se indican aca, intente hacerlo paso a paso y no funciona.

No pude configurar la Base de Datos, siempre me pasa que no logro entender este tipo de instrucciones y ahi quedo estancada con la parte practica.

Comparto esta nota, donde destaca lo bueno y lo malo de DataSpell como herramienta.

Para cargar los datos a la tabla les recomiendo dividan el archivo SQL en varios archivos más pequeños e ir cargándolos uno a uno, puesto que el script tiene más de 6 millones de filas y esto puede colgar nuestro pc
Ejemplo de un compañero

Después de leer mucho y 4 horas de intentos, la solución fue usar el puerto 8000 `sudo docker run -d --name=postgres -p 8000:5432 -v postgres-volume:/var/lib/postgresql/data -e POSTGRES_PASSWORD=mysecretpass postgres`

Buenas, para los que trabajan con un jupyter notebook y postgres en dbeaver les dejo un script que utilice para :

A partir del archivo completo, generar 200 archivos más pequeños (al original lo trabaje como un .txt en vez de .sql cambiándole la extensión)

Recorro cada archivo .txt y lo envío a postgres (hay que eliminar la última “,” que queda al final de cada archivo como consecuencia de la partición del archivo original

Consideraciones:
*La creación de la tabla la ejecuté directamente desde postgres
*Las sentencias : “CREATE …etc” y "insert into public.trades (country_code, year, comm_code, flow, trade_usd, kg, quantity, quantity_name)
values " Las eliminé del .txt original para que queden solamente los valores a insertar, luego agrego esa sentencia en cada iteración del loop

import psycopg2  ( previa !pip install psycopg2)
# Establecer los detalles de conexión a la base de datos
host = 'localhost'
database = 'postgres'
port = 5432 (siempre que el puerto queesté usando pos sea 5432, chequear por las dudas, de ser otro, hay que cambiarlo)
username = 'postgres'
password = 'xxxxxxxxx' (reemplazar por la constraseña de cada uno

# Establecer la ruta del archivo completo
sql_file_path = 'postgres_public_trades.txt' (cambié la extensión .sql por .txt y no defino el path del archivo porque está guardado enla misma carpeta que mi .ipynb)

# Leer el contenido del archivo completo
with open(sql_file_path, 'r') asfile:
    sql_statements = file.readlines()

# Calcular la cantidad delíneas que va a tener cada archivo
total_files = 200
lines_per_file = len(sql_statements) // total_files

# Crear los archivos de partes
for i inrange(total_files):
    start_index = i * lines_per_file
    end_index = start_index + lines_per_file

    # Obtener el nombre del archivo de parte
    file_name = f"parte_{i + 1}.txt"

    # Escribir las líneas correspondientes en el archivo de parte
    with open(file_name, "w") as output_file:
        output_file.writelines(sql_statements[start_index:end_index])
## Hasta acá lo que hice fue particionar el archivo original cono 6.000.000 de registros, en 200 archivos más pequeños con unos 30.000 registros en cada uno, aproximadamente. Ahora voy a enviar cada archivo a la base:

# Construir la sentencia SQL de inserción
insert_statement = "insert into public.trades (country_code, year, comm_code, flow, trade_usd, kg, quantity, quantity_name) values "

# Iterar sobre los archivos desde parte_1.txt hasta parte_200.txt
for i in range(1, 201):
    # Ruta del archivo actual
    file_path = f"parte_{i}.txt"

    # Leer los valores a insertar desde el archivo
    with open(file_path, 'r') asfile:
        values_to_insert = file.read()

    # Eliminar la última coma dela última línea
    values_to_insert = values_to_insert.strip()
    values_to_insert = values_to_insert.rstrip(',')

    # Conectarse a la base de datos
    conn = psycopg2.connect(host=host, database=database, port=port, user=username, password=password)
    cur = conn.cursor()

    # Ejecutar la inserciónde los valores enla base de datos
    cur.execute(insert_statement + values_to_insert)

    # Confirmar los cambios y cerrar la conexión a la base de datos
    conn.commit()
    cur.close()
    conn.close()

Buenas tardes, me gustaría se agregaran las versiones especificas que se están usando para maximizar la reproducibilidad del curso

al correr el contenedor con la imagen postgres va a tomar la que se encuentre como latest, PERO con el tiempo puede llegar a ser incompatible el código, seria bueno que le agreguen el tag de la imagen correcta

Yo lo estoy realizando desde vscode, y lo que hice fue, usar un archivo para dividir el archivo en pequeños archivos y luego usar otro script de python para cargar todos los archivos, les dejo el Repositorio por si quieren revisar y usarlo, puede ser complicado pero se entiende y realmente me funciono, cargue todos los 6 millones de datos en menos de 10 minutos.

**SOLUCIÓN ERROR DE CONEXIÓN BASE DE DATO** Compañeros, en pocas palabras el error se produce porque el puerto 5432 esta ocupado por otros serivicios de nuestro computador, ya sea otras aplicaciones en segundo plano. Se puede producir por instalaciones pasadas que hayan realizado de postgreSQL que genere que diferentes instancias de posgre.exe esten en ejecución al mismo tiempo. Deben de liberar el puerto 5432, que otros servicios no lo esten usando o en su defento cambien la configuración del contenedo ren docker al crearlo, configurando un puerto distinto: ejemplo: `docker run -d --name=postgres -p 5433:5433 -v postgres-volume:/var/lib/postgresql/data -e POSTGRES_PASSWORD=mysecretpass postgres` si desean verificar si su puerto 5432 esta en uso, este comando les será de utilidad: tasklist | find "postgres" : les ayudará a verificar si existen muchas instancias postgres.exe en ejecución a la vez. netstat -an | find "5432" : Les permitirá ver que tan congestionado esta su puerto 5432 y si es necesario usar otro puerto o detener servicios que ya no deseen usar. Espero que les sea de utilidad.
Como yo ya tenía mi postgres instalado tuve varios problemas, pero como quería seguir el curso al pie de la letra me toco hace un cambio en el puerto para poder crear mi imagen de postgress sudo docker run -d --name=postgres -p 5433:5432 -v postgres-volume:/var/lib/postgresql/data -e POSTGRES\_PASSWORD=mysecretpass postgres
Con este queda en menos de 5 minutos ```js import re, psycopg2 from pathlib import Path # Variables # Conección postgres db_config = { "host": "localhost", "port": "5432", "database": "postgres", "user": "postgres", "password": "mysecretpass" } conn = psycopg2.connect(**db_config) cur = conn.cursor() # Directorio para los fragmentos p = Path('frag_sql') p.mkdir(exist_ok=True) print("Iniciando fragmentación y carga") with open('postgres_public_trades.sql', 'r') as op: # Obtener número de filas for count, line in enumerate(op): pass filas_tot = count + 1 partition = 20000 repeticiones = (filas_tot) // partition final = (filas_tot) % partition with open('postgres_public_trades.sql', 'r') as op: for i in range(repeticiones): # archivos a crear if i == 0: # Primer bloque (ya tiene 'insert into') chunck = re.sub(',$', ';', ''.join([next(op) for _ in range(partition)])) elif i == repeticiones - 1: # Último bloque (menos repeticiones) chunck = re.sub(',$', ';', ''.join([next(op) for _ in range(final)])) chunck = f'''insert into public.trades (country_code, year, comm_code, flow, trade_usd, kg, quantity, quantity_name) values {chunck}''' else: chunck = re.sub(',$', ';', ''.join([next(op) for _ in range(partition)])) chunck = f'''insert into public.trades (country_code, year, comm_code, flow, trade_usd, kg, quantity, quantity_name) values {chunck}''' # Crear archivos sql with open(Path.joinpath(p, f'frag_{i+1}.sql'), 'w') as sql: sql.write(chunck) # Correr la consulta SQL try: cur.execute(chunck) except psycopg2.Error as e: print(f'frag_{i+1}.sql No se cargó correctamente') # Confirmar la transacción y cerrar la conexión conn.commit() conn.close() print("Fragmentos creados y cargados") ```

Inserta los datos en menos de 3 minutos (dependiendo de tu pc)

He desarrollado un programa para insertar grandes volúmenes de datos en PostgreSQL de manera rápida y eficiente. Surgió como solución a los problemas de rendimiento que enfrentamos al insertar mas de 6 millones de registros.

Me encantaría recibir su feedback para mejorar este proyecto.

Repositorio GitHub

Luego de muchos intentos logre la conexión con DBeaver, cambiando el puerto al 5433, ya que el 5432 ya estaba en uso en mi equipo local. Básicamente, por si les es de utilidad, deben crear la imagen en Docker así: ```js docker run -d --name=postgres -p 5433:5432 -v postgres-volume:/var/lib/postgresql/data -e POSTGRES_PASSWORD=mysecretpass postgres ```Luego en DBEaver también deben indicarle que el puerto es el 5433.

Si no pueden crear “New Workspace Directory”, revisen este video: https://www.youtube.com/watch?v=Ax9jXToHsIo&t=112s

Quien vaya por la opción de postgress + dbveaver dejo una solución para dividir la base de datos con Python, este txt de 6.2Millones de filas que nos proporcionan en el curso.
yo particularmente lo ejecuté desde Jupyter notebook y es sencillo, 3 pasos.
1 crear una copia del txt con el que vamos a trabajar
2 eliminar la primera parte del código SQL , es decir dejar en nuestra copia todo lo que va despues del
insert into public.trades (country_code, year, comm_code, flow, trade_usd, kg, quantity, quantity_name)
values

ya que esto lo vamos a usar por cada carga que hagamos de cada porción que obtengamos del txt.
3 nuestra copia de 6.2M solo de los valores a insertar, colocar el nombre el el código python y ejecutar (recordar que nuestr txt debe estar en la misma locación/ubicación que nuestro notebook.
Cada uno puede modificar la cantidad de filas por porción, yo las hice de 600mil para obtener 10 partes
splitLen = 621636 # nro que corresponde a la cantidad de filas por cada parte que queremos serparar
outputBase = ‘parte’ # parte.1.txt, parte.2.txt, etc.

leemos nuestro archivo txt

input = open(‘postgres_public_trades_completo.txt’, ‘r’).read().split(’\n’)

at = 1
for lines in range(0, len(input), splitLen):
# creamos una lista con las porciones
outputData = input[lines:lines+splitLen]

# abre el archivo de salida que denominamos anteriormente como "parte" 
# lo escribimos con las lineas seleccionadas y se cierra el archivo con nuevo nombre + nro de archivo
output = open(outputBase + str(at) + '.txt', 'w')
output.write('\n'.join(outputData))
output.close()

# Incrementa el contador
at += 1

Hola, adjunto código para ejecutar los archivos ya divididos, los separé de a 30.000 fueron 208 documentos los cuales se demoraron 8 minutos en insertar los 6 millones de registros.
Recuerden crear un pipenv para las dependencias

import os
import psycopg2
import time

def get_conn():
    conn = psycopg2.connect(database="postgres",
                        host="localhost",
                        user="postgres",
                        password="rootpts",
                        port="5432")
    
    if conn.status == 1:
        cursor = conn.cursor()
        return cursor, conn


def start_script():
    cursor, conn = get_conn()
    print('Starting script...')
    entries = os.listdir('txt/')
    for entry in entries:
        time.sleep(1)
        print(f'Reading {entry}...')
        with open(f'txt/{entry}', 'r') as f:
            data = f.read()
            new_data = data.replace('\n', '')
            without_comma = ''.join(new_data.rsplit(',', 1))
            cursor.execute(f"insert into trades (country_code, year, comm_code, flow, trade_usd, kg, quantity, quantity_name) VALUES {without_comma}")
            conn.commit()
            print(f'Inserted {entry} into database!')


if __name__ == '__main__':
    start_script()

Finalmente lo logre con DataSpell, me fue complicado crear los ambientes y me toco leer para solucionarlo…

Platzi no es solo ver videos, gracias a los comentarios de esta clase y la comunidad desbloque este logro jeje

Postgres usando un archivo docker-compose

version: "3"
services:
  postgres:
    image: postgres
    container_name: postgres
    ports:
      - "5432:5432"
    volumes:
      - postgres-volume:/var/lib/postgresql/data
    environment:
      - POSTGRES_PASSWORD=mysecretpass
volumes:
  postgres-volume:

Para ejecutar el script use DBeaver

Tuve un problema al instalar Anaconda en windows. El problema era que el navigator y el anaconda prompt no abría. Lo que pasaba era que no podía ejecutar un script básico para su arranque porque mi nombre de usuario tenia un "espacio" lo cual causa conflicto. La solución es destinar la descarga en una ruta que no tenga la carpeta del nombre de usuario en mi caso la puse en C:/anaconda3
Yo instale anaconda y dataSpell en mi disco local D:/ porque tenia poco almacenamiento en mi disco local C:/ debido a que es un M.2 de 256GB, el entorno root me funcionaba sin problemas, pero aunque podia crear los entornos nuevos, no podia ejecutarlos bien, tenia detalles, entonces viendo esta solucion pude: <https://stackoverflow.com/questions/60789886/error-failed-to-create-temp-directory-c-users-user-appdata-local-temp-conda> modificar: `@set TEMP=C:/temp` `@set TMP=C:/temp`
He visto que en esta clase y la anterior hemos tenido muchos problemas con este IDE porque no estamos familiarizados. En mi caso, siempre hacía mis proyectos desde la terminal, desde el entorno virtual hasta la conexión con el servidor, así qué, sí, es distinto. Pero no te preocupes, te voy a dar unos tips para que se vuelva más sencillo. El puerto que debes agregar debe ser el 8000, así que en el código queda: ''' `sudo docker run -d --name=postgres -p 8000:5432 -v postgres-volume:/var/lib/postgresql/data -e POSTGRES_PASSWORD=mysecretpass postgres` además de esto, crea primero un entorno virtual desde Anaconda Navigator, esto sucede porque si lo creamos desde ubuntu nos va a presentar fallas (4 horas teniendo errores). Después de esto ahí si configuras tu entorno con DataSpell. De allí, deberás instalar las librerías manualmente, Pandas - sqlalchemy, psycopg2, etc... Por último, agrega una carpeta nueva llamada src, allí pon los archivps .json y .csv para que los puedas usar sin ningún problema
Hola! no he podido resolver este error al momento de cargar los datos. Alguien puede ayudar? `Connection refused to host: 127.0.0.1; nested exception is:` `java.net.ConnectException: Connection refused.`
En mi caso me arrojaba error al momento de realizar el test de connection en DataSpell, informándome que habia un error en el puerto. La solución: Simplemente abrí docker seleccione la base de datos y le di play y volver a testear la coneccion en DataSpell y ya. ![]()
Estaría bueno que subieran los archivos ya divididos, un compañero anterior compartió un repositorio con scripts para dividirlo, no funciona de inmediato pero con unos ajustes se logra.
# ⚡ Quick Fix para FATAL… password authentication failed for… ```txt docker exec -it postgres psql --dbname=postgres --username=postgres --command="ALTER USER postgres PASSWORD 'mysecretpass'" ``` Esta instrucción ejecuta un comando para actualizar la contraseña por defecto a la que se utiliza en el curso. Créditos y más información: <https://stackoverflow.com/questions/55038942/fatal-password-authentication-failed-for-user-postgres-postgresql-11-with-pg>
# 🐍 Instale Miniconda (Conda pero lite) en Ubuntu * Cree una carpeta `miniconda3` en su carpeta de usuario (con la virgulilla, `~`, en este caso), descargue el script de instalación de Miniconda (sería como un lite de Conda), ejecútelo y, finalmente, elimínelo. Simplemente, copie y pegue lo siguiente en su línea de comandos. ⚡ ```txt mkdir -p ~/miniconda3 wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh -O ~/miniconda3/miniconda.sh bash ~/miniconda3/miniconda.sh -b -u -p ~/miniconda3 rm -rf ~/miniconda3/miniconda.sh ``` * ⌛ Una vez instalado, para que las configuraciones nuevas se incluyan automáticamente al abrir la línea de comandos (solo aplica a Bash). ```txt ~/miniconda3/bin/conda init bash ``` # Ahora, instale DataSpell con un solo comando. ⭐️ ```txt sudo snap install dataspell --classic ```
Espero esto les sirva, yo no pude utilizar ninguna de las sugerencia lo que hice fue instalar docker y crear la base como dice en el paso a paso, luego cree dentro de dbeaver con la sentencia de create la base de datos y modifique el data set para que se vea de la siguiente forma, le quite los tab del inicio: ![](https://static.platzi.com/media/user_upload/image-da0a2758-d576-40e6-807f-b4b2545f667d.jpg) luego ejecute el siguiente codigo de python y se demoro alrededor de 5 min y todo funciono perfecto : ```python import psycopg2 from psycopg2.extras import execute_values from tqdm import tqdm # Function to get the connection and cursor def get_conn(): conn = psycopg2.connect(database="postgres", host="localhost", user="postgres", password="mysecretpass", port="5432") cursor = conn.cursor() return cursor, conn # Function to insert data in bulk def insert_data_in_bulk(data): cursor, conn = get_conn() insert_statement = f"INSERT INTO trades (country_code, year, comm_code, flow, trade_usd, kg, quantity, quantity_name) VALUES {data};" cursor.execute(insert_statement) conn.commit() conn.close() # Read data from the SQL file with open('postgres_public_tradesv2.sql', 'r') as file: data = file.read() def split_into_batches(data, num_items): data_list = data.split(',\n') batches = [',\n'.join(data_list[i:i + num_items]) for i in range(0, len(data_list), num_items)] return batches num_items = 10000 # Specify the number of items per batch batches = split_into_batches(data, num_items) # Call the function for each batch with a progress bar for i, batch in enumerate(tqdm(batches, desc="Inserting data")): insert_data_in_bulk(batch) ```import psycopg2 from psycopg2.extras import execute\_values from tqdm import tqdm \# Function to get the connection and cursor def get\_conn(): conn = psycopg2.connect(database="postgres", host="localhost", user="postgres", password="mysecretpass", port="5432") cursor = conn.cursor() return cursor, conn \# Function to insert data in bulk def insert\_data\_in\_bulk(data): cursor, conn = get\_conn() insert\_statement = f"INSERT INTO trades (country\_code, year, comm\_code, flow, trade\_usd, kg, quantity, quantity\_name) VALUES {data};" cursor.execute(insert\_statement) conn.commit() conn.close() \# Read data from the SQL file with open('postgres\_public\_tradesv2.sql', 'r') as file: data = file.read() def split\_into\_batches(data, num\_items): data\_list = data.split(',\n') batches = \[',\n'.join(data\_list\[i:i + num\_items]) for i in range(0, len(data\_list), num\_items)] return batches num\_items = 10000 # Specify the number of items per batch batches = split\_into\_batches(data, num\_items) \# Call the function for each batch with a progress bar for i, batch in enumerate(tqdm(batches, desc="Inserting data")): insert\_data\_in\_bulk(batch)

Carga automática de datos por partes

Cuando intenté ingresar los datos directamente (Creación de tabla + 1 INSERT con 6216353 registros directamente) con el archivo “postgres_public_trades.sql” mi computador básicamente casi explota. Con base en lo anterior, decidí realizar un código (notebook, script) en Python que particiona el archivo “postgres_public_trades.sql” para poder realizar las inserciones por partes y en ejecutarlas en serie (partición por partición).

Instrucciones:

  1. Crea tu base de datos con el nombre que desees <DB_name>
  2. Crea una carpeta con el nombre que desees (ese mismo nombre se lo debes asignar a la variable <folder_name>) y en una ruta de tu preferencia (esa misma ruta se la debes asignar a la variable <parent_dir>), para este ejemplo:
folder_name = "splits"
parent_dir = "C:/Users/HP/Documents/python/002_ETL/"
  1. El código está hecho para realizar 20 particiones de la sentencia INSERT y 1 partición para la creación de la tabla. En caso de que quieras cambiar el número de particiones debes cambiar el valor de la variable <n_splits> (Ten en cuenta que el número total de particiones es <n_splits+1>).

  2. Guarda el archivo “postgres_public_trades.sql” dentro de la carpeta creada en el paso 1 y Descarga el código como notebook o script según tu preferencia (Te recomiendo el notebook si tienes VScode con las extensiones instaladas o cualquier otro IDE que te permita correrlo, el script fue generado a partir del NoteBook) y realiza los cambios necesarios para la ejecución (asignación de folder_name y parent_dir).


  3. Debes abrir tu IDE (en este caso VScode) o editor de código enlazado con la ruta de la carpeta que creaste y ejecuta el notebook/script (en mi computador, el tiempo de ejecución aproximado fue de 2min para n_splits=20). Luego de ejecutar el código, automáticamente se crea la carpeta splits y el archivo splits.sql dentro de la carpeta creada en el paso 1.



  4. Desde la terminal que estés usando para postgres (ya sea desde DataSpell, bash (terminal ubuntu), cmd, PowerShell, psql shell, PSQL Tool de PgAdmin, etc), ejecuta el comando \i seguido de un espacio y la ruta completa para ejecutar el archivo splits.sql (el tiempo de ejecución aproximado fue de 5min). Debes asegurarte de estar conectado en la base de datos (que ya creaste) con el comando \c <DB_name>.

# General Form
\i <parent_dir><folder_name>/splits.sql

# Sample
\i C:/Users/HP/Documents/python/002_ETL/splits.sql



Nota: En mi caso, el tiempo de ejecución (splits con python + carga de datos) total pasó de memoria insuficiente a 7min aproximadamente con el mismo computador 👀. Espero haberte podido ayudar ✌

Error: FATAL: role "postgres" does not exist

  • Siguiendo todos los pasos el usuario user=postgres no funcionó.
  • Al parecer mi máquina lo creo con el nombre de usuario de la máquina, el cual -para estar seguro- lo puedes obtener en la terminal ejecutando:
> whoami
platzi_user
  • Y funcionó el test de conexión: