No tienes acceso a esta clase

¡Continúa aprendiendo! Únete y comienza a potenciar tu carrera

Compra acceso a todo Platzi por 1 año

Suscríbete

Termina en:

14D
5H
59M
50S

# Reutilización de computación intensiva

7/19
Recursos

Aportes 10

Preguntas 2

Ordenar por:

¿Quieres ver más aportes, preguntas y respuestas de la comunidad?

Mi solucion 😄

``````package main

import (
"fmt"
"sync"
"time"
)

func ExpensiveFibonacci(n interface{}) int {
switch n := n.(type) {
case int:
if n <= 1 {
return n
}
return ExpensiveFibonacci(n-1) + ExpensiveFibonacci(n-2)
default:
return 0
}
}

// Memory holds a function and a map of results
type Memory struct {
f     Function                       // Function to be used
cache map[interface{}]FunctionResult // Map of results for a given key
lock  sync.RWMutex                   // Lock to protect the cache

InProgress map[interface{}]bool                  // Map of jobs in progress
IsPending  map[interface{}][]chan FunctionResult // Map of jobs waiting for a response
}

// A function has to recive a value and return a value and an error
type Function func(key interface{}) (interface{}, error)

// The result of a function
type FunctionResult struct {
value interface{}
err   error
}

// NewCache creates a new cache
func NewCache(f Function) *Memory {
return &Memory{
f:          f,
cache:      make(map[interface{}]FunctionResult),
lock:       sync.RWMutex{},
InProgress: make(map[interface{}]bool),
IsPending:  make(map[interface{}][]chan FunctionResult),
}
}

func (m *Memory) service(key interface{}) (interface{}, error) {
// Check if the job is already in progress
m.lock.RLock()
_, ok := m.InProgress[key]
m.lock.RUnlock()

// If the job is already in progress, then wait for the response
if ok {
// If the job is already in progress, then wait for the response
response := make(chan FunctionResult)
defer close(response)

// Add the channel to the pending list
m.lock.Lock()
m.IsPending[key] = append(m.IsPending[key], response)
m.lock.Unlock()

// Wait for the response
res := <-response
return res.value, res.err
}

// If the job is not in progress, then start the job
m.lock.Lock()
m.InProgress[key] = true
m.lock.Unlock()

// Start the job
fmt.Printf("Starting job %d\n", key)
fnresult, err := m.f(key)
res := FunctionResult{value: fnresult, err: err}
if err != nil {
fmt.Printf("Error: %v\n", err)
}

// Get the pending workers for this job
m.lock.RLock()
pendingWorkers, exist := m.IsPending[key]
m.lock.RUnlock()

// If there are pending workers, then send the response
if exist {
for _, worker := range pendingWorkers {
worker <- res
}
}

// We are done with this job, reset the state
m.lock.Lock()
m.InProgress[key] = false
m.IsPending[key] = make([]chan FunctionResult, 0)
m.lock.Unlock()

// Add the value to the cache
m.lock.Lock()
m.cache[key] = res
m.lock.Unlock()

fmt.Printf("Finished job %d, got %d\n", key, res)
return res.value, res.err

}

// Get returns the value for a given key
func (m *Memory) Get(key interface{}) (interface{}, error) {

// Lock the cache
m.lock.RLock()

// Check if the value is in the cache
res, exist := m.cache[key]

// Unlock the cache
m.lock.RUnlock()

// If the value is in the cache, return it
if exist {
return res.value, res.err
}

// If the value is not in the cache, then start the service
res.value, res.err = m.service(key)
return res.value, res.err

}

// Function to be used in the cache
func GetFibonacci(key interface{}) (interface{}, error) {
return ExpensiveFibonacci(key), nil
}

