@@ -254,6 +254,9 @@ def __init__(self, upstream=None, upstreams=None, stream_name=None,
254
254
else :
255
255
self .upstreams = []
256
256
257
+ # Lazily loaded exception handler to avoid recursion
258
+ self ._on_exception = None
259
+
257
260
self ._set_asynchronous (asynchronous )
258
261
self ._set_loop (loop )
259
262
if ensure_io_loop and not self .loop :
@@ -445,13 +448,16 @@ def _emit(self, x, metadata=None):
445
448
446
449
result = []
447
450
for downstream in list (self .downstreams ):
448
- r = downstream .update (x , who = self , metadata = metadata )
451
+ try :
452
+ r = downstream .update (x , who = self , metadata = metadata )
453
+ except Exception as exc :
454
+ # Push this exception to the on_exception handler on the downstream that raised
455
+ r = downstream .on_exception ().update ((x , exc ) , who = downstream , metadata = metadata )
449
456
450
457
if type (r ) is list :
451
458
result .extend (r )
452
459
else :
453
460
result .append (r )
454
-
455
461
self ._release_refs (metadata )
456
462
457
463
return [element for element in result if element is not None ]
@@ -671,6 +677,30 @@ def _release_refs(self, metadata, n=1):
671
677
if 'ref' in m :
672
678
m ['ref' ].release (n )
673
679
680
+ def on_exception (self ):
681
+ """Returns the exception handler associated with this stream
682
+ """
683
+ self ._on_exception = self ._on_exception or _on_exception ()
684
+ return self ._on_exception
685
+
686
+
687
+ class InvalidDataError (Exception ):
688
+ pass
689
+
690
+ class _on_exception (Stream ):
691
+
692
+ def __init__ (self , * args , ** kwargs ):
693
+ self .silent = False
694
+ Stream .__init__ (self , * args , ** kwargs )
695
+
696
+ def update (self , x , who = None , metadata = None ):
697
+ cause , exc = x
698
+
699
+ if self .silent or len (self .downstreams ) > 0 :
700
+ return self ._emit (x , metadata = metadata )
701
+ else :
702
+ logger .exception (exc )
703
+ raise InvalidDataError (cause ) from exc
674
704
675
705
@Stream .register_api ()
676
706
class map (Stream ):
@@ -706,13 +736,8 @@ def __init__(self, upstream, func, *args, **kwargs):
706
736
Stream .__init__ (self , upstream , stream_name = stream_name )
707
737
708
738
def update (self , x , who = None , metadata = None ):
709
- try :
710
- result = self .func (x , * self .args , ** self .kwargs )
711
- except Exception as e :
712
- logger .exception (e )
713
- raise
714
- else :
715
- return self ._emit (result , metadata = metadata )
739
+ result = self .func (x , * self .args , ** self .kwargs )
740
+ self ._emit (result , metadata = metadata )
716
741
717
742
718
743
@Stream .register_api ()
@@ -890,11 +915,7 @@ def update(self, x, who=None, metadata=None):
890
915
else :
891
916
return self ._emit (x , metadata = metadata )
892
917
else :
893
- try :
894
- result = self .func (self .state , x , ** self .kwargs )
895
- except Exception as e :
896
- logger .exception (e )
897
- raise
918
+ result = self .func (self .state , x , ** self .kwargs )
898
919
if self .returns_state :
899
920
state , result = result
900
921
else :
0 commit comments