Skip to content

Commit 168681d

Browse files
rafal-chcong-or
andauthored
feat(hermes): Add the Pre-Publish step to DocSync (#705)
* feat: implement doc-sync channel::post API for issue #628 - Add channel::post(document_bytes) API as requested - Integrate hermes-ipfs library for IPFS operations - Add HTTP endpoints for testing (/api/doc-sync/*) - Configure HTTP gateway routing - Implement IPFS add, pin, and PubSub publish workflow * feat: implement doc-sync channel::post API for issue #628 - Add channel::post(document_bytes) API as requested - Integrate hermes-ipfs library for IPFS operations - Add HTTP endpoints for testing (/api/doc-sync/*) - Configure HTTP gateway routing - Implement IPFS add, pin, and PubSub publish workflow * hermes ipfs version * fix: enable WASM compilation for doc-sync module and add host stubs WASM compilation fixes: - Made tokio runtime features conditional (rt-multi-thread only for non-WASM) - Added separate WASM/native implementations using futures::executor for WASM - Conditionally compile Runtime usage and block_on calls Host implementation: - Replaced panicking todo!() with warning messages and stub return values - Added Resource stub creation for SyncChannel::new() - Functions now print warnings but don't crash runtime This allows the doc-sync module to compile for wasm32-wasip2 targets and run without panicking, though full functionality requires proper host implementation. * Remove all cfg attributes and consolidate WASM/non-WASM code paths into single implementations using futures::executor. Simplify HTTP handlers and reduce complexity to clearly demonstrate the 4-step workflow. * refactor(doc-sync): simplify for demo workflow Remove conditional compilation, OnceLock pattern, and unnecessary complexity to clearly show the 4-step IPFS PubSub workflow. * refactor(doc-sync): use WIT bindings directly for demo Replace async hermes-ipfs library with direct WIT function calls (file_add, file_pin, pubsub_publish). Remove conditional compilation and async dependencies to simplify the 4-step workflow demo. * refactor(doc-sync): use WIT bindings directly for demo Replace async hermes-ipfs library with direct WIT function calls (file_add, file_pin, pubsub_publish). Remove conditional compilation and async dependencies to simplify the 4-step workflow demo. * refactor(doc-sync): integrate with PR #691 subscription flow Replace async hermes-ipfs library with synchronous WIT bindings (file_add, file_pin, pubsub_publish, pubsub_subscribe). Add actual channel subscription in SyncChannel::new() and document complete pub/sub flow with PR #691 infrastructure. Changes: - Use WIT IPFS functions directly instead of async library - Call pubsub_subscribe() to register DocSync subscriptions - Document how on_new_doc events are triggered by PR #691 - Remove conditional compilation and async dependencies - Show clear 4-step workflow: add → pin → validate → publish * docs(doc-sync): document PR #691 integration requirements Add comprehensive comments explaining PR #691 requirement for subscription event routing. Document the complete pub/sub flow, what works now vs what needs PR #691, and how to integrate when it merges * fix(doc-sync): import GuestSyncChannel trait and clarify PR #691 comment Fix compilation error and clarify that publishing to PubSub works now; PR #691 is only needed to route incoming messages to event handlers. * update docs * refactor(doc-sync): Focus module on publishing workflow only Remove subscription logic and simplify documentation. Module now demonstrates only the 4-step publishing workflow: file_add, file_pin, pre-publish validation, and pubsub_publish. * fmt * refactor(doc-sync): Move post logic to host Execute the 4-step publishing workflow (file_add, file_pin, pre-publish, pubsub_publish) on the host side instead of in the WASM module. Reduces boundary crossings from 6 to 2 for better performance. * fmt * fmt * fix(doc-sync): Fix compilation errors - Fix SyncChannel resource import and usage - Update channel::post() to call host implementation correctly * refactor(doc-sync): Replace eprintln with tracing Use tracing macros (info/warn/error) instead of eprintln for logging in doc-sync host implementation. * refactor(doc-sync): Replace eprintln with tracing Use tracing macros (info/warn/error) instead of eprintln for logging in doc-sync host implementation. * fmt * refactor(doc-sync): Extract constants and improve error logging - Add DOC_SYNC_TOPIC and DOC_SYNC_CHANNEL constants - Add error logging to id_for() method * refactor(doc-sync): Improve error logging and remove redundant conversions - Log actual error details instead of discarding with - Remove redundant .to_string() on CID response - Add error logging for failed post operations * fmt * fmt * refactor(doc-sync): Replace stub implementations with todo!() Replace placeholder implementations with explicit todo!() markers: - id_for: Remove incorrect file_add call with side effects - sync-channel::new: Remove placeholder resource ID (42) - sync-channel::close: Remove stub return value These are out of scope for post() implementation and should panic if called. * Update hermes/bin/src/ipfs/task.rs Co-authored-by: Rafał Chabowski <rafal.chabowski@iohk.io> * docs(doc-sync): Move crate docs to README and remove duplication Move module documentation from lib.rs to a crate-level README.md for better visibility and remove duplicated content between crate-level and channel module docs. * docs(doc-sync): Move crate docs to README and remove duplication Move module documentation from lib.rs to a crate-level README.md for better visibility and remove duplicated content between crate-level and channel module docs. * fix(doc-sync): Fix markdown linting errors in README Split long sentences to one per line and ensure lines stay under 132 character limit. * fix(doc-sync): Replace todo!() with placeholder implementations Replace 5 todo!() macros in doc_sync host with minimal placeholder implementations to pass clippy lint checks. Functions now return empty/default values instead of panicking. * fix(doc-sync): Resolve all clippy pedantic lints Fixes documentation formatting, format string inlining, unnecessary value passing, wildcard imports, and missing error documentation to satisfy clippy::pedantic requirements * Log pin result * IPFS file add operation now also returns CID * Fixes after the merge * Remove the pin step * Remove the pin step * Cleanup * Cleanup * DHT provide on the host side * Wire the `IpfsCommand::DhtProvide` command * Add `is_pre_publish_completed()` * Implement basic timeout * Implement DHT get providers * Use proper peer ID * Update test component * Temp switch to `main` branch in libs * Cleanup * Split `fn post()` into auxiliary functions * Update `hermes-ipfs` * Update comment * Fix typo * Fix typo * Revert unnecessary comment change * Use correct error in `dht_provide()` * Use proper error in `IpfsCommand::DhtGetProviders()` --------- Co-authored-by: cong-or <conchubhar.gannon@gmail.com>
1 parent 2690763 commit 168681d

File tree

9 files changed

+409
-104
lines changed

9 files changed

+409
-104
lines changed

hermes/Cargo.lock

Lines changed: 3 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

hermes/bin/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ path = "tests/integration/tests/mod.rs"
3131

3232
[dependencies]
3333
# Catalyst Internal Crates
34-
hermes-ipfs = { version = "0.0.6", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "hermes-ipfs/v0.0.6" }
34+
hermes-ipfs = { version = "0.0.7", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "hermes-ipfs/v0.0.7" }
3535
cardano-blockchain-types = { version = "0.0.9", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "cardano-blockchain-types/v0.0.9" }
3636
cardano-chain-follower = { version = "0.0.19", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "cardano-chain-follower/v0.0.19" }
3737
catalyst-types = { version = "0.0.7", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "catalyst-types/v0.0.7" }
@@ -113,6 +113,7 @@ serial_test = { version = "3.2.0", features = ["file_locks"] }
113113
wasmtime = { version = "38.0.3", default-features = false, features = ["runtime", "cranelift", "component-model", "wat", "addr2line"] }
114114
httpmock = "0.8.2"
115115
wasmtime-wasi = "38.0.3"
116+
test-case = "3.3.1"
116117

117118
[package.metadata.cargo-machete]
118119
# This is required for HDF5.

hermes/bin/src/ipfs/api.rs

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,13 @@ use crate::{
1111
pub(crate) fn hermes_ipfs_add_file(
1212
app_name: &ApplicationName,
1313
contents: IpfsFile,
14-
) -> Result<IpfsPath, Errno> {
14+
) -> Result<hermes_ipfs::IpfsPath, Errno> {
1515
tracing::debug!(app_name = %app_name, "adding IPFS file");
1616
let ipfs = HERMES_IPFS.get().ok_or(Errno::ServiceUnavailable)?;
17-
let ipfs_path = ipfs.file_add(contents)?.to_string();
18-
tracing::debug!(app_name = %app_name, path = %ipfs_path, "added IPFS file");
19-
ipfs.apps.pinned_file(app_name.clone(), &ipfs_path)?;
17+
let ipfs_path = ipfs.file_add(contents)?;
18+
let ipfs_path_str = ipfs_path.to_string();
19+
tracing::debug!(app_name = %app_name, path = %ipfs_path_str, "added IPFS file");
20+
ipfs.apps.pinned_file(app_name.clone(), &ipfs_path_str)?;
2021
Ok(ipfs_path)
2122
}
2223

@@ -106,6 +107,46 @@ pub(crate) fn hermes_ipfs_put_dht_value(
106107
Ok(status)
107108
}
108109

110+
/// Marks a node as a provider for the content under the given key.
111+
pub(crate) fn hermes_ipfs_dht_provide(
112+
app_name: &ApplicationName,
113+
key: DhtKey,
114+
) -> Result<(), Errno> {
115+
let ipfs = HERMES_IPFS.get().ok_or(Errno::ServiceUnavailable)?;
116+
let key_str = format!("{key:x?}");
117+
tracing::debug!(app_name = %app_name, dht_key = %key_str, "DHT provide");
118+
ipfs.dht_provide(key)?;
119+
tracing::debug!(app_name = %app_name, dht_key = %key_str, "DHT provided");
120+
Ok(())
121+
}
122+
123+
/// Gets providers of the content under the given key.
124+
pub(crate) fn hermes_ipfs_dht_get_providers(
125+
app_name: &ApplicationName,
126+
key: DhtKey,
127+
) -> Result<Vec<PeerId>, Errno> {
128+
let ipfs = HERMES_IPFS.get().ok_or(Errno::ServiceUnavailable)?;
129+
let key_str = format!("{key:x?}");
130+
tracing::debug!(app_name = %app_name, dht_key = %key_str, "get DHT providers");
131+
let providers = ipfs.dht_get_providers(key)?;
132+
tracing::debug!(app_name = %app_name, dht_key = %key_str, "got DHT providers");
133+
134+
let providers = providers.iter().map(ToString::to_string).collect();
135+
Ok(providers)
136+
}
137+
138+
/// Returns the peer id of the node.
139+
pub(crate) fn hermes_ipfs_get_peer_identity(
140+
app_name: &ApplicationName
141+
) -> Result<hermes_ipfs::PeerInfo, Errno> {
142+
let ipfs = HERMES_IPFS.get().ok_or(Errno::ServiceUnavailable)?;
143+
tracing::debug!(app_name = %app_name, "Get peer identity");
144+
let identity = ipfs.get_peer_identity()?;
145+
tracing::debug!(app_name = %app_name, "Got peer identity");
146+
147+
Ok(identity)
148+
}
149+
109150
/// Subscribe to a topic
110151
pub(crate) fn hermes_ipfs_subscribe(
111152
app_name: &ApplicationName,

hermes/bin/src/ipfs/mod.rs

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@ use std::{
77
};
88

99
pub(crate) use api::{
10-
hermes_ipfs_add_file, hermes_ipfs_content_validate, hermes_ipfs_evict_peer,
11-
hermes_ipfs_get_dht_value, hermes_ipfs_get_file, hermes_ipfs_pin_file, hermes_ipfs_publish,
10+
hermes_ipfs_add_file, hermes_ipfs_content_validate, hermes_ipfs_dht_get_providers,
11+
hermes_ipfs_dht_provide, hermes_ipfs_evict_peer, hermes_ipfs_get_dht_value,
12+
hermes_ipfs_get_file, hermes_ipfs_get_peer_identity, hermes_ipfs_pin_file, hermes_ipfs_publish,
1213
hermes_ipfs_put_dht_value, hermes_ipfs_subscribe, hermes_ipfs_unpin_file,
1314
};
1415
use dashmap::DashMap;
@@ -252,6 +253,48 @@ where N: hermes_ipfs::rust_ipfs::NetworkBehaviour<ToSwarm = Infallible> + Send +
252253
cmd_rx.blocking_recv().map_err(|_| Errno::DhtGetError)?
253254
}
254255

256+
/// Provide a DHT value
257+
fn dht_provide(
258+
&self,
259+
key: DhtKey,
260+
) -> Result<(), Errno> {
261+
let (cmd_tx, cmd_rx) = oneshot::channel();
262+
self.sender
263+
.as_ref()
264+
.ok_or(Errno::DhtProvideError)?
265+
.blocking_send(IpfsCommand::DhtProvide(key, cmd_tx))
266+
.map_err(|_| Errno::DhtProvideError)?;
267+
cmd_rx.blocking_recv().map_err(|_| Errno::DhtProvideError)?
268+
}
269+
270+
/// Get providers of a DHT value
271+
fn dht_get_providers(
272+
&self,
273+
key: DhtKey,
274+
) -> Result<HashSet<hermes_ipfs::PeerId>, Errno> {
275+
let (cmd_tx, cmd_rx) = oneshot::channel();
276+
self.sender
277+
.as_ref()
278+
.ok_or(Errno::DhtGetProvidersError)?
279+
.blocking_send(IpfsCommand::DhtGetProviders(key, cmd_tx))
280+
.map_err(|_| Errno::DhtGetProvidersError)?;
281+
cmd_rx
282+
.blocking_recv()
283+
.map_err(|_| Errno::DhtGetProvidersError)?
284+
}
285+
286+
/// Get the peer identity
287+
// TODO[rafal-ch]: We should not be using API errors here.
288+
fn get_peer_identity(&self) -> Result<hermes_ipfs::PeerInfo, Errno> {
289+
let (cmd_tx, cmd_rx) = oneshot::channel();
290+
self.sender
291+
.as_ref()
292+
.ok_or(Errno::GetPeerIdError)?
293+
.blocking_send(IpfsCommand::Identity(None, cmd_tx))
294+
.map_err(|_| Errno::GetPeerIdError)?;
295+
cmd_rx.blocking_recv().map_err(|_| Errno::GetPeerIdError)?
296+
}
297+
255298
/// Publish message to a `PubSub` topic
256299
fn pubsub_publish(
257300
&self,

hermes/bin/src/ipfs/task.rs

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
//! IPFS Task
2-
use std::str::FromStr;
2+
use std::{collections::HashSet, str::FromStr};
33

44
use hermes_ipfs::{AddIpfsFile, Cid, HermesIpfs, IpfsPath as PathIpfsFile, PeerId as TargetPeerId};
55
use tokio::{
@@ -32,15 +32,28 @@ pub(crate) enum IpfsCommand {
3232
GetDhtValue(DhtKey, oneshot::Sender<Result<DhtValue, Errno>>),
3333
/// Put DHT value
3434
PutDhtValue(DhtKey, DhtValue, oneshot::Sender<Result<bool, Errno>>),
35+
/// Provide a DHT value
36+
DhtProvide(DhtKey, oneshot::Sender<Result<(), Errno>>),
37+
/// Get providers of a DHT value
38+
DhtGetProviders(
39+
DhtKey,
40+
oneshot::Sender<Result<HashSet<hermes_ipfs::PeerId>, Errno>>,
41+
),
3542
/// Publish to a topic
3643
Publish(PubsubTopic, MessageData, oneshot::Sender<Result<(), Errno>>),
3744
/// Subscribe to a topic
3845
Subscribe(PubsubTopic, oneshot::Sender<Result<JoinHandle<()>, Errno>>),
3946
/// Evict Peer from node
4047
EvictPeer(PeerId, oneshot::Sender<Result<bool, Errno>>),
48+
/// Gets the peer identity
49+
Identity(
50+
Option<PeerId>,
51+
oneshot::Sender<Result<hermes_ipfs::PeerInfo, Errno>>,
52+
),
4153
}
4254

4355
/// Handle IPFS commands in asynchronous task.
56+
#[allow(clippy::too_many_lines)]
4457
pub(crate) async fn ipfs_command_handler(
4558
hermes_node: HermesIpfs,
4659
mut queue_rx: mpsc::Receiver<IpfsCommand>,
@@ -130,6 +143,41 @@ pub(crate) async fn ipfs_command_handler(
130143
let status = hermes_node.ban_peer(peer_id).await.is_ok();
131144
send_response(Ok(status), tx);
132145
},
146+
IpfsCommand::DhtProvide(key, tx) => {
147+
let response = hermes_node.dht_provide(key.clone()).await.map_err(|err| {
148+
tracing::error!(dht_key = ?key, "DHT provide failed: {}", err);
149+
Errno::DhtProvideError
150+
});
151+
send_response(response, tx);
152+
},
153+
IpfsCommand::DhtGetProviders(key, tx) => {
154+
let response = hermes_node
155+
.dht_get_providers(key.clone())
156+
.await
157+
.map_err(|err| {
158+
tracing::error!(dht_key = ?key, "DHT get providers failed: {}", err);
159+
Errno::DhtGetProvidersError
160+
});
161+
send_response(response, tx);
162+
},
163+
IpfsCommand::Identity(peer_id, tx) => {
164+
let peer_id = match peer_id {
165+
Some(peer_id) => {
166+
Some(
167+
hermes_ipfs::PeerId::from_str(&peer_id)
168+
.map_err(|_| Errno::InvalidPeerId)?,
169+
)
170+
},
171+
None => None,
172+
};
173+
174+
let response = hermes_node.identity(peer_id).await.map_err(|err| {
175+
tracing::error!(peer_id = ?peer_id, "Identity failed: {}", err);
176+
Errno::GetPeerIdError
177+
});
178+
179+
send_response(response, tx);
180+
},
133181
}
134182
}
135183
hermes_node.stop().await;

0 commit comments

Comments
 (0)