func main() {
empezar := time.Now()
// Create a cache and some values
cache := NewCache(GetFibonacci)
values := []int{46, 46, 42, 42, 41, 41, 46, 46, 46, 42, 42, 46}

var wg sync.WaitGroup

// For each value to calculate, get the value and print the time it took to calculate
for _, v := range values {
go func(v int) {
defer wg.Done()

start := time.Now()

res, err := cache.Get(v)
if err != nil {
fmt.Printf("Error: %v\n", err)
}

fmt.Printf("%v:%d took %v\n", v, res, time.Since(start))
}(v)
}
wg.Wait()

// This is to prove that cache is working
fmt.Println()
fmt.Println("Doing it all again!")
for _, v := range values {
go func(v int) {
defer wg.Done()

start := time.Now()

res, err := cache.Get(v)
if err != nil {
fmt.Printf("Error: %v\n", err)
}

fmt.Printf("%v:%d took %v\n", v, res, time.Since(start))
}(v)
}

wg.Wait()

fmt.Printf("Total time: %v\n", time.Since(empezar))
}

``````

Les comparto mi solución al reto.
Un sistema caché concurrente con Go: https://gist.github.com/UnPolinomio/5098c4146e97bd3c3aedc26e791e8785

Creo que con esta version logré evitar los deadlocks existentes en la version de la clase asi como el acceso a calculos multiples de un mismo numero.

Aunque si por alguna razon algun canal de `opResultsChan` no obtiene una respuesta se puede quedar ahí zombie

``````package main

import (
"log"
"sync"
)

type FiboService struct {
mutex sync.RWMutex
cache map[int]int

opInProgress  map[int]bool
opResultsChan map[int][]chan int
}

func NewFiboService() *FiboService {
return &FiboService{
cache:         make(map[int]int),
opInProgress:  make(map[int]bool),
opResultsChan: make(map[int][]chan int),
}
}

func (f *FiboService) GetFromCache(key int) (v int, ok bool) {
f.mutex.RLock()
v, ok = f.cache[key]
f.mutex.RUnlock()
return
}

func (f *FiboService) PutToCache(key, value int) {
f.mutex.Lock()
f.cache[key] = value
f.mutex.Unlock()
}

func (f *FiboService) Fibonacci(n int) int {
// return easy results
if n <= 1 {
return n
}

// Check if previously calculated
if v, ok := f.GetFromCache(n); ok {
return v
}

// Check if already in progress
f.mutex.Lock()
if f.opInProgress[n] {
resultChan := make(chan int)

f.opResultsChan[n] = append(f.opResultsChan[n], resultChan)
f.mutex.Unlock()

log.Println("waiting for result of job: ", n)
result := <-resultChan
log.Println("Got result for job: ", n)
return result
}

// calculate new operation
f.opInProgress[n] = true
f.mutex.Unlock()

result := f.Fibonacci(n-1) + f.Fibonacci(n-2)

f.PutToCache(n, result)

go f.notifyResultChans(n, result)

return result
}

func (f *FiboService) notifyResultChans(job, result int) {
log.Println("Notifying to channels waiting for job: ", job)
f.mutex.Lock()

if resultChans := f.opResultsChan[job]; resultChans != nil {
for _, c := range resultChans {
c <- result
close(c)
}
}

delete(f.opInProgress, job)
delete(f.opResultsChan, job)
f.mutex.Unlock()

log.Println("Completed notifying to channels waiting for job: ", job)
}

func main() {
fiboSvc := NewFiboService()

jobs := []int{1, 2, 3, 4, 5, 6, 7, 8}
var wg sync.WaitGroup

for _, n := range jobs {
go func(job int) {
defer wg.Done()
result := fiboSvc.Fibonacci(job)

log.Printf("completed job: %d with result %d", job, result)
}(n)
}
wg.Wait()
}

``````

Se produce un Deadlock que no es detectado por Go. Un canal está esperando una respuesta mientras el WaitGroup espera el Done. Entonces el programa nunca termina.
Sucede cuando el proceso obtiene la respuesta de Fibonacci y la manda a los canales PendingWorkers. En ese momento no hay bloqueo y es true InProgress, por lo que otro proceso puede encolarse pero nunca será contestado. Porque después del bucle se coloca un canal vacio en IsPending y la variable InProgress en false.

Mi solución al reto combina las dos soluciones anteriores creando una funcion anónima que toma un struct caché para que lo use el struct service:

