Skip to content

Commit 20c9256

Browse files
committed
[examples] add hybrid-task-scheduling example.
1 parent 843ad64 commit 20c9256

File tree

4 files changed

+168
-0
lines changed

4 files changed

+168
-0
lines changed

Cargo.lock

Lines changed: 9 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ members = [
3131
"examples/httpserver",
3232
"examples/httpserver",
3333
"examples/shell",
34+
"examples/hybrid-task-schedule",
3435
]
3536

3637
[workspace.package]
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
[package]
2+
name = "hybrid-task-schedule"
3+
version.workspace = true
4+
edition.workspace = true
5+
authors.workspace = true
6+
license.workspace = true
7+
homepage.workspace = true
8+
documentation.workspace = true
9+
repository.workspace = true
10+
keywords.workspace = true
11+
categories.workspace = true
12+
13+
[dependencies]
14+
axstd = { workspace = true, features = [
15+
"alloc",
16+
"multitask",
17+
"net",
18+
"irq",
19+
], optional = true }
20+
axasync-std = { workspace = true }
21+
rand = { version = "0.9.1", default-features = false, features = ["small_rng"] }
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
//! Hybrid task scheduling examples
2+
3+
#![cfg_attr(feature = "axstd", no_std)]
4+
#![cfg_attr(feature = "axstd", no_main)]
5+
6+
#[macro_use]
7+
#[cfg(feature = "axstd")]
8+
extern crate axstd as std;
9+
10+
extern crate axasync_std as async_std;
11+
12+
use rand::{RngCore, SeedableRng, rngs::SmallRng};
13+
use std::thread;
14+
use std::{sync::Arc, vec::Vec};
15+
16+
#[cfg(feature = "axstd")]
17+
use std::os::arceos::api::task::{self as api, AxWaitQueueHandle};
18+
19+
const NUM_DATA: usize = 2_000_000;
20+
const NUM_TASKS: usize = 100;
21+
22+
#[cfg(feature = "axstd")]
23+
fn barrier() {
24+
use std::sync::atomic::{AtomicUsize, Ordering};
25+
static BARRIER_WQ: AxWaitQueueHandle = AxWaitQueueHandle::new();
26+
static BARRIER_COUNT: AtomicUsize = AtomicUsize::new(0);
27+
28+
BARRIER_COUNT.fetch_add(1, Ordering::Relaxed);
29+
api::ax_wait_queue_wait_until(
30+
&BARRIER_WQ,
31+
|| BARRIER_COUNT.load(Ordering::Relaxed) == NUM_TASKS,
32+
None,
33+
);
34+
api::ax_wait_queue_wake(&BARRIER_WQ, u32::MAX); // wakeup all
35+
}
36+
37+
#[cfg(feature = "axstd")]
38+
async fn barrier_f() {
39+
use std::sync::atomic::{AtomicUsize, Ordering};
40+
static BARRIER_WQ: AxWaitQueueHandle = AxWaitQueueHandle::new();
41+
static BARRIER_COUNT: AtomicUsize = AtomicUsize::new(0);
42+
43+
BARRIER_COUNT.fetch_add(1, Ordering::Relaxed);
44+
api::ax_wait_queue_wait_until_f(
45+
&BARRIER_WQ,
46+
|| BARRIER_COUNT.load(Ordering::Relaxed) == NUM_TASKS,
47+
None,
48+
)
49+
.await;
50+
api::ax_wait_queue_wake(&BARRIER_WQ, u32::MAX); // wakeup all
51+
}
52+
53+
#[cfg(not(feature = "axstd"))]
54+
fn barrier() {
55+
use std::sync::{Barrier, OnceLock};
56+
static BARRIER: OnceLock<Barrier> = OnceLock::new();
57+
BARRIER.get_or_init(|| Barrier::new(NUM_TASKS)).wait();
58+
}
59+
60+
fn sqrt(n: &u64) -> u64 {
61+
let mut x = *n;
62+
loop {
63+
if x * x <= *n && (x + 1) * (x + 1) > *n {
64+
return x;
65+
}
66+
x = (x + *n / x) / 2;
67+
}
68+
}
69+
70+
#[cfg_attr(feature = "axstd", unsafe(no_mangle))]
71+
fn main() {
72+
let mut rng = SmallRng::seed_from_u64(0xdead_beef);
73+
let vec = Arc::new(
74+
(0..NUM_DATA)
75+
.map(|_| rng.next_u32() as u64)
76+
.collect::<Vec<_>>(),
77+
);
78+
let expect: u64 = vec.iter().map(sqrt).sum();
79+
80+
let mut tasks = Vec::with_capacity(NUM_TASKS);
81+
for i in 0..NUM_TASKS {
82+
let vec = vec.clone();
83+
tasks.push(async_std::task::spawn(move || async move {
84+
let left = i * (NUM_DATA / NUM_TASKS);
85+
let right = (left + (NUM_DATA / NUM_TASKS)).min(NUM_DATA);
86+
println!(
87+
"part {}: {:?} [{}, {})",
88+
i,
89+
thread::current().id(),
90+
left,
91+
right
92+
);
93+
94+
async_std::task::spawn(|| async {
95+
println!("spawn a thread");
96+
})
97+
.join()
98+
.unwrap();
99+
100+
let partial_sum: u64 = vec[left..right].iter().map(sqrt).sum();
101+
barrier();
102+
async_std::task::yield_now().await;
103+
#[cfg(feature = "axstd")]
104+
barrier_f().await;
105+
async_std::task::sleep(core::time::Duration::from_millis(1)).await;
106+
107+
println!("part {}: {:?} finished", i, thread::current().id());
108+
partial_sum
109+
}));
110+
}
111+
112+
let actual = tasks.into_iter().map(|t| t.join().unwrap()).sum();
113+
println!("sum = {}", actual);
114+
assert_eq!(expect, actual);
115+
116+
println!("Parallel summation tests run OK!");
117+
async_std::block_on! {hello_world()};
118+
async_std::callasync! {test()};
119+
}
120+
121+
async fn hello_world() {
122+
println!("hello world!");
123+
}
124+
125+
async fn test() -> i32 {
126+
let mut flag = false;
127+
core::future::poll_fn(|_cx| {
128+
if !flag {
129+
flag = true;
130+
core::task::Poll::Pending
131+
} else {
132+
core::task::Poll::Ready(())
133+
}
134+
})
135+
.await;
136+
43
137+
}

0 commit comments

Comments
 (0)