No tienes acceso a esta clase

¡Continúa aprendiendo! Únete y comienza a potenciar tu carrera

Reutilización de computación intensiva

7/19
Recursos

Aportes 12

Preguntas 2

Ordenar por:

¿Quieres ver más aportes, preguntas y respuestas de la comunidad?

Mi solucion 😄

package main

import (
	"fmt"
	"sync"
	"time"
)

func ExpensiveFibonacci(n interface{}) int {
	switch n := n.(type) {
	case int:
		if n <= 1 {
			return n
		}
		return ExpensiveFibonacci(n-1) + ExpensiveFibonacci(n-2)
	default:
		return 0
	}
}

// Memory holds a function and a map of results
type Memory struct {
	f     Function                       // Function to be used
	cache map[interface{}]FunctionResult // Map of results for a given key
	lock  sync.RWMutex                   // Lock to protect the cache

	InProgress map[interface{}]bool                  // Map of jobs in progress
	IsPending  map[interface{}][]chan FunctionResult // Map of jobs waiting for a response
}

// A function has to recive a value and return a value and an error
type Function func(key interface{}) (interface{}, error)

// The result of a function
type FunctionResult struct {
	value interface{}
	err   error
}

// NewCache creates a new cache
func NewCache(f Function) *Memory {
	return &Memory{
		f:          f,
		cache:      make(map[interface{}]FunctionResult),
		lock:       sync.RWMutex{},
		InProgress: make(map[interface{}]bool),
		IsPending:  make(map[interface{}][]chan FunctionResult),
	}
}

func (m *Memory) service(key interface{}) (interface{}, error) {
	// Check if the job is already in progress
	m.lock.RLock()
	_, ok := m.InProgress[key]
	m.lock.RUnlock()

	// If the job is already in progress, then wait for the response
	if ok {
		// If the job is already in progress, then wait for the response
		response := make(chan FunctionResult)
		defer close(response)

		// Add the channel to the pending list
		m.lock.Lock()
		m.IsPending[key] = append(m.IsPending[key], response)
		m.lock.Unlock()

		// Wait for the response
		res := <-response
		return res.value, res.err
	}

	// If the job is not in progress, then start the job
	m.lock.Lock()
	m.InProgress[key] = true
	m.lock.Unlock()

	// Start the job
	fmt.Printf("Starting job %d\n", key)
	fnresult, err := m.f(key)
	res := FunctionResult{value: fnresult, err: err}
	if err != nil {
		fmt.Printf("Error: %v\n", err)
	}

	// Get the pending workers for this job
	m.lock.RLock()
	pendingWorkers, exist := m.IsPending[key]
	m.lock.RUnlock()

	// If there are pending workers, then send the response
	if exist {
		for _, worker := range pendingWorkers {
			worker <- res
		}
	}

	// We are done with this job, reset the state
	m.lock.Lock()
	m.InProgress[key] = false
	m.IsPending[key] = make([]chan FunctionResult, 0)
	m.lock.Unlock()

	// Add the value to the cache
	m.lock.Lock()
	m.cache[key] = res
	m.lock.Unlock()

	fmt.Printf("Finished job %d, got %d\n", key, res)
	return res.value, res.err

}

// Get returns the value for a given key
func (m *Memory) Get(key interface{}) (interface{}, error) {

	// Lock the cache
	m.lock.RLock()

	// Check if the value is in the cache
	res, exist := m.cache[key]

	// Unlock the cache
	m.lock.RUnlock()

	// If the value is in the cache, return it
	if exist {
		return res.value, res.err
	}

	// If the value is not in the cache, then start the service
	res.value, res.err = m.service(key)
	return res.value, res.err

}

// Function to be used in the cache
func GetFibonacci(key interface{}) (interface{}, error) {
	return ExpensiveFibonacci(key), nil
}

func main() {
	empezar := time.Now()
	// Create a cache and some values
	cache := NewCache(GetFibonacci)
	values := []int{46, 46, 42, 42, 41, 41, 46, 46, 46, 42, 42, 46}

	var wg sync.WaitGroup

	// For each value to calculate, get the value and print the time it took to calculate
	for _, v := range values {
		go func(v int) {
			defer wg.Done()

			start := time.Now()

			res, err := cache.Get(v)
			if err != nil {
				fmt.Printf("Error: %v\n", err)
			}

			fmt.Printf("%v:%d took %v\n", v, res, time.Since(start))
		}(v)
		wg.Add(1)
	}
	wg.Wait()

	// This is to prove that cache is working
	fmt.Println()
	fmt.Println("Doing it all again!")
	for _, v := range values {
		go func(v int) {
			defer wg.Done()

			start := time.Now()

			res, err := cache.Get(v)
			if err != nil {
				fmt.Printf("Error: %v\n", err)
			}

			fmt.Printf("%v:%d took %v\n", v, res, time.Since(start))
		}(v)
		wg.Add(1)
	}

	wg.Wait()

	fmt.Printf("Total time: %v\n", time.Since(empezar))
}

