Skip to content

Commit 1f3a9d3

Browse files
committed
Add a ClusterManager using one-sided communication
1 parent 4054396 commit 1f3a9d3

File tree

8 files changed

+473
-21
lines changed

8 files changed

+473
-21
lines changed

README.md

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,14 +160,14 @@ Fields `j2mpi` and `mpi2j` of `MPIManager` are associative collections mapping j
160160

161161
This launches a total of 5 processes, mpi rank 0 is the julia pid 1. mpi rank 1 is julia pid 2 and so on.
162162

163-
The program must call `MPI.start(TCP_TRANSPORT_ALL)` with argument `TCP_TRANSPORT_ALL`.
163+
The program must call `MPI.start_main_loop(TCP_TRANSPORT_ALL)` with argument `TCP_TRANSPORT_ALL`.
164164
On mpi rank 0, it returns a `manager` which can be used with `@mpi_do`
165165
On other processes (i.e., the workers) the function does not return
166166

167167

168168
### MPIManager
169169
### (MPI transport - all processes execute MPI code)
170-
`MPI.start` must be called with option `MPI_TRANSPORT_ALL` to use MPI as transport.
170+
`MPI.start_main_loop` must be called with option `MPI_TRANSPORT_ALL` to use MPI as transport.
171171
`mpirun -np 5 julia 06-cman-transport.jl MPI` will run the example using MPI as transport.
172172

