Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 1 addition & 11 deletions .github/workflows/cd-pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ jobs:
publish:
name: publish package to pypi
runs-on: ubuntu-latest
needs: test_publish_package
permissions:
id-token: write # IMPORTANT: mandatory for trusted publishing
steps:
Expand All @@ -59,14 +60,3 @@ jobs:
with:
user: __token__
password: ${{ secrets.PYPI_API_TOKEN }}

publish_docs:
name: trigger docs pipeline
runs-on: ubuntu-latest
steps:
- name: trigger downstream workflow
uses: peter-evans/repository-dispatch@v3
with:
token: ${{ secrets.PUBLISH_DOCS_TOKEN }}
repository: hololinked-dev/docs
event-type: trigger-downstream
18 changes: 9 additions & 9 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ Partial contributions are also taken if its easier to continue working on it. In
To start developing and complete the contribution, following steps may be followed:

1. Fork the repository and clone it to your local machine.
2. Setup python environment preferably using `uv`, instructions are in the [README](https://github.com/hololinked-dev/hololinked/blob/main/README.md#contributing).
2. Setup python environment, preferably using `uv`, instructions are in the [README](https://github.com/hololinked-dev/hololinked/blob/main/README.md#contributing).
3. There are some test things under `tests/helper-scripts` directory or in the [examples repository](https://gitlab.com/hololinked/examples), especially the simulators (they may not be up to date, one could take what one needs). These can be used to test your changes.
4. All code must follow [PEP 8](https://peps.python.org/pep-0008/) style guide. This is enforced by the CI pipeline. One needs to use ruff to check for style issues - `uvx ruff check hololinked`, otherwise the pipeline might fail.
5. Unit/Integration tests are recommended to be added for any new feature or bug fix. These tests can be run with `python -m unittest` under the `tests` directory. Existing tests need to pass as well.
6. Once you are done with your changes, make a pull request to the main repository. Your changes will be reviewed in detail. Any review comments added need to be resolved as well. When requirement are met, commits will be squashed and merged to the main branch.
4. All code must follow [PEP 8](https://peps.python.org/pep-0008/) style guide. One needs to use ruff to check for style issues - `uvx ruff check hololinked`.
5. Unit/Integration tests are recommended to be added for any new feature or bug fix. These tests can be run with `python -m unittest` under the `tests` directory.
6. The CI pipeline currently enforces codestyle checks and tests, therefore steps 4 & 5 must be completed before making a pull request.
7. Once you are done with your changes, make a pull request to the main repository. Your changes will be reviewed in detail and any review comments that may be added need to be resolved as well. When requirements are met and the pipeline passes, commits will be squashed and merged to the main branch.
8. For AI generated code, please make sure to review it properly. You need to understand the code that you write or submit. Same applies to documentation, comments etc., please write them only to the extent you would be willing to read them later, not to bloat them with autogenerated text.

There are also other repositories which can use your skills:

Expand Down Expand Up @@ -68,11 +70,9 @@ Otherwise, I will then take care of the issue as soon as possible.

A simpler model is used roughly based on [this article](https://www.bitsnbites.eu/a-stable-mainline-branching-model-for-git/) -

- main branch is where all stable developments are merged, all your branches must merge here
- main branch is merged to release branch when it is decided to created a release.
- A specific release is tagged and not created as its own branch. Instead release branch simply follows the main branch at the release time. People should clone the main branch for latest (mostly-) stable code base and release branch for released code base.
- other branches are feature or bug fix branches. A develop branch may be used to make general improvements as the package is constantly evolving, but its not a specific philosophy to use a develop branch.
- Bug fixes on releases must proceed from the tag of that release. Perhaps, even a new release can be made after fixing the bug by merging a bug fix branch to main branch.
- main branch is where all stable developments are merged, all your branches must merge here.
- A specific release is tagged and not created as its own branch.
- other branches are feature or bug fix branches.

## Attribution

Expand Down
22 changes: 19 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

As a novice, you have a requirement to control and capture data from your hardware, say in your electronics or science lab, and you want to show the data in a dashboard, provide a PyQt GUI or run automated scripts, `hololinked` can help. Even for isolated desktop applications or a small setup without networking, one can still separate the concerns of the tools that interact with the hardware & the hardware itself.

If you are a web developer or an industry professional looking for a web standards compatible (high-speed) IoT runtime, `hololinked` can be a decent choice. By conforming to [W3C Web of Things](https://www.w3.org/WoT/), one can expect a consistent API and flexible bidirectional message flow to interact with your devices, irrespective of the underlying protocol. Currently HTTP & ZMQ are supported. See [Use Cases Table](#use-cases-table).
If you are a web developer or an industry professional looking for a web standards compatible (high-speed) IoT runtime, `hololinked` can be a decent choice. By conforming to [W3C Web of Things](https://www.w3.org/WoT/), one can expect a consistent API and flexible bidirectional message flow to interact with your devices, irrespective of the underlying protocol. Currently HTTP, MQTT & ZMQ are supported. See [Use Cases Table](#use-cases-table).

This implementation is based on RPC, built ground-up in python keeping both the latest web technologies and python principles in mind.

Expand Down Expand Up @@ -631,11 +631,27 @@ Some other features that are currently supported:
</tr>
<tr>
<td>ZMQ INPROC</td>
<td>High Speed Desktop Applications (again, not exposed on network), currently you will need some CPP magic or disable GIL to leverage it fully</td>
<td>
High Speed Desktop Applications (again, not exposed on network), currently you will need some CPP magic or disable GIL to leverage it fully
</td>
</tr>
<tr>
<td>MQTT</td>
<td>Upcoming (October 2025)</td>
<td>
Reliable pub-sub & incorporating into existing systems that use MQTT for <br> lightweight messaging
</td>
<td>
<code>observeproperty</code>,
<code>unobserveproperty</code>,
<code>subscribeevent</code>,
<code>unsubscribeevent</code>
</td>
</tr>
<tr>
<td>MQTT with websockets</td>
<td>
Reliable pub-sub for web applications, planned for November 2025 release.
</td>
<td>
<code>observeproperty</code>,
<code>unobserveproperty</code>,
Expand Down
2 changes: 1 addition & 1 deletion doc
Submodule doc updated from d4e965 to de826d
168 changes: 153 additions & 15 deletions hololinked/client/factory.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
import logging
import threading
import uuid
import base64
import warnings
import aiomqtt
import httpx
import ssl
from typing import Any
from paho.mqtt.client import Client as PahoMQTTClient, MQTTProtocolVersion, CallbackAPIVersion, MQTTMessage


from ..core import Thing, Action
from ..core.zmq import SyncZMQClient, AsyncZMQClient
Expand All @@ -23,6 +30,7 @@
WriteMultipleProperties,
ReadMultipleProperties,
)
from .mqtt.consumed_interactions import MQTTConsumer # only one type for now


set_global_event_loop_policy()
Expand Down Expand Up @@ -326,31 +334,161 @@ def http(self, url: str, **kwargs) -> ObjectProxy:

return object_proxy

@classmethod
def mqtt(
self,
hostname: str,
port: int,
thing_id: str,
protocol_version: MQTTProtocolVersion = MQTTProtocolVersion.MQTTv5,
qos: int = 1,
username: str = None,
password: str = None,
ssl_context: ssl.SSLContext = None,
**kwargs,
) -> ObjectProxy:
"""
Create an MQTT client for the specified broker.

Parameters
----------
hostname: str
The hostname of the MQTT broker
port: int
The port of the MQTT broker
qos: int
The Quality of Service level for MQTT messages (0, 1, or 2)
username: str, optional
The username for MQTT authentication
password: str, optional
The password for MQTT authentication
kwargs:
Additional configuration options:

- `logger`: `logging.Logger`, optional.
A custom logger instance to use for logging
- `log_level`: `int`, default `logging.INFO`.
The logging level to use for the client (e.g., logging.DEBUG, logging.INFO
"""
id = f"mqtt-client|{hostname}:{port}|{uuid.uuid4().hex[:8]}"
logger = kwargs.get("logger", get_default_logger(id, log_level=kwargs.get("log_level", logging.INFO)))

td_received_event = threading.Event()
TD = None

def fetch_td(client: PahoMQTTClient, userdata, message: MQTTMessage) -> None:
nonlocal TD, thing_id, logger
if message.topic != f"{thing_id}/thing-description":
return
TD = Serializers.json.loads(message.payload)
td_received_event.set()

def on_connect(
client: PahoMQTTClient,
userdata: Any,
flags: Any,
reason_code: list,
properties: dict[str, Any],
) -> None: # TODO fix signature
nonlocal qos
client.subscribe(f"{thing_id}/#", qos=qos)

sync_client = PahoMQTTClient(
callback_api_version=CallbackAPIVersion.VERSION2,
client_id=id,
clean_session=True if not protocol_version == MQTTProtocolVersion.MQTTv5 else None,
protocol=protocol_version,
)
if username and password:
sync_client.username_pw_set(username=username, password=password)
if ssl_context is not None:
sync_client.tls_set_context(ssl_context)
elif kwargs.get("ca_certs", None):
sync_client.tls_set(ca_certs=kwargs.get("ca_certs", None))
sync_client.on_connect = on_connect
sync_client.on_message = fetch_td
sync_client.connect(hostname, port)
sync_client.loop_start()

td_received_event.wait(timeout=10)
if not TD:
raise TimeoutError("Timeout while fetching Thing Description (TD) over MQTT")

if not sync_client._ssl_context and port != 1883:
warnings.warn(
"MQTT used without TLS, if you intended to use TLS with a recognised CA & you saw this warning, considering "
+ "opening an issue at https://github.com/hololinked-dev/hololinked. ",
category=RuntimeWarning,
)

async_client = aiomqtt.Client(
hostname=hostname,
port=port,
username=username,
password=password,
protocol=protocol_version,
tls_context=sync_client._ssl_context,
)

object_proxy = ObjectProxy(id=id, logger=logger, td=TD)

for name in TD.get("properties", []):
affordance = PropertyAffordance.from_TD(name, TD)
consumed_property = MQTTConsumer(
sync_client=sync_client,
async_client=async_client,
resource=affordance,
qos=qos,
logger=logger,
owner_inst=object_proxy,
)
self.add_property(object_proxy, consumed_property)
for name in TD.get("events", []):
affordance = EventAffordance.from_TD(name, TD)
consumed_event = MQTTConsumer(
sync_client=sync_client,
async_client=async_client,
resource=affordance,
qos=qos,
logger=logger,
owner_inst=object_proxy,
)
self.add_event(object_proxy, consumed_event)

return object_proxy

@classmethod
def add_action(self, client, action: ConsumedThingAction) -> None:
# if not func_info.top_owner:
# return
# raise RuntimeError("logic error")
# for dunder in ClientFactory.__wrapper_assignments__:
# if dunder == '__qualname__':
# info = '{}.{}'.format(client.__class__.__name__, func_info.get_dunder_attr(dunder).split('.')[1])
# else:
# info = func_info.get_dunder_attr(dunder)
# setattr(action, dunder, info)
setattr(action, "__name__", action.resource.name)
setattr(action, "__qualname__", f"{client.__class__.__name__}.{action.resource.name}")
setattr(
action,
"__doc__",
action.resource.description or "Invokes the action {} on the remote Thing".format(action.resource.name),
)
setattr(client, action.resource.name, action)

@classmethod
def add_property(self, client, property: ConsumedThingProperty) -> None:
# if not property_info.top_owner:
# return
# raise RuntimeError("logic error")
# for attr in ['__doc__', '__name__']:
# # just to imitate _add_method logic
# setattr(property, attr, property_info.get_dunder_attr(attr))
setattr(property, "__name__", property.resource.name)
setattr(property, "__qualname__", f"{client.__class__.__name__}.{property.resource.name}")
setattr(
property,
"__doc__",
property.resource.description
or "Represents the property {} on the remote Thing".format(property.resource.name),
)
setattr(client, property.resource.name, property)

@classmethod
def add_event(cls, client, event: ConsumedThingEvent) -> None:
setattr(event, "__name__", event.resource.name)
setattr(event, "__qualname__", f"{client.__class__.__name__}.{event.resource.name}")
setattr(
event,
"__doc__",
event.resource.description or "Represents the event {} on the remote Thing".format(event.resource.name),
)
if hasattr(event.resource, "observable") and event.resource.observable:
setattr(client, f"{event.resource.name}_change_event", event)
else:
Expand Down
Empty file.
95 changes: 95 additions & 0 deletions hololinked/client/mqtt/consumed_interactions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import aiomqtt
import logging
from typing import Any, Callable
from paho.mqtt.client import Client as PahoMQTTClient, MQTTMessage

from ..abstractions import SSE, ConsumedThingEvent
from ...td.interaction_affordance import EventAffordance
from ...td.forms import Form
from ...serializers import Serializers, BaseSerializer # noqa: F401


class MQTTConsumer(ConsumedThingEvent):
# An MQTT event consumer

def __init__(
self,
sync_client: PahoMQTTClient,
async_client: aiomqtt.Client,
resource: EventAffordance,
qos: int,
logger: logging.Logger,
owner_inst: Any,
) -> None:
super().__init__(resource=resource, logger=logger, owner_inst=owner_inst)
self.qos = qos
self.sync_client = sync_client
self.async_client = async_client
self.subscribed = True

def listen(self, form: Form, callbacks: list[Callable], concurrent: bool, deserialize: bool) -> None:
# This method is called from a different thread but also finishes quickly, we wont redo this way
# for the time being.
topic = f"{self.resource.thing_id}/{self.resource.name}"

def on_topic_message(client: PahoMQTTClient, userdata, message: MQTTMessage):
try:
payload = message.payload
# message.properties.readProperty("content_type") if message.properties else form.contentType
# TODO, fix this, make sure to that content_type is not empty after extracting
content_type = form.contentType or "application/json"
serializer = Serializers.content_types.get(content_type, None) # type: BaseSerializer
if deserialize and content_type and serializer:
try:
payload = serializer.loads(payload)
except Exception as ex:
self.logger.error(
f"Error deserializing MQTT message for topic {topic}, "
+ f"passing payload as it is. message: {ex}"
)
event_data = SSE()
event_data.data = payload
event_data.id = message.mid
self.schedule_callbacks(callbacks=callbacks, event_data=event_data, concurrent=concurrent)
except Exception as ex:
self.logger.error(f"Error handling MQTT message for topic {topic}: {ex}")

self.sync_client.message_callback_add(topic, on_topic_message)

async def async_listen(self, form: Form, callbacks: list[Callable], concurrent: bool, deserialize: bool) -> None:
topic = f"{self.resource.thing_id}/{self.resource.name}"
try:
await self.async_client.__aenter__()
except aiomqtt.MqttReentrantError:
pass
await self.async_client.subscribe(topic, qos=self.qos)
async for message in self.async_client.messages:
if not self.subscribed:
break
if not message.topic.matches(topic):
continue
try:
payload = message.payload
# message.properties.readProperty("content_type") if message.properties else form.contentType
# TODO, fix this, make sure to that content_type is not empty after extracting
content_type = form.contentType or "application/json"
serializer = Serializers.content_types.get(content_type, None) # type: BaseSerializer
if deserialize and content_type and serializer:
try:
payload = serializer.loads(payload)
except Exception as ex:
self.logger.error(
f"Error deserializing MQTT message for topic {topic}, "
+ f"passing payload as it is. message: {ex}"
)
event_data = SSE()
event_data.data = payload
event_data.id = message.mid
await self.async_schedule_callbacks(callbacks=callbacks, event_data=event_data, concurrent=concurrent)
except Exception as ex:
self.logger.error(f"Error handling MQTT message for topic {topic}: {ex}")
self.async_client.unsubscribe(topic)

def unsubscribe(self) -> None:
self.subscribed = False
self.sync_client.message_callback_remove(f"{self.resource.thing_id}/{self.resource.name}")
Loading
Loading