Les comparto mi solución al reto.
Un sistema caché concurrente con Go: https://gist.github.com/UnPolinomio/5098c4146e97bd3c3aedc26e791e8785

Creo que con esta version logré evitar los deadlocks existentes en la version de la clase asi como el acceso a calculos multiples de un mismo numero.

Aunque si por alguna razon algun canal de opResultsChan no obtiene una respuesta se puede quedar ahí zombie

package main

import (
	"log"
	"sync"
)

type FiboService struct {
	mutex sync.RWMutex
	cache map[int]int

	opInProgress  map[int]bool
	opResultsChan map[int][]chan int
}

func NewFiboService() *FiboService {
	return &FiboService{
		cache:         make(map[int]int),
		opInProgress:  make(map[int]bool),
		opResultsChan: make(map[int][]chan int),
	}
}

func (f *FiboService) GetFromCache(key int) (v int, ok bool) {
	f.mutex.RLock()
	v, ok = f.cache[key]
	f.mutex.RUnlock()
	return
}

func (f *FiboService) PutToCache(key, value int) {
	f.mutex.Lock()
	f.cache[key] = value
	f.mutex.Unlock()
}

func (f *FiboService) Fibonacci(n int) int {
	// return easy results
	if n <= 1 {
		return n
	}

	// Check if previously calculated
	if v, ok := f.GetFromCache(n); ok {
		return v
	}

	// Check if already in progress
	f.mutex.Lock()
	if f.opInProgress[n] {
		resultChan := make(chan int)

		f.opResultsChan[n] = append(f.opResultsChan[n], resultChan)
		f.mutex.Unlock()

		log.Println("waiting for result of job: ", n)
		result := <-resultChan
		log.Println("Got result for job: ", n)
		return result
	}

	// calculate new operation
	f.opInProgress[n] = true
	f.mutex.Unlock()

	result := f.Fibonacci(n-1) + f.Fibonacci(n-2)

	f.PutToCache(n, result)

	go f.notifyResultChans(n, result)

	return result
}

func (f *FiboService) notifyResultChans(job, result int) {
	log.Println("Notifying to channels waiting for job: ", job)
	f.mutex.Lock()

	if resultChans := f.opResultsChan[job]; resultChans != nil {
		for _, c := range resultChans {
			c <- result
			close(c)
		}
	}

	delete(f.opInProgress, job)
	delete(f.opResultsChan, job)
	f.mutex.Unlock()

	log.Println("Completed notifying to channels waiting for job: ", job)
}

func main() {
	fiboSvc := NewFiboService()

	jobs := []int{1, 2, 3, 4, 5, 6, 7, 8}
	var wg sync.WaitGroup
	wg.Add(len(jobs))

	for _, n := range jobs {
		go func(job int) {
			defer wg.Done()
			result := fiboSvc.Fibonacci(job)

			log.Printf("completed job: %d with result %d", job, result)
		}(n)
	}
	wg.Wait()
}

