Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions chronos/asyncsync.nim
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,15 @@ proc unregister*(ab: AsyncEventQueue, key: EventQueueKey) {.
ab.readers.delete(index)
ab.compact()

proc resetRegistration*(ab: AsyncEventQueue, key: EventQueueKey) {.raises: [Defect].} =
let index = ab.getReaderIndex(key)
if index >= 0:
ab.unregister(key)
let reader = EventQueueReader(key: key,
offset: ab.offset + len(ab.queue),
overflow: false)
ab.readers.add(reader)

proc close*(ab: AsyncEventQueue) {.raises: [Defect].} =
for reader in ab.readers.items():
if not(isNil(reader.waiter)) and not(reader.waiter.finished()):
Expand Down
18 changes: 18 additions & 0 deletions tests/testsync.nim
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,24 @@ suite "Asynchronous sync primitives test suite":
expect AsyncEventQueueFullError:
let res {.used.} = dataFut1.read()
check len(eventQueue) == 0

# Once overflowed, forever overflowed...
eventQueue.emit(900)
check len(eventQueue) == 0
let dataFut1a = eventQueue.waitEvents(key1)
check dataFut1a.finished() == true
expect AsyncEventQueueFullError:
let res {.used.} = dataFut1a.read()
# ... unless the reader is reset
eventQueue.resetRegistration(key1)
eventQueue.emit(1000)
check len(eventQueue) == 1
let dataFut1b = eventQueue.waitEvents(key1)
check:
dataFut1b.finished() == true
dataFut1b.read() == @[1000]
check len(eventQueue) == 0

eventQueue.unregister(key1)
check len(eventQueue) == 0

Expand Down