¿Cómo crear un sistema de caché concurrente en Go?
La eficiencia en el cálculo de la serie de Fibonacci demuestra los beneficios de un sistema de caché bien diseñado. Sin embargo, cuando introducimos concurrencia, la ejecución puede llevar a condiciones de carrera que afectan la consistencia y fiabilidad del programa. Aquí exploraremos cómo abordar estos desafíos y optimizar un sistema de caché concurrente usando la robusta capacidad de concurrencia de Go.
¿Qué cambios hicimos en nuestra función principal para manejar concurrencia?
Para manipular concurrente las funciones de nuestro sistema de caché, debimos realizar ajustes en la función main:
Creación de WaitGroup: Este grupo nos permite sincronizar las goroutines y asegurarnos de que todas se completen antes de finalizar el programa.
Uso de Funciones Anónimas: Introducimos funciones anónimas junto con la palabra clave go para ejecutar las operaciones en nuevas goroutines. Este enfoque permite que cada cálculo de Fibonacci se realice de manera concurrente.
Manejo de Contadores: Se utilizó wg.Add(1) para incrementar el contador al iniciar una goroutine y wg.Done() para decrementar al finalizar, garantizando la correcta espera del programa principal mediante wg.Wait().
var wg sync.WaitGroup
for i, n :=range numbers { wg.Add(1)gofunc(index, num int){defer wg.Done()calcularFibonacci(num)}(i, n)}wg.Wait()
¿Cómo resolvimos las condiciones de carrera con logs?
Las condiciones de carrera pueden manifestarse cuando varias goroutines acceden a recursos compartidos de forma desincronizada. Para mitigar este problema, implementamos sync.Mutex en nuestro código:
Sincronización con Mutex: Antes de acceder a datos compartidos, usamos un lock para asegurar la exclusividad y desbloqueamos justo antes de retornar los resultados. Esto evita que múltiples goroutines modifiquen datos simultáneamente, lo que podría llevar a inconsistencias.
Optimización del uso de locks: Inicialmente, bloqueamos todo el bloque de funcionalidad. Sin embargo, al mejorar, bloqueamos sólo cuando es necesario. Movimos el lock para después de verificar la existencia del valor, reduciendo así el impacto negativo en el rendimiento.
¿Cómo mejoramos el rendimiento del sistema?
Después de establecer los locks, notamos un decremento en el rendimiento debido al bloqueo excesivo:
Reanching los locks: Ajustamos los lugares donde se aplicaban los locks, aumentando la eficiencia. Al hacerlo, evitamos bloquear completamente el programa durante operaciones que no lo requieren.
Pruebas de carrera: Usamos la herramienta -race de Go para verificar que no se presenten nuevas condiciones de carrera, garantizando así la correcta implementación de nuestras mejoras.
go run -race main.go
Este enfoque resultó en un sistema más eficiente que previene el recalculo innecesario de datos y optimiza el uso de memoria.
Con estos cambios, hemos creado un sistema de caché concurrente robusto que utiliza únicamente las librerías estándar de Go. Este sistema, que ahora maneja la concurrencia de manera efectiva, permite almacenar resultados de cálculos previos para evitar cálculos repetitivos y mejorar el rendimiento significativamente. ¡Continúa explorando los vastos horizontes de la programación concurrente en Go!
Se ha empeorado notablemente el rendimiento en esta clase.
Realmente no se hace uso del sistema de cache en la ejecución de los ejemplos. Ahora ya ninguno se calcula en nanosegundos.
Se han metido elementos de control de bloqueo en un problema que no lo requiere. No es un buen ejemplo docente.
Tampoco se ha diseñado para poder usar el potencial de la cache en la función de fibonacci. No tiene sentido que si ya se ha calculado el valor fib(42), para calcular el fib(41) le cueste prácticamente lo mismo porque no ha cacheado los resultados intermedios.
No se puede vender esto como un éxito con un resultado manifiestamente peor que el anterior y que encima realmente no usa la cache en ningún momento para beneficiarse de ella, teniendo resultados iguales o peores (el 41 tarde incluso el doble).
Un desastre de clase.
Creo que hay un error de concepto. Hay que entender que la idea de la case es mostrar que existen condiciones de carrera (¿se pudo tomar un mejor ejemplo?, Sí). En este caso esas condiciones de carrera nos estan "beneficiando" ya que se ven resultados de nanosegundos en procesos que esperamos que los tengan porque ya hemos "guardado" valores en la cache.
El punto aqui es que realmente en otra situacion (como llamadas a api's) estas condiciones de carrera pueden hacer que hagamos multiples peticiones sin necesidad alguna. Es ahi donde entra sync.Mutex, pues se asegura de que haya consistencia en los valores (personalmente lo tomo un poco como el concepto de transactions en SQL).
Los valores de tiempo que se ven en consola no es porque al programa le haya tomado todo ese tiempo en realizar la operacion, lo que sucede es que iniciamos el contador al iniciar el llamado de la funcion, por lo que hay un bloqueo mientras las demas funciones consultan los datos pero no se esta procesando nada en ese momento, solo se esta esperando.
Por esa misma razon es que funciona el "semaforo" que vi implementado en otro comentario, no es que el semaforo impida desperdiciar procesamiento, es que esta bloqueando el programa antes de que se inicie el contador y es por ello que cuando se desocupa el canal, se ven "mejoras" en el rendimiento ya que el contador se inicia mucho despues de lo que se debio iniciar sin el semaforo.
Tienes razón en que las condiciones de carrera podrían ayudarnos un poco, pero es un tanto peligroso por que si una operación se llega a anular con otra esto nos dará un resultado distinto (aquí no se nota por que son pocos números y es poco probable que suceda)
Hice algunos cambios en el main para denotar más el uso de cache al buscar el número 42. La primera vez toma 12s, pero el resto de las veces tarda unos pocos ns.
Básicamente lo que hice fue agregar un canal
channel :=make(chanint, maxGoroutines)
para limitar la cantidad de goroutines que calculan. Use solo dos goroutines para evitar que varias Goroutines entren a calcular de forma simultánea el mismo número.
ooh!, muy buena solución evitando que varios procesos calculen el mismo numero el rendimiento mejora, ya que de este modo podemos leer del cache números previamente calculados.
Exactamente!
En esta clase no se logró el objetivo de utilizar concurrencia porque a la final se demora igual o más para calcular el fibonacci de los números repetidos.
Quedé decepcionado, profesor sinceramente le faltó más profundización en este ejemplo
Aunque entiendo que es un objetivo didáctico para ver distintas situaciones que nos podemos encontrar con Gouroutines y como aplicar soluciones, tal y como han comentado otros compañeros el uso de gouroutines empeora el funcionamiento.
.
En este caso es absurdo realizar goroutines, ya que haciendo un sistema de cache potente, donde se guardan todos los cálculos, este proceso de cálculo se vuelve instantáneo para "cualquier" valor (podéis ver el código en mi aporte de la anterior clase).
Como podéis ver, el fibonacci de 150 lo calcula sin despeinarse. Aunque sea a modo ejemplo, también es importante que quede claro cuando aplicar una solución u otra, ya que las goroutines molan mucho, pero no son siempre aplicables. Del mismo modo que un sistema de cache no siempre es viable tal y como lo he planteado.
Mi código con ejemplos extra:
package main
import("fmt""log""sync""time")func Fibonacci(n int) int {if n <=1{return n
}returnFibonacci(n-1)+Fibonacci(n-2)}type Memory struct { origen Function cache map[int]FunctionResult lock sync.Mutex}type Functionfunc(key int)(interface{}, error)type FunctionResult struct { value interface{} err error
}func NewCache(f Function)*Memory{return&Memory{origen: f,cache:make(map[int]FunctionResult),}}func(m *Memory)Get(key int)(interface{}, error){ m.lock.Lock() result,exists:= m.cache[key] m.lock.Unlock()if!exists { result.value, result.err= m.origen(key) m.lock.Lock() m.cache[key]= result
m.lock.Unlock()}return result.value, result.err}func GetFibonacci(n int)(interface{}, error){returnFibonacci(n), nil
}func main(){var wg sync.WaitGroupcache:=NewCache(GetFibonacci)fibo:=[]int{42,41,40,42,38}for _,n:= range fibo { wg.Add(1) go func(index int){ defer wg.Done()start:= time.Now() value,err:= cache.Get(index)if err != nil { log.Println(err)} fmt.Printf("Calculate: %d, \t\tTime: %s, \tResult: %d\n", index, time.Since(start), value)}(n)} wg.Wait() fmt.Println("Y luego llega más trabajo") fibo =[]int{40,38,42,42,41}for _,n:= range fibo { wg.Add(1) go func(index int){ defer wg.Done()start:= time.Now() value,err:= cache.Get(index)if err != nil { log.Println(err)} fmt.Printf("Calculate: %d, \t\tTime: %s, \tResult: %d\n", index, time.Since(start), value)}(n)} wg.Wait() fmt.Println("Y llega el final") fibo =[]int{43,42,41,40,39}for _,n:= range fibo { wg.Add(1) go func(index int){ defer wg.Done()start:= time.Now() value,err:= cache.Get(index)if err != nil { log.Println(err)} fmt.Printf("Calculate: %d, \t\tTime: %s, \tResult: %d\n", index, time.Since(start), value)}(n)} wg.Wait()}
Espero que no sea tarde mi aporte de sistema de cache con concurrencia
Creo que el ejemplo fue el que se pudo haber escogido mejor, o los casos de prueba.
De resto la explicación estuvo bien en su mayoría , aunque a veces muy apresurada y eso hace que se pierda el hilo del curso, ya que al no explicar detalles puntuales se presta a confusiones
De todas formas se recalcula, no funciona el sistema de cache
Fibonacci no es el ejemplo adecuado para concurrencia porque al final se afecta cada operación por separado.
Bastante descontento. Vengo de otros cursos de Go con Néstor y hay cosas que obvia/no explica bien. Por qué implementar un sistema de concurrencia para hacer cosas paralelas y perder menos tiempo si al final estamos usando bloqueadores... Entonces, implementamos concurrencia para deshabilitarla? No entiendo, sinceramente...
Encima no veo al profesor ni a ningún docente respondiendo a las dudas aquí.
Decepcionado...
Creo que las mejoras que le hemos hecho al codigo siguiendo la clase no ven un resultado con pocos datos. Dado que el 42 pregunta al mismo tiempo que el otro 42 y ambos obtienen que no existe nada en la caché. Eso sucede porque el for es muy rapido y practicamente las peticiones se hacen al mismo tiempo, pero en un servidor, esto pasaría al inicio pero al tiempo veremos como la caché vuelve a funcionar. Si tuvieramos mil peticiones de numeros fibonachi y tuvieramos el codigo anterior, el usuario 1000 esperaría una ternidad, mas de 40 segundos por su respuesta, pero en pararelo aún todas las peticiones realizadas a la vez esperaría solo 40 s, y la segunda ves o el usuario 1001 esperaría nada. Interesante porque esta mejora no se puede apreciar con pocos datos, y veo muchos comentarios de que no sirve, como cuesta adaptar el codigo cuando los datos son masivos.
Mi codigo con comentarios y con los channles que comento el usuario Tan:
package main
import("fmt""sync""time")// Function to calculate fibonaccifunc Fibonacci(n int) int {if n <=1{return n
}returnFibonacci(n-1)+Fibonacci(n-2)}// Memory holds a function and a map of resultstype Memory struct { f Function// Function to be used cache map[int]FunctionResult// Map of results for a given key lock sync.RWMutex// Lock to protect the cache}// A function has to recive a value and return a value and an errortype Functionfunc(key int)(interface{}, error)// The result of a functiontype FunctionResult struct { value interface{} err error
}// NewCache creates a new cachefunc NewCache(f Function)*Memory{return&Memory{f,make(map[int]FunctionResult), sync.RWMutex{}}}// Get returns the value for a given keyfunc(m *Memory)Get(key int)(interface{}, error){// Lock the cache m.lock.Lock()// Check if the value is in the cache res,exist:= m.cache[key]// Unlock the cache m.lock.Unlock()// If the value is not in the cache, calculate itif!exist { m.lock.Lock() res.value, res.err= m.f(key)// Calculate the value m.cache[key]= res // Store the value in the cache m.lock.Unlock()}return res.value, res.err}// Function to be used in the cachefunc GetFibonacci(key int)(interface{}, error){returnFibonacci(key), nil
}func main(){empezar:= time.Now()// Create a cache and some valuescache:=NewCache(GetFibonacci)values:=[]int{42,40,41,42,38,41,42}var wg sync.WaitGroupmaxGoroutines:=2channel:=make(chan int, maxGoroutines)// For each value to calculate, get the value and print the time it took to calculatefor _,v:= range values { go func(v int){ defer wg.Done() channel <-1start:= time.Now() res,err:= cache.Get(v)if err != nil { fmt.Printf("Error: %v\n", err)} fmt.Printf("%v:%d took %v\n", v, res, time.Since(start))<-channel
}(v) wg.Add(1)} wg.Wait() fmt.Printf("Tiempo total: %v\n", time.Since(empezar))}
Este resumen muestra la importancia de manejar correctamente la concurrencia cuando se implementa un sistema de caché en Go, ya que múltiples goroutines pueden generar condiciones de carrera si acceden a los mismos datos sin sincronización. Me parece interesante el uso de herramientas como WaitGroup para coordinar la ejecución concurrente y Mutex para proteger los datos compartidos. Además, el uso del detector de race conditions demuestra la importancia de probar la seguridad del código concurrente para garantizar un sistema eficiente y confiable.
Copié el código idénticamente y no me sale la condición de carrera al compilar con la bandera y correr el programa
Qué puede ser?
Hola!
Podrías compartir tu repositorio para ver qué es lo que puede pasar?
Hice unos cambios para que detecte que valores que no se repitan tomará para el calculo y además sea en paralelo, bloqueando sólo cuando sea necesario:
Los resultados:
package main
import("fmt""sync""time")// Function to calculate fibonaccifunc Fibonacci(n int) int {if n <=1{return n
}returnFibonacci(n-1)+Fibonacci(n-2)}// Memory holds a function and a map of resultstype Memory struct { f Function// Function to be used cache map[int]FunctionResult// Map of results for a given key mux sync.RWMutex}// A function has to recive a value and return a value and an errortype Functionfunc(key int)(interface{}, error)// The result of a functiontype FunctionResult struct { value interface{} err error
}// NewCache creates a new cachefunc NewCache(f Function)*Memory{return&Memory{f,make(map[int]FunctionResult), sync.RWMutex{}}}// Get returns the value for a given keyfunc(m *Memory)Get(key int)(interface{}, error){isCalculating:=falsevar value interface{} m.mux.Lock() res,exist:= m.cache[key]if!exist { isCalculating =true m.cache[key]=FunctionResult{}} m.mux.Unlock()for value == nil {if isCalculating { fmt.Println("calculating: ", key) res.value, res.err= m.f(key)// Calculate the value m.mux.Lock() m.cache[key]= res // Store the value in the cache value = res.value m.mux.Unlock()}else{ m.mux.RLock() res, exist = m.cache[key] value = res.value m.mux.RUnlock()}}return res.value, res.err}// Function to be used in the cachefunc GetFibonacci(key int)(interface{}, error){returnFibonacci(key), nil
}func main(){// Create a cache and some valuescache:=NewCache(GetFibonacci)fibo:=[]int{42,40,41,42,38,41}var wg sync.WaitGroup// For each value to calculate, get the value and print the time it took to calculatefor _,n:= range fibo { wg.Add(1) go func(index int){ defer wg.Done()start:= time.Now() value,err:= cache.Get(index)if err != nil {panic(err)} fmt.Printf("%d: %d t: %s \n", index, value, time.Since(start))}(n)} wg.Wait()}
Mi versión del código para evitar que se bloquean todas los restantes cálculos cuando se está calculando 1, solo se bloquea en caso de que ya se esté trabajando con el número que queremos calcular y posteriormente usará el caché
package main
import("fmt""log""sync""time")var fibs map[int]*Fib= map[int]*Fib{0:&Fib{}}func Fibonacci(n int) int {if n <=1{return n
}returnFibonacci(n-1)+Fibonacci(n-2)}type Functionfunc(key Fib)(interface{}, error)type FunctionResult struct { value interface{} err error
}type Fib struct { n int
lock *sync.Mutex}func NewFib(n int)*Fib{flag:=falsevar lock sync.Mutexvar fib *Fibfor k,_:= range fibs {if k == n { flag =truebreak}}if flag { fmt.Printf("Take created %d\n", n) fib = fibs[n]}else{ fmt.Printf("Create %d\n", n) fib =&Fib{n: n,lock:&lock,} fibs[n]= fib
}return fib
}func NewListFib(list ...int)[]*Fib{var fib *Fibvar res []*Fibfor _,n:= range list { fib =NewFib(n) res =append(res, fib)}return res
}func(f Fib)Get(cache map[Fib]FunctionResult)FunctionResult{ fmt.Printf("Started %d\n", f.n) result,exists:= cache[f]//fmt.Println(exists)//fmt.Println(result)if!exists {//fmt.Println("yes") result.value=Fibonacci(f.n) result.err= nil
cache[f]= result
}else{ fmt.Println("cache")//fmt.Println(result)} fmt.Printf("Finished %d\n", f.n)//fmt.Println(result)return result
}type Memory struct { cache map[Fib]FunctionResult}func NewCache()*Memory{return&Memory{cache:make(map[Fib]FunctionResult),}}func(m *Memory)Get(key Fib)(interface{}, error){ key.lock.Lock() defer key.lock.Unlock()result:= key.Get(m.cache)//fmt.Printf("Cache: %v\n", m.cache)return result.value, result.err}func main(){cache:=NewCache()fibo:=NewListFib(42,40,41,42,38)var wg sync.WaitGroupfor _,n:= range fibo { wg.Add(1) go func(index Fib, wg *sync.WaitGroup){ defer wg.Done()start:= time.Now() value,err:= cache.Get(index)if err != nil { log.Println(err)} fmt.Printf("%d, %s, %d\n", index, time.Since(start), value)}(*n,&wg)} wg.Wait()}