Skip to content

Commit 1ac75d7

Browse files
committed
Fixing existing tests and linting complaints
1 parent 59ae341 commit 1ac75d7

File tree

2 files changed

+16
-9
lines changed

2 files changed

+16
-9
lines changed

streamz/core.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ def __init__(self, upstream=None, upstreams=None, stream_name=None,
255255
self.upstreams = []
256256

257257
# Lazily loaded exception handler to avoid recursion
258-
self._on_exception = None
258+
self._on_exception = None
259259

260260
self._set_asynchronous(asynchronous)
261261
self._set_loop(loop)
@@ -678,30 +678,36 @@ def _release_refs(self, metadata, n=1):
678678
m['ref'].release(n)
679679

680680
def on_exception(self):
681-
"""Returns the exception handler associated with this stream
681+
""" Returns the exception handler associated with this stream. The exception handler is either lazily loaded
682+
at this point or (if alredy loaded) just returned.
682683
"""
683684
self._on_exception = self._on_exception or _on_exception()
684685
return self._on_exception
685686

686687

687688
class InvalidDataError(Exception):
688-
pass
689+
"""Generic error that is raised when data passed into a node causes an exception
690+
"""
691+
689692

690693
class _on_exception(Stream):
694+
""" Internal exception-handler for Stream-nodes.
695+
"""
691696

692697
def __init__(self, *args, **kwargs):
693698
self.silent = False
694699
Stream.__init__(self, *args, **kwargs)
695700

696701
def update(self, x, who=None, metadata=None):
697702
cause, exc = x
698-
703+
699704
if self.silent or len(self.downstreams) > 0:
700705
return self._emit(x, metadata=metadata)
701706
else:
702707
logger.exception(exc)
703708
raise InvalidDataError(cause) from exc
704709

710+
705711
@Stream.register_api()
706712
class map(Stream):
707713
""" Apply a function to every element in the stream
@@ -737,7 +743,7 @@ def __init__(self, upstream, func, *args, **kwargs):
737743

738744
def update(self, x, who=None, metadata=None):
739745
result = self.func(x, *self.args, **self.kwargs)
740-
self._emit(result, metadata=metadata)
746+
return self._emit(result, metadata=metadata)
741747

742748

743749
@Stream.register_api()

streamz/tests/test_core.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import streamz as sz
1717

1818
from streamz import RefCounter
19+
from streamz.core import InvalidDataError
1920
from streamz.sources import sink_to_file
2021
from streamz.utils_test import (inc, double, gen_test, tmpfile, captured_logger, # noqa: F401
2122
clean, await_for, metadata, wait_for) # noqa: F401
@@ -933,7 +934,7 @@ def test_pluck():
933934
assert L == [2]
934935
a.emit([4, 5, 6, 7, 8, 9])
935936
assert L == [2, 5]
936-
with pytest.raises(IndexError):
937+
with pytest.raises(InvalidDataError):
937938
a.emit([1])
938939

939940

@@ -945,7 +946,7 @@ def test_pluck_list():
945946
assert L == [(1, 3)]
946947
a.emit([4, 5, 6, 7, 8, 9])
947948
assert L == [(1, 3), (4, 6)]
948-
with pytest.raises(IndexError):
949+
with pytest.raises(InvalidDataError):
949950
a.emit([1])
950951

951952

@@ -1579,7 +1580,7 @@ def test_map_errors_log():
15791580
def test_map_errors_raises():
15801581
a = Stream()
15811582
b = a.map(lambda x: 1 / x) # noqa: F841
1582-
with pytest.raises(ZeroDivisionError):
1583+
with pytest.raises(InvalidDataError):
15831584
a.emit(0)
15841585

15851586

@@ -1599,7 +1600,7 @@ def test_accumulate_errors_log():
15991600
def test_accumulate_errors_raises():
16001601
a = Stream()
16011602
b = a.accumulate(lambda x, y: x / y, with_state=True) # noqa: F841
1602-
with pytest.raises(ZeroDivisionError):
1603+
with pytest.raises(InvalidDataError):
16031604
a.emit(1)
16041605
a.emit(0)
16051606

0 commit comments

Comments
 (0)