``````package main

// usar cache para calcular los valores ya calculados y usar estados de procesos para saber si ya estan calculando un valor
import (
"fmt"
"log"
"sync"
)

func Fibonacci(n int) int {
//fmt.Printf("Calculate Expensive Fibonacci %d\n", n)
if n <= 1 {
return n
}
return Fibonacci(n-1) + Fibonacci(n-2)
}

//----------------------
type Memory struct {
origen Function
cache  map[int]FunctionResult
lock   sync.Mutex
}

type Function func(key int) (interface{}, error)
type FunctionResult struct {
value interface{}
err   error
}

func NewCache(f Function) *Memory {
return &Memory{
origen: f,
cache:  make(map[int]FunctionResult),
}
}

func (m *Memory) Get(key int) (interface{}, error) {
m.lock.Lock()
result, exists := m.cache[key]
m.lock.Unlock()

if !exists {
result.value, result.err = m.origen(key)
m.lock.Lock()
m.cache[key] = result
m.lock.Unlock()
}
return result.value, result.err
}

//----------------------
type FunctionService func(key int) interface{}

func GetFibonacci(n int) (interface{}, error) {
return Fibonacci(n), nil
}

type Service struct {
InProgress map[int]bool
IsPending  map[int][]chan int
Lock       sync.RWMutex
Origen     FunctionService
}

func (s *Service) Work(job int) {
s.Lock.Lock()
exists := s.InProgress[job]
if exists {
response := make(chan int)
defer close(response)

s.IsPending[job] = append(s.IsPending[job], response)
s.Lock.Unlock()
fmt.Printf("Waiting for response job %d\n", job)
resp := <-response
fmt.Printf("For job: %d result: %d\n", job, resp)
return
}
s.InProgress[job] = true
s.Lock.Unlock()

fmt.Printf("Calculating Fibonacci for %d\n", job)
result := s.Origen(job)
fmt.Printf("For job: %d result: %d\n", job, result)

s.Lock.Lock()
pendingWorkers, exists := s.IsPending[job]
s.InProgress[job] = false
s.IsPending[job] = make([]chan int, 0)
s.Lock.Unlock()

if exists {
// workers sleeping
wz := 0
for _, pendingWorker := range pendingWorkers {
// pasamos el tipo interface a entero
// This is known as type assertion in golang, and it is a common practice.
resultinteger := result.(int)
pendingWorker <- int(resultinteger)
wz++
}
fmt.Printf("Result sent - all pending workers ready job: %d result: %d size: %d\n", job, result, wz)
}
}
func NewService(F FunctionService) *Service {
return &Service{
InProgress: make(map[int]bool),
IsPending:  make(map[int][]chan int),
Origen:     F,
}
}

func main() {
var wg sync.WaitGroup
cache := NewCache(GetFibonacci)
service := NewService(
func(n int) interface{} {
value, err := cache.Get(n)
if err != nil {
log.Println(err)
}
return value
},
)
fibo := []int{46, 46, 42, 42, 41, 41, 46, 46, 46, 42, 42, 46}
for _, n := range fibo {
go func(job int) {
defer wg.Done()
service.Work(job)
}(n)
}
wg.Wait()

fmt.Println()
fmt.Println("Doing it all again!")
for _, n := range fibo {
go func(job int) {
defer wg.Done()
service.Work(job)
}(n)
}
wg.Wait()

fmt.Println()
fmt.Println("Doing it all again!")
fibo = []int{43, 42, 46, 42, 46, 42, 46, 42, 41, 46, 41, 46}
for _, n := range fibo {
go func(job int) {
defer wg.Done()
service.Work(job)
}(n)
}
wg.Wait()
}

``````

My solution to the challenge

``````package main

import (
"fmt"
"sync"
)

func Fibonacci(n int) int {
if n < 2 {
return n
}
return Fibonacci(n-1) + Fibonacci(n-2)
}

type Memory struct {
cache map[int]int
lock  sync.RWMutex
}

type Service struct {
InProgress map[int]bool
IsPending  map[int][]chan int
Lock       sync.RWMutex
}

