Streaming bidireccional en gRPC con Go

Resumen

El streaming bidireccional con gRPC te permite que cliente y servidor envíen y reciban datos al mismo tiempo a través de un único canal. Aquí aprendes a construir una función TakeTest en Go donde el servidor envía preguntas y el cliente responde, todo dentro del mismo flujo. Es ideal si trabajas en backend y quieres dominar comunicación en tiempo real.

¿Qué es el streaming bidireccional en gRPC?

Es una técnica que combina el streaming del lado del cliente y del lado del servidor en un solo canal full duplex. Eso significa que ambas partes pueden enviar y recibir mensajes simultáneamente sin abrir conexiones distintas.

¿Qué significa full duplex en gRPC? Es la capacidad de un mismo stream para enviar y recibir datos al mismo tiempo, sin necesidad de abrir dos canales separados.

En el caso práctico que vamos a construir, el servidor enviará las preguntas de un test y el cliente responderá cada una. Todo viaja por el mismo stream [00:14].

¿Cómo defino un RPC bidireccional en el archivo proto?

Lo primero es declarar el contrato en el archivo .proto dentro de la carpeta test.pb. Aquí marcas que tanto el parámetro de entrada como el de salida son streams.

proto rpc TakeTest(stream TakeTestRequest) returns (stream Question);

message TakeTestRequest { string answer = 1; }

La palabra clave stream aparece en ambos lados: en el request y en el response. Eso es lo que le indica a gRPC que la lectura y la escritura ocurrirán a través de data streaming [01:05]. El mensaje TakeTestRequest lleva un único campo answer, que es la respuesta que el cliente envía desde el front.

¿Cómo agrego la consulta de preguntas al repositorio?

Necesitas una función que devuelva las preguntas asociadas a un test específico. Esto se hace en dos capas: la interfaz del repositorio y la implementación concreta en PostgreSQL.

Interfaz abstracta del repositorio

En repository.go añades el método a la interfaz:

go GetQuestionsPerTest(ctx context.Context, testID string) ([]*models.Question, error)

Luego, en la capa abstracta, simplemente delegas la llamada a implementation.GetQuestionsPerTest. Es un patrón limpio que separa el contrato de la implementación [02:30].

Implementación concreta en PostgreSQL

Dentro de database/postgres.go creas la función sobre el tipo PostgresRepository. La consulta selecciona el id y la question desde la tabla questions filtrando por test_id.

go func (repo *PostgresRepository) GetQuestionsPerTest(ctx context.Context, testID string) ([]*models.Question, error) { rows, err := repo.db.QueryContext(ctx, "SELECT id, question FROM questions WHERE test_id = $1", testID) if err != nil { return nil, err } defer func() { if err := rows.Close(); err != nil { log.Fatal(err) } }()

var questions []*models.Question for rows.Next() { var question = models.Question{} if err = rows.Scan(&question.Id, &question.Question); err == nil { questions = append(questions, &question) } } if err = rows.Err(); err != nil { return nil, err } return questions, nil

}

Usa defer con una función anónima para cerrar los rows y captura cualquier error de cierre con log.Fatal. El rows.Scan mapea las columnas a la estructura models.Question [03:50].

¿Cómo implemento TakeTest en el servidor gRPC?

En server/test.go defines la función TakeTest sobre el TestServer. Y aquí viene lo interesante: recibes un solo parámetro stream, no dos.

go func (s *TestServer) TakeTest(stream testpb.TestService_TakeTestServer) error { questions, err := s.repo.GetQuestionsPerTest(context.Background(), "t1") if err != nil { return err }

i := 0 var currentQuestion = &models.Question{} for { if i < len(questions) { currentQuestion = questions[i] } if i <= len(questions) { questionToSend := &testpb.Question{ Id: currentQuestion.Id, Question: currentQuestion.Question, } if err := stream.Send(questionToSend); err != nil { return err } i++ } answer, err := stream.Recv() if err == io.EOF { return nil } if err != nil { return err } log.Println("Answer received:", answer.Answer) }

}

Antes de que el autocompletado funcione, debes recompilar los archivos .proto con el comando que generaste el paquete de Go. Tras compilar, el tipo TestService_TakeTestServer queda disponible con sus métodos Send y Recv, las dos piezas que necesita el streaming bidireccional [06:20].

¿Cuál es la diferencia entre Send y Recv en gRPC? Send enruta un mensaje hacia el otro extremo del stream; Recv lee un mensaje entrante. Ambos operan sobre el mismo objeto stream.

¿Por qué uso una variable contadora?

La variable i controla qué pregunta toca enviar y evita que mandes más preguntas de las que hay en la base de datos. La variable currentQuestion guarda la pregunta activa que se enviará al cliente en cada iteración del bucle. La validación i < len(questions) impide accesos fuera de rango.

¿Cómo manejo el fin del stream?

Cuando stream.Recv() devuelve io.EOF, significa que el cliente cerró su lado del canal. Ahí retornas nil para terminar TakeTest limpiamente. Cualquier otro error se propaga como retorno de la función [09:15].

Puntos clave del streaming bidireccional

Lo que distingue a esta técnica:

  • Un único objeto stream maneja envío y recepción.
  • El servidor empuja preguntas con stream.Send y lee respuestas con stream.Recv.
  • El cliente hace lo mismo en sentido contrario.
  • El cierre se detecta con io.EOF, no con timeouts.

Es full duplex puro: cada extremo decide cuándo hablar y cuándo escuchar. ¿Te animas a hacer configurable el testID en lugar de dejarlo fijo en "t1"? Cuéntame en los comentarios cómo lo resolverías.