From 2c3a4b5d3c6e53672e607f2b9f028195f7719d8a Mon Sep 17 00:00:00 2001 From: Chris Markiewicz Date: Thu, 18 Jul 2024 14:42:01 -0400 Subject: [PATCH 1/2] TEST: Fix plugin invocation, use an initializer that can be verified --- niworkflows/engine/tests/test_plugin.py | 30 ++++++++++++------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/niworkflows/engine/tests/test_plugin.py b/niworkflows/engine/tests/test_plugin.py index e17ecd3f903..b95aaea2068 100644 --- a/niworkflows/engine/tests/test_plugin.py +++ b/niworkflows/engine/tests/test_plugin.py @@ -72,28 +72,28 @@ def test_plugin_defaults(workflow, caplog): def test_plugin_args_noconfig(workflow, caplog): """Test the plugin works with typical nipype arguments.""" caplog.set_level(logging.CRITICAL, logger="nipype.workflow") - workflow.run( - plugin=MultiProcPlugin(), - plugin_args={"n_procs": 2, "memory_gb": 0.1}, - ) + workflow.run(plugin=MultiProcPlugin(plugin_args={"n_procs": 2, "memory_gb": 0.1})) + +def touch_file(file_path): + """Module-level functions play more nicely with multiprocessing.""" + with open(file_path, "w") as f: + f.write("flag") -def test_plugin_app_config(workflow, caplog, capsys): + +def test_plugin_app_config(tmp_path, workflow, caplog): """Test the plugin works with a nipreps-style configuration.""" - def init_print(): - print("Custom init") + init_flag = tmp_path / "init_flag.txt" app_config = SimpleNamespace( - environment=SimpleNamespace(total_memory_gb=1), - _process_initializer=init_print(), - file_path='/does/not/need/to/exist/for/testing', + environment=SimpleNamespace(total_memory=1), + _process_initializer=touch_file, + file_path=init_flag, ) - caplog.set_level(logging.CRITICAL, logger="nipype.workflow") + caplog.set_level(logging.INFO, logger="nipype.workflow") workflow.run( - plugin=MultiProcPlugin(), - plugin_args={"n_procs": 2, "app_config": app_config}, + plugin=MultiProcPlugin(plugin_args={"n_procs": 2, "app_config": app_config}) ) - captured = capsys.readouterr() - assert "Custom init" in captured.out + assert init_flag.exists() and init_flag.read_text() == "flag" From a5e7c67eb5cc658b95d48abdedf78f5ea096721e Mon Sep 17 00:00:00 2001 From: Chris Markiewicz Date: Fri, 19 Jul 2024 14:11:03 -0400 Subject: [PATCH 2/2] doc: Update MultiProcPlugin.__init__ docstring to explain app_config --- niworkflows/engine/plugin.py | 10 ++++++++++ niworkflows/engine/tests/test_plugin.py | 4 ++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/niworkflows/engine/plugin.py b/niworkflows/engine/plugin.py index 8c074e9cbe8..14c2547f0c3 100644 --- a/niworkflows/engine/plugin.py +++ b/niworkflows/engine/plugin.py @@ -401,6 +401,16 @@ def __init__(self, pool=None, plugin_args=None): """ Initialize the plugin. + If passed a nipreps-style configuration object in `plugin_args["app_config"]`, + the following fields must be present: + + app_config.environment.total_memory : :obj:`float` + Memory available to the workflow in gigabytes. + app_config._process_initializer : :obj:`callable` + A function that accepts a file path and returns None, to be run in each worker. + app_config.file_path : :obj:`str` + The path to a file that will be passed to the initializer. + Arguments --------- pool : :obj:`~concurrent.futures.Executor` diff --git a/niworkflows/engine/tests/test_plugin.py b/niworkflows/engine/tests/test_plugin.py index b95aaea2068..6956ba19c6f 100644 --- a/niworkflows/engine/tests/test_plugin.py +++ b/niworkflows/engine/tests/test_plugin.py @@ -75,7 +75,7 @@ def test_plugin_args_noconfig(workflow, caplog): workflow.run(plugin=MultiProcPlugin(plugin_args={"n_procs": 2, "memory_gb": 0.1})) -def touch_file(file_path): +def touch_file(file_path: str) -> None: """Module-level functions play more nicely with multiprocessing.""" with open(file_path, "w") as f: f.write("flag") @@ -89,7 +89,7 @@ def test_plugin_app_config(tmp_path, workflow, caplog): app_config = SimpleNamespace( environment=SimpleNamespace(total_memory=1), _process_initializer=touch_file, - file_path=init_flag, + file_path=str(init_flag), ) caplog.set_level(logging.INFO, logger="nipype.workflow") workflow.run(