Bidirectional gRPC streaming for Go

Disclaimer: This article is not about a core Go package or tool but gRPC.

gRPC provides support for implementing streaming endpoints as well as streaming support in their clients. Bidirectional streaming is useful if you want both server and client to be able to communicate to the other side independently in a full duplex fashion.

In this article, I will dig into how to use the streaming gRPC Go client to talk to a streaming API endpoint.

I am not expecting the readers to implement a server, hence I will use an existing service. Google has recently released the Cloud Speech API which allows its users to caption their audio input. Speech API also supports a bidirectional streaming endpoint where you can sent audio data continuously as you are waiting on more responses from the server on another incoming channel.

Initialize a client:

stream, err := speech.NewSpeechClient(conn).StreamingRecognize(ctx)
if err != nil {
    log.Fatal(err)
}

We want to pipe the stdin to the API as we are printing the results. Therefore, we will need two goroutines, one sending audio data to the service and another retrieving the results.

The program will read from os.Stdin into an intermediate buffer and will immediately push the buffer to the service.

go func() {
    // pipe stdin to the API
    buf := make([]byte, 1024)
    for {
        n, err := os.Stdin.Read(buf)
        if err == io.EOF {
            return // nothing else to pipe, kill this goroutine
        }
        if err != nil {
            // TODO: handle the error
            continue
        }
        if err = stream.Send(&speech.StreamingRecognizeRequest{
            StreamingRequest: &speech.StreamingRecognizeRequest_AudioContent{
                AudioContent: buf[:n],
            },
        }); err != nil {
            // TODO: handle the error
        }
    }
}()

At the same time, the program will start reading the responses in the main goroutine and print the captions as service pushes them:

for {
    resp, err := stream.Recv()
    if err == io.EOF {
        break
    }
    if err != nil {
        // TODO: handle the error
        continue
    }
    if resp.Error != nil {
        // TODO: handle the error
        continue
    }
    for _, result := range resp.Results {
        fmt.Printf("result: %+v\n", result)
    }
}

The full reference is living in a gist where you can learn more about the initializing of the gRPC connection and more.

Please note that the same pattern of sending and receiving can be applied to work with any gRPC bidirectional streaming client.