Skip to content

refactor and pybind of OnlineWebsocketServer #1943

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions python-api-examples/online-websocket-server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#!/usr/bin/env python3
# Copyright 2025 Uniphore

'''
Real-time speech recognition server using WebSockets.
Python API interface to start the server.
Python wrapper around implementation of online-websocket-server.cc in C++.

(1) Download streaming transducer model

curl -SL -O https://github.com/k2-fsa/sherpa-onnx/releases/download/asr-models/sherpa-onnx-streaming-zipformer-en-2023-06-26.tar.bz2
tar xvf sherpa-onnx-streaming-zipformer-en-2023-06-26.tar.bz2
rm sherpa-onnx-streaming-zipformer-en-2023-06-26.tar.bz2

(2) Starting websocket server using the downloaded model

python3 ./python-api-examples/online-websocket-server.py \
--tokens=./sherpa-onnx-streaming-zipformer-en-2023-06-26/tokens.txt \
--encoder=./sherpa-onnx-streaming-zipformer-en-2023-06-26/encoder-epoch-99-avg-1-chunk-16-left-128.onnx \
--decoder=./sherpa-onnx-streaming-zipformer-en-2023-06-26/decoder-epoch-99-avg-1-chunk-16-left-128.onnx \
--joiner=./sherpa-onnx-streaming-zipformer-en-2023-06-26/joiner-epoch-99-avg-1-chunk-16-left-128.onnx \
--max-batch-size=5 \
--loop-interval-ms=10

'''
import argparse
import sys
import signal
from sherpa_onnx import OnlineWebSocketServer

def signal_handler(sig, frame):
print('Exiting...')
sys.exit(0)

# Bind SIGINT to signal_handler
signal.signal(signal.SIGINT, signal_handler)

if __name__ == "__main__":
args = sys.argv[:]
OnlineWebSocketServer(server_args=args)
2 changes: 2 additions & 0 deletions sherpa-onnx/csrc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ set(sources
online-transducer-model.cc
online-transducer-modified-beam-search-decoder.cc
online-transducer-nemo-model.cc
online-transducer-greedy-search-nemo-decoder.cc
online-websocket-server-impl.cc
online-wenet-ctc-model-config.cc
online-wenet-ctc-model.cc
online-zipformer-transducer-model.cc
Expand Down
154 changes: 98 additions & 56 deletions sherpa-onnx/csrc/online-websocket-server.cc
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
// sherpa-onnx/csrc/online-websocket-server.cc
//
// Copyright (c) 2022-2023 Xiaomi Corporation
// Copyright (c) 2025 Uniphore (Author: Manickavela A)

#include "asio.hpp"
#include "sherpa-onnx/csrc/macros.h"
#include "sherpa-onnx/csrc/online-websocket-server-impl.h"
#include "sherpa-onnx/csrc/parse-options.h"
#include <string>
#include <csignal>

#include "sherpa-onnx/csrc/online-websocket-server.h"

static constexpr const char *kUsageMessage = R"(
Automatic speech recognition with sherpa-onnx using websocket.
Expand All @@ -30,80 +31,121 @@ Please refer to
for a list of pre-trained models to download.
)";

