Skip to content

Commit 5a5ef6d

Browse files
committed
Release 0.2.3
- Add event queue
1 parent 5136c09 commit 5a5ef6d

File tree

4 files changed

+275
-2
lines changed

4 files changed

+275
-2
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "spnr-lib"
3-
version = "0.2.2"
3+
version = "0.2.3"
44
edition = "2021"
55
authors = [ "spnr@spnr.app" ]
66
description = "Rust library for building smart contracts on the Internet Computer, by the Spinner.Cash team."

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ More specifically it is used by [Spinner.Cash], a decentralized layer-2 protocol
1010
- [x] Stable storage.
1111
- [x] Append-only log using stable storage.
1212
- [x] Flat merkle tree.
13-
- [ ] Event queue.
13+
- [x] Event queue.
1414

1515
All source code is original and released under [GPLv3](./LICENSE).
1616
Please make sure you understand the requirement and risk before using them in your own projects.

src/event_queue.rs

Lines changed: 272 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,272 @@
1+
/// An event queue ordered by expiry time.
2+
///
3+
/// Each event has an id and status, and they can be inserted/looked up/purged from the queue.
4+
use ic_cdk::export::candid::CandidType;
5+
use serde::{Deserialize, Serialize};
6+
use std::collections::{btree_map::Entry, BTreeMap};
7+
use std::{error, fmt};
8+
9+
#[derive(
10+
CandidType, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Hash,
11+
)]
12+
/// The `nonce` field of an `EventId` must be unique, and is usually randomly generated.
13+
pub struct EventId<I> {
14+
/// Expiration time since UNIX epoch (in nanoseconds).
15+
pub expiry: u64,
16+
/// Unique identifier of an event.
17+
pub nonce: I,
18+
}
19+
20+
impl<I> EventId<I> {
21+
pub fn new(expiry: u64, nonce: I) -> Self {
22+
Self { expiry, nonce }
23+
}
24+
}
25+
26+
#[derive(Clone, Debug)]
27+
/// Error when an operation cannot be performed because the event queue reaches its max capacity.
28+
pub struct EventQueueFullError;
29+
30+
impl error::Error for EventQueueFullError {}
31+
32+
impl fmt::Display for EventQueueFullError {
33+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
34+
f.write_str("Event queue is full")
35+
}
36+
}
37+
38+
#[derive(CandidType, Serialize, Deserialize, Clone, Debug, Hash)]
39+
/// An event queue is an ordered list of events of nonce type `I` and status type `T`.
40+
/// Its max capacity is fixed at initialization.
41+
pub struct EventQueue<I: Ord, T> {
42+
capacity: usize,
43+
events: BTreeMap<EventId<I>, T>,
44+
index: BTreeMap<I, EventId<I>>,
45+
}
46+
47+
impl<I: Ord, T> Default for EventQueue<I, T> {
48+
/// Return an empty event queue of zero capacity.
49+
fn default() -> Self {
50+
EventQueue::new(0)
51+
}
52+
}
53+
54+
fn is_vacant<K, V>(entry: &Entry<K, V>) -> bool {
55+
matches!(entry, Entry::Vacant(_))
56+
}
57+
58+
impl<I: Ord, T> EventQueue<I, T> {
59+
/// Return an event queue of the given max capacity.
60+
pub fn new(capacity: usize) -> Self {
61+
Self {
62+
capacity,
63+
events: BTreeMap::default(),
64+
index: BTreeMap::default(),
65+
}
66+
}
67+
68+
/// Insert an event of the given id and status into the queue.
69+
/// Return error if the queue is full.
70+
/// This operation is `O(log(n))`.
71+
pub fn insert(&mut self, id: EventId<I>, event: T) -> Result<(), EventQueueFullError>
72+
where
73+
I: Clone,
74+
{
75+
let size = self.events.len();
76+
let entry = self.events.entry(id.clone());
77+
if is_vacant(&entry) {
78+
if size >= self.capacity {
79+
return Err(EventQueueFullError);
80+
}
81+
entry.or_insert(event);
82+
self.index.insert(id.nonce.clone(), id);
83+
} else {
84+
entry.and_modify(|v| *v = event);
85+
}
86+
Ok(())
87+
}
88+
89+
/// Remove and return the first matching event.
90+
/// This operation is linear in the events it has to search through.
91+
pub fn pop_front<F: Fn(&T) -> bool>(&mut self, matching: F) -> Option<(EventId<I>, T)>
92+
where
93+
I: Clone,
94+
{
95+
if let Some(id) =
96+
self.events
97+
.iter()
98+
.find_map(|(id, e)| if matching(e) { Some(id.clone()) } else { None })
99+
{
100+
self.index.remove(&id.nonce);
101+
self.events.remove(&id).map(|e| (id.clone(), e))
102+
} else {
103+
None
104+
}
105+
}
106+
107+
/// Remove all events of the matching status with an expiry less than the given expiry.
108+
/// This operation is in the number of events matching the expiry condition.
109+
pub fn purge<F: Fn(&T) -> bool>(&mut self, expiry: u64, matching: F)
110+
where
111+
I: Default,
112+
{
113+
let key = EventId {
114+
expiry,
115+
nonce: I::default(),
116+
};
117+
let mut newer_events = self.events.split_off(&key);
118+
self.events.retain(|k, v| {
119+
let to_remove = matching(v);
120+
if to_remove {
121+
self.index.remove(&k.nonce);
122+
}
123+
!to_remove
124+
});
125+
self.events.append(&mut newer_events);
126+
}
127+
128+
/// Remove all events with an expiry less than the given expiry.
129+
/// This operation is `O(log(n))`.
130+
pub fn purge_expired(&mut self, expiry: u64)
131+
where
132+
I: Default,
133+
{
134+
let key = EventId {
135+
expiry,
136+
nonce: I::default(),
137+
};
138+
self.events = self.events.split_off(&key);
139+
}
140+
141+
/// Lookup event status given an event id.
142+
/// This operation is `O(log(n))`.
143+
pub fn get(&self, id: &EventId<I>) -> Option<&T> {
144+
self.events.get(id)
145+
}
146+
147+
/// Modify the status of an event given its id.
148+
/// This operation is `O(log(n))`.
149+
pub fn modify<F: FnOnce(&mut T)>(&mut self, id: EventId<I>, f: F) {
150+
self.events.entry(id).and_modify(|v: &mut T| f(v));
151+
}
152+
153+
/// Return both an event id and its status if the given nonce is found.
154+
/// This operation is `O(log(n))`.
155+
pub fn find(&self, nonce: &I) -> Option<(&EventId<I>, &T)> {
156+
self.index
157+
.get(nonce)
158+
.and_then(|id| self.events.get(id).map(|e| (id, e)))
159+
}
160+
161+
/// Return number of events in the queue.
162+
/// This operation is constant time.
163+
pub fn len(&self) -> usize {
164+
self.events.len()
165+
}
166+
167+
/// Return true if the queue is empty.
168+
/// This operation is constant time.
169+
pub fn is_empty(&self) -> bool {
170+
self.events.is_empty()
171+
}
172+
173+
/// Return an iterator of all events in the order of their `EventId`, i.e., ordered by expiry first, then by nonce.
174+
pub fn iter(&self) -> Box<dyn Iterator<Item = (&EventId<I>, &T)> + '_> {
175+
Box::new(self.events.iter())
176+
}
177+
178+
/// Check consistency of the event queue.
179+
/// It should always return true if the implementation is correct.
180+
#[cfg(test)]
181+
fn selfcheck(&self) -> bool
182+
where
183+
I: Eq + Clone,
184+
{
185+
use std::collections::BTreeSet;
186+
self.len() <= self.capacity
187+
&& self.events.len() == self.index.len()
188+
&& self.events.keys().collect::<Vec<_>>() == self.index.values().collect::<Vec<_>>()
189+
&& self.index.keys().cloned().collect::<BTreeSet<_>>().len() == self.index.len()
190+
}
191+
}
192+
193+
#[cfg(test)]
194+
mod test {
195+
use super::*;
196+
use assert_matches::*;
197+
198+
#[derive(Debug, PartialEq, Eq)]
199+
enum Item {
200+
Begin(u8),
201+
Middle(u8),
202+
End(u8),
203+
}
204+
use Item::*;
205+
206+
fn is_begin(item: &Item) -> bool {
207+
matches!(item, Begin(_))
208+
}
209+
210+
fn is_middle(item: &Item) -> bool {
211+
matches!(item, Middle(_))
212+
}
213+
214+
fn is_end(item: &Item) -> bool {
215+
matches!(item, End(_))
216+
}
217+
218+
#[test]
219+
fn basic() {
220+
let mut q: EventQueue<u8, Item> = EventQueue::new(10);
221+
assert!(q.is_empty());
222+
assert_eq!(q.len(), 0);
223+
assert!(q.selfcheck());
224+
225+
for i in 0..10 {
226+
let expiry = if i < 5 { 0 } else { 1 };
227+
assert_matches!(q.insert(EventId::new(expiry, i), Begin(i)), Ok(_));
228+
assert!(q.selfcheck());
229+
}
230+
// cannot insert any more
231+
assert_matches!(
232+
q.insert(EventId::new(2, 0), Begin(0)),
233+
Err(EventQueueFullError)
234+
);
235+
assert!(q.selfcheck());
236+
// still allow insertion of existing key
237+
assert_matches!(q.insert(EventId::new(0, 1), Middle(10)), Ok(_));
238+
assert_matches!(q.insert(EventId::new(0, 4), End(40)), Ok(_));
239+
assert!(q.selfcheck());
240+
241+
// confirm if everything is in there
242+
for i in 0..10 {
243+
assert_matches!(q.find(&i), Some(_));
244+
}
245+
246+
// test pop_front
247+
assert_eq!(q.pop_front(is_begin), Some((EventId::new(0, 0), Begin(0))));
248+
assert!(q.selfcheck());
249+
assert_eq!(q.pop_front(is_begin), Some((EventId::new(0, 2), Begin(2))));
250+
assert!(q.selfcheck());
251+
assert_eq!(
252+
q.pop_front(is_middle),
253+
Some((EventId::new(0, 1), Middle(10)))
254+
);
255+
assert!(q.selfcheck());
256+
assert_eq!(q.len(), 7);
257+
258+
// test purge
259+
q.purge(1, is_begin);
260+
assert_eq!(q.len(), 6);
261+
assert!(q.selfcheck());
262+
assert_eq!(q.pop_front(is_begin), Some((EventId::new(1, 5), Begin(5))));
263+
assert!(q.selfcheck());
264+
assert_eq!(q.pop_front(is_end), Some((EventId::new(0, 4), End(40))));
265+
assert!(q.selfcheck());
266+
assert_eq!(q.len(), 4);
267+
268+
q.purge(2, is_begin);
269+
assert_eq!(q.len(), 0);
270+
assert!(q.selfcheck());
271+
}
272+
}

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
//! [Internet Computer]: https://wiki.internetcomputer.org
1313
//! [Spinner.Cash]: https://github.com/spinner-cash
1414
15+
pub mod event_queue;
1516
pub mod flat_tree;
1617
pub mod log;
1718
pub mod storage;

0 commit comments

Comments
 (0)