Skip to content

Commit 2690763

Browse files
authored
feat(hermes): Add subscription to .new topic (#706)
* subscribe topic new Signed-off-by: bkioshn <bkioshn@gmail.com> * remove wrong doc Signed-off-by: bkioshn <bkioshn@gmail.com> * remove .syn subscription Signed-off-by: bkioshn <bkioshn@gmail.com> * fix ipfs subscribe test Signed-off-by: bkioshn <bkioshn@gmail.com> * use default channel name Signed-off-by: bkioshn <bkioshn@gmail.com> --------- Signed-off-by: bkioshn <bkioshn@gmail.com>
1 parent 12efb30 commit 2690763

File tree

4 files changed

+32
-30
lines changed

4 files changed

+32
-30
lines changed

hermes/apps/athena/modules/doc-sync/src/lib.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,16 @@ use shared::{
3535
/// Doc Sync component - thin wrapper calling host-side implementation.
3636
struct Component;
3737

38+
/// Default channel name for doc-sync operations
39+
const DOC_SYNC_CHANNEL: &str = "documents";
40+
3841
impl exports::hermes::init::event::Guest for Component {
3942
/// Initialize the module.
4043
fn init() -> bool {
4144
log::init(log::LevelFilter::Trace);
4245
info!(target: "doc_sync::init", "Doc sync module initialized");
46+
// Create the channel during initialization
47+
SyncChannel::new(DOC_SYNC_CHANNEL);
4348
true
4449
}
4550
}
@@ -125,9 +130,6 @@ fn json_response(
125130
})
126131
}
127132

