Skip to content

Commit 188d394

Browse files
Merge pull request #663 from frankmcsherry/timeless_option
Make `std::time::Instant` optional
2 parents 7ffb5e2 + 7578c1e commit 188d394

File tree

7 files changed

+69
-55
lines changed

7 files changed

+69
-55
lines changed

timely/examples/logging-send.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ fn main() {
1616
let probe = ProbeHandle::new();
1717

1818
// Register timely worker logging.
19-
worker.log_register().insert::<TimelyEventBuilder,_>("timely", |time, data|
19+
worker.log_register().unwrap().insert::<TimelyEventBuilder,_>("timely", |time, data|
2020
if let Some(data) = data {
2121
data.iter().for_each(|x| println!("LOG1: {:?}", x))
2222
}
@@ -28,7 +28,7 @@ fn main() {
2828
// Register timely progress logging.
2929
// Less generally useful: intended for debugging advanced custom operators or timely
3030
// internals.
31-
worker.log_register().insert::<TimelyProgressEventBuilder<usize>,_>("timely/progress/usize", |time, data|
31+
worker.log_register().unwrap().insert::<TimelyProgressEventBuilder<usize>,_>("timely/progress/usize", |time, data|
3232
if let Some(data) = data {
3333
data.iter().for_each(|x| {
3434
println!("PROGRESS: {:?}", x);
@@ -50,7 +50,7 @@ fn main() {
5050
}
5151
);
5252

53-
worker.log_register().insert::<TrackerEventBuilder<usize>,_>("timely/reachability/usize", |time, data|
53+
worker.log_register().unwrap().insert::<TrackerEventBuilder<usize>,_>("timely/reachability/usize", |time, data|
5454
if let Some(data) = data {
5555
data.iter().for_each(|x| {
5656
println!("REACHABILITY: {:?}", x);
@@ -61,7 +61,7 @@ fn main() {
6161
}
6262
);
6363

64-
worker.log_register().insert::<TimelySummaryEventBuilder<usize>,_>("timely/summary/usize", |time, data|
64+
worker.log_register().unwrap().insert::<TimelySummaryEventBuilder<usize>,_>("timely/summary/usize", |time, data|
6565
if let Some(data) = data {
6666
data.iter().for_each(|(_, x)| {
6767
println!("SUMMARY: {:?}", x);
@@ -81,7 +81,7 @@ fn main() {
8181
});
8282

8383
// Register timely worker logging.
84-
worker.log_register().insert::<TimelyEventBuilder,_>("timely", |time, data|
84+
worker.log_register().unwrap().insert::<TimelyEventBuilder,_>("timely", |time, data|
8585
if let Some(data) = data {
8686
data.iter().for_each(|x| println!("LOG2: {:?}", x))
8787
}
@@ -100,7 +100,7 @@ fn main() {
100100

101101
// Register user-level logging.
102102
type MyBuilder = CapacityContainerBuilder<Vec<(Duration, ())>>;
103-
worker.log_register().insert::<MyBuilder,_>("input", |time, data|
103+
worker.log_register().unwrap().insert::<MyBuilder,_>("input", |time, data|
104104
if let Some(data) = data {
105105
for element in data.iter() {
106106
println!("Round tick at: {:?}", element.0);
@@ -111,7 +111,7 @@ fn main() {
111111
}
112112
);
113113

114-
let input_logger = worker.log_register().get::<MyBuilder>("input").expect("Input logger absent");
114+
let input_logger = worker.log_register().unwrap().get::<MyBuilder>("input").expect("Input logger absent");
115115

116116
let timer = std::time::Instant::now();
117117

timely/examples/threadless.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ fn main() {
66

77
// create a naked single-threaded worker.
88
let allocator = timely::communication::allocator::Thread::default();
9-
let mut worker = timely::worker::Worker::new(WorkerConfig::default(), allocator);
9+
let mut worker = timely::worker::Worker::new(WorkerConfig::default(), allocator, None);
1010

1111
// create input and probe handles.
1212
let mut input = InputHandle::new();

timely/src/dataflow/scopes/child.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ where
7373
fn peek_identifier(&self) -> usize {
7474
self.parent.peek_identifier()
7575
}
76-
fn log_register(&self) -> ::std::cell::RefMut<crate::logging_core::Registry> {
76+
fn log_register(&self) -> Option<::std::cell::RefMut<crate::logging_core::Registry>> {
7777
self.parent.log_register()
7878
}
7979
}
@@ -135,8 +135,8 @@ where
135135
let path = self.addr_for_child(index);
136136

137137
let type_name = std::any::type_name::<T2>();
138-
let progress_logging = self.log_register().get(&format!("timely/progress/{type_name}"));
139-
let summary_logging = self.log_register().get(&format!("timely/summary/{type_name}"));
138+
let progress_logging = self.logger_for(&format!("timely/progress/{type_name}"));
139+
let summary_logging = self.logger_for(&format!("timely/summary/{type_name}"));
140140

141141
let subscope = RefCell::new(SubgraphBuilder::new_from(path, identifier, self.logging(), summary_logging, name));
142142
let result = {

timely/src/execute.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ where
154154
F: FnOnce(&mut Worker<crate::communication::allocator::thread::Thread>)->T+Send+Sync+'static
155155
{
156156
let alloc = crate::communication::allocator::thread::Thread::default();
157-
let mut worker = crate::worker::Worker::new(WorkerConfig::default(), alloc);
157+
let mut worker = crate::worker::Worker::new(WorkerConfig::default(), alloc, Some(std::time::Instant::now()));
158158
let result = func(&mut worker);
159159
while worker.has_dataflows() {
160160
worker.step_or_park(None);
@@ -320,7 +320,7 @@ where
320320
T: Send+'static,
321321
F: Fn(&mut Worker<<A as AllocateBuilder>::Allocator>)->T+Send+Sync+'static {
322322
initialize_from(builders, others, move |allocator| {
323-
let mut worker = Worker::new(worker_config.clone(), allocator);
323+
let mut worker = Worker::new(worker_config.clone(), allocator, Some(std::time::Instant::now()));
324324
let result = func(&mut worker);
325325
while worker.has_dataflows() {
326326
worker.step_or_park(None);

timely/src/progress/subgraph.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use std::cell::RefCell;
1010
use std::collections::BinaryHeap;
1111
use std::cmp::Reverse;
1212

13-
use crate::logging::{TimelyLogger as Logger, TimelyProgressEventBuilder};
13+
use crate::logging::TimelyLogger as Logger;
1414
use crate::logging::TimelySummaryLogger as SummaryLogger;
1515

1616
use crate::scheduling::Schedule;
@@ -182,10 +182,9 @@ where
182182
// The `None` argument is optional logging infrastructure.
183183
let type_name = std::any::type_name::<TInner>();
184184
let reachability_logging =
185-
worker.log_register()
186-
.get::<reachability::logging::TrackerEventBuilder<TInner>>(&format!("timely/reachability/{type_name}"))
187-
.map(|logger| reachability::logging::TrackerLogger::new(self.identifier, logger));
188-
let progress_logging = worker.log_register().get::<TimelyProgressEventBuilder<TInner>>(&format!("timely/progress/{type_name}"));
185+
worker.logger_for(&format!("timely/reachability/{type_name}"))
186+
.map(|logger| reachability::logging::TrackerLogger::new(self.identifier, logger));
187+
let progress_logging = worker.logger_for(&format!("timely/progress/{type_name}"));
189188
let (tracker, scope_summary) = builder.build(reachability_logging);
190189

191190
let progcaster = Progcaster::new(worker, Rc::clone(&self.path), self.identifier, self.logging.clone(), progress_logging);

timely/src/scheduling/activate.rs

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,14 @@ pub struct Activations {
4848
rx: Receiver<Vec<usize>>,
4949

5050
// Delayed activations.
51-
timer: Instant,
51+
timer: Option<Instant>,
5252
queue: BinaryHeap<Reverse<(Duration, Vec<usize>)>>,
5353
}
5454

5555
impl Activations {
5656

5757
/// Creates a new activation tracker.
58-
pub fn new(timer: Instant) -> Self {
58+
pub fn new(timer: Option<Instant>) -> Self {
5959
let (tx, rx) = crossbeam_channel::unbounded();
6060
Self {
6161
clean: 0,
@@ -77,13 +77,18 @@ impl Activations {
7777

7878
/// Schedules a future activation for the task addressed by `path`.
7979
pub fn activate_after(&mut self, path: &[usize], delay: Duration) {
80-
// TODO: We could have a minimum delay and immediately schedule anything less than that delay.
81-
if delay == Duration::new(0, 0) {
82-
self.activate(path);
83-
}
80+
if let Some(timer) = self.timer {
81+
// TODO: We could have a minimum delay and immediately schedule anything less than that delay.
82+
if delay == Duration::new(0, 0) {
83+
self.activate(path);
84+
}
85+
else {
86+
let moment = timer.elapsed() + delay;
87+
self.queue.push(Reverse((moment, path.to_vec())));
88+
}
89+
}
8490
else {
85-
let moment = self.timer.elapsed() + delay;
86-
self.queue.push(Reverse((moment, path.to_vec())));
91+
self.activate(path);
8792
}
8893
}
8994

@@ -96,11 +101,13 @@ impl Activations {
96101
}
97102

98103
// Drain timer-based activations.
99-
if !self.queue.is_empty() {
100-
let now = self.timer.elapsed();
101-
while self.queue.peek().map(|Reverse((t,_))| t <= &now) == Some(true) {
102-
let Reverse((_time, path)) = self.queue.pop().unwrap();
103-
self.activate(&path[..]);
104+
if let Some(timer) = self.timer {
105+
if !self.queue.is_empty() {
106+
let now = timer.elapsed();
107+
while self.queue.peek().map(|Reverse((t,_))| t <= &now) == Some(true) {
108+
let Reverse((_time, path)) = self.queue.pop().unwrap();
109+
self.activate(&path[..]);
110+
}
104111
}
105112
}
106113

@@ -173,12 +180,12 @@ impl Activations {
173180
/// indicates the amount of time before the thread should be unparked for the
174181
/// next scheduled activation.
175182
pub fn empty_for(&self) -> Option<Duration> {
176-
if !self.bounds.is_empty() {
183+
if !self.bounds.is_empty() || self.timer.is_none() {
177184
Some(Duration::new(0,0))
178185
}
179186
else {
180187
self.queue.peek().map(|Reverse((t,_a))| {
181-
let elapsed = self.timer.elapsed();
188+
let elapsed = self.timer.unwrap().elapsed();
182189
if t < &elapsed { Duration::new(0,0) }
183190
else { *t - elapsed }
184191
})

timely/src/worker.rs

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -201,23 +201,33 @@ pub trait AsWorker : Scheduler {
201201
/// The next worker-unique identifier to be allocated.
202202
fn peek_identifier(&self) -> usize;
203203
/// Provides access to named logging streams.
204-
fn log_register(&self) -> ::std::cell::RefMut<crate::logging_core::Registry>;
204+
fn log_register(&self) -> Option<::std::cell::RefMut<crate::logging_core::Registry>>;
205+
/// Acquires a logger by name, if the log register exists and the name is registered.
206+
///
207+
/// For a more precise understanding of why a result is `None` one can use the direct functions.
208+
fn logger_for<CB: timely_container::ContainerBuilder>(&self, name: &str) -> Option<timely_logging::Logger<CB>> {
209+
self.log_register().and_then(|l| l.get(name))
210+
}
205211
/// Provides access to the timely logging stream.
206-
fn logging(&self) -> Option<crate::logging::TimelyLogger> { self.log_register().get("timely").map(Into::into) }
212+
fn logging(&self) -> Option<crate::logging::TimelyLogger> { self.logger_for("timely").map(Into::into) }
207213
}
208214

209215
/// A `Worker` is the entry point to a timely dataflow computation. It wraps a `Allocate`,
210216
/// and has a list of dataflows that it manages.
211217
pub struct Worker<A: Allocate> {
212218
config: Config,
213-
timer: Instant,
219+
/// An optional instant from which the start of the computation should be reckoned.
220+
///
221+
/// If this is set to none, system time-based functionality will be unavailable or work badly.
222+
/// For example, logging will be unavailable, and activation after a delay will be unavailable.
223+
timer: Option<Instant>,
214224
paths: Rc<RefCell<HashMap<usize, Rc<[usize]>>>>,
215225
allocator: Rc<RefCell<A>>,
216226
identifiers: Rc<RefCell<usize>>,
217227
// dataflows: Rc<RefCell<Vec<Wrapper>>>,
218228
dataflows: Rc<RefCell<HashMap<usize, Wrapper>>>,
219229
dataflow_counter: Rc<RefCell<usize>>,
220-
logging: Rc<RefCell<crate::logging_core::Registry>>,
230+
logging: Option<Rc<RefCell<crate::logging_core::Registry>>>,
221231

222232
activations: Rc<RefCell<Activations>>,
223233
active_dataflows: Vec<usize>,
@@ -255,7 +265,7 @@ impl<A: Allocate> AsWorker for Worker<A> {
255265

256266
fn new_identifier(&mut self) -> usize { self.new_identifier() }
257267
fn peek_identifier(&self) -> usize { self.peek_identifier() }
258-
fn log_register(&self) -> RefMut<crate::logging_core::Registry> {
268+
fn log_register(&self) -> Option<RefMut<crate::logging_core::Registry>> {
259269
self.log_register()
260270
}
261271
}
@@ -268,8 +278,7 @@ impl<A: Allocate> Scheduler for Worker<A> {
268278

269279
impl<A: Allocate> Worker<A> {
270280
/// Allocates a new `Worker` bound to a channel allocator.
271-
pub fn new(config: Config, c: A) -> Worker<A> {
272-
let now = Instant::now();
281+
pub fn new(config: Config, c: A, now: Option<std::time::Instant>) -> Worker<A> {
273282
Worker {
274283
config,
275284
timer: now,
@@ -278,7 +287,7 @@ impl<A: Allocate> Worker<A> {
278287
identifiers: Default::default(),
279288
dataflows: Default::default(),
280289
dataflow_counter: Default::default(),
281-
logging: Rc::new(RefCell::new(crate::logging_core::Registry::new(now))),
290+
logging: now.map(|now| Rc::new(RefCell::new(crate::logging_core::Registry::new(now)))),
282291
activations: Rc::new(RefCell::new(Activations::new(now))),
283292
active_dataflows: Default::default(),
284293
temp_channel_ids: Default::default(),
@@ -414,7 +423,7 @@ impl<A: Allocate> Worker<A> {
414423
}
415424

416425
// Clean up, indicate if dataflows remain.
417-
self.logging.borrow_mut().flush();
426+
self.logging.as_ref().map(|l| l.borrow_mut().flush());
418427
self.allocator.borrow_mut().release();
419428
!self.dataflows.borrow().is_empty()
420429
}
@@ -485,7 +494,7 @@ impl<A: Allocate> Worker<A> {
485494
///
486495
/// let index = worker.index();
487496
/// let peers = worker.peers();
488-
/// let timer = worker.timer();
497+
/// let timer = worker.timer().unwrap();
489498
///
490499
/// println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers);
491500
///
@@ -500,7 +509,7 @@ impl<A: Allocate> Worker<A> {
500509
///
501510
/// let index = worker.index();
502511
/// let peers = worker.peers();
503-
/// let timer = worker.timer();
512+
/// let timer = worker.timer().unwrap();
504513
///
505514
/// println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers);
506515
///
@@ -516,13 +525,13 @@ impl<A: Allocate> Worker<A> {
516525
///
517526
/// let index = worker.index();
518527
/// let peers = worker.peers();
519-
/// let timer = worker.timer();
528+
/// let timer = worker.timer().unwrap();
520529
///
521530
/// println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers);
522531
///
523532
/// });
524533
/// ```
525-
pub fn timer(&self) -> Instant { self.timer }
534+
pub fn timer(&self) -> Option<Instant> { self.timer }
526535

527536
/// Allocate a new worker-unique identifier.
528537
///
@@ -546,13 +555,14 @@ impl<A: Allocate> Worker<A> {
546555
/// timely::execute_from_args(::std::env::args(), |worker| {
547556
///
548557
/// worker.log_register()
558+
/// .unwrap()
549559
/// .insert::<timely::logging::TimelyEventBuilder,_>("timely", |time, data|
550560
/// println!("{:?}\t{:?}", time, data)
551561
/// );
552562
/// });
553563
/// ```
554-
pub fn log_register(&self) -> ::std::cell::RefMut<crate::logging_core::Registry> {
555-
self.logging.borrow_mut()
564+
pub fn log_register(&self) -> Option<::std::cell::RefMut<crate::logging_core::Registry>> {
565+
self.logging.as_ref().map(|l| l.borrow_mut())
556566
}
557567

558568
/// Construct a new dataflow.
@@ -575,8 +585,7 @@ impl<A: Allocate> Worker<A> {
575585
T: Refines<()>,
576586
F: FnOnce(&mut Child<Self, T>)->R,
577587
{
578-
let logging = self.logging.borrow_mut().get("timely").map(Into::into);
579-
self.dataflow_core("Dataflow", logging, Box::new(()), |_, child| func(child))
588+
self.dataflow_core("Dataflow", self.logging(), Box::new(()), |_, child| func(child))
580589
}
581590

582591
/// Construct a new dataflow with a (purely cosmetic) name.
@@ -599,8 +608,7 @@ impl<A: Allocate> Worker<A> {
599608
T: Refines<()>,
600609
F: FnOnce(&mut Child<Self, T>)->R,
601610
{
602-
let logging = self.logging.borrow_mut().get("timely").map(Into::into);
603-
self.dataflow_core(name, logging, Box::new(()), |_, child| func(child))
611+
self.dataflow_core(name, self.logging(), Box::new(()), |_, child| func(child))
604612
}
605613

606614
/// Construct a new dataflow with specific configurations.
@@ -639,8 +647,8 @@ impl<A: Allocate> Worker<A> {
639647
let identifier = self.new_identifier();
640648

641649
let type_name = std::any::type_name::<T>();
642-
let progress_logging = self.logging.borrow_mut().get(&format!("timely/progress/{type_name}"));
643-
let summary_logging = self.logging.borrow_mut().get(&format!("timely/summary/{type_name}"));
650+
let progress_logging = self.logger_for(&format!("timely/progress/{}", type_name));
651+
let summary_logging = self.logger_for(&format!("timely/summary/{}", type_name));
644652
let subscope = SubgraphBuilder::new_from(addr, identifier, logging.clone(), summary_logging, name);
645653
let subscope = RefCell::new(subscope);
646654

@@ -735,7 +743,7 @@ impl<A: Allocate> Clone for Worker<A> {
735743
identifiers: Rc::clone(&self.identifiers),
736744
dataflows: Rc::clone(&self.dataflows),
737745
dataflow_counter: Rc::clone(&self.dataflow_counter),
738-
logging: Rc::clone(&self.logging),
746+
logging: self.logging.clone(),
739747
activations: Rc::clone(&self.activations),
740748
active_dataflows: Vec::new(),
741749
temp_channel_ids: Rc::clone(&self.temp_channel_ids),

0 commit comments

Comments
 (0)