Skip to content

Commit edcb654

Browse files
committed
Move parking from communication into scheduler
1 parent 7578c1e commit edcb654

File tree

8 files changed

+44
-68
lines changed

8 files changed

+44
-68
lines changed

communication/src/allocator/generic.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -113,15 +113,6 @@ impl Allocate for Generic {
113113
fn receive(&mut self) { self.receive(); }
114114
fn release(&mut self) { self.release(); }
115115
fn events(&self) -> &Rc<RefCell<Vec<usize>>> { self.events() }
116-
fn await_events(&self, _duration: Option<std::time::Duration>) {
117-
match self {
118-
Generic::Thread(t) => t.await_events(_duration),
119-
Generic::Process(p) => p.await_events(_duration),
120-
Generic::ProcessBinary(pb) => pb.await_events(_duration),
121-
Generic::ZeroCopy(z) => z.await_events(_duration),
122-
Generic::ZeroCopyBinary(z) => z.await_events(_duration),
123-
}
124-
}
125116
}
126117

127118

communication/src/allocator/mod.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
33
use std::rc::Rc;
44
use std::cell::RefCell;
5-
use std::time::Duration;
65

76
pub use self::thread::Thread;
87
pub use self::process::Process;
@@ -57,14 +56,6 @@ pub trait Allocate {
5756
/// into a performance problem.
5857
fn events(&self) -> &Rc<RefCell<Vec<usize>>>;
5958

60-
/// Awaits communication events.
61-
///
62-
/// This method may park the current thread, for at most `duration`,
63-
/// until new events arrive.
64-
/// The method is not guaranteed to wait for any amount of time, but
65-
/// good implementations should use this as a hint to park the thread.
66-
fn await_events(&self, _duration: Option<Duration>) { }
67-
6859
/// Ensure that received messages are surfaced in each channel.
6960
///
7061
/// This method should be called to ensure that received messages are

communication/src/allocator/process.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ use std::rc::Rc;
44
use std::cell::RefCell;
55
use std::sync::{Arc, Mutex};
66
use std::any::Any;
7-
use std::time::Duration;
87
use std::collections::{HashMap};
98
use std::sync::mpsc::{Sender, Receiver};
109

@@ -182,10 +181,6 @@ impl Allocate for Process {
182181
self.inner.events()
183182
}
184183

185-
fn await_events(&self, duration: Option<Duration>) {
186-
self.inner.await_events(duration);
187-
}
188-
189184
fn receive(&mut self) {
190185
let mut events = self.inner.events().borrow_mut();
191186
while let Ok(index) = self.counters_recv.try_recv() {

communication/src/allocator/thread.rs

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
33
use std::rc::Rc;
44
use std::cell::RefCell;
5-
use std::time::Duration;
65
use std::collections::VecDeque;
76

87
use crate::allocator::{Allocate, AllocateBuilder};
@@ -36,16 +35,6 @@ impl Allocate for Thread {
3635
fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
3736
&self.events
3837
}
39-
fn await_events(&self, duration: Option<Duration>) {
40-
if self.events.borrow().is_empty() {
41-
if let Some(duration) = duration {
42-
std::thread::park_timeout(duration);
43-
}
44-
else {
45-
std::thread::park();
46-
}
47-
}
48-
}
4938
}
5039

5140
/// Thread-local counting channel push endpoint.

communication/src/allocator/zero_copy/allocator.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,4 @@ impl<A: Allocate> Allocate for TcpAllocator<A> {
314314
fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
315315
self.inner.events()
316316
}
317-
fn await_events(&self, duration: Option<std::time::Duration>) {
318-
self.inner.await_events(duration);
319-
}
320317
}

communication/src/allocator/zero_copy/allocator_process.rs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -242,14 +242,4 @@ impl Allocate for ProcessAllocator {
242242
fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
243243
&self.events
244244
}
245-
fn await_events(&self, duration: Option<std::time::Duration>) {
246-
if self.events.borrow().is_empty() {
247-
if let Some(duration) = duration {
248-
std::thread::park_timeout(duration);
249-
}
250-
else {
251-
std::thread::park();
252-
}
253-
}
254-
}
255245
}

