En el fascinante mundo de la programación concurrente en Go, los channels con buffer juegan un papel crucial. Estos pueden ser empleados para limitar la cantidad de procesos concurrentes que se ejecutan al mismo tiempo, actuando como un "semáforo" que regula el flujo de ejecución de las goroutines. Con un buffer, puedes predeterminar cuántas goroutines pueden estar activas simultáneamente, asegurando así un control más fino y evitando bloqueos por exceso de carga.
¿Cómo se implementa un channel con buffer?
Para entender mejor el uso de channels con buffer, vamos a ver un ejemplo práctico. Primero, se comienza creando un canal para transmitir datos de tipo entero. Luego, se define un buffer que permite que el canal maneje múltiples datos a la vez antes de ser leídos.
Código de ejemplo
A continuación, se presenta un ejemplo en Go donde se configura un canal con buffer:
package main
import("fmt""sync""time")funcdoSomething(id int, wg *sync.WaitGroup, ch chanint){defer wg.Done() fmt.Printf("Inicio del proceso: %d\n", id) time.Sleep(4* time.Second) fmt.Printf("Fin del proceso: %d\n", id)<-ch
}funcmain(){var wg sync.WaitGroup
ch :=make(chanint,2)for i :=0; i <10; i++{ wg.Add(1) ch <-1godoSomething(i,&wg, ch)} wg.Wait()}
Explicación del código
Creación del canal: Se crea un canal ch con un buffer de tamaño 2. Esto significa que puede contener hasta dos elementos antes de que se necesite leer un valor para añadir otro.
Función doSomething: Esta simula un proceso que requiere cuatro segundos para completarse. Utiliza un channel para indicar cuándo una goroutine ha finalizado, liberando así el espacio ocupado en el buffer.
Control del flujo concurrente: Utilizando un ciclo for, se lanzan diez goroutines. Sin embargo, debido al buffer del canal, solo dos pueden ejecutarse simultáneamente. Las demás esperan su turno, entrando en ejecución únicamente cuando una goroutine libera un espacio en el buffer.
¿Cómo actúa el buffer como un semáforo?
El concepto de buffer se asimila al de un semáforo al controlar el ritmo de ejecución de las goroutines. En el código anterior:
Cada vez que una llamada a doSomething finaliza, se libera un elemento del canal (<-ch), permitiendo que una nueva goroutine inicie.
Al tener un buffer de tamaño 2, se garantiza que solo dos goroutines se ejecutan simultáneamente, brindando una forma eficiente de gestionar recursos.
¿Qué sucede si modificamos la capacidad del buffer?
Aumentar la capacidad del buffer permitiría un mayor número de procesos concurrentes. Por ejemplo, si el buffer se cambia a 5, hasta cinco goroutines podrían ejecutarse al mismo tiempo antes de que se requiera liberar un espacio.
Prueba con un buffer mayor
Veamos qué ocurre al aumentar el buffer:
ch :=make(chanint,5)
Al ejecutar el programa con esta configuración, se observan más goroutines iniciando y finalizando en paralelo. Esto es ideal cuando se desea un balance entre concurrencia y control.
¿Cuándo utilizar channels con buffer?
El uso de channels con buffer resulta vital en escenarios donde la carga de procesos concurrentes puede saturar el sistema:
Control de concurrencia: Limitar el número de goroutines concurrentes ayuda a prevenir sobrecargas y asegura un uso eficiente de recursos.
Trabajo en colas (job queues): Son útiles para gestionar la ejecución de trabajos, especialmente cuando estos se pueden acumular rápidamente.
Al comprender y aprovechar las ventajas de los channels con buffer, puedes optimizar la concurrencia en tus programas en Go, asegurando rendimiento y estabilidad. ¡Continúa aprendiendo y experimentando con estos conceptos para perfeccionar tu habilidad en la programación concurrente!
Me encantó como se explicó cerca del minuto 7:00 lo de los cupos de los canales. ¡Gracias por esmerarte tanto Néstor!
Muy buena explicacion por parte del profe, me quedo muy claro :)
Esto es brutal. Yo desarrollé un integrador que lee datos de Softland ERP y los inserta en Dynamics CRM. Utilicé C#. Resulta que el API de microsoft permite solo 39 llamadas (Creo que eran como 39) después de ahí me daba un error de muchas peticiones el famoso 429. Yo quería que todo fuera rápido. Desarrollé la llamada de cada entidad con task y me las arreglé para hacerlo en pararelo. Pero se fueron toddas las lineas de una sola vez, si habían 100 lineas las 100 intentaban integrarse a la vez, por supuesto, esto provocó que el sistema me obligara a esperar hasta 5 horas, por lo que en lugar de ganar tiempo obtuve un resultado opuesto. Para evitar este problema dividí el total de ejecuciones en paquetes de 20 llamadas, si hay 100 lineas por integrar, tomo 20, las integro y paso a las siguientes 20. Realizar esta segmentación me costó mucho. Tuve que crear un metodo que ahora manejara los task (forma de trabajar concurrencia en c#) y un contador. Con los channels simplemente creo un objeto y lo paso por parámetro. Impresionante. Quiero migrar en go el integrador, es un refactor dificil, no es conveniente. Es un deseo de un junior lo sé, pero como lo deseo!!
¿Y lo hiciste en estos 3 años?
Code with comments:
package main
import("fmt""sync""time")/*
traffic light.
this uses channels and waint groups to 1. execute only 2 doSmth() func
at a time and 2. be able to wait for all of them.
in order of execution it'll:
c := [][] -- two free spaces
c := [routine][] -- one free space
c := [rountine][routine] -- all occupied
c := [][routine] -- one free space
*/func doSmth(i int, wg *sync.WaitGroup, c chan int){ defer wg.Done() fmt.Printf("Id: %d -> started...\n", i) time.Sleep(time.Second*4) fmt.Printf("Id: %d -> finished...\n", i)<- c // frees the space for new routines}func main(){c:=make(chan int,2)// creates a buffered channel with a capacity of twovar wg sync.WaitGroup// creates wait groupfori:=0; i <10; i++{ c <-1// alocate a new "instance" in the free space wg.Add(1)// adds to the wait group go doSmth(i,&wg, c)} wg.Wait()}
package main
import("fmt""sync""time")func main(){var wg sync.WaitGroupc:=make(chan int,5)fori:=0; i <10; i++{ wg.Add(1) c <-1 go doSomething(i,&wg, c)} fmt.Println("Waiting for goroutines to finish") wg.Wait()}func doSomething(i int, wg *sync.WaitGroup, c chan int){ defer wg.Done() fmt.Printf("id: %d started \n", i) time.Sleep(4* time.Second) fmt.Printf("id: %d finished \n", i)<-c
}
Gracias Nestor por la explicación literalmente me abriste la mente.
Que clase tan útil!
Se me hace que este concepto será súper útil al implementar una virtual estate machine (para smart contracts por ejemplo)
Aquí el mismo ejemplo de la clase comentado. En este caso a diferencia de un unbuffered channel (que está vacío y necesita hacer el handshake de inmediato) no hay deadlock ya que un buffer channel se bloquea temporalmente cuando está lleno (Backpressure).
this is the correct output
~/Devspace/go/sun ❯ go run main.go 4s 01:44:36 PM
Doing something 1...
Doing something 0...
Done!
Done!
Doing something 2...
Doing something 3...
Done!
Doing something 4...
Done!
Doing something 5...
Done!
Doing something 6...
Done!
Doing something 7...
Done!
Doing something 8...
Done!
Doing something 9...
Done!
Done!
Consulte al respecto de un concepto que nuestro tutor ejemplifica en el minuto 8:42 aprox. y se refiere a:
Una cola de trabajos (job queue) puede implementarse tanto con el modelo de concurrencia integrado del lenguaje (gorutinas y canales) para casos sencillos,
// Job define el tipo de tarea a procesartype Jobfunc()// Worker implementa la lógica de un trabajadorfunc Worker(id int, jobs <-chan Job, wg *sync.WaitGroup){ defer wg.Done()forj:= range jobs { fmt.Printf("Worker %d: Iniciando trabajo...\n", id)j()// Ejecuta la tarea fmt.Printf("Worker %d: Trabajo finalizado.\n", id)}}// setupWorkers crea una cantidad de trabajadoresfunc setupWorkers(numWorkers int, jobs <-chan Job, wg *sync.WaitGroup){fori:=1; i <= numWorkers; i++{ wg.Add(1) go Worker(i, jobs, wg)}}func main(){const numJobs =10const numWorkers =3// Crea un canal para los trabajosjobs:=make(chan Job, numJobs)var wg sync.WaitGroupsetupWorkers(numWorkers, jobs,&wg)// Enviar trabajos al canalfori:=0; i < numJobs; i++{job:=func(id int)Job{returnfunc(){// Simula un trabajo time.Sleep(time.Duration(id)*100* time.Millisecond)}}(i) jobs <- job
}// Cierra el canal para indicar que no hay más trabajosclose(jobs)// Espera a que todos los trabajadores terminen wg.Wait() fmt.Println("Todos los trabajos han sido procesados.")}
como utilizando librerías externas para manejar necesidades más complejas como persistencia, reintentos y escalabilidad. Se pueden usar brokers de mensajes como RabbitMQ o Redis, o sistemas de bases de datos como PostgreSQL, para construir soluciones robustas para procesar tareas en segundo plano.
Aquí un ejemplo sencillo con RabbitMQ:
import("log""github.com/streadway/amqp")func main(){// Establecer conexión con RabbitMQ conn,err:= amqp.Dial("amqp://guest:guest@localhost:5672/")if err != nil { log.Fatalf("Error al conectar con RabbitMQ: %v", err)} defer conn.Close()// Crear un canal ch,err:= conn.Channel()if err != nil { log.Fatalf("Error al crear canal: %v", err)} defer ch.Close()// Declarar una cola q,err:= ch.QueueDeclare("tareas",// namefalse,// durablefalse,// delete when unusedfalse,// exclusivefalse,// no-wait nil,// arguments)if err != nil { log.Fatalf("Error al declarar la cola: %v", err)}// Publicar un mensaje (una tarea) err = ch.Publish("",// exchange q.Name,// routing keyfalse,// mandatoryfalse,// immediate amqp.Publishing{ContentType:"text/plain",Body:[]byte("Procesar datos de usuario X"),})if err != nil { log.Fatalf("Error al publicar mensaje: %v", err)} log.Printf("Mensaje enviado a la cola: %s","Procesar datos de usuario X")}
¿Qué diferencia hay entre crear el sync.WaitGroup que hacerlo solo con chanels?
package main
import("fmt""time")func doSmth(i int, c chan int){ fmt.Printf("Id: %d -> started...\n", i) time.Sleep(time.Second*4) fmt.Printf("Id: %d -> finished...\n", i)<- c // frees the space for new routines}func main(){c:=make(chan int,2)// creates a buffered channel with a capacity of twofori:=0; i <10; i++{ c <-1// alocate a new "instance" in the free space go doSmth(i, c)}}
Que aqui hay un pequeño bug, basicamente cuando se logren ejecutar las primeras 8 "doSomething", el programa saldra del loop y terminara- Haciendo que los id 8 y 9 no terminen
Si está este error D:
Cuál es la relación entre la cantidad de núcleos del procesador, la gorutines y los hilos?
Creo este enlace da detalle y respuesta a tu pregunta amigo... ¡Ánimos!
Clic
Si alguien ve éste comentario, cuenten, ¿qué ha sido de sus vidas?, ¿ya trabajan con go?
Si comento todo lo de WaitGroup sigue funcionando igual. ¿Alguien me puede explicar por qué pusimos los wg aquí?
El canal con buffer funciona como semáforo dejando pasar hasta 5 rutinas en este ejemplo. Pero el Wait es el que permite esperar a que las rutinas terminen. Si el Wait el hilo principal puede terminar antes y dejar las rutinas inconclusas. Podes probarlo con diferentes sleep(n) para las rutinas para ver que sin el Wait algunas no terminan su ejecución.
¿Entonces los go channels actúan como si fueran "hilos del procesador"'?
Hola Alejandro, creo que hay una pequeña confusión, se podría decir que las GoRoutines son "hilos del procesador", pero para ser más exactos un hilo puede contener muchas GoRoutines a la vez, es decir que si tranquilamente podría tener un hilo que esté ejecutando cientos de GoRoutines. Los Channels son los métodos de comunicación que existen entre estas GoRoutines, como su nombre lo dicen son un canal de comunicación.
Si actúan como los hilos pero no lo son.
Un hilo contine muchas go rututines
El concepto de buffered channels como semaforos me parece de bastante utilidad para poder delimitar el consumo de recursos relacionados a las gorutinas que se ejecuten. Excelente explicación! :D
Excelente explicacion! ahora queda mucho mejor el concepto y uso de canales y waitgroup :D
La utilidad de los channels como semaforos esta francamente bien y tiene multiples usos. Cada dia me gusta más go