RPC in Go: The Client Side

RPC in Go: The Client Side

Conceptually, each client maintains a connection to the server, on which, encoded requests and responses are transmitted, and a pending list, which maintains requests already sent out and waiting for responses.

In order to match responses with pending requests, every call is assigned a monotonically increasing sequential number. The pending list is in fact a mapping from the sequential number to a rpc.Call struct.

Every client has a goroutine that collects responses and match them with pending requests. If correctly matched, the goroutine notifies the caller about the completion and remove the matched request from the pending list.

The modification of the pending list is protected by mutex rpc.Call.mutex, thus avoids race condition causes by parallel request sending and response receiving.

The increment of the sequential number is protected by another mutex rpc.Call.sending, so concurrent sending does not make any confusion about the sequential number.

Establish the Connection

We can establish a TCP connection between the client and the server by calling rpc.Dial, which invokes net.Dial to establish a connection and calls rpc.NewClient to wrap up this connection as an RPC client, an rpc.Client struct.

We can also establish a TCP connection by using the CONNECT in HTTP protocol. This is done by calling rpc.DialHTTP, which invokes rpc.DialHTTPPath withDefaultRPCPath="/_goRPC"rpc.DialHTTPPath then invokes net.Dial to establish a connection and sends an HTTP request whose method is CONNECT and URI isDefaultRPCPath. The RPC server, which must be a HTTP server, should understand this request is to make a TCP connection for later RPC calls, it should keep that connection alive. Once the connection is established, a call to rpc.NewClient wraps up the connection by a client.

Encoding and Decoding

rpc.Client struct contains a member rpc.Client.codec with type rpc.ClientCodec, which wraps up the network connection. It encodes all requests and decodes all responses.

Client created by rpc.NewClient has a rpc.gobClientCodec codec, an implementation of the rpc.ClientCodec interface. It is also possible to specify another implementation by creating the client using rpc.NewClientWithCodec. The rpc.Dial* methods callrpc.NewClient, but rpc.NewClientWithCodec was called by jsonrpc.NewClient.

Indeed, rpc.NewClient invokes rpc.NewClientWithCodec, and the latter, before returning the client object, invokes method go client.input, which recieves respones of pending requests and notifies callers the completion of their calls.

  • Seems that there is not a CreateServerWithCodec on the server side. So how should I write a JSON RPC server?

Make Calls

Conceptually, every RPC call consists of the name of service and method, an argument and the reply. More than that an error might occur during the call and a done channel is used to notify the completion of the call. All these are described by the rpc.Call struct.

To make a call, we can all rpc.Client.Go, which requires the service/method name, the argument, the holder of reply, and the done channel. rpc.Client.Go encapsulate all these inputs into a rpc.Call struct, and gives it to the call to rpc.Client.send.

The rpc.Call struct has a method done, which, when invoked, notifies the completion of the call by sending the rpc.Call strucut itself to the done channel, which is of type chan *Call. As the done channel is provided by the caller, the caller is able to know the reply or the error once it reads an rpc.Call struct out from the channel.

Calling Patterns

The caller can provide a done channel to multiple RPC calls, and waits to read all responses from the channel. This can be very useful in some cases.

Consider that we have a bunch of downstream services, and it is OK if we get response from any of them. We can make calls like:

var clients []rpc.Client          // to the N=10 services.    
done := make(chan * rpc.Call, 10) // buffer size >=10 to avoid unnecessary blocking.
for _, c := range clients {  // make calls to all these instances.
  c.Go("AService", arg, reply, done)
}
call := <- done  // blocks until any instance replies.

Another common case is to collect information from a bunch of servers in order to build a request for the next stage of RPC invocation. This can be done by changing the last line of above code:

var req ARequest
for _, call := range done {
  if call.Error != nil {
    log.Fatal(call.Error)
  }
  req.arg[call.ServiceMethod] = call.Reply
}
// Make another RPC call with req as the argument.

Sending Request

The method rpc.Client.send is a critical section protected by mutexrpc.Client.sending. The method checks if the client is currently under closing or had been shutdown. If so, it calls call.done to finish the call before transimitting it over the network connection.

Otherwise, it assign a new sequential number to the call. This sequential number is used as the key when it adds the call to the pending list. Then the sequential number, together with service/method name and the argument are written to the server by callingclient.codec.WriteRequest.

Receiving Response

The goroutine created by rpc.NewClientWithCodec (which is invoked by rpc.NewClient) runs rpc.Client.input is in charge of collecting responses and matching responses with pending requests.

The response consists of a header and a body. If errors occur during decoding the header,rpc.Client.input terminates itself. Otherwise, the matched call is removed from the pending list.

Otherwise, if the response has no matching pending request, there might had been something wrong with the call to rpc.ClientCodec.WriteRequest, and the body should be read the discarded. Or, if the response header notifies some errors on the server, the body should also be read and discarded. Only when everything is alright, the body is read and decoded intocall.Reply.

rpc.Client.input correctly invokes call.Done with either an error from the server or with the reply.

Client Codec

Above procedure also explains why the interface rpc.ClientCodec interface contains the following methods:

  • WriteRequest(r *Request, body interface{}) error
  • ReadResponseHeader(r *Response) error
  • ReadResponseBody(body interface{}) error
  • Close

Method WriteRequest encodes and writes rpc.Request, which contains the sequential number and service/method name, and the argument (as body).

Method ReadResponseHeader reads and decodes rpc.Response, which contains the sequential number, service/method name and a possible error.

Method ReadResponseBody reads and decodes the reply.