¿Qué es Workerpool en Go y cómo puede transformar tus proyectos?
El mundo de la programación es vasto y está lleno de paradigmas interesantes. Uno de los conceptos más poderosos en el lenguaje de programación Go es el de workerpool, que permite la ejecución concurrente de tareas específicas por múltiples "trabajadores" o workers. Por tanto, es esencial entender cómo implementar esto en tus proyectos para optimizar tareas que demandan tiempo. ¡Veamos cómo funciona en Go!
¿Cómo se inicia un proyecto de workerpool en Go?
El primer paso es crear un archivo en Go, por ejemplo, wp.go. Dicho archivo comenzará como la mayoría de los programas en Go, definiendo el paquete con la clausula package main. Luego, se definirá una función para el cálculo de la serie de Fibonacci. Esta función, que utiliza una implementación recursiva, será la pieza central del trabajo que realizarán los trabajadores.
funcfibonacci(n int)int{if n <=1{return n
}returnfibonacci(n-1)+fibonacci(n-2)}
¿Cómo se crea un worker en Go?
Un worker es una función que recibe tareas (jobs) a través de un canal, las procesa y envía el resultado a través de otro canal. Es fundamental asignar un ID a cada worker para identificar qué trabajador realiza cada tarea.
funcworker(id int, jobs <-chanint, results chan<-int){for job :=range jobs { fmt.Printf("Worker %d started job %d\n", id, job) results <-fibonacci(job) fmt.Printf("Worker %d finished job %d\n", id, job)}}
¿Cómo implementar el workerpool en la función principal?
La función main es donde se coordina todo el proceso: definir las tareas, crear los canales de comunicación, iniciar los workers y manejar los resultados.
Definir las tareas: Se utiliza un slice para almacenar los números a los que se les aplicará la serie de Fibonacci.
Configurar los workers: Crear varios workers, asignándoles un ID y pasando los canales de tareas y resultados.
Manejar los canales: Asignar las tareas a los workers a través de un canal y recibir los resultados por otro.
funcmain(){ tasks :=[]int{2,3,4,5,7,10,12,40} numOfWorkers :=3 jobs :=make(chanint,len(tasks)) results :=make(chanint,len(tasks))for w :=1; w <= numOfWorkers; w++{goworker(w, jobs, results)}for_, task :=range tasks { jobs <- task
}close(jobs)for i :=0; i <len(tasks); i++{<-results
}}
¿Cómo optimiza el uso de go rutinas y canales en Go?
La magia de Go se encuentra en su capacidad para manejar la concurrencia de manera eficiente y sencilla. Al utilizar go rutinas y canales de comunicación, es posible crear programas altamente concurrentes sin la complejidad que otros lenguajes requieren. Cada worker es manejado como una gorutina, procesando tareas de manera concurrente y colaborando mediante canales de comunicación.
Esta estructura no solo mejora el tiempo de ejecución de tareas pesadas, como el cálculo de la serie de Fibonacci para grandes números, sino que también garantiza que los recursos del sistema se usen de manera eficiente.
Con esta implementación, aumentamos nuestra capacidad de procesar tareas concurrentemente, algo vital para aplicaciones que requieren manejar cientos o miles de acciones simultáneamente.
Explorar y utilizar workerpools en Go abre la puerta a automáticamente escalar el procesamiento de tareas y es una manera efectiva de manejar cargas de trabajo masivas. Te animo a que profundices más en este tema y sigas experimentando con lo que Go tiene para ofrecer en programación concurrente. ¡La práctica te permitirá dominar estas herramientas poderosas!
Escribí unas modificaciones del código visto en clase para observar realmente cómo es que las gorutines y los workers funcionan a gran escala (o bueno eso creo), el código al final de su ejecución dice cuánto tiempo tomó y en mi caso, realizando las series de Fibonacci con los workers y gorutines tomó 9.544643109 segundos, de la forma convencional tomó 18.054724068 les dejo el código por si alguien quiere probarlo, sólo deben cambiar el valor de la constante withGoRutinesAndWorkers para hacer la ejecución de una forma u otra.
Código:
package main
import("fmt""time")func Worker(id int, jobs <-chan int, results chan<- int){forjob:= range jobs { fmt.Printf("Worker with id %d started fib with %d\n", id, job)fib:=Fibonacci(job) fmt.Printf("Worker with id %d has finished the job %d, and fib %d\n", id, job, fib) results <- fib
}}func Fibonacci(n int) int {if n <=1{return n
}returnFibonacci(n-1)+Fibonacci(n-2)}func main(){start:= time.Now()tasks:=[]int{2,34,36,17,10,40,6,22,41,44,33,2,5,2,34,36,17,10,40,6,22,41,44,33,2,5,17,10,40,6,22,2,5,2,34,36,22,41,44}const withGoRutinesAndWorkers =falseif withGoRutinesAndWorkers {nWorkers:=12jobs:=make(chan int,len(tasks))results:=make(chan int,len(tasks))fori:=0; i <= nWorkers; i++{ go Worker(i, jobs, results)}for _,value:= range tasks { jobs <- value
}close(jobs)fori:=0; i <len(tasks); i++{<-results
}}else{for _,v:= range tasks {val:=Fibonacci(v) fmt.Println("val->", val)}}elapsed:= time.Since(start) fmt.Printf("Program took %s", elapsed)}
Worker pools are a model in which a fixed number of m workers (implemented in Go with goroutines) work their way through n tasks in a work queue (implemented in Go with a channel). Work stays in a queue until a worker finishes up its current task and pulls a new one off.
Basically there are a set amount of workers and n tasks. Every worker takes a task, and when it's done it takes another one. This is very useful for breakable task that require a lot of processing.
This is an example program:
package main
import"fmt"funcWorker(id int, jobs <-chanint, results chan<-int){for job :=range jobs { fmt.Printf("--> Worker #%d started fib with %d\n", id, job) fib :=Fibo(job) fmt.Printf("Worker #%d finished. Job: %d; Fibo: %d\n", id, job, fib) results <- fib
}}funcFibo(n int)int{if n <=1{return n
}returnFibo(n -1)+Fibo(n -2)}funcmain(){ tasks :=[]int{2,3,4,7,10,15} nWorkers :=3 jobs :=make(chanint,len(tasks)) results :=make(chanint,len(tasks))// creates the workersfor i :=0; i < nWorkers; i++{goWorker(i, jobs, results)}// sends them to workfor_, v :=range tasks { jobs <- v
}// close channel to indicate that's all the work to doclose(jobs)for i :=0; i <len(tasks); i++{<- results
}}
Explanation
Excellent annotation, thanks
Basicamente un worker es la combinación de Buffered Channels, GoRoutines y se emplea el patrón de semáforos.
Si alguien no entiende puede tratar con este codigo, tienes unos prints y unos sleep para comprender mejor lo q pasa paso a paso
package main
import("fmt""time")func Worker(id int, jobs <-chan int, results chan<- int){ fmt.Println("Worker with id", id,"started")forj:= range jobs { fmt.Printf("Worker with id %v started job %v\n", id, j) results <-Fibonacci(j) fmt.Printf("Worker with id %v finished job %v\n", id, j)}}func Fibonacci(n int) int {if n <=1{return n
}returnFibonacci(n-1)+Fibonacci(n-2)}func main(){tasks:=[]int{7,12,15,18,21,24,27,30,33,36,39,42}jobs:=make(chan int,len(tasks))results:=make(chan int,len(tasks))forw:=1; w <=3; w++{ go Worker(w, jobs, results)} time.Sleep(3* time.Second)for _,task:= range tasks {println("Sending task", task) jobs <- task
time.Sleep(1* time.Second)}close(jobs)fora:=1; a <=len(tasks); a++{ fmt.Println(<-results)}}
buenas no me queda claro el uso del result,? se que es necesario pero no tengo claro aun el porque?
Por lo que entiendo es más que todo para el control de la concurrencia y su salida.
El de jobs se encarga de la asignación de job a los Workers a disposición en Worker(...), y el de results para mostrar/terminar el job que realizó determinado worker. Sin este canal, al realizar el Worker(...) en la línea 13 se bloquearía el programa.
Les dejo un ejemplo de este ejercicio usando WaitGroups, en lugar de un channel extra (results). El canal results no me parecio necesario asi que decidi borrarlo y tiene el mismo funcionamiento.
package main
import("fmt""sync")var( wg = sync.WaitGroup{})func Worker(id int, jobs <-chan int){forjob:= range jobs { fmt.Printf("Worker with id %d started fib with %d\n", id, job)fib:=Fibonacci(job) fmt.Printf("Worker with id %d, job %d fib with %d\n", id, job, fib) wg.Done()}}func Fibonacci(n int) int {if n <=1{return n
}returnFibonacci(n-1)+Fibonacci(n-2)}func main(){tasks:=[]int{2,3,4,5,7,10,12,40}nWorkers:=3jobs:=make(chan int,len(tasks))fori:=0; i < nWorkers; i++{ go Worker(i, jobs)}for _,value:= range tasks { wg.Add(1) jobs <- value
}close(jobs) wg.Wait()}
Hola, aunque el orden no importe, no tiene más sentido para la lógica del script que primero hagamos el for loop de agregar los valores de tasks dentro del canal de jobs antes de hacer el for loop de iniciar los workers ?
for _,value:= range tasks { jobs <- value
}close(jobs)fori:=0; i < nWorkers; i++{ go Worker(i, jobs, results)}
Hola Slissto, una pregunta muy interesante, en este ejemplo en específico podemos alterar ese orden porque estamos usando un channel con el buffer exactamente igual a la longitud de tareas, pero ahora quiero que te imagines un caso donde no sabes con cuantas tareas vas a contar o que por algún motivo supere el tamaño del buffer del canal que estás usando, si implementaramos primero que el channel envía los valores de las tasks lo que ocurrirá muy probablemente es un deadlock porque recuerda que si llegamos a la capacidad máxima del buffer de un channel, tiene que haber algo que esté leyendo de allí para evitar ese bloqueo que te he mencionado.
// Tenemos 20 tareas que ejecutar
tasks := []int{Slice Con 20 elementos}
// Pero creamos un canal con la capacidad de la mitad de las tareas, es decir 10
jobs := make(chan int, len(tasks) /2)
...
...
for _, value := range tasks { //Esto va a tener 20 iteraciones, pero nuestro channel solo tiene capacidad para 10
jobs <- value // Cuando vaya por la iteración 11 va a haber problemas porque el channel está lleno y no hay nadie quien esté leyendo y procesando las tasks
}
for i := 0; i < nWorkers; i++ {
go Worker(i, jobs, results) // Por más seguridad es mejor poner esto de primero.
}
¿Cómo se puede ver el flujo en que los canales se ejecutan?
No queda muy claro la manera en que se va leyendo cada 'buffer' del canal por el proceso Worker.
existe alguna otra referencia de este ejemplo específico?
Hola Luis, podrías compartir el fragmento específico de código que te genera esta confusión para poderte ayudar mejor?
Hola Nestor, gracias por responder.
Cuando creamos una subrutina (Worker), esta subrutina recorre todo el canal de entrada cada vez que se ejecuta. Por ejemplo el worker0 solo ejecuta al mismo tiempo los que le permite el canal de entrada jobs que podrían ser todos los jobs .
En qué momento las rutinas worker1 y worker2 entran en acción? Y cómo se libera el espacio cada espacio jobs si como mencionas en la pregunta de arriba reduces el tamaño del canal a algo menos a la cantidad de tareas?
Es interesante saber como funciona la función recursiva para resolver el fibonacci, dejo link donde lo explican:
Fibonacci
package main
import"fmt"// Worker is a function that does the work.func Worker(id int, jobs <-chan int, results chan<- int){forj:= range jobs { fmt.Println("worker", id,"started job", j)fib:=Fibonnaci(j) results <- fib
fmt.Println("worker", id,"finished job", j,"result", fib)}}// Fibonacci returns the nth number in the Fibonacci sequence.func Fibonnaci(n int) int {if n <2{return n
}returnFibonnaci(n-1)+Fibonnaci(n-2)}func main(){// tasks to dotasks:=[]int{2,3,4,5,7,10,12,35,37,40,41,42}nWorkers:=5jobs:=make(chan int,len(tasks))results:=make(chan int,len(tasks))// start the workersforw:=1; w <= nWorkers; w++{ go Worker(w, jobs, results)}// give the workers jobsfor _,t:= range tasks { jobs <- t
}close(jobs)// get the results (consume the channel)fora:=1; a <=len(tasks); a++{<-results
}}
jobs es un work queue, no un semáforo.
- En un semáforo, el valor enviado/recibido es un "token" sin significado y se usa como acquire/release simétrico.
- Aquí, cada int es el trabajo (payload). El channel conecta producer -> workers.
El buffer de jobs solo controla cuanto puede encolar main sin bloquea, si se llena, el send bloquea (backpressure) sobre el producer.
La concurrencia real la limita nWorkers
Pregunta, pueden haber 2 worker que lean en forma simultanea el channel? o existe un mecanismo safe para que solamente el valor de un channel sea leido en un lugar a la vez?
Se le puede dar más prioridad a uno de los workers para que tome más trabajos en caso de escuchar el canal de jobs al mismo tiempo que otros workers?
por que no se pasa como referencia usando '&' ?
Estaba viendo ¿Por qué en los resultados sólo hay dos IDs? Y resulta que es por que en la condición del “for” puso “i<3” por lo que solo se crearon 2 workers y no 3