-
Notifications
You must be signed in to change notification settings - Fork 90
Callback-based gRPC client with C interface #963
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
impl Service<http::Request<Body>> for GrpcMetricSvc { | ||
type Response = http::Response<Body>; | ||
type Error = tonic::transport::Error; | ||
type Error = Box<dyn std::error::Error + Send + Sync>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not believe changing this will cause any issues or serious performance concerns, but would like to have it double checked
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general looking good!
let req = GrpcRequest { | ||
service: path_parts.next().unwrap_or_default(), | ||
rpc: path_parts.next().unwrap_or_default(), | ||
headers: &parts.headers, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why manipulate the body at all? If we pass through the compression flag and length, we can document that and allow the callback implementer to handle compression if they want
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because most gRPC clients deal in protos, not full bodies with the extra 5 bytes in front. For callback implementers that are, say, delegating to their own in-language clients, those clients operate on protobuf bodies, not raw HTTP ones. I don't think we should ask them to carve up the body to get the proto out (or put it back). If there is a use case for needing to know the compression byte, we can add it, but we are in control of the client call so it will always be what we want anyways.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, what I'm saying is the callback client isn't really a pure gRPC client, because it may want to delegate to something that implements compression too. So seems to me we should just document that and leave it to the caller.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They can't enable compression, we'd have to with our Tonic client which is what builds this body. But there's no value for us to do so in this case since it's all in memory. If they want to delegate to something that implements compression, they can/should. They can wrap the whole proto bytes in pre-negotiated compression if they'd like. But from our in-memory perspective, we give them proto bytes, how that's represented on the in-memory wire via the few bits of Rust code between Tonic and this callback should have no user effect. It's up to us and it should always be 0 (no compression) IMO. The compression byte is about pre-negotiated compression algorithm with the server, which doesn't apply to in-memory nor should it.
Overall, this compression is just a boolean used by gRPC HTTP, but has no value for in-memory representation and will always be 0. It doesn't compress the bytes or even tell you the algorithm, it's just a note saying the negotiated compression with the server is in effect (there is no server here). This is unrelated to whether a user wants to use compression to their upstream implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aaa, ok that makes sense
// We have to cast this to a literal pointer integer because we use spawn_blocking | ||
// and Rust can't validate things in either of two approaches. The first approach, | ||
// just moving the *mut to spawn_blocking closure, will not work because it is not | ||
// send (even if you wrap it in a marked-send struct). The second, approach, moving | ||
// the box to the closure and into_raw'ing it there won't work because Rust thinks | ||
// the "req" param to spawn_blocking may outlive this closure even though we're | ||
// confident in our oneshot use this will never happen. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes sense to me. AFAIK there is no safe way to express this (if you want to keep using spawn_blocking
).
Because spawn_blocking by definition requires the possibility of using another thread, the Send requirement exists. Raw pointers are never send. The lifetime issue is also unsolvable: https://without.boats/blog/the-scoped-task-trilemma/
This explanation is great, but I also like a quick summary of // SAFETY: This is safe because the spawned task is guaranteed to be joined, and the box reclaimed, before this function exits
However, writing that - is it actually safe in the error case? Seems like we might need to double check there that the user did call the response callback, and free the pointer if they didn't.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Raw pointers are never send
Rustinomicon says you can cheat this (https://doc.rust-lang.org/nomicon/send-and-sync.html), but it still was not working for me when I tried due to other issues (but I have cheated like this before).
However, writing that - is it actually safe in the error case? Seems like we might need to double check there that the user did call the response callback, and free the pointer if they didn't.
If they didn't call the respond call, receiver.await
never completes. And note, the only time the sender can ever be dropped is also in that same respond call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right but the error can get returned before that await point
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hrmm, so from my reading, in this situation spawn_blocking
may return an error not caused by the user callback code panicking when 1) shutting down tokio runtime (is_cancelled) or 2) tokio cannot schedule the thread (a form of panic). Both should be rare, but I suppose technically possible. I will look into making an atomic free call on the error return there.
EDIT: Actually, I'm struggling to know whether it reached user code or not. We definitely don't want to free if it did right? What if user callback did the respond and then panicked (e.g. threw exception from their lang)? This would be the second free attempt but the memory would already be invalid so you can't check whether freed before. I can have some kind of send/arc bool I guess that I set just before invoking user callback so we know their code is responsible for freeing now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well that's the thing is you can't necessarily know (I agree it's very rare though). Having some bool flag that gets set when the response is called is what I was thinking.
What was changed
temporal_client::callback_based
module with a Tonic-compatible Tower service implementation that invokes a callback instead of making a network callconnect_no_namespace_with_service_override
overload that accepts an optional callback service (and moved stuff fromconnect_no_namespace
into there)temporal_client::metrics::GrpcMetricSvc
to work with channel or callback-based servicegrpc_override_callback
option in C bridge'sClientOptions
that can be set with a C function for callbackMissing/future features: