Skip to content

Commit 44814ec

Browse files
authored
Merge pull request #876 from effigies/fix/nipype-plugin
FIX: Remove accidental MRIQC dependency, allow app config to be passed to workflow plugin
2 parents 699fa75 + 841dcf8 commit 44814ec

File tree

2 files changed

+122
-11
lines changed

2 files changed

+122
-11
lines changed

niworkflows/engine/plugin.py

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
#
2323
"""A lightweight NiPype MultiProc execution plugin."""
2424

25-
# Import packages
2625
import os
2726
import sys
2827
from copy import deepcopy
@@ -32,6 +31,8 @@
3231
from traceback import format_exception
3332
import gc
3433

34+
from nipype.utils.misc import str2bool
35+
3536

3637
# Run node
3738
def run_node(node, updatehash, taskid):
@@ -239,8 +240,6 @@ def _clear_task(self, taskid):
239240
raise NotImplementedError
240241

241242
def _clean_queue(self, jobid, graph, result=None):
242-
from mriqc import config
243-
244243
if self._status_callback:
245244
self._status_callback(self.procs[jobid], "exception")
246245
if result is None:
@@ -250,7 +249,7 @@ def _clean_queue(self, jobid, graph, result=None):
250249
}
251250

252251
crashfile = self._report_crash(self.procs[jobid], result=result)
253-
if config.nipype.stop_on_first_crash:
252+
if str2bool(self._config["execution"]["stop_on_first_crash"]):
254253
raise RuntimeError("".join(result["traceback"]))
255254
if jobid in self.mapnodesubids:
256255
# remove current jobid
@@ -292,9 +291,7 @@ def _submit_mapnode(self, jobid):
292291
return False
293292

294293
def _local_hash_check(self, jobid, graph):
295-
from mriqc import config
296-
297-
if not config.nipype.local_hash_check:
294+
if not str2bool(self.procs[jobid].config["execution"]["local_hash_check"]):
298295
return False
299296

300297
try:
@@ -368,9 +365,8 @@ def _remove_node_dirs(self):
368365
"""Remove directories whose outputs have already been used up."""
369366
import numpy as np
370367
from shutil import rmtree
371-
from mriqc import config
372368

373-
if config.nipype.remove_node_directories:
369+
if str2bool(self._config["execution"]["remove_node_directories"]):
374370
indices = np.nonzero((self.refidx.sum(axis=1) == 0).__array__())[0]
375371
for idx in indices:
376372
if idx in self.mapnodesubids:
@@ -413,8 +409,6 @@ def __init__(self, pool=None, plugin_args=None):
413409
A Nipype-compatible dictionary of settings.
414410
415411
"""
416-
from mriqc import config
417-
418412
super().__init__(plugin_args=plugin_args)
419413
self._taskresult = {}
420414
self._task_obj = {}
@@ -424,6 +418,24 @@ def __init__(self, pool=None, plugin_args=None):
424418
# change to it when workers are set up
425419
self._cwd = os.getcwd()
426420

421+
# Retrieve a nipreps-style configuration object
422+
try:
423+
config = plugin_args["app_config"]
424+
except (KeyError, TypeError):
425+
from types import SimpleNamespace
426+
from nipype.utils.profiler import get_system_total_memory_gb
427+
428+
config = SimpleNamespace(
429+
environment=SimpleNamespace(
430+
# Nipype default
431+
total_memory=get_system_total_memory_gb()
432+
),
433+
# concurrent.futures default
434+
_process_initializer=None,
435+
# Just needs to exist
436+
file_path=None,
437+
)
438+
427439
# Read in options or set defaults.
428440
self.processors = self.plugin_args.get("n_procs", mp.cpu_count())
429441
self.memory_gb = self.plugin_args.get(
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
import logging
2+
from types import SimpleNamespace
3+
4+
import pytest
5+
from nipype.pipeline import engine as pe
6+
from nipype.interfaces import utility as niu
7+
8+
from ..plugin import MultiProcPlugin
9+
10+
11+
def add(x, y):
12+
return x + y
13+
14+
15+
def addall(inlist):
16+
import time
17+
18+
time.sleep(0.2) # Simulate some work
19+
return sum(inlist)
20+
21+
22+
@pytest.fixture
23+
def workflow(tmp_path):
24+
workflow = pe.Workflow(name="test_wf", base_dir=tmp_path)
25+
26+
inputnode = pe.Node(niu.IdentityInterface(fields=["x", "y"]), name="inputnode")
27+
outputnode = pe.Node(niu.IdentityInterface(fields=["z"]), name="outputnode")
28+
29+
# Generate many nodes and claim a lot of memory
30+
add_nd = pe.MapNode(
31+
niu.Function(function=add, input_names=["x", "y"], output_names=["z"]),
32+
name="add",
33+
iterfield=["x"],
34+
mem_gb=0.8,
35+
)
36+
37+
# Regular node
38+
sum_nd = pe.Node(niu.Function(function=addall, input_names=["inlist"]), name="sum")
39+
40+
# Run without submitting is another code path
41+
add_more_nd = pe.Node(
42+
niu.Function(function=add, input_names=["x", "y"], output_names=["z"]),
43+
name="add_more",
44+
run_without_submitting=True,
45+
)
46+
47+
workflow.connect(
48+
[
49+
(inputnode, add_nd, [("x", "x"), ("y", "y")]),
50+
(add_nd, sum_nd, [("z", "inlist")]),
51+
(sum_nd, add_more_nd, [("out", "x")]),
52+
(inputnode, add_more_nd, [("y", "y")]),
53+
(add_more_nd, outputnode, [("z", "z")]),
54+
]
55+
)
56+
57+
inputnode.inputs.x = list(range(30))
58+
inputnode.inputs.y = 4
59+
60+
# Avoid unnecessary sleeps
61+
workflow.config["execution"]["poll_sleep_duration"] = 0
62+
63+
return workflow
64+
65+
66+
def test_plugin_defaults(workflow, caplog):
67+
"""Test the plugin works without any arguments."""
68+
caplog.set_level(logging.CRITICAL, logger="nipype.workflow")
69+
workflow.run(plugin=MultiProcPlugin())
70+
71+
72+
def test_plugin_args_noconfig(workflow, caplog):
73+
"""Test the plugin works with typical nipype arguments."""
74+
caplog.set_level(logging.CRITICAL, logger="nipype.workflow")
75+
workflow.run(
76+
plugin=MultiProcPlugin(),
77+
plugin_args={"n_procs": 2, "memory_gb": 0.1},
78+
)
79+
80+
81+
def test_plugin_app_config(workflow, caplog, capsys):
82+
"""Test the plugin works with a nipreps-style configuration."""
83+
84+
def init_print():
85+
print("Custom init")
86+
87+
app_config = SimpleNamespace(
88+
environment=SimpleNamespace(total_memory_gb=1),
89+
_process_initializer=init_print(),
90+
file_path='/does/not/need/to/exist/for/testing',
91+
)
92+
caplog.set_level(logging.CRITICAL, logger="nipype.workflow")
93+
workflow.run(
94+
plugin=MultiProcPlugin(),
95+
plugin_args={"n_procs": 2, "app_config": app_config},
96+
)
97+
98+
captured = capsys.readouterr()
99+
assert "Custom init" in captured.out

0 commit comments

Comments
 (0)