Skip to content

Commit b638e61

Browse files
committed
Reorganized Activator as Scheduler
1 parent edcb654 commit b638e61

File tree

3 files changed

+62
-41
lines changed

3 files changed

+62
-41
lines changed

timely/src/progress/subgraph.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,10 +300,11 @@ where
300300
self.propagate_pointstamps();
301301

302302
{ // Enqueue active children; scoped to let borrow drop.
303+
use crate::scheduling::activate::Scheduler;
303304
let temp_active = &mut self.temp_active;
304305
self.activations
305306
.borrow_mut()
306-
.for_extensions(&self.path[..], |index| temp_active.push(Reverse(index)));
307+
.extensions(&self.path[..], temp_active);
307308
}
308309

309310
// Schedule child operators.

timely/src/scheduling/activate.rs

Lines changed: 41 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -8,30 +8,45 @@ use std::time::{Duration, Instant};
88
use std::cmp::Reverse;
99
use crossbeam_channel::{Sender, Receiver};
1010

11-
/// Methods required to act as a timely scheduler.
11+
/// Methods required to act as a scheduler for timely operators.
1212
///
13-
/// The core methods are the activation of "paths", sequences of integers, and
14-
/// the enumeration of active paths by prefix. A scheduler may delay the report
15-
/// of a path indefinitely, but it should report at least one extension for the
16-
/// empty path `&[]` or risk parking the worker thread without a certain unpark.
13+
/// Operators are described by "paths" of integers, indicating the path along
14+
/// a tree of regions, arriving at the the operator. Each path is either "idle"
15+
/// or "active", where the latter indicates that someone has requested that the
16+
/// operator be scheduled by the worker. Operators go from idle to active when
17+
/// the `activate(path)` method is called, and from active to idle when the path
18+
/// is returned through a call to `extensions(path, _)`.
1719
///
18-
/// There is no known harm to "spurious wake-ups" where a not-active path is
19-
/// returned through `extensions()`.
20+
/// The worker will continually probe for extensions to the root empty path `[]`,
21+
/// and then follow all returned addresses, recursively. A scheduler need not
22+
/// schedule all active paths, but it should return *some* active path when the
23+
/// worker probes the empty path, or the worker may put the thread to sleep.
24+
///
25+
/// There is no known harm to scheduling an idle path.
26+
/// The worker may speculatively schedule paths of its own accord.
2027
pub trait Scheduler {
2128
/// Mark a path as immediately scheduleable.
29+
///
30+
/// The scheduler is not required to immediately schedule the path, but it
31+
/// should not signal that it has no work until the path has been scheduled.
2232
fn activate(&mut self, path: &[usize]);
2333
/// Populates `dest` with next identifiers on active extensions of `path`.
2434
///
2535
/// This method is where a scheduler is allowed to exercise some discretion,
2636
/// in that it does not need to present *all* extensions, but it can instead
27-
/// present only those that the runtime should schedule.
28-
fn extensions(&mut self, path: &[usize], dest: &mut Vec<usize>);
37+
/// present only those that the runtime should immediately schedule.
38+
///
39+
/// The worker *will* schedule all extensions before probing new prefixes.
40+
/// The scheduler is invited to rely on this, and to schedule in "batches",
41+
/// where the next time the worker probes for extensions to the empty path
42+
/// then all addresses in the batch have certainly been scheduled.
43+
fn extensions(&mut self, path: &[usize], dest: &mut BinaryHeap<Reverse<usize>>);
2944
}
3045

3146
// Trait objects can be schedulers too.
3247
impl Scheduler for Box<dyn Scheduler> {
3348
fn activate(&mut self, path: &[usize]) { (**self).activate(path) }
34-
fn extensions(&mut self, path: &[usize], dest: &mut Vec<usize>) { (**self).extensions(path, dest) }
49+
fn extensions(&mut self, path: &[usize], dest: &mut BinaryHeap<Reverse<usize>>) { (**self).extensions(path, dest) }
3550
}
3651

3752
/// Allocation-free activation tracker.
@@ -93,7 +108,7 @@ impl Activations {
93108
}
94109

