-
Notifications
You must be signed in to change notification settings - Fork 17
Description
🐛 Bug Report
The client currently exposes a list of Subscriptions.
When subscribing to a new topic, a subscription is added to the list. When unsubscribing the topic is removed.
There are certain issues with the current implementation, and I would argue that subscriptions should not be held in the client at all.
IMHO it would be best to have a low level client that does not try to do anything smart, and then add a ManagedClient that can focus on providing nice user features built on the solid low level client. It would also reduce the API surface of the low level client and simplify thread safety.
@pglombardo what do you think? I'm happy to chat about this is private, feel free to send me an e-mail.
Issues:
- Access to the Subscription list is not synchronised
- Subscriptions are not cleared after a disconnect.
var client =
new HiveMQClient(
new HiveMQClientOptionsBuilder()
.WithClientId("ConcurrentSubscribe")
.Build()
);
var inbox = new ConcurrentBag<string>();
client.OnMessageReceived += (_, e) =>
{
inbox.Add(e.PublishMessage.Topic!);
};
var connectResult = await client.ConnectAsync();
Assert.True(connectResult.ReasonCode == ConnAckReasonCode.Success);
Assert.True(client.IsConnected());
var subscribeResult = await client.SubscribeAsync("/test/#", QualityOfService.ExactlyOnceDelivery, false);
Assert.True(subscribeResult.Subscriptions.Count == 1);
Assert.True(subscribeResult.Subscriptions[0].SubscribeReasonCode == SubAckReasonCode.GrantedQoS2);
Assert.Equal(1, client.Subscriptions.Count(s => s.TopicFilter.Topic == "/test/#"));
await client.PublishAsync("/test/1", "whatever");
while (!inbox.Contains("/test/1"))
{
await Task.Delay(50);
}
await client.DisconnectAsync();
await Task.Delay(1000);
var reconnectResult = await client.ConnectAsync();
Assert.True(reconnectResult.ReasonCode == ConnAckReasonCode.Success);
Assert.True(client.IsConnected());
await client.PublishAsync("/test/2", "whatever");
// client still contains subscription.
Assert.True(client.Subscriptions.Any(s => s.TopicFilter.Topic == "/test/#"));
// we never receive the message, because the subscription is not re-established
while (!inbox.Contains("/test/2"))
{
await Task.Delay(50);
}
- Subscribing multiple times results in multiple subscriptions (for the same topic) in the subscription list. This also included the handler.
var subscribeResultA = await client.SubscribeAsync("/test/#", QualityOfService.ExactlyOnceDelivery, false);
Assert.True(subscribeResultA.Subscriptions.Count == 1);
Assert.True(subscribeResultA.Subscriptions[0].SubscribeReasonCode == SubAckReasonCode.GrantedQoS2);
var subscribeResultB = await client.SubscribeAsync("/test/#", QualityOfService.ExactlyOnceDelivery, false);
Assert.True(subscribeResultB.Subscriptions.Count == 1);
Assert.True(subscribeResultB.Subscriptions[0].SubscribeReasonCode == SubAckReasonCode.GrantedQoS2);
Assert.Equal(2, client.Subscriptions.Count(s => s.TopicFilter.Topic == "/test/#"));
- Subscription level message handlers burn up CPU time, even when not used. All Subscriptions are iterated and checked for a topic match using MatchTopic, which is quite expensive.
hivemq-mqtt-client-dotnet/Source/HiveMQtt/Client/HiveMQClientEvents.cs
Lines 245 to 265 in b3700c3
foreach (var subscription in this.Subscriptions) | |
{ | |
if (packet.Message.Topic != null && MatchTopic(subscription.TopicFilter.Topic, packet.Message.Topic)) | |
{ | |
if (subscription.MessageReceivedHandler != null && subscription.MessageReceivedHandler.GetInvocationList().Length > 0) | |
{ | |
// We have a per-subscription message handler. | |
_ = Task.Run(() => subscription.MessageReceivedHandler?.Invoke(this, eventArgs)).ContinueWith( | |
t => | |
{ | |
if (t.IsFaulted) | |
{ | |
Logger.Error($"per-subscription MessageReceivedEventLauncher faulted ({packet.Message.Topic}): " + t.Exception?.Message); | |
} | |
}, | |
TaskScheduler.Default); | |
messageHandled = true; | |
} | |
} | |
} |
🔬 How To Reproduce
Environment
Current master or latest Nuget