func (s *Service) Work(job int, m *Memory, c chan int) {
if isCached(job, m) {
fmt.Printf("Job %d is cached, the result is %d\n", job, m.cache[job])
return
}
s.Lock.RLock()
if s.InProgress[job] {
s.Lock.RUnlock()
response := make(chan int)
s.Lock.Lock()
s.IsPending[job] = append(s.IsPending[job], response)
s.Lock.Unlock()
fmt.Printf("Job %d is in progress, waiting for it to finish\n", job)
fmt.Printf("Response Done, the result is %d for job %d\n", <-response, job)
return
}
s.Lock.RUnlock()
s.Lock.Lock()
s.InProgress[job] = true
s.Lock.Unlock()
fmt.Printf("Job %d is not in progress, starting it\n", job)
result := Fibonacci(job)
m.lock.Lock()
m.cache[job] = result
m.lock.Unlock()
s.Lock.RLock()
pendingWorkers, ok := s.IsPending[job]
s.Lock.RUnlock()
if ok {
fmt.Printf("Job %d is done, notifying %d workers\n", job, len(pendingWorkers))
for _, worker := range pendingWorkers {
worker <- result
}
}
s.Lock.Lock()
s.InProgress[job] = false
delete(s.IsPending, job)
s.Lock.Unlock()
}

func NewService() *Service {
return &Service{
InProgress: make(map[int]bool),
IsPending:  make(map[int][]chan int),
}
}

func NewMemory() *Memory {
return &Memory{
cache: make(map[int]int),
}
}

func isCached(n int, m *Memory) bool {
m.lock.RLock()
_, ok := m.cache[n]
m.lock.RUnlock()
return ok
}

func main() {
workers := 7
c := make(chan int, workers)
service := NewService()
memory := NewMemory()
jobs := []int{44, 46, 47, 49, 44, 34, 47, 41, 41, 33,
43, 40, 46, 43, 45, 49, 34, 47, 36, 43, 48, 41,
35, 46, 33, 42, 49, 47, 32, 30, 50, 50, 30, 40,
44, 30, 49, 34, 48, 43, 50, 42, 48, 31, 35, 30,
33, 40, 40, 50, 49, 47, 36, 43, 48, 41, 35, 46,
}
var wg sync.WaitGroup
for _, job := range jobs {
c <- 1
go func(job int) {
defer wg.Done()
service.Work(job, memory, c)
<-c
}(job)
}
wg.Wait()
}

``````

Mi solucion al reto consiste en implementar el struct Service de esta clase y aplicar el sistema de caching de la clase anterior. En la ejecución de la funcion main, intento simular una ejecucion de jobs tiempo despues, pero con los mismos valores, para asegurar que el sistema de caching funcione como es esperado.

``````package main

import (
"fmt"
"sync"
"time"
)

func ExpensiveFibonacci(n int) int {
if n <= 1 {
return n
}

return ExpensiveFibonacci(n-1) + ExpensiveFibonacci(n-2)
}

type Function func(key int) (interface{}, error)
type FunctionResult struct {
value interface{}
err   error
}

type Service struct {
f          Function
cache      map[int]FunctionResult
InProgress map[int]bool
IsPending  map[int][]chan FunctionResult
Lock       sync.RWMutex
}

func (s *Service) Work(job int) FunctionResult {
s.Lock.RLock()
exists := s.InProgress[job]
if exists {
s.Lock.RUnlock()
response := make(chan FunctionResult)
defer close(response)

s.Lock.Lock()
s.IsPending[job] = append(s.IsPending[job], response)
s.Lock.Unlock()
fmt.Printf("Waiting for Response job: %d\n", job)
return <-response
}
s.Lock.RUnlock()

s.Lock.RLock()
result, exists := s.cache[job]
s.Lock.RUnlock()

if !exists {
s.Lock.Lock()
s.InProgress[job] = true
s.Lock.Unlock()

result.value, result.err = s.f(job)

s.Lock.RLock()
pendingWorkers, exists := s.IsPending[job]
s.Lock.RUnlock()

if exists {
for _, pendingWorker := range pendingWorkers {
pendingWorker <- result
}
fmt.Printf("Result send - all pending workers ready job: %d\n", job)
fmt.Println()
}

s.Lock.Lock()
s.InProgress[job] = false
s.IsPending[job] = make([]chan FunctionResult, 0)
s.cache[job] = result
s.Lock.Unlock()
}

return result
}