95110
/// Discards the current active set and presents the next active set.
96-
pub fn advance(&mut self) {
111+
fn advance(&mut self) {
97112

98113
// Drain inter-thread activations.
99114
while let Ok(path) = self.rx.try_recv() {
@@ -130,15 +145,15 @@ impl Activations {
130145
self.clean = self.bounds.len();
131146
}
132147

133-
/// Maps a function across activated paths.
134-
pub fn map_active(&self, logic: impl Fn(&[usize])) {
135-
for (offset, length) in self.bounds.iter() {
136-
logic(&self.slices[*offset .. (*offset + *length)]);
137-
}
138-
}
139-
140148
/// Sets as active any symbols that follow `path`.
141-
pub fn for_extensions(&self, path: &[usize], mut action: impl FnMut(usize)) {
149+
fn for_extensions(&mut self, path: &[usize], mut action: impl FnMut(usize)) {
150+
151+
// Each call for the root path is a moment where the worker has reset.
152+
// This relies on a worker implementation that follows the scheduling
153+
// instructions perfectly; if any offered paths are not explored, oops.
154+
if path.is_empty() {
155+
self.advance();
156+
}
142157

143158
let position =
144159
self.bounds[..self.clean]
@@ -211,13 +226,14 @@ impl Activations {
211226
std::thread::park();
212227
}
213228
}
229+
}
214230

215-
/// True iff there are no immediate activations.
216-
///
217-
/// Used by others to guard work done in anticipation of potentially parking.
218-
/// An alternate method name could be `would_park`.
219-
pub fn is_idle(&self) -> bool {
220-
self.bounds.is_empty() && self.timer.is_none()
231+
impl Scheduler for Activations {
232+
fn activate(&mut self, path: &[usize]) {
233+
self.activate(path);
234+
}
235+
fn extensions(&mut self, path: &[usize], dest: &mut BinaryHeap<Reverse<usize>>) {
236+
self.for_extensions(path, |index| dest.push(Reverse(index)));
221237
}
222238
}
223239

timely/src/worker.rs

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@
22
33
use std::rc::Rc;
44
use std::cell::{RefCell, RefMut};
5+
use std::cmp::Reverse;
56
use std::any::Any;
67
use std::str::FromStr;
78
use std::time::{Instant, Duration};
8-
use std::collections::HashMap;
9+
use std::collections::{HashMap, BinaryHeap};
910
use std::collections::hash_map::Entry;
1011
use std::sync::Arc;
1112

@@ -230,7 +231,7 @@ pub struct Worker<A: Allocate> {
230231
logging: Option<Rc<RefCell<crate::logging_core::Registry>>>,
231232

232233
activations: Rc<RefCell<Activations>>,
233-
active_dataflows: Vec<usize>,
234+
active_dataflows: BinaryHeap<Reverse<usize>>,
234235

235236
// Temporary storage for channel identifiers during dataflow construction.
236237
// These are then associated with a dataflow once constructed.
@@ -370,12 +371,20 @@ impl<A: Allocate> Worker<A> {
370371
}
371372
}
372373

373-
// Organize activations.
374-
self.activations
375-
.borrow_mut()
376-
.advance();
374+
// Commence a new round of scheduling, starting with dataflows.
375+
// We probe the scheduler for active prefixes, where an empty response
376+
// indicates that the scheduler has no work for us at the moment.
377+
{ // Scoped to let borrow of `self.active_dataflows` drop.
378+
use crate::scheduling::activate::Scheduler;
379+
let active_dataflows = &mut self.active_dataflows;
380+
self.activations
381+
.borrow_mut()
382+
.extensions(&[], active_dataflows);
383+
}
384+
385+
// If no dataflows are active, there is nothing to do. Consider parking.
386+
if self.active_dataflows.is_empty() {
377387

378-
if self.activations.borrow().is_idle() {
379388
// If the timeout is zero, don't bother trying to park.
380389
// More generally, we could put some threshold in here.
381390
if timeout != Some(Duration::new(0, 0)) {
@@ -393,15 +402,10 @@ impl<A: Allocate> Worker<A> {
393402
self.logging().as_mut().map(|l| l.log(crate::logging::ParkEvent::unpark()));
394403
}
395404
}
396-
else { // Schedule active dataflows.
397-
398-
let active_dataflows = &mut self.active_dataflows;
399-
self.activations
400-
.borrow_mut()
401-
.for_extensions(&[], |index| active_dataflows.push(index));
405+
else { // Schedule all active dataflows.
402406

403407
let mut dataflows = self.dataflows.borrow_mut();
404-
for index in active_dataflows.drain(..) {
408+
for Reverse(index) in self.active_dataflows.drain() {
405409
// Step dataflow if it exists, remove if not incomplete.
406410
if let Entry::Occupied(mut entry) = dataflows.entry(index) {
407411
// TODO: This is a moment at which a scheduling decision is being made.
@@ -740,7 +744,7 @@ impl<A: Allocate> Clone for Worker<A> {
740744
dataflow_counter: Rc::clone(&self.dataflow_counter),
741745
logging: self.logging.clone(),
742746
activations: Rc::clone(&self.activations),
743-
active_dataflows: Vec::new(),
747+
active_dataflows: Default::default(),
744748
temp_channel_ids: Rc::clone(&self.temp_channel_ids),
745749
}
746750
}

0 commit comments

Comments
 (0)