128-
/// Default channel name for doc-sync operations
129-
const DOC_SYNC_CHANNEL: &str = "documents";
130-
131133
/// API for posting documents to IPFS `PubSub` channels.
132134
pub mod channel {
133135
use super::{DOC_SYNC_CHANNEL, DocData, SyncChannel, hermes};

hermes/bin/src/runtime_extensions/hermes/doc_sync/host.rs

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use stringzilla::stringzilla::Sha256;
55
use wasmtime::component::Resource;
66

77
use crate::{
8-
ipfs::hermes_ipfs_subscribe,
98
runtime_context::HermesRuntimeContext,
109
runtime_extensions::{
1110
bindings::hermes::{
@@ -32,9 +31,6 @@ const SHA2_256_CODE: u64 = 0x12;
3231
/// See: <https://github.com/ipld/cid-cbor/>
3332
const CID_CBOR_TAG: u64 = 42;
3433

35-
/// Default `PubSub` topic for doc-sync channel
36-
const DOC_SYNC_TOPIC: &str = "doc-sync/documents";
37-
3834
/// Wrapper for `hermes_ipfs::Cid` to implement `minicbor::Encode` for it.
3935
struct Cid(hermes_ipfs::Cid);
4036

@@ -90,11 +86,6 @@ impl HostSyncChannel for HermesRuntimeContext {
9086
///
9187
/// - `name`: The Name of the channel to Open. Creates if it doesn't exist, otherwise
9288
/// joins it.
93-
///
94-
/// **Returns**
95-
///
96-
/// - `ok(network)`: A resource network, if successfully create network resource.
97-
/// - `error(create-network-error)`: If creating network resource failed.
9889
fn new(
9990
&mut self,
10091
name: ChannelName,
@@ -128,11 +119,17 @@ impl HostSyncChannel for HermesRuntimeContext {
128119
}
129120
}
130121

131-
if let Err(err) = hermes_ipfs_subscribe(self.app_name(), name) {
122+
// When the channel is created, subscribe to .new <base>.<topic>
123+
if let Err(err) = self.pubsub_subscribe(format!("{name}.new")) {
124+
// FIXME - Do we want to remove the entry from the map here?
132125
DOC_SYNC_STATE.remove(&resource);
133-
return Err(wasmtime::Error::msg(format!("Subscription failed: {err}",)));
126+
return Err(wasmtime::Error::msg(format!(
127+
"Subscription to {name}.new failed: {err}",
128+
)));
134129
}
135130

131+
tracing::info!("Created Doc Sync Channel: {name}");
132+
136133
Ok(wasmtime::component::Resource::new_own(resource))
137134
}
138135

@@ -168,7 +165,7 @@ impl HostSyncChannel for HermesRuntimeContext {
168165
/// 3. Publish to `PubSub` (`pubsub_publish`)
169166
fn post(
170167
&mut self,
171-
_self_: Resource<SyncChannel>,
168+
self_: Resource<SyncChannel>,
172169
doc: DocData,
173170
) -> wasmtime::Result<Result<DocLoc, Errno>> {
174171
tracing::info!("📤 Posting {} bytes to doc-sync channel", doc.len());
@@ -202,18 +199,27 @@ impl HostSyncChannel for HermesRuntimeContext {
202199
//
203200
// Since Step 1 (add + pin) already succeeded, the document is safely stored
204201
// in IPFS. We treat "no peers" as a warning rather than a fatal error.
205-
let topic = DOC_SYNC_TOPIC.to_string();
206202

207-
// Subscribe to the topic first (required for Gossipsub - you must be subscribed
208-
// to a topic before you can publish to it)
209-
match self.pubsub_subscribe(topic.clone())? {
210-
Ok(_) => tracing::info!("✓ Subscribed to topic: {}", topic),
203+
let channel_name = DOC_SYNC_STATE
204+
.get(&self_.rep())
205+
.ok_or_else(|| wasmtime::Error::msg("Channel not found"))?
206+
.value()
207+
.clone();
208+
209+
let topic_new = format!("{channel_name}.new");
210+
211+
// The channel should already be subscribed to the `.new` topic (subscription
212+
// is performed in `new()`). Invoking the subscription again to ensure
213+
// the topic is active, because Gossipsub enforces that peers must subscribe
214+
// to a topic before they are permitted to publish on it.
215+
match self.pubsub_subscribe(topic_new.clone())? {
216+
Ok(_) => tracing::info!("✓ Subscribed to topic: {topic_new}"),
211217
Err(e) => tracing::warn!("⚠ Subscribe warning: {:?}", e),
212218
}
213219

214220
// Attempt to publish to PubSub
215-
if let Ok(()) = self.pubsub_publish(topic.clone(), doc)? {
216-
tracing::info!("✓ Step 3/3: Published to PubSub → {}", topic);
221+
if let Ok(()) = self.pubsub_publish(topic_new.clone(), doc)? {
222+
tracing::info!("✓ Step 3/3: Published to PubSub → {topic_new}",);
217223
} else {
218224
// Non-fatal: PubSub requires peer nodes to be subscribed to the topic.
219225
// In a single-node environment, this is expected to fail with
@@ -223,8 +229,7 @@ impl HostSyncChannel for HermesRuntimeContext {
223229
"⚠ Step 3/3: PubSub publish skipped (no peer nodes subscribed to topic)"
224230
);
225231
tracing::warn!(
226-
" Note: Gossipsub requires other nodes subscribing to '{}' to work",
227-
topic
232+
" Note: Gossipsub requires other nodes subscribing to '{topic_new}' to work",
228233
);
229234
tracing::info!(" Document is successfully stored in IPFS from Step 1");
230235
}

hermes/bin/tests/integration/tests/serial/ipfs_subscribe.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ fn ipfs_subscribe() {
2727

2828
assert!(utils::assert::app_logs_contain(
2929
&temp_dir,
30-
"\"pubsub_topic\":\"ipfs_channel\""
30+
"\"pubsub_topic\":\"ipfs_channel.new\""
3131
));
3232

3333
// Uncomment the line below if you want to inspect the details

wasm/wasi/wit/deps/hermes-doc-sync/api.wit

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,6 @@ interface api {
3434
/// **Parameters**
3535
///
3636
/// - `name`: The Name of the channel to Open. Creates if it doesn't exist, otherwise joins it.
37-
///
38-
/// **Returns**
39-
///
40-
/// - `ok(network)`: A resource network, if successfully create network resource.
41-
/// - `error(create-network-error)`: If creating network resource failed.
4237
constructor(name: channel-name);
4338

4439
/// Close Doc Sync Channel

0 commit comments

Comments
 (0)