Creación y uso de UDFs en PySpark para manejo de datos faltantes
Clase 21 de 25 • Curso de Fundamentos de Spark para Big Data
Resumen
¿Cómo crear una UDF en PySpark?
¡Bienvenido al fascinante mundo de Spark! En esta clase, aprenderemos a crear nuestra primera función definida por el usuario (UDF) en PySpark. Una UDF nos permite manejar datos complejos y realizar transformaciones personalizadas directamente en los data frames. Acompáñame en este viaje para descubrir cómo manejar datos erróneos y transformarlos eficazmente con una UDF.
¿Cuál es el primer paso para crear una UDF?
Antes de empezar, asegúrate de tener tus datos listos. En nuestro caso, estamos trabajando con un archivo CSV llamado deportista_error.csv
. Observamos que este archivo tiene valores faltantes, lo que podría causar problemas al procesarlo. Aquí es donde entra en juego la UDF: nos ayudará a gestionar estos datos erróneos.
-
Carga del RDD: Primero, cargamos el archivo y eliminamos encabezados.
deportista_error = SparkContext.textFile("ruta/deportista_error.csv") rdd_sin_encabezado = deportista_error.mapPartitionsWithIndex( lambda idx, it: iter(list(it)[1:]) if idx == 0 else it )
-
Definición del esquema: Especificamos las columnas y sus tipos de datos.
esquema = StructType([ StructField("id", StringType(), False), StructField("nombre", StringType(), False), StructField("genero", StringType(), False), StructField("edad", StringType(), False), StructField("altura", StringType(), True), StructField("peso", StringType(), True), StructField("equipo_id", StringType(), False), ])
¿Cómo definir una UDF para transformar datos?
Las UDF se definen mediante funciones de Python que luego se registran como funciones en Spark. Nuestra UDF convertirá campos numéricos erróneos en valor nulo (None
) si están vacíos.
-
Importación necesaria: Importamos desde
pyspark.sql.functions
.from pyspark.sql.functions import udf from pyspark.sql.types import IntegerType
-
Creación de la UDF: Definimos una función de Python y la registramos.
def convertir_a_entero(valor): return int(valor) if valor and len(valor) > 0 else None convertir_a_entero_udf = udf(convertir_a_entero, IntegerType())
-
Registro de la UDF: Con el
SQLContext
, registramos la UDF.sqlContext.udf.register("convertir_a_entero", convertir_a_entero_udf)
¿Cómo aplicar la UDF a un DataFrame?
Una vez creada y registrada nuestra UDF, es hora de aplicarla a datos reales. Este proceso transforma entradas erróneas o vacías al formato deseado.
-
Creación del DataFrame: Convertimos el RDD al DataFrame usando el esquema.
df_deportista_error = sqlContext.createDataFrame(rdd_sin_encabezado, esquema)
-
Aplicación de la UDF: Utilizando
select()
, transformamos datos específicos.df_transformado = df_deportista_error.select( "id", "nombre", "genero", "edad", convertir_a_entero_udf("altura").alias("altura"), convertir_a_entero_udf("peso").alias("peso"), "equipo_id" )
-
Visualización de resultados: Esto nos permite ver cómo se eliminaron los errores y se corrigieron los datos.
df_transformado.show()
¡Y ahí lo tienes! Has aprendido a manejar datos erróneos mediante una UDF personalizada. La correcta comprensión y uso de UDFs te permitirá realizar transformaciones complejas y vitales en tus datasets. Te animamos a seguir explorando Spark y a profundizar en cómo estas funciones pueden optimizar el trabajo con grandes volúmenes de datos. ¡Hasta la próxima!