Entendiendo el código
Introducción
CaracterÃsticas esenciales de Go
Qué aprenderás y qué necesitas saber
Repaso general: variables, condicionales, slices y map
Repaso general: GoRoutines y apuntadores
Programación orientada a objetos
¿Es Go orientado a objetos?
Structs vs. clases
Métodos y funciones
Constructores
Herencia
Interfaces
Aplicando interfaces con Abstract Factory
Implementación final de Abstract Factory
Funciones anónimas
Funciones variadicas y retornos con nombre
Go Modules
Cómo utilizar los Go modules
Creando nuestro módulo
Testing
Testing
Code coverage
Profiling
Testing usando Mocks
Implementando Mocks
Concurrencia
Unbuffered channels y buffered channels
Waitgroup
Buffered channels como semáforos
Definiendo channels de lectura y escritura
Worker pools
Multiplexación con Select y Case
Proyecto: servidor con worker pools
Definiendo workers, jobs y dispatchers
Creando web server para procesar jobs
Conclusión
Continúa con el Curso de Go Avanzado
Aún no tienes acceso a esta clase
Crea una cuenta y continúa viendo este curso
Aportes 5
Preguntas 3
Entendiendo el código
Aqui esta con un poco de comentarios:
package main
import (
"fmt"
"time"
)
// Job represents a job to be executed, with a name and a number and a delay
type Job struct {
Name string // name of the job
Delay time.Duration // delay between each job
Number int // number to calculate on the fibonacci sequence
}
// Worker will be our concurrency-friendly worker
type Worker struct {
Id int // id of the worker
JobQueue chan Job // Jobs to be processed
WorkerPool chan chan Job // Pool of workers
Quit chan bool // Quit worker
}
// Dispatcher is a dispatcher that will dispatch jobs to workers
type Dispatcher struct {
WorkerPool chan chan Job // Pool of workers
MaxWorkers int // Maximum number of workers
JobQueue chan Job // Jobs to be processed
}
// NewWorker returns a new Worker with the provided id and workerpool
func NewWorker(id int, workerPool chan chan Job) *Worker {
return &Worker{
Id: id,
WorkerPool: workerPool,
JobQueue: make(chan Job), // create a job queue
Quit: make(chan bool), // Channel to end jobs
}
}
// Start method starts all workers
func (w Worker) Start() {
go func() {
for {
w.WorkerPool <- w.JobQueue // add job to pool
// Multiplexing
select {
case job := <-w.JobQueue: // get job from queue
fmt.Printf("worker%d: started %s, %d\n", w.Id, job.Name, job.Number)
fib := Fibonacci(job.Number)
time.Sleep(job.Delay)
fmt.Printf("worker%d: finished %s, %d with result %d\n", w.Id, job.Name, job.Number, fib)
case <-w.Quit: // quit if worker is told to do so
fmt.Printf("Worker with id %d Stopped\n", w.Id)
return
}
}
}()
}
// Stop method stop the worker
func (w Worker) Stop() {
go func() {
w.Quit <- true
}()
}
// Fibonacci calculates the fibonacci sequence
func Fibonacci(n int) int {
if n <= 1 {
return n
}
return Fibonacci(n-1) + Fibonacci(n-2)
}
// NewDispatcher returns a new Dispatcher with the provided maxWorkers
func NewDispatcher(jobQueue chan Job, maxWorkers int) *Dispatcher {
pool := make(chan chan Job, maxWorkers)
return &Dispatcher{
WorkerPool: pool,
MaxWorkers: maxWorkers,
JobQueue: jobQueue,
}
}
// Dispatch will dispatch jobs to workers
func (d *Dispatcher) dispatch() {
for {
select {
case job := <-d.JobQueue: // get job from queue
// Asign the job to a worker
go func() {
jobChannel := <-d.WorkerPool // get worker from pool
jobChannel <- job // Workers will read from this channel
}()
}
}
}
func (d *Dispatcher) Run() {
for i := 0; i < d.MaxWorkers; i++ {
worker := NewWorker(i+1, d.WorkerPool)
worker.Start()
}
go d.dispatch()
}
lo unico que no me gusta es que va explicando a la velicidad de la luz
Un pequeño resumen de lo que pude entender, espero los ayude
package project
import (
"fmt"
"time"
)
// Estructura de la tareas de procesar
type Job struct {
Name string //Nombre de la tarea
Delay time.Duration //Tiempo de espera
Number int // Numero a procesar
}
type Worker struct {
Id int // id del Worker
JobQueue chan Job // Canal de tareas del worker
WorkerPool chan chan Job //Canal de canales de tareas, este canal se comparte entre todos los workers
QuitChan chan bool //Canal para parar al worker
}
type Dispatcher struct {
WorkerPool chan chan Job //Canal de canales de tareas, este se les pasa a cada worker nuevo
MaxWorkers int //cantidad maxima de workers
JobQueue chan Job //Canal de tareas, se puede ver como un canal global de tareas que despues se reparten entre workers
}
func NewWorker(id int, workerPool chan chan Job) *Worker {
return &Worker{
Id: id, //Se asigna un id
WorkerPool: workerPool, //Se le indica el canal donde tiene quie agregar su canal de tareas
JobQueue: make(chan Job), //Canal de tareas del worker
QuitChan: make(chan bool), //Canal para parar al worker
}
}
func (w Worker) Start() {
//Se inicia de manera concurrente un ciclo sin fin
go func() {
for {
//Al worker pool se manda el canal de worker, este se manda cada vez iteracion, es decir cuando el worker termino de hacer un jobs
w.WorkerPool <- w.JobQueue
//Se multiplexean los canales del worker
select {
case job := <-w.JobQueue:
//Si se recibe un job en el canal de tareas del worker se ejecuta
fmt.Printf("Worker with id %d Started\n", w.Id)
fib := Fibonacci(job.Number)
time.Sleep(job.Delay)
fmt.Printf("Worker with id %d Finishes with result %d\n", w.Id, fib)
case <-w.QuitChan:
//Si se recibe un job en el canal de salida se para el worker (lo sca del ciclo)
fmt.Printf("Worker with id %d Stopped\n", w.Id)
return
}
}
}()
}
//La funcion stop manda un true al canl de salida del worker
func (w Worker) Stop() {
go func() {
w.QuitChan <- true
}()
}
//El dispatcher cuenta con el el canal global de jobs y un canal de todos los canales de los workers
func NewDispatcher(jobQueue chan Job, maxWorkers int) *Dispatcher {
worker := make(chan chan Job, maxWorkers)
return &Dispatcher{
JobQueue: jobQueue,
MaxWorkers: maxWorkers,
WorkerPool: worker,
}
}
func (d *Dispatcher) Dispatch() {
//Inicia de manera indefinidad a mandar jobs a los canales que se van recibiendo en el canal de caneles de jobs
for {
select {
case job := <-d.JobQueue:
go func() {
workerJobQueue := <-d.WorkerPool
workerJobQueue <- job
}()
}
}
}
func Fibonacci(n int) int {
if n <= 1 {
return n
}
return Fibonacci(n-1) + Fibonacci(n-2)
}
La verdad es que hasta ahora venÃa bien el curso pero acá faltó un trabajo previo de explicación de qué es un Worker Pool y conceptualmente qué vamos a querer implementar antes de empezar a picar código.
¿Quieres ver más aportes, preguntas y respuestas de la comunidad? Crea una cuenta o inicia sesión.