int32_t main(int32_t argc, char *argv[]) {
sherpa_onnx::ParseOptions po(kUsageMessage);

// Global server instance pointer for signal handling
OnlineWebsocketServerApp *global_server_instance = nullptr;

// Signal handler to stop the server
void SignalHandler(int signal) {
if (signal == SIGINT || signal == SIGTERM) {
SHERPA_ONNX_LOGE("\nSignal %d received. Stopping server...", signal);
if (global_server_instance) {
global_server_instance->Stop();
}
}
}

OnlineWebsocketServerApp::OnlineWebsocketServerApp(
int32_t argc, char *argv[]) : argc_(argc), argv_(argv) {}

void OnlineWebsocketServerApp::Run() {
sherpa_onnx::ParseOptions po(kUsageMessage);
sherpa_onnx::OnlineWebsocketServerConfig config;

// the server will listen on this port
int32_t port = 6006;
// the server will listen on this port
int32_t port = 6006;

// size of the thread pool for handling network connections
int32_t num_io_threads = 1;
// size of the thread pool for handling network connections
int32_t num_io_threads = 1;

// size of the thread pool for neural network computation and decoding
int32_t num_work_threads = 3;
// size of the thread pool for neural network computation and decoding
int32_t num_work_threads = 3;

po.Register("num-io-threads", &num_io_threads,
"Thread pool size for network connections.");
po.Register("num-io-threads", &num_io_threads,
"Thread pool size for network connections.");

po.Register("num-work-threads", &num_work_threads,
"Thread pool size for for neural network "
"computation and decoding.");
po.Register("num-work-threads", &num_work_threads,
"Thread pool size for for neural network "
"computation and decoding.");

po.Register("port", &port, "The port on which the server will listen.");
po.Register("port", &port, "The port on which the server will listen.");

config.Register(&po);
config.Register(&po);

if (argc == 1) {
po.PrintUsage();
exit(EXIT_FAILURE);
}
if (argc_ == 1) {
po.PrintUsage();
exit(EXIT_FAILURE);
}

po.Read(argc, argv);
po.Read(argc_, argv_);

if (po.NumArgs() != 0) {
SHERPA_ONNX_LOGE("Unrecognized positional arguments!");
po.PrintUsage();
exit(EXIT_FAILURE);
}
if (po.NumArgs() != 0) {
SHERPA_ONNX_LOGE("Unrecognized positional arguments!");
po.PrintUsage();
exit(EXIT_FAILURE);
}

config.Validate();
config.Validate();

asio::io_context io_conn; // for network connections
asio::io_context io_work; // for neural network and decoding
// Set the global instance for signal handling
global_server_instance = this;

sherpa_onnx::OnlineWebsocketServer server(io_conn, io_work, config);
server.Run(port);
// Handle SIGINT and SIGTERM
std::signal(SIGINT, SignalHandler);
std::signal(SIGTERM, SignalHandler);

SHERPA_ONNX_LOGE("Started!");
SHERPA_ONNX_LOGE("Listening on: %d", port);
SHERPA_ONNX_LOGE("Number of work threads: %d", num_work_threads);
// io_conn for network connections
// io_work for neural network and decoding

// give some work to do for the io_work pool
auto work_guard = asio::make_work_guard(io_work);
sherpa_onnx::OnlineWebsocketServer server(io_conn_, io_work_, config);
server.Run(port);

std::vector<std::thread> io_threads;
SHERPA_ONNX_LOGE("Started!");
SHERPA_ONNX_LOGE("Listening on: %d", port);
SHERPA_ONNX_LOGE("Number of work threads: %d", num_work_threads);

// decrement since the main thread is also used for network communications
for (int32_t i = 0; i < num_io_threads - 1; ++i) {
io_threads.emplace_back([&io_conn]() { io_conn.run(); });
}
// give some work to do for the io_work pool
auto work_guard = asio::make_work_guard(io_work_);

std::vector<std::thread> work_threads;
for (int32_t i = 0; i < num_work_threads; ++i) {
work_threads.emplace_back([&io_work]() { io_work.run(); });
}
std::vector<std::thread> io_threads;

io_conn.run();
// decrement since the main thread is also used for network communications
for (int32_t i = 0; i < num_io_threads - 1; ++i) {
io_threads.emplace_back([this]() { io_conn_.run(); });
}

for (auto &t : io_threads) {
t.join();
}
std::vector<std::thread> work_threads;
for (int32_t i = 0; i < num_work_threads; ++i) {
work_threads.emplace_back([this]() { io_work_.run(); });
}

for (auto &t : work_threads) {
t.join();
}
// Main thread handles IO
io_conn_.run();

return 0;
for (auto &t : io_threads) {
t.join();
}

for (auto &t : work_threads) {
t.join();
}
SHERPA_ONNX_LOGE("Server shut down gracefully.");
}

void OnlineWebsocketServerApp::Stop() {
shutdown_requested_.store(true);
io_conn_.stop();
io_work_.stop();
SHERPA_ONNX_LOGE("Server stopping...");
}

int32_t main(int32_t argc, char *argv[]) {
OnlineWebsocketServerApp app(argc, argv);
app.Run();
return 0;
}

void StartServer(int32_t argc, char *argv[]) {
OnlineWebsocketServerApp app(argc, argv);
app.Run();
}
32 changes: 32 additions & 0 deletions sherpa-onnx/csrc/online-websocket-server.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// sherpa-onnx/csrc/online-wenet-ctc-model-config.h
//
// Copyright (c) 2025 Uniphore (Author: Manickavela A)s

#ifndef SHERPA_ONNX_ONLINE_WEBSOCKET_SERVER_H
#define SHERPA_ONNX_ONLINE_WEBSOCKET_SERVER_H

#include "asio.hpp"
#include "sherpa-onnx/csrc/macros.h"
#include "sherpa-onnx/csrc/online-websocket-server-impl.h"
#include "sherpa-onnx/csrc/parse-options.h"

class OnlineWebsocketServerApp {
public:
OnlineWebsocketServerApp(int32_t argc, char *argv[]);
void Run();
void Stop();

private:
int32_t argc_;
char **argv_;
asio::io_context io_conn_; // ASIO context for connections
asio::io_context io_work_; // ASIO context for work
std::atomic<bool> shutdown_requested_{false};
std::vector<std::thread> io_threads_;
std::vector<std::thread> work_threads_;
};

// Declare StartServer so it's accessible for Pybind
void StartServer(int32_t argc, char *argv[]);

#endif // SHERPA_ONNX_ONLINE_WEBSOCKET_SERVER_H
6 changes: 6 additions & 0 deletions sherpa-onnx/python/csrc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ set(srcs
voice-activity-detector.cc
wave-writer.cc
)

list(APPEND srcs
${CMAKE_SOURCE_DIR}/sherpa-onnx/csrc/online-websocket-server.cc
online-websocket-server-app.cc
)

if(SHERPA_ONNX_HAS_ALSA)
list(APPEND srcs ${CMAKE_SOURCE_DIR}/sherpa-onnx/csrc/alsa.cc alsa.cc)
else()
Expand Down
39 changes: 39 additions & 0 deletions sherpa-onnx/python/csrc/online-websocket-server-app.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// sherpa-onnx/python/csrc/online-websocket-server.cc
//
// Copyright (c) 2025 Uniphore (Author: Manickavela A)

#include "sherpa-onnx/python/csrc/online-websocket-server-app.h"

#include <string>

#include "sherpa-onnx/csrc/online-websocket-server.h"
#include "sherpa-onnx/csrc/macros.h"

namespace sherpa_onnx {

void StartServerWrapper(py::list args) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest that you refactor the OnlineWebSocketServer so that it can accept a pointer to an OnlineRecognizer in its constructor.

In this way, it can simplify how you construct an OnlineWebsocketServer.

You can reuse the methods form online_recognizer.py to construct an OnlineRecognizer and pass it to OnlineWebSocketServer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using OnlineRecognizer seems a little bit of a hassle as underlying OnlineWebsocketServer and Online
also has to be refactored and its seems like a bigger job

I would suggest OnlineWebsocketServer accepting OnlineRecognizerConfig in python/csrc/online-websocket-server-app.cc
I will take functions defined in online_recognizer.py and make them aligned for OnlineWebSocketServerConfig and feed it to pybind and c++, C++ functions will have a wrapper to accept this and start the server

let me know if its sounding good

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you describe how much you need to refactor?

int argc = args.size();
std::vector<std::string> args_str; // Store actual strings
std::vector<char *> argv; // Store pointers to those strings

for (const auto &arg : args) {
args_str.push_back(arg.cast<std::string>());
}

// Fill argv with pointers to the actual string data
for (auto &str : args_str) {
argv.push_back(str.data());
}

argv.push_back(nullptr); // Null-terminate like C-style arrays

// Call your server
StartServer(argc, argv.data());
}

void PybindOnlineWebsocketServerWrapperApp(py::module *m) {
m->def("start_server", &StartServerWrapper, "Start the WebSocket server",
py::call_guard<py::gil_scoped_release>());
}

} // namespace sherpa_onnx
16 changes: 16 additions & 0 deletions sherpa-onnx/python/csrc/online-websocket-server-app.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// sherpa-onnx/python/csrc/online-websocket-server.h
//
// Copyright (c) 2025 Uniphore (Author: Manickavela A)

