AÃºn no tienes acceso a esta clase

Crea una cuenta y continÃºa viendo este curso

# Definiendo workers, jobs y dispatchers

28/30
Recursos

Aportes 5

Preguntas 3

Ordenar por:

Â¿Quieres ver mÃ¡s aportes, preguntas y respuestas de la comunidad? Crea una cuenta o inicia sesiÃ³n.

Entendiendo el cÃ³digo

Aqui esta con un poco de comentarios:

``````package main

import (
"fmt"
"time"
)

// Job represents a job to be executed, with a name and a number and a delay
type Job struct {
Name   string        // name of the job
Delay  time.Duration // delay between each job
Number int           // number to calculate on the fibonacci sequence
}

// Worker will be our concurrency-friendly worker
type Worker struct {
Id         int           // id of the worker
JobQueue   chan Job      // Jobs to be processed
WorkerPool chan chan Job // Pool of workers
Quit       chan bool     // Quit worker
}

// Dispatcher is a dispatcher that will dispatch jobs to workers
type Dispatcher struct {
WorkerPool chan chan Job // Pool of workers
MaxWorkers int           // Maximum number of workers
JobQueue   chan Job      // Jobs to be processed
}

// NewWorker returns a new Worker with the provided id and workerpool
func NewWorker(id int, workerPool chan chan Job) *Worker {
return &Worker{
Id:         id,
WorkerPool: workerPool,
JobQueue:   make(chan Job),  // create a job queue
Quit:       make(chan bool), // Channel to end jobs
}
}

// Start method starts all workers
func (w Worker) Start() {
go func() {
for {
w.WorkerPool <- w.JobQueue // add job to pool

// Multiplexing
select {
case job := <-w.JobQueue: // get job from queue
fmt.Printf("worker%d: started %s, %d\n", w.Id, job.Name, job.Number)
fib := Fibonacci(job.Number)
time.Sleep(job.Delay)
fmt.Printf("worker%d: finished %s, %d with result %d\n", w.Id, job.Name, job.Number, fib)
case <-w.Quit: // quit if worker is told to do so
fmt.Printf("Worker with id %d Stopped\n", w.Id)
return
}
}
}()
}

// Stop method stop the worker
func (w Worker) Stop() {
go func() {
w.Quit <- true
}()
}

// Fibonacci calculates the fibonacci sequence
func Fibonacci(n int) int {
if n <= 1 {
return n
}
return Fibonacci(n-1) + Fibonacci(n-2)
}

// NewDispatcher returns a new Dispatcher with the provided maxWorkers
func NewDispatcher(jobQueue chan Job, maxWorkers int) *Dispatcher {
pool := make(chan chan Job, maxWorkers)
return &Dispatcher{
WorkerPool: pool,
MaxWorkers: maxWorkers,
JobQueue:   jobQueue,
}
}

// Dispatch will dispatch jobs to workers
func (d *Dispatcher) dispatch() {
for {
select {
case job := <-d.JobQueue: // get job from queue
// Asign the job to a worker
go func() {
jobChannel := <-d.WorkerPool // get worker from pool
jobChannel <- job            // Workers will read from this channel
}()
}
}
}

func (d *Dispatcher) Run() {
for i := 0; i < d.MaxWorkers; i++ {
worker := NewWorker(i+1, d.WorkerPool)
worker.Start()
}

go d.dispatch()
}

``````

lo unico que no me gusta es que va explicando a la velicidad de la luz

Un pequeÃ±o resumen de lo que pude entender, espero los ayude

• El dispatcher recibe todos los jobs, se puede decir que es como el componente global
• Cada worker tiene su canal de jobs, y saben cual es el canal del disptacher, es decir el workerpool es el mismo canal para todos los workers.
• Cada worker esta enviando su canal al canal del dispatcher
• En la medida que el dispatcher recibe jobs este los va repartiendo entre los workers a travÃ©s de sus canales
``````package project

import (
"fmt"
"time"
)

// Estructura de la tareas de procesar
type Job struct {
Name   string        //Nombre de la tarea
Delay  time.Duration //Tiempo de espera
Number int           // Numero a procesar
}

type Worker struct {
Id         int           // id del Worker
JobQueue   chan Job      // Canal de tareas del worker
WorkerPool chan chan Job //Canal de canales de tareas, este canal se comparte entre todos los workers
QuitChan   chan bool     //Canal para parar al worker
}

type Dispatcher struct {
WorkerPool chan chan Job //Canal de canales de tareas, este se les pasa a cada worker nuevo
MaxWorkers int           //cantidad maxima de workers
JobQueue   chan Job      //Canal de tareas, se puede ver como un canal global de tareas que despues se reparten entre workers
}

func NewWorker(id int, workerPool chan chan Job) *Worker {
return &Worker{
Id:         id,              //Se asigna un id
WorkerPool: workerPool,      //Se le indica el canal donde tiene quie agregar su canal de tareas
JobQueue:   make(chan Job),  //Canal de tareas del worker
QuitChan:   make(chan bool), //Canal para parar al worker
}
}

func (w Worker) Start() {

//Se inicia de manera concurrente un ciclo sin fin
go func() {
for {

//Al worker pool se manda el canal de worker, este se manda cada vez iteracion, es decir cuando el worker termino de hacer un jobs
w.WorkerPool <- w.JobQueue

//Se multiplexean los canales del worker
select {
case job := <-w.JobQueue:
//Si se recibe un job en el canal de tareas del worker se ejecuta
fmt.Printf("Worker with id %d Started\n", w.Id)
fib := Fibonacci(job.Number)
time.Sleep(job.Delay)
fmt.Printf("Worker with id %d Finishes with result %d\n", w.Id, fib)

case <-w.QuitChan:
//Si se recibe un job en el canal de salida se para el worker (lo sca del ciclo)
fmt.Printf("Worker with id %d Stopped\n", w.Id)
return
}

}
}()
}

//La funcion stop manda un true al canl de salida del worker
func (w Worker) Stop() {
go func() {
w.QuitChan <- true
}()
}

//El dispatcher cuenta con el el canal global de jobs y un canal de todos los canales de los workers

func NewDispatcher(jobQueue chan Job, maxWorkers int) *Dispatcher {

worker := make(chan chan Job, maxWorkers)
return &Dispatcher{
JobQueue:   jobQueue,
MaxWorkers: maxWorkers,
WorkerPool: worker,
}
}

func (d *Dispatcher) Dispatch() {

//Inicia de manera indefinidad a mandar jobs a los canales que se van recibiendo en el canal de caneles de jobs
for {
select {
case job := <-d.JobQueue:
go func() {
workerJobQueue := <-d.WorkerPool
workerJobQueue <- job
}()
}
}
}

func Fibonacci(n int) int {
if n <= 1 {
return n
}

return Fibonacci(n-1) + Fibonacci(n-2)
}

``````

La verdad es que hasta ahora venÃ­a bien el curso pero acÃ¡ faltÃ³ un trabajo previo de explicaciÃ³n de quÃ© es un Worker Pool y conceptualmente quÃ© vamos a querer implementar antes de empezar a picar cÃ³digo.