No tienes acceso a esta clase

¡Continúa aprendiendo! Únete y comienza a potenciar tu carrera

Curso de Fundamentos de Spark para Big Data

Curso de Fundamentos de Spark para Big Data

Óscar Gutiérrez Castillo

Óscar Gutiérrez Castillo

Particionamiento de RDDs y DataFrames en Spark con PySpark

24/25
Recursos

¿Cómo particionar RDDs y DataFrames en Spark?

La partición de datos es una técnica eficaz para optimizar el procesamiento en Spark, ya que distribuye las cargas de trabajo y mejora la performance. En esta guía, aprenderás cómo particionar tanto RDDs como DataFrames, aprovechando las bondades de Spark para el manejo de grandes volúmenes de datos.

¿Cómo comenzar una sesión de Spark en Jupyter?

Para iniciar una sesión de Spark en Jupyter, es necesario importar los módulos de PySpark correctos. Aquí te mostramos cómo configurarla:

from pyspark.sql import SparkSession

# Configuración de la sesión de Spark
spark = SparkSession.builder \
    .appName('particionado') \
    .master('local[5]') \
    .getOrCreate()

Este código inicia un contexto Spark local con 5 núcleos, configurando así las particiones por defecto.

¿Cómo verificar el número de particiones?

Una vez que tienes un DataFrame, es importante saber cuántas particiones fueron creadas por defecto. Puedes hacerlo fácilmente:

# Creación de un DataFrame de prueba
dataframe_prueba = spark.range(0, 20)

# Verificación del número de particiones
particiones = dataframe_prueba.rdd.getNumPartitions()
print(f"El número de particiones es: {particiones}")  # Por defecto será 5

¿Cómo especificar el número de particiones manualmente?

Cuando crees un RDD, puedes definir explícitamente el número de particiones. Aquí te mostramos cómo:

from pyspark import SparkContext

# Creación del contexto Spark
sc = SparkContext.getOrCreate()

# Definición del conjunto de datos y partición
datos = sc.parallelize(range(0, 20), numSlices=10)

# Verificación del número de particiones
particiones_manual = datos.getNumPartitions()
print(f"Las particiones definidas son: {particiones_manual}")  # Resultado esperado: 10

¿Cómo cargar archivos en RDDs con particiones?

Para trabajar visualmente con archivos, podrías crear un RDD a partir de un archivo y particionarlo:

# Creación de RDD desde archivo
archivo_rdd = sc.textFile('/ruta/al/archivo/deporte.csv', minPartitions=10)

# Verificación
print(f"Particiones del archivo: {archivo_rdd.getNumPartitions()}")

¿Cómo guardar RDDs particionados?

Cuando guardas RDDs particionados en disco, Spark almacena estos en una carpeta con múltiples archivos:

# Guardar RDD particionado
archivo_rdd.saveAsTextFile('/ruta/destino/nueva_carpeta')

Asegúrate de tener permisos adecuados para escribir las particiones.

¿Cómo cargar archivos particionados?

Cargar archivos es sencillo, pero es importante consolidar las particiones apropiadamente para transformarlas en DataFrames:

# Recargamos los archivos de una carpeta
rdd_recargado = sc.textFile('/ruta/destino/nueva_carpeta/*')

# Compresión y mapeo de RDDs
lista_valores = rdd_recargado.map(lambda x: x.split(',')).collect()

Después de recargar los archivos, Spark puede realizar operaciones adicionales, tales como transformaciones basadas en llaves y valores.

¿Por qué elegir métodos sencillos para particionar?

Elegir la forma más simple para cargar materiales de archivos en Spark no solo acelera procesos, sino también minimiza la complejidad. La claridad es clave para mantener un código eficiente y manejable. Encontrarás muchas rutas para manejar partituras; busca la que mejor se adapte a tus necesidades y compártelas con la comunidad para enriquecer el conocimiento común.

¡Continúa explorando y aprovechando las capacidades de Spark para lograr tus metas en big data!

Aportes 10

Preguntas 2

Ordenar por:

¿Quieres ver más aportes, preguntas y respuestas de la comunidad?

sería bueno que este curso se actualizara para trabajar spark completamente en la nube, tomando archivos desde azure, aws o google cloud, convirtiendo los datos con spark desde una herramienta como databricks y subiendo todo este contenido a un storage también en la nube

Una pregunta Oscar, me surge una duda al particionar los datos, ¿Qué criterio debo tomar en cuenta a la hora de realizar una correcta partición de datos?

Usé este modo de lectura que a mí parecer es el más simple

rdd_schema = StructType([
    StructField("deporte_id", IntegerType(), False),
    StructField("deporte", StringType(), False)
])

rdd = spark.read.csv(path+"/rddDesdeArchivo",rdd_schema)
rdd.take(2)

En realidad lo que se tiene que extraer son todas las particiones en un lista, se puede hacer con os o cualquier otra herramienta, lo importante es que todas las particiones estén en la lista. Dejo la solución mia

from pathlib import Path

path = Path("salida")
files = [i for i in path.iterdir() if str(i).split("/")[1][0] == "p"]
val = list(map(lambda x: str(x.absolute()), files))
val.sort()
spark.sparkContext.textFile(','.join(val), 10).map(lambda l: l.split(",")).take(5)

Aqui muestro los resultados

[['deporte_id', 'deporte'],
 ['1', 'Basketball'],
 ['2', 'Judo'],
 ['3', 'Football'],
 ['4', 'Tug-Of-War']]


Quise simplificar el ejemplo de fredy
NOTA: p* indica que solo va a tomar todos los archivos que inicien con p por ende se omite el archivo _success

import glob
path = glob.glob("TuRuta/p* ")
path.sort()
rdd3 = spark.sparkContext.textFile(','.join(path),10).map(lambda l: l.split(','))
rdd3.take(5)

Yo estaba buscando, sistemas de almacenamiento de datos y llegué aquí.

Dejo como aporte el notebook de la clase. Gracias <https://drive.google.com/file/d/1lGNPJ_KEd8gTa1aM8Ih0xceqseiMflQq/view?usp=sharing>
solo para especificar que .master("local\[5]") no es para hacer un particionado en los datos , "local\[5]" significa que Spark usará 5 núcleos del procesador para ejecutar tareas en paralelo.
Les comparto otra forma en la que también se pueden obtener los nombres de los archivos que nos interesan: ```python spark_particionado = SparkSession.builder.appName("Particionado").master("local[5]").getOrCreate() spark_particionado.sparkContext.wholeTextFiles("/mnt/d/Adrian/Cursos/PySpark/Inputs/testsaveas/*").map(lambda x: x[0::2][0]).collect() ```spark\_particionado.sparkContext.wholeTextFiles("/mnt/d/Adrian/Cursos/PySpark/Inputs/testsaveas/\*").map(lambda x: x\[0::2]\[0]).collect()
En mi caso no necesite utilizar wholeTextFile(), solamente con textFile() y la wild card \* fue suficiente. Muy sencillo!!! `rdd = spark.sparkContext.textFile('my_path/*')`