Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
prophesier
GitHub Repository: prophesier/diff-svc
Path: blob/main/utils/multiprocess_utils.py
694 views
1
import os
2
import traceback
3
from multiprocessing import Queue, Process
4
5
6
def chunked_worker(worker_id, map_func, args, results_queue=None, init_ctx_func=None):
7
ctx = init_ctx_func(worker_id) if init_ctx_func is not None else None
8
for job_idx, arg in args:
9
try:
10
if ctx is not None:
11
res = map_func(*arg, ctx=ctx)
12
else:
13
res = map_func(*arg)
14
results_queue.put((job_idx, res))
15
except:
16
traceback.print_exc()
17
results_queue.put((job_idx, None))
18
19
def chunked_multiprocess_run(map_func, args, num_workers=None, ordered=True, init_ctx_func=None, q_max_size=1000):
20
args = zip(range(len(args)), args)
21
args = list(args)
22
n_jobs = len(args)
23
if num_workers is None:
24
num_workers = int(os.getenv('N_PROC', os.cpu_count()))
25
results_queues = []
26
if ordered:
27
for i in range(num_workers):
28
results_queues.append(Queue(maxsize=q_max_size // num_workers))
29
else:
30
results_queue = Queue(maxsize=q_max_size)
31
for i in range(num_workers):
32
results_queues.append(results_queue)
33
workers = []
34
for i in range(num_workers):
35
args_worker = args[i::num_workers]
36
p = Process(target=chunked_worker, args=(
37
i, map_func, args_worker, results_queues[i], init_ctx_func), daemon=True)
38
workers.append(p)
39
p.start()
40
for n_finished in range(n_jobs):
41
results_queue = results_queues[n_finished % num_workers]
42
job_idx, res = results_queue.get()
43
assert job_idx == n_finished or not ordered, (job_idx, n_finished)
44
yield res
45
for w in workers:
46
w.join()
47
w.close()
48
49