Definir funciones personalizadas que se ejecuten de forma paralela en todos los nodos de un clúster es una de las capacidades más potentes que ofrece Apache Spark. Las UDF (User Defined Functions) permiten exactamente eso: escribir lógica propia en el lenguaje de programación que prefieras y registrarla dentro de Spark para que opere de manera distribuida sobre grandes volúmenes de datos.
¿Qué son las UDF y por qué usarlas en Spark?
Spark incluye funciones nativas como map, filter y flatMap para realizar transformaciones sobre los datos. Sin embargo, existen escenarios donde necesitamos lógica que Spark no ofrece de forma nativa. Ahí es donde entran las UDF [01:03].
Una UDF es, en esencia, una función convencional de cualquier lenguaje de programación. La diferencia fundamental radica en que, al registrarla dentro de Apache Spark, esa función adquiere la capacidad de ejecutarse de manera distribuida en todos los nodos del clúster. Esa es su característica distintiva.
Es importante considerar que Spark se integra con diferentes lenguajes. Si trabajas con Python, la UDF se define en Python. Si trabajas con Scala o R, el código debe ser del lenguaje correspondiente [03:00]. La API que utilices determina el lenguaje en el que escribes la función.
¿Cómo se crea una UDF paso a paso en PySpark?
El proceso se divide en tres etapas claras: crear un DataFrame, definir la función y registrarla como UDF.
¿Cómo preparar los datos de trabajo?
Primero se crea un DataFrame utilizando createDataFrame, pasando un conjunto de datos de prueba y definiendo las columnas: ID, nombre, departamento y salario [01:40]. Al ejecutar .show(), los datos se visualizan en formato tabular de filas y columnas.
¿Cómo definir la lógica de la función?
Se define una función estándar en Python con la palabra reservada def [03:25]. En el ejemplo práctico, la función recibe el salario como parámetro y clasifica a cada persona según su nivel salarial:
Si el salario es mayor a 5000: se asigna high salary.
Si el salario es mayor a 2000: se asigna medium salary.
Si el salario es mayor a 0: se asigna low salary.
En cualquier otro caso: se marca como inválido.
La función retorna la variable level, que solo puede tomar esos cuatro valores de tipo texto [04:55]. Hasta este punto, no se ha hecho nada relacionado con UDF; es simplemente una función Python convencional.
¿Cómo registrar y aplicar la UDF en Spark?
Para convertir esa función en UDF se utiliza el método de registro, donde se especifican dos elementos clave [06:05]:
El nombre de la función previamente definida.
El tipo de dato de retorno, que en este caso es StringType porque la variable level devuelve cadenas de texto.
Como buena práctica, se antepone la palabra UDF al nombre de la función registrada, por ejemplo udf_salary_level, lo que facilita identificar rápidamente que se trata de una función distribuida [06:50].
Para aplicarla, se agrega una nueva columna al DataFrame llamada nivel de salario utilizando withColumn. Dentro de esa operación se invoca la UDF pasándole la columna salary como parámetro [07:20].
¿Qué resultados se obtienen al ejecutar la UDF?
Al ejecutar el código y visualizar el DataFrame resultante, la clasificación funciona correctamente [08:00]:
Un salario de 1000 dólares se clasifica como low salary.
Un salario de 3500 dólares aparece como medium salary.
Personas que ganan por encima de 5000 dólares reciben la etiqueta de high salary.
Esto demuestra cómo una función personalizada, que Spark no podría ofrecer de forma nativa, se ejecuta de manera paralela y distribuida sobre cada fila del DataFrame. Se pueden crear tantas UDF como el contexto lo requiera, adaptando la lógica y el tipo de retorno según la necesidad.
Si ya pusiste en práctica tu propia UDF, comparte en los comentarios los resultados y los retos que resolviste.
En Apache Spark, una UDF (User Defined Function) es una función definida por el usuario que puedes utilizar para realizar transformaciones personalizadas en los datos. Las UDF permiten aplicar lógica compleja a columnas de un DataFrame que no está disponible con las funciones integradas de Spark.
Características principales de las UDF:
Personalización: Permiten agregar lógica específica del negocio.
Compatibilidad: Se pueden usar con diferentes lenguajes como Python, Scala, Java y R.
Desempeño: Las UDF pueden ser más lentas que las funciones integradas de Spark porque se ejecutan en el intérprete del lenguaje (por ejemplo, Python) y no están optimizadas para la ejecución distribuida.
1. Crear una UDF en Python
Puedes registrar una función personalizada como una UDF y usarla en transformaciones.
Paso 1: Importar las funciones necesarias
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerType
Puedes registrar una UDF y usarla en consultas SQL.
# Registrar la UDF en el contexto SQL
spark.udf.register("categorize_salary", categorize_salary, StringType())
# Registrar la tabla temporal
df.createOrReplaceTempView("employees")
# Ejecutar una consulta SQL con la UDF
result = spark.sql("SELECT id, name, salary, categorize_salary(salary) AS salary_category FROM employees")
result.show()
3. Tipos de datos soportados
Cuando defines una UDF, necesitas especificar el tipo de dato de salida usando las clases de pyspark.sql.types:
Uso de funciones integradas: Siempre que sea posible, usa las funciones integradas de Spark en lugar de UDF, ya que están optimizadas para la ejecución distribuida.
Vectorización con Pandas UDF: Para mejorar el rendimiento, puedes usar Pandas UDFs, que están optimizadas para operaciones vectorizadas y aprovechan mejor los recursos de Spark.
Ejemplo de Pandas UDF:
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType
Esta clase al igual que las demás está excelente, me está gustando mucho este curso! Estas funciones UDF dan mucha libertad a la hora de trabajar el código.
Genial Cinddy :)
Les comparto mis resultados:
Saludos.
# Databricks notebook source# Importacion de las librerias y los utilitariosfrom pyspark.sql.types import*from datetime import datetime
import math
# COMMAND ----------# Definición de una variablePARAM_NUMERO =10# Crear un DataFrame de ejemplo solo númerosdata_numeros =[(1,),(2,),(3,),(4,)]columns =["numero"]df = spark.createDataFrame(data_numeros, columns)# Crear un DataFrame de ejemplo solo textodata_texto =[("Hola",),("Mundo",),("Databricks",),(None,)]columns =["cadena"]df_texto = spark.createDataFrame(data_texto, columns)# COMMAND ----------# MAGIC %md# MAGIC ### Ejercicio 1 cuadrado (UDF)# COMMAND ----------# Definir la función para calcular el cuadradodefcuadrado(numero):return numero **2# COMMAND ----------# Registro de la funcion: udf_calcular_cuadrado como UDFudf_cuadrado = udf(cuadrado, LongType())# COMMAND ----------#Aplicación de la UDFdf_con_cuadrado = df.withColumn("cuadrado", udf_cuadrado(df["numero"]))# Mostrar los resultadosdf_con_cuadrado.show()# COMMAND ----------# MAGIC %md# MAGIC ### Ejercicio 2 longitud (UDF)# COMMAND ----------# Definir la función para calcular la longitud de una cadenadeflongitud(cadena):returnlen(cadena)if cadena else0# COMMAND ----------# Registrar la función como un UDFudf_longitud = udf(longitud, LongType())# COMMAND ----------# Aplicar la UDF al DataFramedf_con_longitud = df_texto.withColumn("longitud", udf_longitud(df_texto["cadena"]))# Mostrar los resultadosdf_con_longitud.show()# COMMAND ----------# MAGIC %md# MAGIC ### Ejercicio 3 es_par (UDF)# COMMAND ----------# Definir la función para saber si un número es pardefes_par(numero):if numero %2==0:returnTruereturnFalse# COMMAND ----------# Registrar la función como un UDFudf_es_par = udf(es_par, BooleanType())# COMMAND ----------#Aplicación de la UDFprint(df)df_con_paridad = df.withColumn("cuadrado", udf_es_par (df["numero"]))# Mostrar los resultadosdf_con_paridad .show()# COMMAND ----------# MAGIC %md# MAGIC ### Ejercicio 4 revertir (UDF)# COMMAND ----------# Definir la función para revertir una cadena de textodefrevertir(cadena):if cadena isNone:returnNonereturn"".join(reversed(cadena))# COMMAND ----------# Registrar la función como UDFudf_revertir = udf(revertir, StringType())# COMMAND ----------# Aplicar la UDF al DataFramedf_texto_revertido = df_texto.withColumn("revertir", udf_revertir(df_texto["cadena"]))# Mostrar Resultadosdf_texto_revertido.show()# COMMAND ----------# MAGIC %md# MAGIC ### Ejercicio 5 sumar_elementos (UDF)# COMMAND ----------# Definir función para sumar elementosdefsumar_elementos(lista_numeros):returnsum(lista_numeros)# COMMAND ----------# Crear un DataFrame donde cada fila contiene listas de númerosdata_lista_numeros =[([1,3],),([2,5],),([3,9],),([4,4],)]columns =["lista_numeros"]df_lista_numeros = spark.createDataFrame(data_lista_numeros, columns)# COMMAND ----------# Registrar la función cómo UDFudf_sumar_elementos = udf(sumar_elementos, LongType())# COMMAND ----------# Aplicar la UDF al DataFramedf_suma = df_lista_numeros.withColumn("suma", udf_sumar_elementos(df_lista_numeros['lista_numeros']))# Mostrar resultadosdf_suma.show()# COMMAND ----------# MAGIC %md# MAGIC ### Ejercicio 6 calcular_area_circulo (UDF)# COMMAND ----------# Definir función para calcular el area de un circulodefcalcular_area_circulo(radio):return math.pi * radio **2# COMMAND ----------# Registrar la función cómo UDFudf_calcular_area_circulo = udf(calcular_area_circulo, DoubleType())# COMMAND ----------# Aplicar la UDF al DataFramedf_area = df.withColumn("area", calcular_area_circulo(df['numero']))# Mostrar resultadosdf_area.show()# COMMAND ----------# MAGIC %md# MAGIC ### Ejercicio 7 convertir_mayusculas (UDF)# COMMAND ----------# Definir función convertir una cadena a mayúsculasdefconvertir_mayusculas(cadena):if cadena isNone:returnNonereturn(cadena.upper())# COMMAND ----------# Registrar la función cómo UDFudf_convertir_mayusculas = udf(convertir_mayusculas, StringType())# COMMAND ----------# Aplicar la UDF al DataFramedf_area = df_texto.withColumn("texto", udf_convertir_mayusculas(df_texto['cadena']))# Mostrar resultadosdf_area.show()# COMMAND ----------# MAGIC %md# MAGIC ### Ejercicio 8 calcular_edad (UDF)# COMMAND ----------# Definir función para calcular la edad (Por defecto dejamos el año en el 2023 si no envían uno)defcalcular_edad(fecha_nacimiento_str, anio_actual=2023): fecha_nacimiento = datetime.strptime(fecha_nacimiento_str,"%Y-%m-%d") fecha_actual = datetime(anio_actual, datetime.now().month, datetime.now().day) edad = fecha_actual.year - fecha_nacimiento.year
if(fecha_actual.month < fecha_nacimiento.month or(fecha_actual.month == fecha_nacimiento.month and fecha_actual.day < fecha_nacimiento.day)): edad -=1return edad
# COMMAND ----------# Registrar la función cómo UDFudf_calcular_edad = udf(calcular_edad, IntegerType());# COMMAND ----------# Crear un DataFrame donde cada fila contiene listas de númerosdata_personas =[("Juan","1980-01-01",),("Pedro","1988-12-11",)]columns =["name","fecha_nacimiento"]df_personas = spark.createDataFrame(data_personas, columns)# COMMAND ----------#Aplicacion de la udfdf_personas_con_edad = df_personas.withColumn("edad", udf_calcular_edad("fecha_nacimiento"))df_personas_con_edad.show()# COMMAND ----------# MAGIC %md# MAGIC ### Ejercicio 9 es_vocal (UDF)# COMMAND ----------# Definir función para saber si una letra es una vocaldefes_vocal(letra): vocal =Falseif letra.lower()in['a','e','i','o','u']: vocal =Truereturn vocal
# COMMAND ----------# Crear un DataFrame con letasdata_letras =[("a",),("b",),("c",),("e",),("f",)]columns =["letras",]df_letras = spark.createDataFrame(data_letras, columns)# COMMAND ----------# Registrar la función cómo UDFudf_es_vocal = udf(es_vocal, BooleanType())# COMMAND ----------#Aplicacion de la udfdf_personas_con_edad = df_letras.withColumn("letra", udf_es_vocal("letras"))df_personas_con_edad.show()```# Databricks notebook source
\# Importacion de las librerias y los utilitarios
from pyspark.sql.types import \*from datetime import datetime
import math
\# COMMAND ----------
\# Definición de una variable
PARAM\_NUMERO =10
\# Crear un DataFrame de ejemplo solo números
data\_numeros = \[(1,),(2,),(3,),(4,)]columns = \["numero"]df = spark.createDataFrame(data\_numeros, columns)
\# Crear un DataFrame de ejemplo solo texto
data\_texto = \[("Hola",),("Mundo",),("Databricks",),(None,)]columns = \["cadena"]df\_texto = spark.createDataFrame(data\_texto, columns)\# COMMAND ----------\# MAGIC %md
\# MAGIC ### Ejercicio 1 cuadrado (UDF)\# COMMAND ----------
\# Definir la función para calcular el cuadrado
defcuadrado(numero):  return numero \*\* 2\# COMMAND ----------\# Registro de la funcion: udf\_calcular\_cuadrado como UDF
udf\_cuadrado = udf(cuadrado, LongType())\# COMMAND ----------
\#Aplicación de la UDF
df\_con\_cuadrado = df.withColumn("cuadrado", udf\_cuadrado(df\["numero"]))
\# Mostrar los resultados
df\_con\_cuadrado.show()\# COMMAND ----------\# MAGIC %md
\# MAGIC ### Ejercicio 2 longitud (UDF)\# COMMAND ----------
\# Definir la función para calcular la longitud de una cadena
deflongitud(cadena):  return len(cadena) if cadena else 0\# COMMAND ----------
\# Registrar la función como un UDF
udf\_longitud = udf(longitud, LongType())\# COMMAND ----------
\# Aplicar la UDF al DataFrame
df\_con\_longitud = df\_texto.withColumn("longitud", udf\_longitud(df\_texto\["cadena"]))
\# Mostrar los resultados
df\_con\_longitud.show()\# COMMAND ----------\# MAGIC %md
\# MAGIC ### Ejercicio 3 es\_par (UDF)\# COMMAND ----------
\# Definir la función para saber si un número es par
def es\_par(numero):  if numero % 2 == 0:  return True     return False\# COMMAND ----------
\# Registrar la función como un UDF
udf\_es\_par = udf(es\_par, BooleanType())\# COMMAND ----------
\#Aplicación de la UDF
print(df)df\_con\_paridad = df.withColumn("cuadrado", udf\_es\_par (df\["numero"]))
\# Mostrar los resultados
df\_con\_paridad .show()\# COMMAND ----------\# MAGIC %md
\# MAGIC ### Ejercicio 4 revertir (UDF)\# COMMAND ----------
\# Definir la función para revertir una cadena de texto
defrevertir(cadena):  if cadena is None:  return None  return "".join(reversed(cadena))\# COMMAND ----------
\# Registrar la función como UDF
udf\_revertir = udf(revertir, StringType())\# COMMAND ----------
\# Aplicar la UDF al DataFrame
df\_texto\_revertido = df\_texto.withColumn("revertir", udf\_revertir(df\_texto\["cadena"]))
\# Mostrar Resultados
df\_texto\_revertido.show() \# COMMAND ----------\# MAGIC %md
\# MAGIC ### Ejercicio 5 sumar\_elementos (UDF)\# COMMAND ----------
\# Definir función para sumar elementos
def sumar\_elementos(lista\_numeros):  return sum(lista\_numeros)\# COMMAND ----------
\# Crear un DataFrame donde cada fila contiene listas de números
data\_lista\_numeros = \[(\[1,3],),(\[2,5],),(\[3,9],),(\[4,4],)]columns = \["lista\_numeros"]df\_lista\_numeros = spark.createDataFrame(data\_lista\_numeros, columns)\# COMMAND ----------
\# Registrar la función cómo UDF
udf\_sumar\_elementos = udf(sumar\_elementos, LongType())\# COMMAND ----------
\# Aplicar la UDF al DataFrame
df\_suma = df\_lista\_numeros.withColumn("suma", udf\_sumar\_elementos(df\_lista\_numeros\['lista\_numeros']))
\# Mostrar resultados
df\_suma.show()\# COMMAND ----------\# MAGIC %md
\# MAGIC ### Ejercicio 6 calcular\_area\_circulo (UDF)\# COMMAND ----------
\# Definir función para calcular el area de un circulo
def calcular\_area\_circulo(radio):  return math.pi \* radio \*\* 2\# COMMAND ----------
\# Registrar la función cómo UDF
udf\_calcular\_area\_circulo = udf(calcular\_area\_circulo, DoubleType())\# COMMAND ----------
\# Aplicar la UDF al DataFrame
df\_area = df.withColumn("area", calcular\_area\_circulo(df\['numero']))
\# Mostrar resultados
df\_area.show()\# COMMAND ----------\# MAGIC %md
\# MAGIC ### Ejercicio 7 convertir\_mayusculas (UDF)\# COMMAND ----------
\# Definir función convertir una cadena a mayúsculas
def convertir\_mayusculas(cadena):  if cadena is None:  return None  return (cadena.upper()) \# COMMAND ----------
\# Registrar la función cómo UDF
udf\_convertir\_mayusculas = udf(convertir\_mayusculas, StringType())\# COMMAND ----------
\# Aplicar la UDF al DataFrame
df\_area = df\_texto.withColumn("texto", udf\_convertir\_mayusculas(df\_texto\['cadena']))
\# Mostrar resultados
df\_area.show()\# COMMAND ----------\# MAGIC %md
\# MAGIC ### Ejercicio 8 calcular\_edad (UDF)\# COMMAND ----------\# Definir función para calcular la edad (Por defecto dejamos el año en el 2023 si no envían uno)def calcular\_edad(fecha\_nacimiento\_str, anio\_actual=2023):  fecha\_nacimiento = datetime.strptime(fecha\_nacimiento\_str, "%Y-%m-%d")  fecha\_actual = datetime(anio\_actual, datetime.now().month, datetime.now().day)    edad = fecha\_actual.year - fecha\_nacimiento.year  if (fecha\_actual.month < fecha\_nacimiento.month or   (fecha\_actual.month == fecha\_nacimiento.month and fecha\_actual.day < fecha\_nacimiento.day)):  edad -= 1  return edad\# COMMAND ----------
\# Registrar la función cómo UDF
udf\_calcular\_edad = udf(calcular\_edad, IntegerType());\# COMMAND ----------
\# Crear un DataFrame donde cada fila contiene listas de números
data\_personas = \[("Juan","1980-01-01",),("Pedro","1988-12-11",)]columns = \["name","fecha\_nacimiento"]df\_personas = spark.createDataFrame(data\_personas, columns)\# COMMAND ----------
\#Aplicacion de la udf
df\_personas\_con\_edad = df\_personas.withColumn("edad", udf\_calcular\_edad("fecha\_nacimiento"))df\_personas\_con\_edad.show()\# COMMAND ----------\# MAGIC %md
\# MAGIC ### Ejercicio 9 es\_vocal (UDF)\# COMMAND ----------
\# Definir función para saber si una letra es una vocal
def es\_vocal(letra):  vocal = False  if letra.lower() in \['a', 'e', 'i', 'o', 'u']:  vocal = True  return vocal\# COMMAND ----------
\# Crear un DataFrame con letas
data\_letras = \[("a",),("b",),("c",),("e",),("f",)]columns = \["letras",]df\_letras = spark.createDataFrame(data\_letras, columns)\# COMMAND ----------
\# Registrar la función cómo UDF
udf\_es\_vocal = udf(es\_vocal, BooleanType())\# COMMAND ----------
\#Aplicacion de la udf
df\_personas\_con\_edad = df\_letras.withColumn("letra", udf\_es\_vocal("letras"))df\_personas\_con\_edad.show()
# Pregunta 1spark.sql("""
SELECT P.*
FROM personas P
WHERE edad > 28
""").show()#Pregunta 2f.createOrReplaceTempView("personas_all")spark.sql("""
SELECT
P.nombre,
mean(p.edad) as avr_age
FROM personas_all P
GROUP BY nombre
""").show()Pregunta 3spark.sql("""
SELECT
P.nombre, P.edad, D.direccion
FROM personas_all as P
LEFT JOIN (SELECT * FROM direcciones) as D ON P.id = D.id
where direccion is not NULL
""").show()
Esto es del reto de la clase anterior lo queria pegar alla pero no me di cuenta sorry!
No hay problema amor :)
Aquí mis respuestas (son válidas por 6 meses a partir de hoy 2024-04-03):
Gracias :)
¿En databricks se pueden ejecutar subprocesos?
Si, se puede especialmente con las versiones de pago, tmb. Por lo demas es algo configurable :)
¿Cómo se puede hacer un ciclo for en python que obviamente es secuencial para que sea distribuido?
¿Al ejecutar el ciclo for dentro de una UDF el mismo spark lo detecta y que lo hace distribuido?
Con una udf, tal cual como vimos en el curso :)
Para mi la UDF me significa una especie de CASE en SQL ¿O no?
No, en este contexto es lo que vimos en clase :) funciones definidas por el usuario! Saludos!
Una User Defined Function (UDF) es una función personalizada que defines para extender la funcionalidad nativa de Spark SQL. Te permite aplicar lógica que no está disponible en las funciones predefinidas de PySpark o Spark SQL.
Las UDFs son útiles cuando necesitas realizar cálculos o transformaciones personalizadas en columnas de un DataFrame. Las UDFs son similares a las funciones definidas por el usuario en lenguajes como SQL o Python, pero se ejecutan de manera distribuida en un clúster de Spark.
En el examen se pregunta
"Cual es proposito principal de las UDF" y la respuesta "correcta" es: "Filtrar datos en un DataFrame según un criterio personalizado" lo cual no es correcto, este NO es su principal proposito.
Su principal proposito, como su nombre lo dice, es permitir la creacion de funciones personalizadas que no estan cubiertas por las funciones estandar.