173173
## Julia MPI-only interface
@@ -183,6 +183,11 @@ juliacomm = MPI.COMM_WORLD
183183
ccomm = MPI.CComm(juliacomm)
184184
```
185185

186+
### MPIWindowIOManager
187+
This manager is started using the `MPI_WINDOW_IO` or `MPI_WINDOW_NOWAIT` transports. It uses asynchronous IO
188+
based on MPI windows. The `MPI_WINDOW_NOWAIT` will only use the clustermanager for code preceeded by the `@cluster`
189+
macro. See `test_windowcman.jl` and `test_windowcman_nowait.jl` for examples.
190+
186191
### Currently wrapped MPI functions
187192
Convention: `MPI_Fun => MPI.Fun`
188193

src/MPI.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ include(depfile)
1818
include("mpi-base.jl")
1919
include("cman.jl")
2020
include("window-io.jl")
21+
include("window-cman.jl")
2122

2223
const mpitype_dict = Dict{DataType, Cint}()
2324
const mpitype_dict_inverse = Dict{Cint, DataType}()

src/cman.jl

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,34 @@
11
import Base: launch, manage, kill, procs, connect
22
export MPIManager, launch, manage, kill, procs, connect, mpiprocs, @mpi_do
3-
export TransportMode, MPI_ON_WORKERS, TCP_TRANSPORT_ALL, MPI_TRANSPORT_ALL
3+
export TransportMode, MPI_ON_WORKERS, TCP_TRANSPORT_ALL, MPI_TRANSPORT_ALL, MPI_WINDOW_IO, MPI_WINDOW_NOWAIT
44

55

66

7-
################################################################################
8-
# MPI Cluster Manager
9-
# Note: The cluster manager object lives only in the manager process,
10-
# except for MPI_TRANSPORT_ALL
7+
"""
8+
MPI Cluster Manager
9+
Note: The cluster manager object lives only in the manager process,
10+
except for MPI_TRANSPORT_ALL and MPI_WINDOW_IO
11+
12+
There are four different transport modes:
1113
12-
# There are three different transport modes:
14+
MPI_ON_WORKERS: Use MPI between the workers only, not for the manager. This
15+
allows interactive use from a Julia shell, using the familiar `addprocs`
16+
interface.
1317
14-
# MPI_ON_WORKERS: Use MPI between the workers only, not for the manager. This
15-
# allows interactive use from a Julia shell, using the familiar `addprocs`
16-
# interface.
18+
MPI_TRANSPORT_ALL: Use MPI on all processes; there is no separate manager
19+
process. This corresponds to the "usual" way in which MPI is used in a
20+
headless mode, e.g. submitted as a script to a queueing system.
1721
18-
# MPI_TRANSPORT_ALL: Use MPI on all processes; there is no separate manager
19-
# process. This corresponds to the "usual" way in which MPI is used in a
20-
# headless mode, e.g. submitted as a script to a queueing system.
22+
TCP_TRANSPORT_ALL: Same as MPI_TRANSPORT_ALL, but Julia uses TCP for its
23+
communication between processes. MPI can still be used by the user.
2124
22-
# TCP_TRANSPORT_ALL: Same as MPI_TRANSPORT_ALL, but Julia uses TCP for its
23-
# communication between processes. MPI can still be used by the user.
25+
MPI_WINDOW_IO: Uses the MPI shared memory model with passive communication on all processes.
26+
The program must be started with mpirun or equivalent.
2427
25-
@enum TransportMode MPI_ON_WORKERS MPI_TRANSPORT_ALL TCP_TRANSPORT_ALL
28+
MPI_WINDOW_NOWAIT: Sets up a cluster manager, but only uses it for code enlosed in the @cluster
29+
macro. All other code runs as regular MPI code (single program, multiple data).
30+
"""
31+
@enum TransportMode MPI_ON_WORKERS MPI_TRANSPORT_ALL TCP_TRANSPORT_ALL MPI_WINDOW_IO MPI_WINDOW_NOWAIT
2632

2733
mutable struct MPIManager <: ClusterManager
2834
np::Int # number of worker processes (excluding the manager process)
@@ -313,8 +319,9 @@ end
313319
################################################################################
314320
# Alternative startup model: All Julia processes are started via an external
315321
# mpirun, and the user does not call addprocs.
316-
317-
# Enter the MPI cluster manager's main loop (does not return on the workers)
322+
"""
323+
Enter the MPI cluster manager's main loop (does not return on the workers)
324+
"""
318325
function start_main_loop(mode::TransportMode=TCP_TRANSPORT_ALL;
319326
comm::MPI.Comm=MPI.COMM_WORLD)
320327
!MPI.Initialized() && MPI.Init()
@@ -379,6 +386,10 @@ function start_main_loop(mode::TransportMode=TCP_TRANSPORT_ALL;
379386
MPI.Finalize()
380387
exit()
381388
end
389+
elseif mode == MPI_WINDOW_IO
390+
start_window_worker(comm, true)
391+
elseif mode == MPI_WINDOW_NOWAIT
392+
start_window_worker(comm, false)
382393
else
383394
error("Unknown mode $mode")
384395
end

src/window-cman.jl

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
import Base: launch, kill, manage, connect
2+
export MPIWindowIOManager, launch, kill, manage, connect, @cluster
3+
4+
"""
5+
Stores the buffers needed for communication, in one instance per rank. Loop stops when the stop_condition is triggered
6+
"""
7+
mutable struct MPIWindowIOManager <: ClusterManager
8+
comm::MPI.Comm
9+
connection_windows::Vector{WindowIO}
10+
stdio_windows::Vector{WindowIO}
11+
workers_wait::Bool
12+
13+
function MPIWindowIOManager(comm::MPI.Comm, workers_wait::Bool)
14+
nb_procs = MPI.Comm_size(comm)
15+
connection_windows = Vector{WindowIO}(nb_procs)
16+
stdio_windows = Vector{WindowIO}(nb_procs)
17+
18+
for i in 1:nb_procs
19+
connection_windows[i] = WindowIO(comm)
20+
stdio_windows[i] = WindowIO(comm)
21+
end
22+
23+
# Make sure all windows are created before continuing
24+
MPI.Barrier(comm)
25+
26+
return new(comm, connection_windows, stdio_windows, workers_wait)
27+
end
28+
end
29+
30+
# Closes all local MPI Windows in a manager. Must be called collectively on all ranks
31+
function closeall(manager::MPIWindowIOManager)
32+
for w in manager.connection_windows
33+
close(w)
34+
end
35+
for w in manager.stdio_windows
36+
close(w)
37+
end
38+
end
39+
40+
function launch(mgr::MPIWindowIOManager, params::Dict,
41+
instances::Array, cond::Condition)
42+
try
43+
nprocs = MPI.Comm_size(mgr.comm)
44+
for cnt in 1:(nprocs-1)
45+
push!(instances, WorkerConfig())
46+
end
47+
notify(cond)
48+
catch e
49+
println("Error in MPI launch $e")
50+
rethrow(e)
51+
end
52+
end
53+
54+
function kill(mgr::MPIWindowIOManager, pid::Int, config::WorkerConfig)
55+
@spawnat pid notify(_stop_requested)
56+
Distributed.set_worker_state(Distributed.Worker(pid), Distributed.W_TERMINATED)
57+
end
58+
59+
function manage(mgr::MPIWindowIOManager, id::Integer, config::WorkerConfig, op::Symbol) end
60+
61+
function connect(mgr::MPIWindowIOManager, pid::Int, config::WorkerConfig)
62+
myrank = MPI.Comm_rank(mgr.comm)
63+
if myrank == 0
64+
proc_stdio = mgr.stdio_windows[pid]
65+
@schedule while !eof(proc_stdio)
66+
try
67+
println("\tFrom worker $(pid):\t$(readline(proc_stdio))")
68+
catch e
69+
end
70+
end
71+
end
72+
return (mgr.connection_windows[pid], WindowWriter(mgr.connection_windows[myrank+1], pid-1))
73+
end
74+
75+
function redirect_to_mpi(s::WindowWriter)
76+
(rd, wr) = redirect_stdout()
77+
@schedule while !eof(rd) && isopen(s.winio)
78+
av = readline(rd)
79+
if isopen(s.winio)
80+
println(s,av)
81+
flush(s)
82+
end
83+
end
84+
end
85+
86+
function checkworkers()
87+
for w in workers()
88+
if w != (@fetchfrom w myid())
89+
error("worker $w is not waiting")
90+
end
91+
end
92+
end
93+
94+
function notify_workers()
95+
for w in workers()
96+
@spawnat(w, notify(_stop_requested))
97+
end
98+
end
99+
100+
function wait_for_events()
101+
global _stop_requested
102+
wait(_stop_requested)
103+
end
104+
105+
"""
106+
Initialize the current process as a Julia parallel worker. Must be called on all ranks.
107+
If comm is not supplied, MPI is initialized and MPI_COMM_WORLD is used.
108+
"""
109+
function start_window_worker(comm::Comm, workers_wait)
110+
rank = MPI.Comm_rank(comm)
111+
N = MPI.Comm_size(comm)
112+
113+
manager = MPIWindowIOManager(comm, workers_wait)
114+
cookie = string(comm)
115+
if length(cookie) > Base.Distributed.HDR_COOKIE_LEN
116+
cookie = cookie[1:Base.Distributed.HDR_COOKIE_LEN]
117+
end
118+
119+
try
120+
if rank == 0
121+
Base.cluster_cookie(cookie)
122+
MPI.Barrier(comm)
123+
addprocs(manager)
124+
@assert nprocs() == N
125+
@assert nworkers() == (N == 1 ? 1 : N-1)
126+
127+
if !workers_wait
128+
checkworkers()
129+
notify_workers()
130+
end
131+
else
132+
init_worker(cookie, manager)
133+
MPI.Barrier(comm)
134+
redirect_to_mpi(WindowWriter(manager.stdio_windows[rank+1], 0))
135+
for i in vcat([1], (rank+2):N)
136+
# Receiving end of connections to all higher workers and master
137+
Base.process_messages(manager.connection_windows[i], WindowWriter(manager.connection_windows[rank+1], i-1))
138+
end
139+
140+
global _stop_requested = Condition()
141+
wait_for_events()
142+
end
143+
catch e
144+
Base.display_error(STDERR,"exception $e on rank $rank",backtrace())
145+
end
146+
147+
if workers_wait && rank != 0
148+
closeall(manager)
149+
MPI.Finalize()
150+
exit(0)
151+
end
152+
153+
return manager
154+
end
155+
156+
"""
157+
Stop the manager. This closes all windows and calls MPI.Finalize on all workers
158+
"""
159+
function stop_main_loop(manager::MPIWindowIOManager)
160+
if myid() != 1
161+
wait_for_events()
162+
else
163+
checkworkers()
164+
if nprocs() > 1
165+
rmprocs(workers())
166+
end
167+
end
168+
closeall(manager)
169+
MPI.Finalize()
170+
end
171+
172+
"""
173+
Runs the given expression using the Julia parallel cluster. Useful when running with MPI_WINDOW_NOWAIT,
174+
since this will temporarily activate the worker event loops to listen for messages.
175+
"""
176+
macro cluster(expr)
177+
quote
178+
if myid() != 1
179+
wait_for_events()
180+
else
181+
$(esc(expr))
182+
notify_workers()
183+
end
184+
end
185+
end

src/window-io.jl

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,16 +97,16 @@ function has_data_available(w::WindowIO)
9797
end
9898

9999
# Check if we need to grow the buffer
100+
MPI.Win_lock(MPI.LOCK_EXCLUSIVE, w.myrank, 0, w.header_win)
100101
MPI.Win_sync(w.header_cwin) # CWin version doesn't allocate
101102
if w.header.needed_length > w.header.length
102-
MPI.Win_lock(MPI.LOCK_EXCLUSIVE, w.myrank, 0, w.header_win)
103103
MPI.Win_detach(w.win, w.buffer)
104104
resize!(w.buffer, w.header.needed_length)
105105
MPI.Win_attach(w.win, w.buffer)
106106
w.header.address = MPI.Get_address(w.buffer)
107107
w.header.length = w.header.needed_length
108-
MPI.Win_unlock(w.myrank, w.header_win)
109108
end
109+
MPI.Win_unlock(w.myrank, w.header_win)
110110

111111
return w.header.count > w.ptr
112112
end
@@ -128,7 +128,9 @@ end
128128
function wait_nb_available(w, nb)
129129
nb_found = wait_nb_available(w)
130130
while nb_found < nb && w.is_open
131+
MPI.Win_lock(MPI.LOCK_SHARED, w.myrank, 0, w.header_win)
131132
MPI.Win_sync(w.header_cwin) # sync every loop, to make sure we get updates
133+
MPI.Win_unlock(w.myrank, w.header_win)
132134
nb_found = wait_nb_available(w)
133135
end
134136
return nb_found

test/test_reduce.jl

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,35 @@ sum_mesg = MPI.Reduce(mesg, MPI.SUM, root, comm)
2626
sum_mesg = rank == root ? sum_mesg : size*mesg
2727
@test isapprox(norm(sum_mesg-size*mesg), 0.0)
2828

29+
# For comparison with the clustermanager version
30+
const ARRSIZE = 1024^2*100
31+
@test ARRSIZE % size == 0
32+
const my_arr = fill(1*(rank+1),ARRSIZE ÷ size)
33+
34+
function mpi_sum(arr)::Int
35+
mysum = 0
36+
for x in arr
37+
mysum += x
38+
end
39+
totalsum = MPI.Reduce(mysum, +, 0, comm)
40+
return rank == 0 ? totalsum[1] : 0
41+
end
42+
43+
const sumresult = mpi_sum(my_arr)
44+
const expected = sum((ARRSIZE ÷ size) * (1:size))
45+
if rank == 0
46+
@test sumresult == expected
47+
end
48+
if rank == 0
49+
println("Timings for MPI reduce:")
50+
@time expected == mpi_sum(my_arr)
51+
@time expected == mpi_sum(my_arr)
52+
@time expected == mpi_sum(my_arr)
53+
else
54+
mpi_sum(my_arr)
55+
mpi_sum(my_arr)
56+
mpi_sum(my_arr)
57+
end
58+
59+
2960
MPI.Finalize()

0 commit comments

Comments
 (0)