Skip to content

Callback-based gRPC client with C interface #963

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

cretz
Copy link
Member

@cretz cretz commented Jul 23, 2025

What was changed

  • Added temporal_client::callback_based module with a Tonic-compatible Tower service implementation that invokes a callback instead of making a network call
  • Added connect_no_namespace_with_service_override overload that accepts an optional callback service (and moved stuff from connect_no_namespace into there)
  • Adapted temporal_client::metrics::GrpcMetricSvc to work with channel or callback-based service
  • Added grpc_override_callback option in C bridge's ClientOptions that can be set with a C function for callback
  • Added supporting structures and methods to support C-based callbacks with a careful eye towards lifetimes
  • Added minimal test to confirm some behaviors

Missing/future features:

  • Cancellation support - There is currently no way to notify the callback implementer when a call needs to be canceled which is an important feature
  • Easy way to delegate to traditional Core client from inside callback - This is needed to be able to use the callback-based client as an interception mechanism for langs

@cretz cretz requested a review from a team as a code owner July 23, 2025 21:44
impl Service<http::Request<Body>> for GrpcMetricSvc {
type Response = http::Response<Body>;
type Error = tonic::transport::Error;
type Error = Box<dyn std::error::Error + Send + Sync>;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not believe changing this will cause any issues or serious performance concerns, but would like to have it double checked

Copy link
Member

@Sushisource Sushisource left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general looking good!

let req = GrpcRequest {
service: path_parts.next().unwrap_or_default(),
rpc: path_parts.next().unwrap_or_default(),
headers: &parts.headers,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why manipulate the body at all? If we pass through the compression flag and length, we can document that and allow the callback implementer to handle compression if they want

Copy link
Member Author

@cretz cretz Jul 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because most gRPC clients deal in protos, not full bodies with the extra 5 bytes in front. For callback implementers that are, say, delegating to their own in-language clients, those clients operate on protobuf bodies, not raw HTTP ones. I don't think we should ask them to carve up the body to get the proto out (or put it back). If there is a use case for needing to know the compression byte, we can add it, but we are in control of the client call so it will always be what we want anyways.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, what I'm saying is the callback client isn't really a pure gRPC client, because it may want to delegate to something that implements compression too. So seems to me we should just document that and leave it to the caller.

Copy link
Member Author

@cretz cretz Jul 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They can't enable compression, we'd have to with our Tonic client which is what builds this body. But there's no value for us to do so in this case since it's all in memory. If they want to delegate to something that implements compression, they can/should. They can wrap the whole proto bytes in pre-negotiated compression if they'd like. But from our in-memory perspective, we give them proto bytes, how that's represented on the in-memory wire via the few bits of Rust code between Tonic and this callback should have no user effect. It's up to us and it should always be 0 (no compression) IMO. The compression byte is about pre-negotiated compression algorithm with the server, which doesn't apply to in-memory nor should it.

Overall, this compression is just a boolean used by gRPC HTTP, but has no value for in-memory representation and will always be 0. It doesn't compress the bytes or even tell you the algorithm, it's just a note saying the negotiated compression with the server is in effect (there is no server here). This is unrelated to whether a user wants to use compression to their upstream implementation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aaa, ok that makes sense

Comment on lines +170 to +176
// We have to cast this to a literal pointer integer because we use spawn_blocking
// and Rust can't validate things in either of two approaches. The first approach,
// just moving the *mut to spawn_blocking closure, will not work because it is not
// send (even if you wrap it in a marked-send struct). The second, approach, moving
// the box to the closure and into_raw'ing it there won't work because Rust thinks
// the "req" param to spawn_blocking may outlive this closure even though we're
// confident in our oneshot use this will never happen.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense to me. AFAIK there is no safe way to express this (if you want to keep using spawn_blocking).

Because spawn_blocking by definition requires the possibility of using another thread, the Send requirement exists. Raw pointers are never send. The lifetime issue is also unsolvable: https://without.boats/blog/the-scoped-task-trilemma/

This explanation is great, but I also like a quick summary of // SAFETY: This is safe because the spawned task is guaranteed to be joined, and the box reclaimed, before this function exits

However, writing that - is it actually safe in the error case? Seems like we might need to double check there that the user did call the response callback, and free the pointer if they didn't.

Copy link
Member Author

@cretz cretz Jul 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Raw pointers are never send

Rustinomicon says you can cheat this (https://doc.rust-lang.org/nomicon/send-and-sync.html), but it still was not working for me when I tried due to other issues (but I have cheated like this before).

However, writing that - is it actually safe in the error case? Seems like we might need to double check there that the user did call the response callback, and free the pointer if they didn't.

If they didn't call the respond call, receiver.await never completes. And note, the only time the sender can ever be dropped is also in that same respond call.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right but the error can get returned before that await point

Copy link
Member Author

@cretz cretz Jul 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hrmm, so from my reading, in this situation spawn_blocking may return an error not caused by the user callback code panicking when 1) shutting down tokio runtime (is_cancelled) or 2) tokio cannot schedule the thread (a form of panic). Both should be rare, but I suppose technically possible. I will look into making an atomic free call on the error return there.

EDIT: Actually, I'm struggling to know whether it reached user code or not. We definitely don't want to free if it did right? What if user callback did the respond and then panicked (e.g. threw exception from their lang)? This would be the second free attempt but the memory would already be invalid so you can't check whether freed before. I can have some kind of send/arc bool I guess that I set just before invoking user callback so we know their code is responsible for freeing now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well that's the thing is you can't necessarily know (I agree it's very rare though). Having some bool flag that gets set when the response is called is what I was thinking.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants