diff --git a/examples/river_kmeans.ipynb b/examples/river_kmeans.ipynb index d3f30d97..10aafb3b 100644 --- a/examples/river_kmeans.ipynb +++ b/examples/river_kmeans.ipynb @@ -7,7 +7,9 @@ "metadata": {}, "outputs": [], "source": [ + "import functools\n", "import random\n", + "import time\n", "\n", "import pandas as pd\n", "\n", @@ -17,6 +19,7 @@ "from river import cluster\n", "import holoviews as hv\n", "from panel.pane.holoviews import HoloViews\n", + "import panel as pn\n", "hv.extension('bokeh')" ] }, @@ -29,6 +32,7 @@ "source": [ "model = cluster.KMeans(n_clusters=3, sigma=0.1, mu=0.5)\n", "centres = [[random.random(), random.random()] for _ in range(3)]\n", + "count = [0]\n", "\n", "def gen(move_chance=0.05):\n", " centre = int(random.random() * 3) # 3x faster than random.randint(0, 2)\n", @@ -37,6 +41,7 @@ " centres[centre][1] += random.random() / 5 - 0.1\n", " value = {'x': random.random() / 20 + centres[centre][0],\n", " 'y': random.random() / 20 + centres[centre][1]}\n", + " count[0] += 1\n", " return value\n", "\n", "\n", @@ -53,18 +58,55 @@ "metadata": {}, "outputs": [], "source": [ - "s = Stream.from_periodic(gen, 0.03)\n", + "cadance = 0.01\n", + "\n", + "ex = pd.DataFrame({'x': [0.5], 'y': [0.5]})\n", + "pipe_in = hv.streams.Pipe(data=ex)\n", + "pipe_out = hv.streams.Pipe(data=ex)\n", + "\n", + "# setup pipes\n", + "s = Stream.from_periodic(gen, cadance)\n", + "\n", + "# Branch 0: Input/Observations\n", + "obs = s.map(lambda x: pd.DataFrame([x]))\n", + "\n", + "# Branch 1: Output/River ML clusters\n", "km = RiverTrain(model, pass_model=True)\n", "s.map(lambda x: (x,)).connect(km) # learn takes a tuple of (x,[ y[, w]])\n", - "ex = pd.DataFrame({'x': [0.5], 'y': [0.5]})\n", - "ooo = s.map(lambda x: pd.DataFrame([x])).to_dataframe(example=ex)\n", - "out = km.map(get_clusters)\n", + "clusters = km.map(get_clusters)\n", + "\n", + "concat = functools.partial(pd.concat, ignore_index=True)\n", + "\n", + "def accumulate(previous, new, last_lines=50):\n", + " return concat([previous, new]).iloc[-last_lines:, :]\n", + "\n", + "partition_obs = 10\n", + "particion_clusters = 10\n", + "backlog_obs = 100\n", + "\n", + "# .partition is used to gather x number of points\n", + "# before sending them to the plots\n", + "# .accumulate allows to generate a backlog\n", + "\n", + "(\n", + " obs\n", + " .partition(partition_obs)\n", + " .map(concat)\n", + " .accumulate(functools.partial(accumulate, last_lines=backlog_obs))\n", + " .sink(pipe_in.send)\n", + ")\n", + "(\n", + " clusters\n", + " .partition(particion_clusters)\n", + " .map(pd.concat)\n", + " .sink(pipe_out.send)\n", + ")\n", "\n", "# start things\n", "s.emit(gen()) # set initial model\n", "for i, (x, y) in enumerate(centres):\n", " model.centers[i]['x'] = x\n", - " model.centers[i]['y'] = y\n" + " model.centers[i]['y'] = y" ] }, { @@ -74,31 +116,36 @@ "metadata": {}, "outputs": [], "source": [ - "pout = out.to_dataframe(example=ex)\n", - "pl = (ooo.hvplot.scatter('x', 'y', color=\"blue\", backlog=50) *\n", - " pout.hvplot.scatter('x', 'y', color=\"red\", backlog=3))\n", + "button_start = pn.widgets.Button(name='Start')\n", + "button_stop = pn.widgets.Button(name='Stop')\n", + "\n", + "t0 = 0\n", + "\n", + "def start(event):\n", + " s.start()\n", + " global t0\n", + " t0 = time.time()\n", + "\n", + "def stop(event):\n", + " print(count, \"events\")\n", + " global t0\n", + " t_spent = time.time() - t0\n", + " print(\"frequency\", count[0] / t_spent, \"Hz\")\n", + " print(\"Current centres\", centres)\n", + " print(\"Output centres\", [list(c.values()) for c in model.centers.values()])\n", + " s.stop()\n", + "\n", + "button_start.on_click(start)\n", + "button_stop.on_click(stop)\n", + "\n", + "scatter_dmap_input = hv.DynamicMap(hv.Scatter, streams=[pipe_in]).opts(color=\"blue\")\n", + "scatter_dmap_output = hv.DynamicMap(hv.Scatter, streams=[pipe_out]).opts(color=\"red\")\n", + "pl = scatter_dmap_input * scatter_dmap_output\n", "pl.opts(xlim=(-0.2, 1.2), ylim=(-0.2, 1.2), height=600, width=600)\n", - "pl" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "c24d2363", - "metadata": {}, - "outputs": [], - "source": [ - "s.start()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "18cfd94e", - "metadata": {}, - "outputs": [], - "source": [ - "s.stop()" + "\n", + "pan = HoloViews(pl)\n", + "app = pn.Row(pn.Column(button_start, button_stop), pan)\n", + "app" ] }, { diff --git a/streamz/tests/test_core.py b/streamz/tests/test_core.py index 56a661d7..cdde04d9 100644 --- a/streamz/tests/test_core.py +++ b/streamz/tests/test_core.py @@ -1283,7 +1283,7 @@ def test_from_file(): source.start() - yield await_for(lambda: len(L) == 3, timeout=5) + yield await_for(lambda: len(L) == 3, timeout=15) assert L == [1, 2, 3] @@ -1309,11 +1309,11 @@ def test_from_file_end(): out = source.sink_to_list() source.start() assert out == [] - yield await_for(lambda: source.started, 2, period=0.02) + yield await_for(lambda: source.started, 12, period=0.02) f.write('data2\n') f.flush() - yield await_for(lambda: out == ['data2\n'], timeout=5, period=0.1) + yield await_for(lambda: out == ['data2\n'], timeout=15, period=0.1) @gen_test() @@ -1754,5 +1754,5 @@ def run(self): source.connect(sout) source.start() - wait_for(lambda: L == [0], 0.01) + wait_for(lambda: L == [0], 1, period=0.01) assert len(source_list) > 0 diff --git a/streamz/tests/test_kafka.py b/streamz/tests/test_kafka.py index 27755655..eb2a98d9 100644 --- a/streamz/tests/test_kafka.py +++ b/streamz/tests/test_kafka.py @@ -70,7 +70,7 @@ def predicate(): return b'kafka entered RUNNING state' in out except subprocess.CalledProcessError: pass - wait_for(predicate, 10, period=0.1) + wait_for(predicate, 20, period=0.1) return cid @@ -123,12 +123,12 @@ def test_from_kafka(): kafka.produce(TOPIC, b'value-%d' % i) kafka.flush() # it takes some time for messages to come back out of kafka - wait_for(lambda: len(out) == 10, 10, period=0.1) + wait_for(lambda: len(out) == 10, 15, period=0.1) assert out[-1] == b'value-9' kafka.produce(TOPIC, b'final message') kafka.flush() - wait_for(lambda: out[-1] == b'final message', 10, period=0.1) + wait_for(lambda: out[-1] == b'final message', 15, period=0.1) stream._close_consumer() kafka.produce(TOPIC, b'lost message') @@ -154,7 +154,7 @@ def test_to_kafka(): source.emit('final message') kafka.flush() - wait_for(lambda: len(out) == 11, 10, period=0.1) + wait_for(lambda: len(out) == 11, 15, period=0.1) assert out[-1] == b'final message' @@ -175,12 +175,12 @@ def test_from_kafka_thread(): kafka.produce(TOPIC, b'value-%d' % i) kafka.flush() # it takes some time for messages to come back out of kafka - yield await_for(lambda: len(out) == 10, 10, period=0.1) + yield await_for(lambda: len(out) == 15, 10, period=0.1) assert out[-1] == b'value-9' kafka.produce(TOPIC, b'final message') kafka.flush() - yield await_for(lambda: out[-1] == b'final message', 10, period=0.1) + yield await_for(lambda: out[-1] == b'final message', 15, period=0.1) stream._close_consumer() kafka.produce(TOPIC, b'lost message') @@ -205,12 +205,12 @@ def test_kafka_batch(): stream = Stream.from_kafka_batched(TOPIC, ARGS, max_batch_size=4, keys=True) out = stream.sink_to_list() stream.start() - wait_for(lambda: stream.upstream.started, 10, 0.1) + wait_for(lambda: stream.upstream.started, 15, 0.1) for i in range(10): kafka.produce(TOPIC, b'value-%d' % i, b'%d' % i) kafka.flush() # out may still be empty or first item of out may be [] - wait_for(lambda: any(out) and out[-1][-1]['value'] == b'value-9', 10, period=0.2) + wait_for(lambda: any(out) and out[-1][-1]['value'] == b'value-9', 15, period=0.2) assert out[-1][-1]['key'] == b'9' # max_batch_size checks assert len(out[0]) == len(out[1]) == 4 and len(out) == 3 @@ -233,7 +233,7 @@ async def test_kafka_dask_batch(c, s, w1, w2): for i in range(10): kafka.produce(TOPIC, b'value-%d' % i) kafka.flush() - await await_for(lambda: any(out), 10, period=0.2) + await await_for(lambda: any(out), 15, period=0.2) assert {'key': None, 'value': b'value-1'} in out[0] stream.stop() await asyncio.sleep(0) @@ -280,8 +280,8 @@ def test_kafka_batch_npartitions(): npartitions=1) out2 = stream2.gather().sink_to_list() stream2.start() - wait_for(lambda: stream2.upstream.started, 10, 0.1) - wait_for(lambda: len(out2) == 1 and len(out2[0]) == 5, 10, 0.1) + wait_for(lambda: stream2.upstream.started, 15, 0.1) + wait_for(lambda: len(out2) == 1 and len(out2[0]) == 5, 15, 0.1) stream2.upstream.stopped = True stream3 = Stream.from_kafka_batched(TOPIC, ARGS2, @@ -289,8 +289,8 @@ def test_kafka_batch_npartitions(): npartitions=4) out3 = stream3.gather().sink_to_list() stream3.start() - wait_for(lambda: stream3.upstream.started, 10, 0.1) - wait_for(lambda: len(out3) == 2 and (len(out3[0]) + len(out3[1])) == 10, 10, 0.1) + wait_for(lambda: stream3.upstream.started, 15, 0.1) + wait_for(lambda: len(out3) == 2 and (len(out3[0]) + len(out3[1])) == 10, 15, 0.1) stream3.upstream.stopped = True @@ -322,8 +322,8 @@ def test_kafka_refresh_partitions(): poll_interval='2s') out = stream.gather().sink_to_list() stream.start() - wait_for(lambda: stream.upstream.started, 10, 0.1) - wait_for(lambda: len(out) == 2 and (len(out[0]) + len(out[1])) == 10, 10, 0.1) + wait_for(lambda: stream.upstream.started, 15, 0.1) + wait_for(lambda: len(out) == 2 and (len(out[0]) + len(out[1])) == 10, 15, 0.1) subprocess.call(shlex.split("docker exec streamz-kafka " "/opt/kafka_2.11-0.10.1.0/bin/kafka-topics.sh " @@ -338,7 +338,7 @@ def test_kafka_refresh_partitions(): kafka.flush() wait_for(lambda: len(out) == 4 and (len(out[2]) + len(out[3])) == 10 - and out[3][4] == b'value-19', 10, 0.1) + and out[3][4] == b'value-19', 15, 0.1) stream.upstream.stopped = True @@ -367,7 +367,7 @@ def test_kafka_batch_checkpointing_sync_nodes(): stream1 = Stream.from_kafka_batched(TOPIC, ARGS1) out1 = stream1.map(split).filter(lambda x: x[-1] % 2 == 1).sink_to_list() stream1.start() - wait_for(lambda: any(out1) and out1[-1][-1] == 9, 10, period=0.2) + wait_for(lambda: any(out1) and out1[-1][-1] == 9, 15, period=0.2) stream1.upstream.stopped = True stream2 = Stream.from_kafka_batched(TOPIC, ARGS1) out2 = stream2.map(split).filter(lambda x: x[-1] % 2 == 1).sink_to_list() @@ -378,7 +378,7 @@ def test_kafka_batch_checkpointing_sync_nodes(): stream3 = Stream.from_kafka_batched(TOPIC, ARGS2) out3 = stream3.map(split).filter(lambda x: x[-1] % 2 == 1).sink_to_list() stream3.start() - wait_for(lambda: any(out3) and out3[-1][-1] == 9, 10, period=0.2) + wait_for(lambda: any(out3) and out3[-1][-1] == 9, 15, period=0.2) stream3.upstream.stopped = True @@ -407,7 +407,7 @@ async def test_kafka_dask_checkpointing_sync_nodes(c, s, w1, w2): dask=True) out1 = stream1.map(split).gather().filter(lambda x: x[-1] % 2 == 1).sink_to_list() stream1.start() - await await_for(lambda: any(out1) and out1[-1][-1] == 9, 10, period=0.2) + await await_for(lambda: any(out1) and out1[-1][-1] == 9, 15, period=0.2) stream1.upstream.stopped = True stream2 = Stream.from_kafka_batched(TOPIC, ARGS1, asynchronous=True, dask=True) @@ -420,7 +420,7 @@ async def test_kafka_dask_checkpointing_sync_nodes(c, s, w1, w2): dask=True) out3 = stream3.map(split).gather().filter(lambda x: x[-1] % 2 == 1).sink_to_list() stream3.start() - await await_for(lambda: any(out3) and out3[-1][-1] == 9, 10, period=0.2) + await await_for(lambda: any(out3) and out3[-1][-1] == 9, 15, period=0.2) stream3.upstream.stopped = True @@ -449,7 +449,7 @@ def test_kafka_batch_checkpointing_async_nodes_1(): stream2 = Stream.from_kafka_batched(TOPIC, ARGS) out2 = stream2.partition(2).sliding_window(2, return_partial=False).sink_to_list() stream2.start() - wait_for(lambda: stream2.upstream.started, 10, 0.1) + wait_for(lambda: stream2.upstream.started, 15, 0.1) for i in range(2,6): kafka.produce(TOPIC, b'value-%d' % i) kafka.flush() @@ -462,9 +462,9 @@ def test_kafka_batch_checkpointing_async_nodes_1(): stream3 = Stream.from_kafka_batched(TOPIC, ARGS) out3 = stream3.sink_to_list() stream3.start() - wait_for(lambda: stream3.upstream.started, 10, 0.1) + wait_for(lambda: stream3.upstream.started, 15, 0.1) #Stream picks up from where it left before, i.e., from the last committed offset. - wait_for(lambda: len(out3) == 1 and out3[0] == [b'value-3', b'value-4', b'value-5'], 10, 0.1) + wait_for(lambda: len(out3) == 1 and out3[0] == [b'value-3', b'value-4', b'value-5'], 15, 0.1) stream3.upstream.stopped = True stream3.destroy() @@ -586,7 +586,7 @@ def test_kafka_checkpointing_auto_offset_reset_latest(): stream1 = Stream.from_kafka_batched(TOPIC, ARGS, asynchronous=True) out1 = stream1.map(split).gather().sink_to_list() stream1.start() - wait_for(lambda: stream1.upstream.started, 10, period=0.1) + wait_for(lambda: stream1.upstream.started, 15, period=0.1) ''' Stream has started, so these are read. @@ -596,7 +596,7 @@ def test_kafka_checkpointing_auto_offset_reset_latest(): kafka.flush() wait_for(lambda: len(out1) == 3 and (len(out1[0]) + len(out1[1]) + len(out1[2])) == 30, - 10, period=0.1) + 15, period=0.1) ''' Stream stops but checkpoint has been created. ''' @@ -617,12 +617,12 @@ def test_kafka_checkpointing_auto_offset_reset_latest(): Stream restarts here. ''' stream2.start() - wait_for(lambda: stream2.upstream.started, 10, 0.1) + wait_for(lambda: stream2.upstream.started, 15, 0.1) for i in range(30): kafka.produce(TOPIC, b'value-%d' % i) kafka.flush() wait_for(lambda: len(out2) == 6 and (len(out2[3]) + len(out2[4]) + len(out2[5])) == 30, - 10, period=0.1) + 15, period=0.1) stream2.upstream.stopped = True diff --git a/streamz/tests/test_sinks.py b/streamz/tests/test_sinks.py index 155796be..e13fa77b 100644 --- a/streamz/tests/test_sinks.py +++ b/streamz/tests/test_sinks.py @@ -83,7 +83,7 @@ def test_ws_roundtrip(): s.to_websocket("ws://localhost:8989") s.start() - wait_for(lambda: data == l, timeout=1) + wait_for(lambda: data == l, timeout=5) s.stop() s0.stop() @@ -99,7 +99,7 @@ def test_mqtt_roundtrip(): s.to_mqtt("mqtt.eclipseprojects.io", 1883, "streamz/sensor/temperature") s.start() - wait_for(lambda: data == l, timeout=1) + wait_for(lambda: data == l, timeout=5) s.stop() s0.stop() diff --git a/streamz/tests/test_sources.py b/streamz/tests/test_sources.py index b0ad87ba..376e3c9f 100644 --- a/streamz/tests/test_sources.py +++ b/streamz/tests/test_sources.py @@ -13,8 +13,8 @@ def test_periodic(): l = s.sink_to_list() assert s.stopped s.start() - wait_for(lambda: l, 0.3, period=0.01) - wait_for(lambda: len(l) > 1, 0.3, period=0.01) + wait_for(lambda: l, 1.3, period=0.01) + wait_for(lambda: len(l) > 1, 1.3, period=0.01) assert all(l) @@ -24,7 +24,7 @@ def test_tcp(): s = Source.from_tcp(port) out = s.sink_to_list() s.start() - wait_for(lambda: s.server is not None, 2, period=0.02) + wait_for(lambda: s.server is not None, 12, period=0.02) try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -39,7 +39,7 @@ def test_tcp(): sock2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock2.connect(("localhost", port)) sock2.send(b'data2\n') - wait_for(lambda: out == [b'data\n', b'data\n', b'data2\n'], 2, + wait_for(lambda: out == [b'data\n', b'data\n', b'data2\n'], 12, period=0.01) finally: s.stop() @@ -54,7 +54,7 @@ def test_tcp_async(): s = Source.from_tcp(port) out = s.sink_to_list() s.start() - yield await_for(lambda: s.server is not None, 2, period=0.02) + yield await_for(lambda: s.server is not None, 12, period=0.02) try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -69,7 +69,7 @@ def test_tcp_async(): sock2 = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock2.connect(("localhost", port)) sock2.send(b'data2\n') - yield await_for(lambda: out == [b'data\n', b'data\n', b'data2\n'], 2, + yield await_for(lambda: out == [b'data\n', b'data\n', b'data2\n'], 12, period=0.01) finally: s.stop() @@ -83,14 +83,14 @@ def test_http(): s = Source.from_http_server(port) out = s.sink_to_list() s.start() - wait_for(lambda: s.server is not None, 2, period=0.02) + wait_for(lambda: s.server is not None, 12, period=0.02) r = requests.post('http://localhost:%i/' % port, data=b'data') - wait_for(lambda: out == [b'data'], 2, period=0.01) + wait_for(lambda: out == [b'data'], 12, period=0.01) assert r.ok r = requests.post('http://localhost:%i/other' % port, data=b'data2') - wait_for(lambda: out == [b'data', b'data2'], 2, period=0.01) + wait_for(lambda: out == [b'data', b'data2'], 12, period=0.01) assert r.ok s.stop() @@ -111,7 +111,7 @@ def test_process(): watcher.attach_loop(s.loop.asyncio_loop) out = s.sink_to_list() s.start() - yield await_for(lambda: out == [b'0123'], timeout=5) + yield await_for(lambda: out == [b'0123'], timeout=15) s.stop() @@ -127,7 +127,7 @@ def test_process_str(): watcher.attach_loop(s.loop.asyncio_loop) out = s.sink_to_list() s.start() - yield await_for(lambda: out == [b'0\n', b'1\n', b'2\n', b'3\n'], timeout=5) + yield await_for(lambda: out == [b'0\n', b'1\n', b'2\n', b'3\n'], timeout=15) s.stop() @@ -135,7 +135,7 @@ def test_from_iterable(): source = Source.from_iterable(range(3)) L = source.sink_to_list() source.start() - wait_for(lambda: L == [0, 1, 2], 0.1) + wait_for(lambda: L == [0, 1, 2], 1, period=0.1) def test_from_iterable_backpressure(): @@ -144,7 +144,7 @@ def test_from_iterable_backpressure(): L = source.rate_limit(0.1).sink_to_list() source.start() - wait_for(lambda: L == [0], 1, period=0.01) + wait_for(lambda: L == [0], 5, period=0.01) assert next(it) == 2 # 1 is in blocked _emit @@ -155,9 +155,9 @@ def test_from_iterable_stop(): L = source.rate_limit(0.01).sink_to_list() source.start() - wait_for(lambda: L == [0], 1) + wait_for(lambda: L == [0], 11) source.stop() assert source.stopped with pytest.raises(Failed): - wait_for(lambda: L == [0, 1, 2], 0.1) + wait_for(lambda: L == [0, 1, 2], 1, period=0.1)