Skip to content

Proposal: Low Level Client vs Managed Client with Subscription Handling #225

@JaggerJo

Description

@JaggerJo

🐛 Bug Report

The client currently exposes a list of Subscriptions.

When subscribing to a new topic, a subscription is added to the list. When unsubscribing the topic is removed.

There are certain issues with the current implementation, and I would argue that subscriptions should not be held in the client at all.

IMHO it would be best to have a low level client that does not try to do anything smart, and then add a ManagedClient that can focus on providing nice user features built on the solid low level client. It would also reduce the API surface of the low level client and simplify thread safety.

@pglombardo what do you think? I'm happy to chat about this is private, feel free to send me an e-mail.

Issues:

        var client =
            new HiveMQClient(
                new HiveMQClientOptionsBuilder()
                    .WithClientId("ConcurrentSubscribe")
                    .Build()
            );

        var inbox = new ConcurrentBag<string>();

        client.OnMessageReceived += (_, e) =>
        {
            inbox.Add(e.PublishMessage.Topic!);
        };

        var connectResult = await client.ConnectAsync();

        Assert.True(connectResult.ReasonCode == ConnAckReasonCode.Success);
        Assert.True(client.IsConnected());

        var subscribeResult = await client.SubscribeAsync("/test/#", QualityOfService.ExactlyOnceDelivery, false);

        Assert.True(subscribeResult.Subscriptions.Count == 1);
        Assert.True(subscribeResult.Subscriptions[0].SubscribeReasonCode == SubAckReasonCode.GrantedQoS2);

        Assert.Equal(1, client.Subscriptions.Count(s => s.TopicFilter.Topic == "/test/#"));

        await client.PublishAsync("/test/1", "whatever");

        while (!inbox.Contains("/test/1"))
        {
            await Task.Delay(50);
        }

        await client.DisconnectAsync();

        await Task.Delay(1000);

        var reconnectResult = await client.ConnectAsync();

        Assert.True(reconnectResult.ReasonCode == ConnAckReasonCode.Success);
        Assert.True(client.IsConnected());

        await client.PublishAsync("/test/2", "whatever");

        // client still contains subscription.
        Assert.True(client.Subscriptions.Any(s => s.TopicFilter.Topic == "/test/#"));

        // we never receive the message, because the subscription is not re-established
        while (!inbox.Contains("/test/2"))
        {
            await Task.Delay(50);
        }
  • Subscribing multiple times results in multiple subscriptions (for the same topic) in the subscription list. This also included the handler.
        var subscribeResultA = await client.SubscribeAsync("/test/#", QualityOfService.ExactlyOnceDelivery, false);

        Assert.True(subscribeResultA.Subscriptions.Count == 1);
        Assert.True(subscribeResultA.Subscriptions[0].SubscribeReasonCode == SubAckReasonCode.GrantedQoS2);

        var subscribeResultB = await client.SubscribeAsync("/test/#", QualityOfService.ExactlyOnceDelivery, false);

        Assert.True(subscribeResultB.Subscriptions.Count == 1);
        Assert.True(subscribeResultB.Subscriptions[0].SubscribeReasonCode == SubAckReasonCode.GrantedQoS2);

        Assert.Equal(2, client.Subscriptions.Count(s => s.TopicFilter.Topic == "/test/#"));
  • Subscription level message handlers burn up CPU time, even when not used. All Subscriptions are iterated and checked for a topic match using MatchTopic, which is quite expensive.

foreach (var subscription in this.Subscriptions)
{
if (packet.Message.Topic != null && MatchTopic(subscription.TopicFilter.Topic, packet.Message.Topic))
{
if (subscription.MessageReceivedHandler != null && subscription.MessageReceivedHandler.GetInvocationList().Length > 0)
{
// We have a per-subscription message handler.
_ = Task.Run(() => subscription.MessageReceivedHandler?.Invoke(this, eventArgs)).ContinueWith(
t =>
{
if (t.IsFaulted)
{
Logger.Error($"per-subscription MessageReceivedEventLauncher faulted ({packet.Message.Topic}): " + t.Exception?.Message);
}
},
TaskScheduler.Default);
messageHandled = true;
}
}
}

🔬 How To Reproduce

Archive.zip

Environment

Current master or latest Nuget

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions