Detect client context destruction from gRPC server
I have create an Async C++ gRPC server that offer several APIs similar with a signature similar to this:
service Foo {
rpc FunctionalityA(ARequest) returns (stream AResponse);
rpc FunctionalityB(BRequest) returns (stream BResponse);
}
The client creates one channel to connect to this service, and uses calls the various RPCs from separate threads, something like this:
class FooClient {
// ...
void FunctionalityA() {
auto stub = example::Foo::NewStub(m_channel);
grpc::ClientContext context;
example::ARequest request;
example::AResponse response;
auto reader = stub->FunctionalityA(&context, request);
for(int i = 0; i < 3; i++) {
reader->Read(&response);
}
}
void FunctionalityB() {
auto stub = example::Foo::NewStub(m_channel);
grpc::ClientContext context;
example::BRequest request;
example::BResponse response;
auto reader = stub->FunctionalityB(&context, request);
for(int i = 0; i < 3; i++) {
reader->Read(&response);
}
}
// ...
};
int main() {
// ...
FooClient client(grpc::CreateChannel("127.0.0.1:12345", grpc::InsecureChannelCredentials()));
auto ta = std::thread(&FooClient::FunctionalityA, &client);
auto tb = std::thread(&FooClient::FunctionalityB, &client);
// ...
}
I want to implement the server so that:
- when FunctionalityA is called, it start streaming objects of type AResponse
- when FunctionalityB is called, it start streaming objects of type BResponse
- when the context used to call FunctionalityA is cancelled, streaming of AResponse ends
- when the context used to call FunctionalityB is cancelled, streaming of BResponse ends
The problem I face is that even when the ClientContext associated with one of the two Functionalities goes out of scope (after the 3 reads in the example) the server does not receive any information and keeps writing, and the "ok" status remains true. The "ok" status goes to false and allows me to stop Writing only when the client disconnects.
Is this the intended behavior of gRPC? Does the client need to send a specific "kiss of death" message in order to inform the server to stop writing on the stream?
Here is an example of the implementation of a Functionality server side, for completeness:
void FunctionalityB::ProcessRequest(bool ok, RequestState state) {
if(!ok) {
if(state == RequestState::START) {
// the server has been Shutdown before this particular call got matched to an incoming RPC
delete this;
} else if(state == RequestState::WRITE || state == RequestState::FINISH) {
// not going to the wire because the call is already dead (i.e., canceled, deadline expired, other side dropped the channel, etc).
delete this;
} else {
// unhandled state
}
} else {
if(state == RequestState::START) {
// the RPC has indeed been started
m_writer.Write(m_response, CreateTag(RequestState::WRITE));
// the constructor of the functionality requests a new one to handle future new connections
new FunctionalityB(m_completion_queue, m_service, m_worker);
} else if(state == RequestState::WRITE) {
// TODO do some real work
std::this_thread::sleep_for(std::chrono::milliseconds(50));
m_writer.Write(m_response, CreateTag(RequestState::WRITE)); // this write will continue forever, even after client stops reading and TryCancel its context
} else if(state == RequestState::FINISH) {
delete this;
} else {
// unhandled state
}
}
}
Solution 1:
There are two ways to detect call cancellation on the server.
The first one is to check ServerContext::IsCancelled()
. That is something you can check right before you do a write, which in this case may be fine. In the general case, though, it may not be ideal, because your application might be waiting for some other event (other than the previous write completing) before it does another write, and you ideally want some async way of getting notified when the cancellation happens.
Which brings me to the second approach, which is to request an event on the completion queue when the call is cancelled by calling ServerContext::AsyncNotifyWhenDone()
before the RPC starts. This will give you async notification of the cancellation, but unfortunately, the API is very cumbersome and has a few sharp edges. (This is something that is handled much more cleanly in the new callback-based API, but that API isn't that performant in OSS until we finish the EventEngine effort.)
I hope this info is helpful.