#ifndef SHERPA_ONNX_PYTHON_CSRC_ONLINE_WEBSOCKET_SERVER_APP_H_
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change the header guard to match the filename.

HINT: There is no app in the filename.

#define SHERPA_ONNX_PYTHON_CSRC_ONLINE_WEBSOCKET_SERVER_APP_H_

#include "sherpa-onnx/python/csrc/sherpa-onnx.h"


namespace sherpa_onnx {

void PybindOnlineWebsocketServerWrapperApp(py::module *m);

}
#endif // SHERPA_ONNX_PYTHON_CSRC_ONLINE_WEBSOCKET_SERVER_H_
2 changes: 2 additions & 0 deletions sherpa-onnx/python/csrc/sherpa-onnx.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "sherpa-onnx/python/csrc/online-punctuation.h"
#include "sherpa-onnx/python/csrc/online-recognizer.h"
#include "sherpa-onnx/python/csrc/online-stream.h"
#include "sherpa-onnx/python/csrc/online-websocket-server-app.h"
#include "sherpa-onnx/python/csrc/speaker-embedding-extractor.h"
#include "sherpa-onnx/python/csrc/speaker-embedding-manager.h"
#include "sherpa-onnx/python/csrc/spoken-language-identification.h"
Expand Down Expand Up @@ -56,6 +57,7 @@ PYBIND11_MODULE(_sherpa_onnx, m) {
PybindOnlineModelConfig(&m);
PybindOnlineLMConfig(&m);
PybindOnlineStream(&m);
PybindOnlineWebsocketServerWrapperApp(&m);
PybindEndpoint(&m);
PybindOnlineRecognizer(&m);
PybindKeywordSpotter(&m);
Expand Down
1 change: 1 addition & 0 deletions sherpa-onnx/python/sherpa_onnx/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,5 @@
from .keyword_spotter import KeywordSpotter
from .offline_recognizer import OfflineRecognizer
from .online_recognizer import OnlineRecognizer
from .online_websocket_server import OnlineWebSocketServer
from .utils import text2token
Loading