23
23
_mp_fork_set = False
24
24
if not _mp_fork_set :
25
25
try :
26
- if platform == ' win32' :
27
- set_start_method (' spawn' )
26
+ if platform == " win32" :
27
+ set_start_method (" spawn" )
28
28
else :
29
- set_start_method (' fork' )
29
+ set_start_method (" fork" )
30
30
_mp_fork_set = True
31
31
except Exception as e :
32
- logger .info (f' error when setting multiprocessing.set_start_method - maybe the context is set { e .args } ' )
32
+ logger .info (f" error when setting multiprocessing.set_start_method - maybe the context is set { e .args } " )
33
33
if platform == "darwin" :
34
- os .environ [' no_proxy' ] = '*'
34
+ os .environ [" no_proxy" ] = "*"
35
35
36
36
def register_decorated_fn (name : str , poll_interval : int , domain : str , worker_id : str , func ):
37
- logger .info (f' decorated { name } ' )
37
+ logger .info (f" decorated { name } " )
38
38
_decorated_functions [(name , domain )] = {
39
- ' func' : func ,
40
- ' poll_interval' : poll_interval ,
41
- ' domain' : domain ,
42
- ' worker_id' : worker_id
39
+ " func" : func ,
40
+ " poll_interval" : poll_interval ,
41
+ " domain" : domain ,
42
+ " worker_id" : worker_id
43
43
}
44
44
45
45
@@ -56,11 +56,11 @@ def __init__(
56
56
self .logger_process , self .queue = _setup_logging_queue (configuration )
57
57
58
58
# imports
59
- importlib .import_module (' conductor.client.http.models.task' )
60
- importlib .import_module (' conductor.client.worker.worker_task' )
59
+ importlib .import_module (" conductor.client.http.models.task" )
60
+ importlib .import_module (" conductor.client.worker.worker_task" )
61
61
if import_modules is not None :
62
62
for module in import_modules :
63
- logger .info (f' loading module { module } ' )
63
+ logger .info (f" loading module { module } " )
64
64
importlib .import_module (module )
65
65
66
66
elif not isinstance (workers , list ):
@@ -77,12 +77,12 @@ def __init__(
77
77
worker_id = worker_id ,
78
78
domain = domain ,
79
79
poll_interval = poll_interval )
80
- logger .info (f' created worker with name={ task_def_name } and domain={ domain } ' )
80
+ logger .info (f" created worker with name={ task_def_name } and domain={ domain } " )
81
81
workers .append (worker )
82
82
83
83
self .__create_task_runner_processes (workers , configuration , metrics_settings )
84
84
self .__create_metrics_provider_process (metrics_settings )
85
- logger .info (' TaskHandler initialized' )
85
+ logger .info (" TaskHandler initialized" )
86
86
87
87
def __enter__ (self ):
88
88
return self
@@ -93,24 +93,24 @@ def __exit__(self, exc_type, exc_value, traceback):
93
93
def stop_processes (self ) -> None :
94
94
self .__stop_task_runner_processes ()
95
95
self .__stop_metrics_provider_process ()
96
- logger .info (' Stopped worker processes...' )
96
+ logger .info (" Stopped worker processes..." )
97
97
self .queue .put (None )
98
98
self .logger_process .terminate ()
99
99
100
100
def start_processes (self ) -> None :
101
- logger .info (' Starting worker processes...' )
101
+ logger .info (" Starting worker processes..." )
102
102
freeze_support ()
103
103
self .__start_task_runner_processes ()
104
104
self .__start_metrics_provider_process ()
105
- logger .info (' Started all processes' )
105
+ logger .info (" Started all processes" )
106
106
107
107
def join_processes (self ) -> None :
108
108
try :
109
109
self .__join_task_runner_processes ()
110
110
self .__join_metrics_provider_process ()
111
- logger .info (' Joined all processes' )
111
+ logger .info (" Joined all processes" )
112
112
except KeyboardInterrupt :
113
- logger .info (' KeyboardInterrupt: Stopping all processes' )
113
+ logger .info (" KeyboardInterrupt: Stopping all processes" )
114
114
self .stop_processes ()
115
115
116
116
def __create_metrics_provider_process (self , metrics_settings : MetricsSettings ) -> None :
@@ -121,7 +121,7 @@ def __create_metrics_provider_process(self, metrics_settings: MetricsSettings) -
121
121
target = MetricsCollector .provide_metrics ,
122
122
args = (metrics_settings ,)
123
123
)
124
- logger .info (' Created MetricsProvider process' )
124
+ logger .info (" Created MetricsProvider process" )
125
125
126
126
def __create_task_runner_processes (
127
127
self ,
@@ -149,25 +149,25 @@ def __start_metrics_provider_process(self):
149
149
if self .metrics_provider_process is None :
150
150
return
151
151
self .metrics_provider_process .start ()
152
- logger .info (' Started MetricsProvider process' )
152
+ logger .info (" Started MetricsProvider process" )
153
153
154
154
def __start_task_runner_processes (self ):
155
155
n = 0
156
156
for task_runner_process in self .task_runner_processes :
157
157
task_runner_process .start ()
158
158
n = n + 1
159
- logger .info (f' Started { n } TaskRunner process' )
159
+ logger .info (f" Started { n } TaskRunner process" )
160
160
161
161
def __join_metrics_provider_process (self ):
162
162
if self .metrics_provider_process is None :
163
163
return
164
164
self .metrics_provider_process .join ()
165
- logger .info (' Joined MetricsProvider processes' )
165
+ logger .info (" Joined MetricsProvider processes" )
166
166
167
167
def __join_task_runner_processes (self ):
168
168
for task_runner_process in self .task_runner_processes :
169
169
task_runner_process .join ()
170
- logger .info (' Joined TaskRunner processes' )
170
+ logger .info (" Joined TaskRunner processes" )
171
171
172
172
def __stop_metrics_provider_process (self ):
173
173
self .__stop_process (self .metrics_provider_process )
@@ -180,12 +180,12 @@ def __stop_process(self, process: Process):
180
180
if process is None :
181
181
return
182
182
try :
183
- logger .debug (f' Terminating process: { process .pid } ' )
183
+ logger .debug (f" Terminating process: { process .pid } " )
184
184
process .terminate ()
185
185
except Exception as e :
186
- logger .debug (f' Failed to terminate process: { process .pid } , reason: { e } ' )
186
+ logger .debug (f" Failed to terminate process: { process .pid } , reason: { e } " )
187
187
process .kill ()
188
- logger .debug (f' Killed process: { process .pid } ' )
188
+ logger .debug (f" Killed process: { process .pid } " )
189
189
190
190
191
191
# Setup centralized logging queue
0 commit comments