Operaciones avanzadas con RDDs en Spark: Uniones y Joins
Resumen
¿Cómo operar con múltiples RDDs para obtener información significativa?
Operar con RDDs (Resilient Distributed Datasets) es una habilidad esencial para quienes trabajan con grandes volúmenes de datos. En esta lección, te mostramos cómo integrar y manipular varios RDDs para obtener datos significativos. Al finalizar este proceso, tendrás una comprensión más profunda de cómo gestionar estos datos de manera efectiva en un entorno distribuido.
¿Cómo importar y visualizar RDDs en Jupyter?
Comenzamos importando RDDs desde archivos CSV usando Jupyter. Para esto, asegúrate de tener los archivos CSV en tu carpeta de trabajo. Recuerda que los archivos pueden tener o no encabezados. Por ello, importamos dos archivos: uno con encabezado ("deportista.csv") y otro sin ("deportista2.csv").
La operación union es crucial cuando necesitas fusionar datos de dos RDDs. Spark maneja automáticamente las duplicidades al ejecutar una union. Aquí te mostramos cómo hacerlo:
Para validar la operación, puedes utilizar el conteo:
conteo = deportista_olimpico_unido.count()
Un conteo exitoso indica que los archivos tienen la calidad mínima necesaria.
¿Cómo realizar joins con RDDs?
Hacer un join entre RDDs te permite combinar información de diferentes fuentes. Por ejemplo, al unir deportistas con equipos olímpicos, primero debes identificar las claves comunes:
# Seleccionar la clave y el resto de las columnas necesariasdeportista_clave_valor = deportista_olimpico_RDD.map(lambda l:(l[-1], l))equipos_olimpicos_clave_valor = equipos_olimpicos_RDD.map(lambda m:(m[0], m[-1]))# Realizar el joinjoin_resultado = deportista_clave_valor.join(equipos_olimpicos_clave_valor)
¿Cómo utilizar muestras para verificar un join?
La función takeSample te permite tomar una muestra aleatoria de tus datos para revisar la calidad del join:
muestra = join_resultado.takeSample(False,6,42)
¿Cómo cargar y filtrar un RDD de resultados?
Al trabajar con datos como resultados de juegos olímpicos, puedes estar interesado solo en aquellos jugadores que ganaron medallas. Para esto, carga y filtra los resultados:
Al operar con grandes volúmenes de datos, es clave utilizar técnicas eficientes. A medida que desarrolles tus habilidades, explora el uso de más funciones de Spark para optimizar tus análisis. ¡No olvides seguir practicando y buscando nuevos retos para consolidar tus conocimientos!
Dado el join que se acaba de realizar entre deportistaOlimpicoRDD y equiposOlimpicosRDD, hay que cambiar el valor llave (el cual es equipo_id) por deportista_id ya que éste valor es quien relaciona la tabla Deportista con la tabla Resultados.
Solo después de haber realizado el cambio de valor, ya se puede hacer el join. El cambio de llaves se hace simplemente ordenando la información con un map().
(deportistaOlimpicoRDD.map(lambda l :[l[-1],l[:-1]]) # Se selecciona la última columna del RDD(equipo_id) que es valor eje y el resto de contenidos
.join(equiposOlimpicosRDD.map(lambda l :[l[0],l[2]])) # Se selecciona solo el id que es el valor eje y la sigla del país
.map(lambda l :(l[1][0][0],(l[0],l[1][0][1:],l[1][1]))) # Pone deportista_id como valor eje
.join(resultadoGanador.map(lambda l :[l[2],l[1]])) # Toma deportista_id y medalla
.takeSample(False,6,25))
mi output es:
[('54832',(('982',['Carlos Jimnez Snchez','1','24','201','100'],'ESP'),'Silver')),('135219',(('944',['Urka olnir','2','22','173','64'],'SLO'),'Bronze')),('106724',(('705',['Adrie Ard Schenk','1','19','190','90'],'NED'),'Gold')),('112317',(('1096',['Justus Ketchum Smith','1','26','0','0'],'USA'),'Gold')),('124518',(('705',['Daniel Daan van Dijk','1','21','0','0'],'NED'),'Gold')),('42668',(('1096',['Joseph Tilford Lee Joe Greene','1','25','183','70'],'USA'),'Bronze'))]
Así yo le dí respuesta al reto para traernos los valores de todos los campos:
Tener en cuenta que la función top() NO muestra los primeros valores, en realidad muestra los valores más altos. Es decir si tengo un arreglo con los valores [1,2,3,4,5] y uso top(2) me mostrará el valor 5 y 4. Si mi arreglo es de strings me mostrará primero las letras en orden alfabético seguido de los números. Ejemplo: si mi RDD es [‘a’,‘b’,‘4’,‘5’] y uso top(3) me mostrará [‘b’,‘a’,‘5’]..
Sé que es un curso muy viejo y tal vez nadie vea mi respuesta. Pero creo que es muy diferente a la que los demás han hecho y creo que es mucho más fácil de entender.
[('54832',(('982',['Carlos Jimnez Snchez','1','24','201','100'],'ESP'),'Silver')),('135219',(('944',['Urka olnir','2','22','173','64'],'SLO'),'Bronze')),('106724',(('705',['Adrie Ard Schenk','1','19','190','90'],'NED'),'Gold')),('112317',(('1096',['Justus Ketchum Smith','1','26','0','0'],'USA'),'Gold')),('124518',(('705',['Daniel Daan van Dijk','1','21','0','0'],'NED'),'Gold')),('42668',(('1096',['Joseph Tilford Lee Joe Greene','1','25','183','70'],'USA'),'Bronze'))]
No entendí lo de la semilla. con takeSample()
Debido a que takeSample toma una muestra aleatoria, se requiere ingresar una 'semilla' para que se calcule una muestra. Haz la prueba ingresando diferentes semillas.
Otra forma de entender la semilla es que al ocuparla la computadora escoge valores "semi aleatorios" de forma que si yo ocupo la misma semilla que tú, a ambas nos van a regresar los mismos valores "aleatorios"
Mi salida agregando las medallas y el Id del evento a al join ya generado,
deportistaOlimpicoRDD.map(lambda l : [l[-1], l[:-1]]).join(equiposOlimpocosRDD.map(lambda x : [x[0], x[2]])).join(resultadoGanador.map( lambda r : [r [-1], r[1]])).take(2)
No entendí por qué quería hacer union de los dos csv's. La explicación de que uno de los dos no tiene headers no es suficiente. ¿Supongo que en el otro hay más información necesaria?
por que al hacer el Join llave-valor la llave se separa del arreglo del resto de valores, por ejemplo ahora si vemos la longitud vemos que se reduce a 1(0,1) como si tuviera únicamente 2 columnas, esto se puede evitar para mantener la misma "estructura" o cómo se puede separar nuevamente ?
Me costo mucho entender al 100% la sintaxis y armarlo por mi cuenta, bendita IA y sus explicaciones para dumbs
deportistaPaises = deportistaOlimpicosRDD.map(lambda l : [l[-1], l[:-1]]) \ .join(equiposOlimpicosRDD.map(lambda x: [x[0], x[2]]))
deportistaPaises.take(1)
reto = deportistaPaises.map(lambda l : [l[1][0][0], l[:]]) \ .join(resultadoGanador.map(lambda x : [x[0],x[1:]]))
reto.take(1)
Agrego mi aporte explicando primero que yo, lo que quería por encima de todo, era trabajar con el id del deportista como clave, parece obvio pero no lo es tanto
Hice el join entre lo que ya se había hecho, deportistaOlimpicoRDD y el resultadoGanadorRDD, trayendo todos los campos de resultadoGanadorRDD, usando deportista_id como el punto de la unión.
[('74148',(['SWE','Erik Malmberg'],['147771','Bronze','9','280'])),('36026',(['DEN','Hans Marius Fogh'],['70752','Silver','23','96'])),('18232',(['USA','Richard John Rick Carey'],['35396','Gold','35','150'])),('72276',(['SWE','Kristina Anna Maria Lundberg'],['144017','Silver','46','303'])),('46524',(['CAN','Gavin Hassett'],['91794','Silver','41','399']))]
Que tal bro, que bueno no ser el unico haciendo este curso en estas fechas.
~ Que tal Platzinauta, ya conectamos en LinkedIn? ~
¡Que estas esperando! Conectemos en LinkedIn, GitHub, Medium o Redes sociales
# Unión de resultadoGanador con equipos y deportistasdeportistaEquipo = deportistaOlimpicoRDD.map(lambda l :[l[-1], l[:-1]]).join(equiposOlimpicosRDD.map(lambda x :[x[0], x[2]]))deportistaEquipo.take(5)
deportistaEquipo.map(lambda x :[x[1][0][0], x[:]]).join(resultadoGanador.map(lambda y :[y[2], y[:-1]])).take(5)