Mi Solución: ```js package main import ( "fmt" "sync" ) func Fibonacci(n int) int { if n <= 1 { return n } return Fibonacci(n-1) + Fibonacci(n-2) } type Function func(key int) (interface{}, error) type FunctionResult struct { value interface{} err error } type CacheMemory struct { F Function Cache map[int]FunctionResult Mutex sync.RWMutex IsInProcess map[int]bool IsPending map[int][]chan int } func NewCache(f Function) *CacheMemory { return &CacheMemory{ F: f, Cache: make(map[int]FunctionResult), IsInProcess: make(map[int]bool), IsPending: make(map[int][]chan int), } } func GetFibonacci(n int) (interface{}, error) { return Fibonacci(n), nil } func (m *CacheMemory) Get(key int) (interface{}, error) { m.Mutex.RLock() result, exists := m.Cache[key] m.Mutex.RUnlock() if exists { return result.value, result.err } m.Mutex.Lock() exists = m.IsInProcess[key] m.Mutex.Unlock() if exists { response := make(chan int) defer close(response) m.Mutex.Lock() m.IsPending[key] = append(m.IsPending[key], response) m.Mutex.Unlock() fmt.Println("Waiting for response") resp := <-response fmt.Println("Response received: ", resp) return resp, nil } m.Mutex.Lock() m.IsInProcess[key] = true m.Mutex.Unlock() fmt.Println("Calculating fibonacci(", key, ")") response, err := m.F(key) m.Mutex.Lock() m.Cache[key] = FunctionResult{value: response, err: err} for _, pendingWorker := range m.IsPending[key] { pendingWorker <- response.(int) } delete(m.IsPending, key) delete(m.IsInProcess, key) m.Mutex.Unlock() return response, err } func main() { cache := NewCache(GetFibonacci) fibo := []int{42, 41, 38, 40, 38, 42, 42, 42, 42, 42, 42, 41, 38, 40} var wg sync.WaitGroup for _, n := range fibo { wg.Add(1) go func(n int) { defer wg.Done() value, err := cache.Get(n) if err != nil { fmt.Println(err) } fmt.Printf("Fibonacci(%d) = %d\n", n, value) }(n) } wg.Wait() } ```
Mi solución unificando ambos métodos. package main import ( "fmt" "sync" "time") /\*\*Método que calcula el valor de Fibonacci para N Recibe un Entero N Retorna el valor calculado como Entero N \*\*/func Fibonacci(n int) int { if n <= 1 { return n } return Fibonacci(n-1) + Fibonacci(n-2)} /\*\* Clase de Memoria Almacena resultados obtenidos para el Entero N \*\*/type Memory struct { cache map\[int]int} /\*\* Método para Extraer Datos de Memoria Retorna el valor y booleano. El valor falso implicará que la llave no existe. \*\*/func (m \*Memory) Get(key int) (int, bool) { res, exist := m.cache\[key] return res, exist} /\*\* Método para almacenar un nuevo valor en Memoria \*\*/func (m \*Memory) Set(key int, value int) { m.cache\[key] = value} /\*\* Instanciar una nueva Memoria. Así se evita sobreescribir para diferentes funciones. \*\*/func NewCache() \*Memory { return \&Memory{make(map\[int]int)}} /\*\* Clase de servicio. Tiene la memoria, los mapas que definen si un trabajo está en progreso o si está siendo esperado por otros y el lock para detener la ejecución. \*\*/type Service struct { Memory \*Memory InProgress map\[int]bool IsPending map\[int]\[]chan int Lock sync.RWMutex} /\*\* Método para Instanciar la clase Service. Recibe una instancia de Memory Retorna un nuevo Service con: - Memory - mapa de trabajos en Progreso - mapa de trabajos en Espera de Resultados \*\*/func NewService(m \*Memory) \*Service { return \&Service{ Memory: m, InProgress: make(map\[int]bool), IsPending: make(map\[int]\[]chan int), }} /\*\* Método principal Verifica en memoria, luego en trabajos en progreso para ponerse a la espera, o se marca como en progreso y se pone en ejecución. \*\*/func (s \*Service) Work(job int) { start := time.Now() // Verificar si está almacenado en memoria. value, inMemory := s.Memory.Get(job) if inMemory { fmt.Printf("Value For job %d in Memory: %d\n", job, value) } else { // Verificar si ya está en progreso s.Lock.RLock() isInProgress := s.InProgress\[job] s.Lock.RUnlock() // Si está En Progreso if isInProgress { // Crear un nuevo canal para recibir la respuesta responseChannel := make(chan int) defer close(responseChannel) // Añadir el canal al listado de canales pendientes s.Lock.Lock() s.IsPending\[job] = append(s.IsPending\[job], responseChannel) s.Lock.Unlock() fmt.Printf("Waiting for Response job: %d\n", job) // Se congela la ejecución a la espera que el canal de respuesta reciba algo res := <-responseChannel fmt.Printf("Response received: %d\n", res) // Si no está en progreso } else { // Congelo toda la ejecución y marco que lo voy a comenzar s.Lock.Lock() s.InProgress\[job] = true s.Lock.Unlock() fmt.Printf("Calculating Job: %d\n", job) // Calculo resultado y lo almaceno en memoria result := Fibonacci(job) s.Memory.Set(job, result) // Busco todos los trabajos que estan esperando este resultado s.Lock.RLock() pendingWorkers, workersWaiting := s.IsPending\[job] s.Lock.RUnlock() // Si existen, comienzo a enviarles el resultado por el canal. if workersWaiting { for \_, pendingWorker := range pendingWorkers { pendingWorker <- result } fmt.Printf("Result sent to all pending workers for job %d\n", job) } else { // Imprimimos de todas maneras resultados que ningún canal esperaba. fmt.Printf("Obtained a result for job %d not awaited: %d\n", job, result) } // Limpiamos los canales de progreso y espera. s.Lock.Lock() s.InProgress\[job] = false s.IsPending\[job] = make(\[]chan int, 0) s.Lock.Unlock() } } fmt.Printf("This job took: %s \n", time.Since(start))} func main() { // Funcionamiento conjunto fibonacciCache := NewCache() fibonacciService := NewService(fibonacciCache) jobs := \[]int{40, 42, 42, 39, 40, 44, 35, 30, 31, 40, 25, 30, 42} var wg sync.WaitGroup wg.Add(len(jobs)) for \_, n := range jobs { go func(job int) { defer wg.Done() fibonacciService.Work(job) }(n) // Para poder simular que no todos lleguen de golpe // y que algunos alcancen a almacenarse en memoria // Ponemos un sleep de 200ms time.Sleep(200 \* time.Millisecond) } wg.Wait()} ```js package main import ( "fmt" "sync" "time" ) /* *Método que calcula el valor de Fibonacci para N Recibe un Entero N Retorna el valor calculado como Entero N * */ func Fibonacci(n int) int { if n <= 1 { return n } return Fibonacci(n-1) + Fibonacci(n-2) } /* * Clase de Memoria Almacena resultados obtenidos para el Entero N * */ type Memory struct { cache map[int]int } /* * Método para Extraer Datos de Memoria Retorna el valor y booleano. El valor falso implicará que la llave no existe. * */ func (m *Memory) Get(key int) (int, bool) { res, exist := m.cache[key] return res, exist } /* * Método para almacenar un nuevo valor en Memoria * */ func (m *Memory) Set(key int, value int) { m.cache[key] = value } /* * Instanciar una nueva Memoria. Así se evita sobreescribir para diferentes funciones. * */ func NewCache() *Memory { return &Memory{make(map[int]int)} } /* * Clase de servicio. Tiene la memoria, los mapas que definen si un trabajo está en progreso o si está siendo esperado por otros y el lock para detener la ejecución. * */ type Service struct { Memory *Memory InProgress map[int]bool IsPending map[int][]chan int Lock sync.RWMutex } /* * Método para Instanciar la clase Service. Recibe una instancia de Memory Retorna un nuevo Service con: - Memory - mapa de trabajos en Progreso - mapa de trabajos en Espera de Resultados * */ func NewService(m *Memory) *Service { return &Service{ Memory: m, InProgress: make(map[int]bool), IsPending: make(map[int][]chan int), } } /* * Método principal Verifica en memoria, luego en trabajos en progreso para ponerse a la espera, o se marca como en progreso y se pone en ejecución. * */ func (s *Service) Work(job int) { start := time.Now() // Verificar si está almacenado en memoria. value, inMemory := s.Memory.Get(job) if inMemory { fmt.Printf("Value For job %d in Memory: %d\n", job, value) } else { // Verificar si ya está en progreso s.Lock.RLock() isInProgress := s.InProgress[job] s.Lock.RUnlock() // Si está En Progreso if isInProgress { // Crear un nuevo canal para recibir la respuesta responseChannel := make(chan int) defer close(responseChannel) // Añadir el canal al listado de canales pendientes s.Lock.Lock() s.IsPending[job] = append(s.IsPending[job], responseChannel) s.Lock.Unlock() fmt.Printf("Waiting for Response job: %d\n", job) // Se congela la ejecución a la espera que el canal de respuesta reciba algo res := <-responseChannel fmt.Printf("Response received: %d\n", res) // Si no está en progreso } else { // Congelo toda la ejecución y marco que lo voy a comenzar s.Lock.Lock() s.InProgress[job] = true s.Lock.Unlock() fmt.Printf("Calculating Job: %d\n", job) // Calculo resultado y lo almaceno en memoria result := Fibonacci(job) s.Memory.Set(job, result) // Busco todos los trabajos que estan esperando este resultado s.Lock.RLock() pendingWorkers, workersWaiting := s.IsPending[job] s.Lock.RUnlock() // Si existen, comienzo a enviarles el resultado por el canal. if workersWaiting { for _, pendingWorker := range pendingWorkers { pendingWorker <- result } fmt.Printf("Result sent to all pending workers for job %d\n", job) } else { // Imprimimos de todas maneras resultados que ningún canal esperaba. fmt.Printf("Obtained a result for job %d not awaited: %d\n", job, result) } // Limpiamos los canales de progreso y espera. s.Lock.Lock() s.InProgress[job] = false s.IsPending[job] = make([]chan int, 0) s.Lock.Unlock() } } fmt.Printf("This job took: %s \n", time.Since(start)) } func main() { // Funcionamiento conjunto fibonacciCache := NewCache() fibonacciService := NewService(fibonacciCache) jobs := []int{40, 42, 42, 39, 40, 44, 35, 30, 31, 40, 25, 30, 42} var wg sync.WaitGroup wg.Add(len(jobs)) for _, n := range jobs { go func(job int) { defer wg.Done() fibonacciService.Work(job) }(n) // Para poder simular que no todos lleguen de golpe // y que algunos alcancen a almacenarse en memoria // Ponemos un sleep de 200ms time.Sleep(200 * time.Millisecond) } wg.Wait() } ```

Se produce un Deadlock que no es detectado por Go. Un canal está esperando una respuesta mientras el WaitGroup espera el Done. Entonces el programa nunca termina.
Sucede cuando el proceso obtiene la respuesta de Fibonacci y la manda a los canales PendingWorkers. En ese momento no hay bloqueo y es true InProgress, por lo que otro proceso puede encolarse pero nunca será contestado. Porque después del bucle se coloca un canal vacio en IsPending y la variable InProgress en false.

Mi solución al reto combina las dos soluciones anteriores creando una funcion anónima que toma un struct caché para que lo use el struct service:

package main

// usar cache para calcular los valores ya calculados y usar estados de procesos para saber si ya estan calculando un valor
import (
	"fmt"
	"log"
	"sync"
)

func Fibonacci(n int) int {
	//fmt.Printf("Calculate Expensive Fibonacci %d\n", n)
	if n <= 1 {
		return n
	}
	return Fibonacci(n-1) + Fibonacci(n-2)
}

//----------------------
type Memory struct {
	origen Function
	cache  map[int]FunctionResult
	lock   sync.Mutex
}

type Function func(key int) (interface{}, error)
type FunctionResult struct {
	value interface{}
	err   error
}

func NewCache(f Function) *Memory {
	return &Memory{
		origen: f,
		cache:  make(map[int]FunctionResult),
	}
}

func (m *Memory) Get(key int) (interface{}, error) {
	m.lock.Lock()
	result, exists := m.cache[key]
	m.lock.Unlock()

	if !exists {
		result.value, result.err = m.origen(key)
		m.lock.Lock()
		m.cache[key] = result
		m.lock.Unlock()
	}
	return result.value, result.err
}

//----------------------
type FunctionService func(key int) interface{}

func GetFibonacci(n int) (interface{}, error) {
	return Fibonacci(n), nil
}

type Service struct {
	InProgress map[int]bool
	IsPending  map[int][]chan int
	Lock       sync.RWMutex
	Origen     FunctionService
}

func (s *Service) Work(job int) {
	s.Lock.Lock()
	exists := s.InProgress[job]
	if exists {
		response := make(chan int)
		defer close(response)

		s.IsPending[job] = append(s.IsPending[job], response)
		s.Lock.Unlock()
		fmt.Printf("Waiting for response job %d\n", job)
		resp := <-response
		fmt.Printf("Response Done, received %d\t", resp)
		fmt.Printf("For job: %d result: %d\n", job, resp)
		return
	}
	s.InProgress[job] = true
	s.Lock.Unlock()

	fmt.Printf("Calculating Fibonacci for %d\n", job)
	result := s.Origen(job)
	fmt.Printf("For job: %d result: %d\n", job, result)

	s.Lock.Lock()
	pendingWorkers, exists := s.IsPending[job]
	s.InProgress[job] = false
	s.IsPending[job] = make([]chan int, 0)
	s.Lock.Unlock()

	if exists {
		// workers sleeping
		wz := 0
		for _, pendingWorker := range pendingWorkers {
			// pasamos el tipo interface a entero
			// This is known as type assertion in golang, and it is a common practice.
			resultinteger := result.(int)
			pendingWorker <- int(resultinteger)
			wz++
		}
		fmt.Printf("Result sent - all pending workers ready job: %d result: %d size: %d\n", job, result, wz)
	}
}
func NewService(F FunctionService) *Service {
	return &Service{
		InProgress: make(map[int]bool),
		IsPending:  make(map[int][]chan int),
		Origen:     F,
	}
}

func main() {
	var wg sync.WaitGroup
	cache := NewCache(GetFibonacci)
	service := NewService(
		func(n int) interface{} {
			value, err := cache.Get(n)
			if err != nil {
				log.Println(err)
			}
			return value
		},
	)
	fibo := []int{46, 46, 42, 42, 41, 41, 46, 46, 46, 42, 42, 46}
	wg.Add(len(fibo))
	for _, n := range fibo {
		go func(job int) {
			defer wg.Done()
			service.Work(job)
		}(n)
	}
	wg.Wait()

	fmt.Println()
	fmt.Println("Doing it all again!")
	for _, n := range fibo {
		wg.Add(1)
		go func(job int) {
			defer wg.Done()
			service.Work(job)
		}(n)
	}
	wg.Wait()

	fmt.Println()
	fmt.Println("Doing it all again!")
	fibo = []int{43, 42, 46, 42, 46, 42, 46, 42, 41, 46, 41, 46}
	for _, n := range fibo {
		wg.Add(1)
		go func(job int) {
			defer wg.Done()
			service.Work(job)
		}(n)
	}
	wg.Wait()
}

My solution to the challenge

package main

import (
	"fmt"
	"sync"
)

func Fibonacci(n int) int {
	if n < 2 {
		return n
	}
	return Fibonacci(n-1) + Fibonacci(n-2)
}

type Memory struct {
	cache map[int]int
	lock  sync.RWMutex
}

type Service struct {
	InProgress map[int]bool
	IsPending  map[int][]chan int
	Lock       sync.RWMutex
}

func (s *Service) Work(job int, m *Memory, c chan int) {
	if isCached(job, m) {
		fmt.Printf("Job %d is cached, the result is %d\n", job, m.cache[job])
		return
	}
	s.Lock.RLock()
	if s.InProgress[job] {
		s.Lock.RUnlock()
		response := make(chan int)
		s.Lock.Lock()
		s.IsPending[job] = append(s.IsPending[job], response)
		s.Lock.Unlock()
		fmt.Printf("Job %d is in progress, waiting for it to finish\n", job)
		fmt.Printf("Response Done, the result is %d for job %d\n", <-response, job)
		return
	}
	s.Lock.RUnlock()
	s.Lock.Lock()
	s.InProgress[job] = true
	s.Lock.Unlock()
	fmt.Printf("Job %d is not in progress, starting it\n", job)
	result := Fibonacci(job)
	m.lock.Lock()
	m.cache[job] = result
	m.lock.Unlock()
	s.Lock.RLock()
	pendingWorkers, ok := s.IsPending[job]
	s.Lock.RUnlock()
	if ok {
		fmt.Printf("Job %d is done, notifying %d workers\n", job, len(pendingWorkers))
		for _, worker := range pendingWorkers {
			worker <- result
		}
	}
	s.Lock.Lock()
	s.InProgress[job] = false
	delete(s.IsPending, job)
	s.Lock.Unlock()
}

func NewService() *Service {
	return &Service{
		InProgress: make(map[int]bool),
		IsPending:  make(map[int][]chan int),
	}
}

func NewMemory() *Memory {
	return &Memory{
		cache: make(map[int]int),
	}
}

func isCached(n int, m *Memory) bool {
	m.lock.RLock()
	_, ok := m.cache[n]
	m.lock.RUnlock()
	return ok
}

func main() {
	workers := 7
	c := make(chan int, workers)
	service := NewService()
	memory := NewMemory()
	jobs := []int{44, 46, 47, 49, 44, 34, 47, 41, 41, 33,
		43, 40, 46, 43, 45, 49, 34, 47, 36, 43, 48, 41,
		35, 46, 33, 42, 49, 47, 32, 30, 50, 50, 30, 40,
		44, 30, 49, 34, 48, 43, 50, 42, 48, 31, 35, 30,
		33, 40, 40, 50, 49, 47, 36, 43, 48, 41, 35, 46,
	}
	var wg sync.WaitGroup
	for _, job := range jobs {
		c <- 1
		wg.Add(1)
		go func(job int) {
			defer wg.Done()
			service.Work(job, memory, c)
			<-c
		}(job)
	}
	wg.Wait()
}

Mi solucion al reto consiste en implementar el struct Service de esta clase y aplicar el sistema de caching de la clase anterior. En la ejecución de la funcion main, intento simular una ejecucion de jobs tiempo despues, pero con los mismos valores, para asegurar que el sistema de caching funcione como es esperado.

package main

import (
	"fmt"
	"sync"
	"time"
)

func ExpensiveFibonacci(n int) int {
	if n <= 1 {
		return n
	}

	return ExpensiveFibonacci(n-1) + ExpensiveFibonacci(n-2)
}

type Function func(key int) (interface{}, error)
type FunctionResult struct {
	value interface{}
	err   error
}

type Service struct {
	f          Function
	cache      map[int]FunctionResult
	InProgress map[int]bool
	IsPending  map[int][]chan FunctionResult
	Lock       sync.RWMutex
}

func (s *Service) Work(job int) FunctionResult {
	s.Lock.RLock()
	exists := s.InProgress[job]
	if exists {
		s.Lock.RUnlock()
		response := make(chan FunctionResult)
		defer close(response)

		s.Lock.Lock()
		s.IsPending[job] = append(s.IsPending[job], response)
		s.Lock.Unlock()
		fmt.Printf("Waiting for Response job: %d\n", job)
		return <-response
	}
	s.Lock.RUnlock()

	s.Lock.RLock()
	result, exists := s.cache[job]
	s.Lock.RUnlock()

	if !exists {
		s.Lock.Lock()
		s.InProgress[job] = true
		s.Lock.Unlock()
		
		result.value, result.err = s.f(job)

		s.Lock.RLock()
		pendingWorkers, exists := s.IsPending[job]
		s.Lock.RUnlock()

		if exists {
			for _, pendingWorker := range pendingWorkers {
				pendingWorker <- result
			}
			fmt.Printf("Result send - all pending workers ready job: %d\n", job)
			fmt.Println()
		}

		s.Lock.Lock()
		s.InProgress[job] = false
		s.IsPending[job] = make([]chan FunctionResult, 0)
		s.cache[job] = result
		s.Lock.Unlock()
	}

	return result
}

func NewService(f Function) *Service {
	return &Service{
		f:          f,
		cache:      make(map[int]FunctionResult),
		InProgress: make(map[int]bool),
		IsPending:  make(map[int][]chan FunctionResult),
	}
}

func GetFibonacci(key int) (interface{}, error) {
	return ExpensiveFibonacci(key), nil
}

func main() {
	service := NewService(GetFibonacci)
	jobs := []int {7,10,15,3,2,25,40,13,15,10,40,5,2,1}
	var wg sync.WaitGroup
	wg.Add(len(jobs))

	fmt.Println()
	fmt.Println("First attempt")
	for _, n := range jobs {
		go func (job int) {
			defer wg.Done()
			start := time.Now()
			res := service.Work(job)
			fmt.Printf("Calculate: Value: %d, Time: %s, Result: %d\n", job, time.Since(start), res.value)
		}(n)
	}
	wg.Wait()
	
	fmt.Println()
	fmt.Println("Second attempt")
	wg.Add(len(jobs))
	for _, n := range jobs {
		go func (job int) {
			defer wg.Done()
			start := time.Now()
			res := service.Work(job)
			fmt.Printf("Calculate: Value: %d, Time: %s, Result: %d\n", job, time.Since(start), res.value)
		}(n)
	}
	wg.Wait()
}

Integre el código de cache para poder mejorar el ejecicio anterior que presentaba problemas, ya no se calcula varias veces el fibonacci de un número que ya ha sido enviado:
|

¡IMPORTANTE!: Tener claro que el ejercicio del cache del número fibonacci no es el correcto para demostrar las cualidades del código, pero la idea es demostrar que no se hacen calculos duplicados y que a su vez se puedan realizar de manera concurrente

package main

import (
	"log"
	"sync"
	"time"
)

type Function func(key int) (interface{}, error)

type FunctionResult struct {
	value interface{}
	err   error
}

type Memory struct {
	f     Function
	cache map[int]FunctionResult
	rwm   sync.RWMutex
}

func (m *Memory) Get(key int) (interface{}, error) {
	m.rwm.RLock()
	result, ok := m.cache[key]
	m.rwm.RUnlock()
	if !ok {
		log.Printf("Calculate for %d\n", key)
		result.value, result.err = m.f(key)

		m.rwm.Lock()
		m.cache[key] = result
		m.rwm.Unlock()
	} else {
		log.Printf("Found for %d\n", key)
	}

	return result.value, result.err
}

type Service struct {
	InProgress map[int]bool
	IsPending  map[int][]chan FunctionResult
	Mutex      sync.RWMutex
	memory     *Memory
}

func (s *Service) Work(job int) {
	s.Mutex.Lock()
	exists := s.InProgress[job]
	s.Mutex.Unlock()
	if exists {
		response := make(chan FunctionResult)
		defer close(response)

		s.Mutex.Lock()
		s.IsPending[job] = append(s.IsPending[job], response)
		s.Mutex.Unlock()
		log.Printf("Waiting for response job: %d\n", job)
		res := <-response
		log.Printf("Response for job %d done, received %d\n", job, res)
		return
	}

	s.Mutex.Lock()
	s.InProgress[job] = true
	s.Mutex.Unlock()

	start := time.Now()
	result, err := s.memory.Get(job)
	log.Printf("Execution time [%d, %s, %d]\n", job, time.Since(start), result)

	s.Mutex.RLock()
	pendingWorkers, exists := s.IsPending[job]
	s.Mutex.RUnlock()

	if exists {
		for _, pendingWorker := range pendingWorkers {
			fResult := FunctionResult{
				value: result,
				err:   err,
			}
			pendingWorker <- fResult
		}
	}

	s.Mutex.Lock()
	s.InProgress[job] = false
	s.IsPending[job] = make([]chan FunctionResult, 0)
	s.Mutex.Unlock()
}

func NewService(f Function) *Service {
	return &Service{
		InProgress: make(map[int]bool),
		IsPending:  make(map[int][]chan FunctionResult),
		memory: &Memory{
			f:     f,
			cache: make(map[int]FunctionResult),
		},
	}
}

func ExpensiveFibonacci(n int) int {
	if n <= 1 {
		return n
	}

	return ExpensiveFibonacci(n-1) + ExpensiveFibonacci(n-2)
}

func main() {
	log.SetFlags(log.Ldate + log.Ltime + log.Lmicroseconds)
	service := NewService(func(n int) (interface{}, error) {
		return ExpensiveFibonacci(n), nil
	})
	jobs := []int{42, 40, 41, 42, 38, 38, 2, 42}
	wg := new(sync.WaitGroup)
	wg.Add(len(jobs))

	for _, n := range jobs {
		go func(job int) {
			defer wg.Done()
			service.Work(job)
		}(n)
	}
	wg.Wait()

	wg.Add(2)
	for i := 0; i < 2; i++ {
		go func() {
			defer wg.Done()
			service.Work(42)
		}()
	}
	wg.Wait()
}

Les comparto la solución que desarrollé para este desafío en éste enlace. Realicé un sistema de cache en memoria que se persiste en el sistema de archivos, al reiniciar la aplicación se vuelve a cargar el caché en memoria (característica inspirada en Redis)

Mi solución al reto: la función que calcula Fibonacci se ejecuta solo la cantidad de veces que es necesaria, de igual forma se guarda el caché de todo número previamente calculado. Esto mejora considerablemente el performance ya que se evita el consumo de CPU en la repetición del cálculo de Fibonacci para números repetidos.

package main

import (
	"fmt"
	"log"
	"sync"
	"time"
)

func Fibonacci(n int) int {
	if n <= 1 {
		return n
	}
	return Fibonacci(n-1) + Fibonacci(n-2)
}

type Memory struct {
	f          Function
	InProgress map[int]bool
	IsPending  map[int][]chan int
	cache      map[int]FunctionResult
	lock       sync.RWMutex
}

type Function func(key int) (int, error)

type FunctionResult struct {
	value interface{}
	err   error
}

func NewCache(f Function) *Memory {
	return &Memory{
		f:          f,
		InProgress: make(map[int]bool),
		IsPending:  make(map[int][]chan int),
		cache:      make(map[int]FunctionResult),
	}
}

func (m *Memory) Get(key int) (interface{}, error) {
	m.lock.Lock()
	result, exists := m.cache[key]
	m.lock.Unlock()
	if !exists {
		result.value, result.err = m.Work(key)
		m.lock.Lock()
		m.cache[key] = result
		m.lock.Unlock()
	}
	return result.value, result.err
}

func (m *Memory) Work(job int) (int, error) {
	m.lock.RLock()
	exists := m.InProgress[job]
	if exists {
		m.lock.RUnlock()
		response := make(chan int)
		defer close(response)

		m.lock.Lock()
		m.IsPending[job] = append(m.IsPending[job], response)
		m.lock.Unlock()
		resp := <-response
		return resp, nil
	}
	m.lock.RUnlock()

	m.lock.Lock()
	m.InProgress[job] = true
	m.lock.Unlock()

	result, _ := m.f(job)

	m.lock.RLock()
	pendingWorkers, exists := m.IsPending[job]
	m.lock.RUnlock()

	if exists {
		for _, pendingWorker := range pendingWorkers {
			pendingWorker <- result
		}
	}
	m.lock.Lock()
	m.InProgress[job] = false
	m.IsPending[job] = make([]chan int, 0)
	m.lock.Unlock()
	return result, nil
}

func GetFibonacci(n int) (int, error) {
	fmt.Println("Se ejecuto GetFibonacci")
	return Fibonacci(n), nil
}

func main() {
	timeInit := time.Now()
	cache := NewCache(GetFibonacci)
	fibo := []int{42, 40, 41, 42, 38, 40, 40, 41, 43, 43, 43}
	var wg sync.WaitGroup
	for _, n := range fibo {
		wg.Add(1)
		go func(index int) {
			defer wg.Done()
			start := time.Now()
			value, err := cache.Get(index)
			if err != nil {
				log.Println(err)
			}
			fmt.Printf("%d, %s, %d\n", index, time.Since(start), value)
		}(n)
	}
	wg.Wait()
	fmt.Printf("tiempo final %s\n", time.Since(timeInit))
}