timely/src/scheduling/activate.rs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ impl Activations {
179179
/// This method should be used before putting a worker thread to sleep, as it
180180
/// indicates the amount of time before the thread should be unparked for the
181181
/// next scheduled activation.
182-
pub fn empty_for(&self) -> Option<Duration> {
182+
fn empty_for(&self) -> Option<Duration> {
183183
if !self.bounds.is_empty() || self.timer.is_none() {
184184
Some(Duration::new(0,0))
185185
}
@@ -191,6 +191,34 @@ impl Activations {
191191
})
192192
}
193193
}
194+
195+
/// Indicates that there is nothing to do for `timeout`, and that the scheduler
196+
/// can allow the thread to sleep until then.
197+
///
198+
/// The method does not *need* to park the thread, and indeed it may elect to
199+
/// unpark earlier if there are deferred activations.
200+
pub fn park_timeout(&self, timeout: Option<Duration>) {
201+
let empty_for = self.empty_for();
202+
let timeout = match (timeout, empty_for) {
203+
(Some(x), Some(y)) => Some(std::cmp::min(x,y)),
204+
(x, y) => x.or(y),
205+
};
206+
207+
if let Some(timeout) = timeout {
208+
std::thread::park_timeout(timeout);
209+
}
210+
else {
211+
std::thread::park();
212+
}
213+
}
214+
215+
/// True iff there are no immediate activations.
216+
///
217+
/// Used by others to guard work done in anticipation of potentially parking.
218+
/// An alternate method name could be `would_park`.
219+
pub fn is_idle(&self) -> bool {
220+
self.bounds.is_empty() && self.timer.is_none()
221+
}
194222
}
195223

196224
/// A thread-safe handle to an `Activations`.

timely/src/worker.rs

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ impl<A: Allocate> Worker<A> {
346346
/// worker.step_or_park(Some(Duration::from_secs(1)));
347347
/// });
348348
/// ```
349-
pub fn step_or_park(&mut self, duration: Option<Duration>) -> bool {
349+
pub fn step_or_park(&mut self, timeout: Option<Duration>) -> bool {
350350

351351
{ // Process channel events. Activate responders.
352352
let mut allocator = self.allocator.borrow_mut();
@@ -375,28 +375,23 @@ impl<A: Allocate> Worker<A> {
375375
.borrow_mut()
376376
.advance();
377377

378-
// Consider parking only if we have no pending events, some dataflows, and a non-zero duration.
379-
let empty_for = self.activations.borrow().empty_for();
380-
// Determine the minimum park duration, where `None` are an absence of a constraint.
381-
let delay = match (duration, empty_for) {
382-
(Some(x), Some(y)) => Some(std::cmp::min(x,y)),
383-
(x, y) => x.or(y),
384-
};
378+
if self.activations.borrow().is_idle() {
379+
// If the timeout is zero, don't bother trying to park.
380+
// More generally, we could put some threshold in here.
381+
if timeout != Some(Duration::new(0, 0)) {
382+
// Log parking and flush log.
383+
if let Some(l) = self.logging().as_mut() {
384+
l.log(crate::logging::ParkEvent::park(timeout));
385+
l.flush();
386+
}
385387

386-
if delay != Some(Duration::new(0,0)) {
388+
// We have just drained `allocator.events()` up above;
389+
// otherwise we should first check it for emptiness.
390+
self.activations.borrow().park_timeout(timeout);
387391

388-
// Log parking and flush log.
389-
if let Some(l) = self.logging().as_mut() {
390-
l.log(crate::logging::ParkEvent::park(delay));
391-
l.flush();
392+
// Log return from unpark.
393+
self.logging().as_mut().map(|l| l.log(crate::logging::ParkEvent::unpark()));
392394
}
393-
394-
self.allocator
395-
.borrow()
396-
.await_events(delay);
397-
398-
// Log return from unpark.
399-
self.logging().as_mut().map(|l| l.log(crate::logging::ParkEvent::unpark()));
400395
}
401396
else { // Schedule active dataflows.
402397

0 commit comments

Comments
 (0)