¿Cómo trabajar con archivos y esquemas en Spark SQL?
Para triunfar en el manejo de datos, es esencial comprender cómo leer y manipular información adecuadamente. En el contexto de Spark SQL, aprender a importar y definir esquemas es el primer paso crucial. Comencemos cargando un archivo persona.data, al cual aplicamos un esquema de metadatos. Esto nos permite especificar claramente los tipos de datos para cada campo, como ID, nombre o correo electrónico.
Este enfoque asegura que puedas manejar los datos adecuadamente, reduciendo riesgos de errores al definir claramente los tipos.
¿Cómo crear y manejar una vista temporal?
Para mejorar el manejo de consultas SQL en Spark, es esencial crear una vista temporal de los datos. Esto nos permite interactuar con los datos como si fueran una tabla en SQL. Crear una vista es sencillo:
Una vez creada, puedes ejecutar consultas SQL directamente sobre esta vista:
result = spark.sql("SELECT * FROM dfPersonaView WHERE edad > 30")result.show(10)
Esto simplifica la aplicación de filtros y condiciones, maximizando la eficiencia en tu análisis de datos.
¿Cómo optimizar consultas largas en Spark SQL?
Las consultas extensas pueden complicarse y volverse difíciles de manejar. Aquí destacamos el uso de triple comillas para simplificar y estructurar consultas largas, haciéndolas más legibles:
query ="""
SELECT id, nombre, correo
FROM dfPersonaView
WHERE edad > 30 AND salario > 5000
"""dfResult = spark.sql(query)dfResult.show(10)
Este método mejora la claridad del código, facilitando el entendimiento y mantenimiento de tus consultas.
¿Qué ventajas tiene la parametrización en consultas SQL?
Incorporar parámetros en tus consultas añade dinamismo y flexibilidad, permitiendo cambios sin modificar la lógica SQL:
paramEdad =30paramSalario =5000query =f"""
SELECT id, nombre, correo
FROM dfPersonaView
WHERE edad > {paramEdad} AND salario > {paramSalario}"""dfResult = spark.sql(query)dfResult.show(10)
La parametrización permite una gestión más eficiente y adaptable de tus consultas, agilizando los procesos de modificación y mantenimiento.
¿Cómo practicar y aplicar estos conceptos?
Nada refuerza el aprendizaje más que la práctica continua. Aquí te presentamos tres ejercicios que te permitirán aplicar lo aprendido:
Seleccionar nombre y edad de personas mayores de 28 años.
Calcular la edad promedio en el DataFrame.
Extraer nombre, edad y dirección de personas con direcciones registradas.
Desafíate a resolver estos ejercicios utilizando las técnicas presentadas y fortalece tus habilidades en Spark SQL. ¡Sigue explorando, aprendiendo y dominando este vasto mundo de la gestión de datos!
Estuve revisando. Para el ejercicio 2 el promedio es erróneo (Incluso en el notebook de resolución). Este arroja 27.66666 pero el correcto es 27.3333. Esto es porque aunque se agregaron los nuevos valores a la variable df, esta no se actualizó en la vista, originando no solamente el error en el punto 2 sino también en el punto 3.
Asi lo corregí (actualizando la vista [líneas 12 y 13]):
De hecho, respecto a dicho punto 3 utilicé el right join para tomar todos los datos de las direcciones y traer los que crucen con las personas. A continuación dejo segundo screenshot:
Esto también se podría ejecutar como
SELECTp.nombre,p.edad,p.direccionFROM personas p
FULLOUTERJOIN direcciones d
ON p.id= d.idWHERE d.direccionesISNOTNULL```Ya que toma todos los datos de ambas tablas y filtra según lo que requerimos(que la direccion no sea nula).Espero que les pueda servir compañeros.Buen dia :)
Gracias Christian :) lo validamos. Un saludo!
Gracias, me fue útil tu explicación
¡Entendido! Si estás trabajando en un laboratorio de PySpark SQL, seguramente estás explorando cómo usar las capacidades SQL en PySpark para consultas, análisis y manipulación de datos.
Parte 2: Ejemplo estructurado para PySpark SQL
A continuación, te guiaré a través de una estructura típica para trabajar con PySpark SQL en un laboratorio:
1. Crear o cargar un DataFrame
Cargamos datos desde un archivo CSV o creamos un DataFrame manualmente.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# Crear una SparkSession
spark = SparkSession.builder.appName("PySpark SQL Lab").getOrCreate()
Registrar una tabla temporal para ejecutar consultas SQL.
# Registrar el DataFrame como una tabla temporal
df.createOrReplaceTempView("employees")
3. Ejecutar consultas SQL
Usar Spark SQL para ejecutar consultas sobre los datos.
# Ejemplo 1: Seleccionar todos los registros
result = spark.sql("SELECT * FROM employees")
result.show()
# Ejemplo 2: Filtrar empleados con salario mayor a 5000
result = spark.sql("SELECT * FROM employees WHERE salary > 5000")
result.show()
# Ejemplo 3: Agregar una columna calculada (bonus)
result = spark.sql("SELECT *, salary * 0.1 AS bonus FROM employees")
result.show()
# Ejemplo 4: Calcular la suma total de salarios
result = spark.sql("SELECT SUM(salary) AS total_salary FROM employees")
result.show()
4. Realizar transformaciones adicionales
Combinar operaciones SQL y funciones PySpark.
# Agregar una columna 'bonus' al DataFrame original y calcular el total
df = df.withColumn("bonus", col("salary") * 0.1)
df = df.withColumn("total", col("salary") + col("bonus"))
# Mostrar el DataFrame actualizado
df.show()
5. Guardar los resultados
Guardar el resultado de las consultas o transformaciones en diferentes formatos.
# Guardar el DataFrame actualizado como un archivo CSV
df.write.csv("output/employees_with_bonus", header=True)
Ejercicio 2
from pyspark.sql import functions as f
#Ejercicio 2 - Resolucion(df.agg(f.avg("edad")).show())# Completar
En Python utilizo df.info() para poder ver rápidamente si tengo nulos en cada una de las columnas del df. Hay algunas función en Pyspark que me permita hacer esta revisión?
Hola Paula, sí, en PySpark puedes utilizar la función df.printSchema() para obtener información sobre el esquema del DataFrame, incluyendo los tipos de datos y si las columnas permiten valores nulos o no. Además, también puedes usar df.describe() para obtener estadísticas descriptivas de las columnas numéricas, lo que podría ayudarte a identificar posibles valores nulos en esas columnas si ves que el recuento de valores no es consistente.
Sin embargo, para verificar específicamente la presencia de valores nulos en cada columna, puedes usar el método isNull() junto con la función df.select() para contar los valores nulos en cada columna.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as spark_sum
null_counts = df.select(*(spark_sum(col(c).isNull().cast("int")).alias(c) for c in df.columns))
Mostrar el resultado
null_counts.show()
Ahi va un ejemplo :) Un saludo!
Tengo una duda, cómo funciona en memoria lo que acabamos de hacer veo que primero creamos un object de spray y luego en otro que emula una tabla SQL, esos dos objetos están en la RAM de la instancia? es común hacer esto en un conjunto de datos masivo?
Hola Gerardo, si para SPARKSQL lo necesitas de esa manera.
Aqui mis respuestas:#Ejercicio 1 - Resolucionparam_edad = 28spark.sql(f"""SELECT nombre, edad FROM personas WHERE edad > {param_edad} """).show()#Ejercicio 2 - Resolucion#Ejercicio 3 - Resolucion
spark.sql("""SELECT p.nombre, p.edad, d.direccion FROM personas p JOIN direcciones d ON p.id = d.id""").show()
df.agg({'edad': 'avg'}).show()
#Ejercicio 1 - Resolucionparam_edad =28spark.sql(f"""SELECT nombre, edad
FROM personas
WHERE
edad > {param_edad} """).show()``````python
#Ejercicio 2 - Resoluciondf.agg({'edad':'avg'}).show()``````js
#Ejercicio 3 - Resolucionspark.sql("""SELECT p.nombre, p.edad, d.direccion
FROM personas p
JOIN direcciones d ON p.id = d.id""").show()