func NewService(f Function) *Service {
return &Service{
f:          f,
cache:      make(map[int]FunctionResult),
InProgress: make(map[int]bool),
IsPending:  make(map[int][]chan FunctionResult),
}
}

func GetFibonacci(key int) (interface{}, error) {
return ExpensiveFibonacci(key), nil
}

func main() {
service := NewService(GetFibonacci)
jobs := []int {7,10,15,3,2,25,40,13,15,10,40,5,2,1}
var wg sync.WaitGroup

fmt.Println()
fmt.Println("First attempt")
for _, n := range jobs {
go func (job int) {
defer wg.Done()
start := time.Now()
res := service.Work(job)
fmt.Printf("Calculate: Value: %d, Time: %s, Result: %d\n", job, time.Since(start), res.value)
}(n)
}
wg.Wait()

fmt.Println()
fmt.Println("Second attempt")
for _, n := range jobs {
go func (job int) {
defer wg.Done()
start := time.Now()
res := service.Work(job)
fmt.Printf("Calculate: Value: %d, Time: %s, Result: %d\n", job, time.Since(start), res.value)
}(n)
}
wg.Wait()
}
``````

Integre el código de cache para poder mejorar el ejecicio anterior que presentaba problemas, ya no se calcula varias veces el fibonacci de un número que ya ha sido enviado:
|

¡IMPORTANTE!: Tener claro que el ejercicio del cache del número fibonacci no es el correcto para demostrar las cualidades del código, pero la idea es demostrar que no se hacen calculos duplicados y que a su vez se puedan realizar de manera concurrente

``````package main

import (
"log"
"sync"
"time"
)

type Function func(key int) (interface{}, error)

type FunctionResult struct {
value interface{}
err   error
}

type Memory struct {
f     Function
cache map[int]FunctionResult
rwm   sync.RWMutex
}

func (m *Memory) Get(key int) (interface{}, error) {
m.rwm.RLock()
result, ok := m.cache[key]
m.rwm.RUnlock()
if !ok {
log.Printf("Calculate for %d\n", key)
result.value, result.err = m.f(key)

m.rwm.Lock()
m.cache[key] = result
m.rwm.Unlock()
} else {
log.Printf("Found for %d\n", key)
}

return result.value, result.err
}

type Service struct {
InProgress map[int]bool
IsPending  map[int][]chan FunctionResult
Mutex      sync.RWMutex
memory     *Memory
}

func (s *Service) Work(job int) {
s.Mutex.Lock()
exists := s.InProgress[job]
s.Mutex.Unlock()
if exists {
response := make(chan FunctionResult)
defer close(response)

s.Mutex.Lock()
s.IsPending[job] = append(s.IsPending[job], response)
s.Mutex.Unlock()
log.Printf("Waiting for response job: %d\n", job)
res := <-response
log.Printf("Response for job %d done, received %d\n", job, res)
return
}

s.Mutex.Lock()
s.InProgress[job] = true
s.Mutex.Unlock()

start := time.Now()
result, err := s.memory.Get(job)
log.Printf("Execution time [%d, %s, %d]\n", job, time.Since(start), result)

s.Mutex.RLock()
pendingWorkers, exists := s.IsPending[job]
s.Mutex.RUnlock()

if exists {
for _, pendingWorker := range pendingWorkers {
fResult := FunctionResult{
value: result,
err:   err,
}
pendingWorker <- fResult
}
}

s.Mutex.Lock()
s.InProgress[job] = false
s.IsPending[job] = make([]chan FunctionResult, 0)
s.Mutex.Unlock()
}

