No tienes acceso a esta clase

¡Continúa aprendiendo! Únete y comienza a potenciar tu carrera Última oportunidad para asegurar tu aprendizaje por 1 año a precio especial

Suscríbete

Termina en:

0D
6H
48M
36S ## Curso de Go Intermedio: Programación Orientada a Objetos y Concurrencia Néstor Escoto

# Worker pools

26/30
Recursos

Aportes 8

Preguntas 5

Ordenar por:

¿Quieres ver más aportes, preguntas y respuestas de la comunidad?

Worker pools are a model in which a fixed number of m workers (implemented in Go with goroutines) work their way through n tasks in a work queue (implemented in Go with a channel). Work stays in a queue until a worker finishes up its current task and pulls a new one off.

Basically there are a set amount of workers and n tasks. Every worker takes a task, and when it’s done it takes another one. This is very useful for breakable task that require a lot of processing.

This is an example program:

``````package main

import "fmt"

func Worker(id int, jobs <- chan int, results chan <- int) {
for job := range jobs {
fmt.Printf("--> Worker #%d started fib with %d\n", id, job)
fib := Fibo(job)
fmt.Printf("Worker #%d finished. Job: %d; Fibo: %d\n", id, job, fib)
results <- fib
}
}

func Fibo(n int) int {
if n <= 1 {
return n
}

return Fibo(n - 1) + Fibo(n - 2)
}

func main() {
tasks := []int{2, 3, 4, 7, 10, 15}
nWorkers := 3

// creates the workers
for i := 0; i < nWorkers; i++ {
go Worker(i, jobs, results)
}

// sends them to work
for _, v := range tasks {
jobs <- v
}

// close channel to indicate that's all the work to do
close(jobs)

for i := 0; i < len(tasks); i++ {
<- results
}
}
``````

### Explanation

https://gobyexample.com/worker-pools

Escribí unas modificaciones del código visto en clase para observar realmente cómo es que las gorutines y los workers funcionan a gran escala (o bueno eso creo), el código al final de su ejecución dice cuánto tiempo tomó y en mi caso, realizando las series de Fibonacci con los workers y gorutines tomó 9.544643109 segundos, de la forma convencional tomó 18.054724068 les dejo el código por si alguien quiere probarlo, sólo deben cambiar el valor de la constante withGoRutinesAndWorkers para hacer la ejecución de una forma u otra.

Código:

``````package main

import (
"fmt"
"time"
)

func Worker(id int, jobs <-chan int, results chan<- int) {
for job := range jobs {
fmt.Printf("Worker with id %d started fib with %d\n", id, job)
fib := Fibonacci(job)
fmt.Printf("Worker with id %d has finished the job %d, and fib %d\n", id, job, fib)
results <- fib
}
}

func Fibonacci(n int) int {
if n <= 1 {
return n
}
return Fibonacci(n-1) + Fibonacci(n-2)
}

func main() {
start := time.Now()
tasks := []int{2, 34, 36, 17, 10, 40, 6, 22, 41, 44, 33, 2, 5, 2, 34, 36, 17, 10, 40, 6, 22, 41, 44, 33, 2, 5, 17, 10, 40, 6, 22, 2, 5, 2, 34, 36, 22, 41, 44}
const withGoRutinesAndWorkers = false
if withGoRutinesAndWorkers {
nWorkers := 12

for i := 0; i <= nWorkers; i++ {
go Worker(i, jobs, results)
}

for _, value := range tasks {
jobs <- value
}
close(jobs)

for i := 0; i < len(tasks); i++ {
<-results
}
} else {

for _, v := range tasks {
val := Fibonacci(v)
fmt.Println("val->", val)
}

}

elapsed := time.Since(start)
fmt.Printf("Program took %s", elapsed)

}
``````

Si alguien no entiende puede tratar con este codigo, tienes unos prints y unos sleep para comprender mejor lo q pasa paso a paso

``````package main

import (
"fmt"
"time"
)

func Worker(id int, jobs <-chan int, results chan<- int) {
fmt.Println("Worker with id", id, "started")
for j := range jobs {
fmt.Printf("Worker with id %v started job %v\n", id, j)
results <- Fibonacci(j)
fmt.Printf("Worker with id %v finished job %v\n", id, j)
}
}

func Fibonacci(n int) int {
if n <= 1 {
return n
}
return Fibonacci(n-1) + Fibonacci(n-2)
}

func main() {
tasks := []int{7, 12, 15, 18, 21, 24, 27, 30, 33, 36, 39, 42}
for w := 1; w <= 3; w++ {
go Worker(w, jobs, results)
}
time.Sleep(3 * time.Second)
time.Sleep(1 * time.Second)
}
close(jobs)
for a := 1; a <= len(tasks); a++ {
fmt.Println(<-results)
}
}

``````

Basicamente un worker es la combinación de Buffered Channels, GoRoutines y se emplea el patrón de semáforos.

Es interesante saber como funciona la función recursiva para resolver el fibonacci, dejo link donde lo explican:
Fibonacci

``````package main

import "fmt"

// Worker is a function that does the work.
func Worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Println("worker", id, "started  job", j)
fib := Fibonnaci(j)
results <- fib
fmt.Println("worker", id, "finished job", j, "result", fib)
}
}

// Fibonacci returns the nth number in the Fibonacci sequence.
func Fibonnaci(n int) int {
if n < 2 {
return n
}
return Fibonnaci(n-1) + Fibonnaci(n-2)
}

func main() {
tasks := []int{2, 3, 4, 5, 7, 10, 12, 35, 37, 40, 41, 42}

nWorkers := 5

// start the workers
for w := 1; w <= nWorkers; w++ {
go Worker(w, jobs, results)
}

// give the workers jobs
for _, t := range tasks {
jobs <- t
}
close(jobs)

// get the results (consume the channel)
for a := 1; a <= len(tasks); a++ {
<-results
}
}

``````

Les dejo un ejemplo de este ejercicio usando WaitGroups, en lugar de un channel extra (results). El canal results no me parecio necesario asi que decidi borrarlo y tiene el mismo funcionamiento.

``````package main

import (
"fmt"
"sync"
)

var (
wg = sync.WaitGroup{}
)

func Worker(id int, jobs <-chan int) {
for job := range jobs {
fmt.Printf("Worker with id %d started fib with %d\n", id, job)
fib := Fibonacci(job)
fmt.Printf("Worker with id %d, job %d fib with %d\n", id, job, fib)
wg.Done()
}
}

func Fibonacci(n int) int {
if n <= 1 {
return n
}
return Fibonacci(n-1) + Fibonacci(n-2)
}

func main() {
tasks := []int{2, 3, 4, 5, 7, 10, 12, 40}
nWorkers := 3

for i := 0; i < nWorkers; i++ {
go Worker(i, jobs)
}

for _, value := range tasks {