Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/docs/source/src/python/user-guide/misc/multiprocess.py
7890 views
1
"""
2
# --8<-- [start:recommendation]
3
from multiprocessing import get_context
4
5
6
def my_fun(s):
7
print(s)
8
9
10
with get_context("spawn").Pool() as pool:
11
pool.map(my_fun, ["input1", "input2", ...])
12
13
# --8<-- [end:recommendation]
14
15
# --8<-- [start:example1]
16
import multiprocessing
17
import polars as pl
18
19
20
def test_sub_process(df: pl.DataFrame, job_id):
21
df_filtered = df.filter(pl.col("a") > 0)
22
print(f"Filtered (job_id: {job_id})", df_filtered, sep="\n")
23
24
25
def create_dataset():
26
return pl.DataFrame({"a": [0, 2, 3, 4, 5], "b": [0, 4, 5, 56, 4]})
27
28
29
def setup():
30
# some setup work
31
df = create_dataset()
32
df.write_parquet("/tmp/test.parquet")
33
34
35
def main():
36
test_df = pl.read_parquet("/tmp/test.parquet")
37
38
for i in range(0, 5):
39
proc = multiprocessing.get_context("spawn").Process(
40
target=test_sub_process, args=(test_df, i)
41
)
42
proc.start()
43
proc.join()
44
45
print(f"Executed sub process {i}")
46
47
48
if __name__ == "__main__":
49
setup()
50
main()
51
52
# --8<-- [end:example1]
53
"""
54
55
# --8<-- [start:example2]
56
import multiprocessing
57
import polars as pl
58
59
60
def test_sub_process(df: pl.DataFrame, job_id):
61
df_filtered = df.filter(pl.col("a") > 0)
62
print(f"Filtered (job_id: {job_id})", df_filtered, sep="\n")
63
64
65
def create_dataset():
66
return pl.DataFrame({"a": [0, 2, 3, 4, 5], "b": [0, 4, 5, 56, 4]})
67
68
69
def main():
70
test_df = create_dataset()
71
72
for i in range(0, 5):
73
proc = multiprocessing.get_context("fork").Process(
74
target=test_sub_process, args=(test_df, i)
75
)
76
proc.start()
77
proc.join()
78
79
print(f"Executed sub process {i}")
80
81
82
if __name__ == "__main__":
83
main()
84
85
# --8<-- [end:example2]
86
87