func NewService(f Function) *Service {
return &Service{
InProgress: make(map[int]bool),
IsPending:  make(map[int][]chan FunctionResult),
memory: &Memory{
f:     f,
cache: make(map[int]FunctionResult),
},
}
}

func ExpensiveFibonacci(n int) int {
if n <= 1 {
return n
}

return ExpensiveFibonacci(n-1) + ExpensiveFibonacci(n-2)
}

func main() {
log.SetFlags(log.Ldate + log.Ltime + log.Lmicroseconds)
service := NewService(func(n int) (interface{}, error) {
return ExpensiveFibonacci(n), nil
})
jobs := []int{42, 40, 41, 42, 38, 38, 2, 42}
wg := new(sync.WaitGroup)

for _, n := range jobs {
go func(job int) {
defer wg.Done()
service.Work(job)
}(n)
}
wg.Wait()

for i := 0; i < 2; i++ {
go func() {
defer wg.Done()
service.Work(42)
}()
}
wg.Wait()
}
``````

Les comparto la solución que desarrollé para este desafío en éste enlace. Realicé un sistema de cache en memoria que se persiste en el sistema de archivos, al reiniciar la aplicación se vuelve a cargar el caché en memoria (característica inspirada en Redis)

Mi solución al reto: la función que calcula Fibonacci se ejecuta solo la cantidad de veces que es necesaria, de igual forma se guarda el caché de todo número previamente calculado. Esto mejora considerablemente el performance ya que se evita el consumo de CPU en la repetición del cálculo de Fibonacci para números repetidos.

``````package main

import (
"fmt"
"log"
"sync"
"time"
)

func Fibonacci(n int) int {
if n <= 1 {
return n
}
return Fibonacci(n-1) + Fibonacci(n-2)
}

type Memory struct {
f          Function
InProgress map[int]bool
IsPending  map[int][]chan int
cache      map[int]FunctionResult
lock       sync.RWMutex
}

type Function func(key int) (int, error)

type FunctionResult struct {
value interface{}
err   error
}

func NewCache(f Function) *Memory {
return &Memory{
f:          f,
InProgress: make(map[int]bool),
IsPending:  make(map[int][]chan int),
cache:      make(map[int]FunctionResult),
}
}

func (m *Memory) Get(key int) (interface{}, error) {
m.lock.Lock()
result, exists := m.cache[key]
m.lock.Unlock()
if !exists {
result.value, result.err = m.Work(key)
m.lock.Lock()
m.cache[key] = result
m.lock.Unlock()
}
return result.value, result.err
}

func (m *Memory) Work(job int) (int, error) {
m.lock.RLock()
exists := m.InProgress[job]
if exists {
m.lock.RUnlock()
response := make(chan int)
defer close(response)

m.lock.Lock()
m.IsPending[job] = append(m.IsPending[job], response)
m.lock.Unlock()
resp := <-response
return resp, nil
}
m.lock.RUnlock()

m.lock.Lock()
m.InProgress[job] = true
m.lock.Unlock()

result, _ := m.f(job)

m.lock.RLock()
pendingWorkers, exists := m.IsPending[job]
m.lock.RUnlock()

if exists {
for _, pendingWorker := range pendingWorkers {
pendingWorker <- result
}
}
m.lock.Lock()
m.InProgress[job] = false
m.IsPending[job] = make([]chan int, 0)
m.lock.Unlock()
return result, nil
}

func GetFibonacci(n int) (int, error) {
fmt.Println("Se ejecuto GetFibonacci")
return Fibonacci(n), nil
}

func main() {
timeInit := time.Now()
cache := NewCache(GetFibonacci)
fibo := []int{42, 40, 41, 42, 38, 40, 40, 41, 43, 43, 43}
var wg sync.WaitGroup
for _, n := range fibo {
go func(index int) {
defer wg.Done()
start := time.Now()
value, err := cache.Get(index)
if err != nil {
log.Println(err)
}
fmt.Printf("%d, %s, %d\n", index, time.Since(start), value)
}(n)
}
wg.Wait()
fmt.Printf("tiempo final %s\n", time.Since(timeInit))
}
``````