A fast, lightweight Python client for the Jolt in-memory messaging broker.
This is a Python port of the jolt-java-api, providing identical functionality with a Pythonic interface.
- Plain TCP + NDJSON (newline-delimited JSON)
- No external dependencies (standard library only)
- Supports:
auth- Authenticationsub/unsub- Subscribe/Unsubscribe to topicspub- Publish messagesping- Keep-alive
- Designed for low latency and high throughput
- Python 3.7 or newer
- A running Jolt server (the Go broker):
- Example:
./broker -config=config.json - Default port:
8080(unless changed inconfig.json)
- Example:
git clone https://github.com/DevArqf/jolt-python-api.git
cd jolt-python-api
pip install -e .pip install jolt-python-apijolt-python-api/
βββ src/
β βββ jolt/
β βββ __init__.py
β βββ client.py # Main JoltClient class
β βββ config.py # JoltConfig and builder
β βββ handler.py # JoltMessageHandler abstract class
β βββ request.py # JoltRequestBuilder
β βββ response.py # Response parsers and models
β βββ exceptions.py # JoltException
βββ examples/
β βββ simple_example.py
βββ tests/
βββ setup.py
βββ README.md
βββ requirements.txt
from jolt import JoltClient, JoltConfig, JoltMessageHandler
from jolt import JoltErrorResponse, JoltTopicMessage
from typing import Optional
# 1. Configure connection
config = JoltConfig.new_builder() \
.host("127.0.0.1") \
.port(8080) \
.build()
# 2. Define message handler
class MyHandler(JoltMessageHandler):
def on_ok(self, raw_line: str):
print(f"[OK] {raw_line}")
def on_error(self, error: JoltErrorResponse, raw_line: str):
print(f"[ERROR] {error.get_error()}")
def on_topic_message(self, msg: JoltTopicMessage, raw_line: str):
print(f"[MSG] {msg.get_topic()} -> {msg.get_data()}")
def on_disconnected(self, cause: Optional[Exception]):
print(f"[DISCONNECTED] {cause if cause else 'closed'}")
# 3. Create and connect client
handler = MyHandler()
client = JoltClient(config, handler)
client.connect()
# 4. Authenticate (if server requires it)
client.auth("username", "password")
# 5. Subscribe and publish
client.subscribe("chat.room1")
client.publish("chat.room1", "Hello from Python!")
# 6. Ping server
client.ping()
# 7. Clean shutdown
client.close()import time
from jolt import JoltClient, JoltConfig, JoltMessageHandler
from jolt import JoltErrorResponse, JoltTopicMessage
from typing import Optional
class ChatHandler(JoltMessageHandler):
def on_ok(self, raw_line: str):
print(f"β Operation successful")
def on_error(self, error: JoltErrorResponse, raw_line: str):
print(f"β Error: {error.get_error()}")
def on_topic_message(self, msg: JoltTopicMessage, raw_line: str):
print(f"π© [{msg.get_topic()}] {msg.get_data()}")
def on_disconnected(self, cause: Optional[Exception]):
if cause:
print(f"β Disconnected: {cause}")
else:
print("π Connection closed")
def main():
# Setup
config = JoltConfig.new_builder() \
.host("127.0.0.1") \
.port(8080) \
.build()
handler = ChatHandler()
client = JoltClient(config, handler)
try:
# Connect
print("π Connecting to Jolt server...")
client.connect()
print("β Connected!")
# Auth (if needed)
# client.auth("jolt-chat", "password123")
# Subscribe to topics
client.subscribe("chat.general")
client.subscribe("notifications")
# Publish some messages
client.publish("chat.general", "Hello, everyone!")
client.publish("chat.general", "This is from Python!")
# Keep running to receive messages
print("\nπ‘ Listening for messages (Ctrl+C to exit)...")
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\n\nπ Shutting down...")
finally:
client.close()
if __name__ == "__main__":
main()Configuration object for the Jolt client.
# Using builder pattern
config = JoltConfig.new_builder() \
.host("127.0.0.1") \
.port(8080) \
.build()
# Direct instantiation
config = JoltConfig(host="127.0.0.1", port=8080)Methods:
get_host()βstr: Get the server hostget_port()βint: Get the server port
Main client for interacting with the Jolt broker.
Constructor:
client = JoltClient(config: JoltConfig, handler: JoltMessageHandler)Methods:
connect(): Connect to the Jolt serverauth(username: str, password: str): Authenticate with the serversubscribe(topic: str): Subscribe to a topicunsubscribe(topic: str): Unsubscribe from a topicpublish(topic: str, data: str): Publish a message to a topicping(): Send a ping to the serverclose(): Close the connectionis_connected()βbool: Check connection status
Abstract base class for handling server messages. Implement all methods:
class MyHandler(JoltMessageHandler):
def on_ok(self, raw_line: str):
"""Called when receiving an OK response"""
pass
def on_error(self, error: JoltErrorResponse, raw_line: str):
"""Called when receiving an error response"""
pass
def on_topic_message(self, msg: JoltTopicMessage, raw_line: str):
"""Called when receiving a message on a subscribed topic"""
pass
def on_disconnected(self, cause: Optional[Exception]):
"""Called when disconnected from the server"""
passRepresents a message received on a subscribed topic.
Methods:
get_topic()βstr: Get the topic nameget_data()βstr: Get the message data
Represents an error response from the server.
Methods:
get_error()βstr: Get the error message
Exception raised for Jolt client errors.
This Python implementation maintains feature parity with the Java API:
| Feature | Java API | Python API |
|---|---|---|
| TCP Connection | β | β |
| NDJSON Protocol | β | β |
| Auth | β | β |
| Subscribe/Unsubscribe | β | β |
| Publish | β | β |
| Ping | β | β |
| Message Handler | β | β |
| Thread-safe Writing | β | β |
| Background Reader | β | β |
| No Dependencies | β | β |
API Naming Conventions:
The Python API follows Pythonic naming while maintaining similar structure:
- Java:
JoltConfig.newBuilder()β Python:JoltConfig.new_builder() - Java:
error.getError()β Python:error.get_error() - Java:
msg.getTopic()β Python:msg.get_topic()
topics = ["news", "sports", "weather"]
for topic in topics:
client.subscribe(topic)
# Publish to different topics
client.publish("news", "Breaking: Python API released!")
client.publish("sports", "Game score: 3-2")class SmartHandler(JoltMessageHandler):
def on_topic_message(self, msg: JoltTopicMessage, raw_line: str):
topic = msg.get_topic()
data = msg.get_data()
if topic.startswith("alert."):
print(f"π¨ ALERT: {data}")
elif topic.startswith("chat."):
print(f"π¬ Chat: {data}")
else:
print(f"π¨ {topic}: {data}")import time
class ReconnectingHandler(JoltMessageHandler):
def __init__(self, client):
self.client = client
self.should_reconnect = True
def on_disconnected(self, cause: Optional[Exception]):
print(f"Disconnected: {cause}")
if self.should_reconnect:
print("Attempting to reconnect...")
time.sleep(5)
try:
self.client.connect()
print("Reconnected!")
except Exception as e:
print(f"Reconnection failed: {e}")- This API does not provide TLS. To secure transport:
- Run Jolt behind a TLS-terminating proxy (e.g. Nginx, HAProxy), or
- Use OS-level tunnels (SSH, VPN)
- All JSON is handled using Python's built-in
jsonmodule - The client uses a single background thread for reading and a thread-safe writer
- Messages are automatically framed with newlines (NDJSON format)
Run tests (when implemented):
pytest tests/Run with coverage:
pytest --cov=jolt tests/Contributions are welcome! Please:
- Fork the repository
- Create a feature branch
- Make your changes
- Add tests if applicable
- Submit a pull request
MIT License
Copyright (c) 2025 DevArqf
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
This is a Python port of the jolt-java-api created for the Jolt Database project.
For issues, questions, or contributions:
- Open an issue
- Contact the Jolt Database team