Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
giswqs
GitHub Repository: giswqs/geemap
Path: blob/master/tests/test_ml.py
2313 views
1
import multiprocessing
2
import pathlib
3
import tempfile
4
import unittest
5
from unittest import mock
6
7
try:
8
import numpy as np
9
import sklearn.ensemble
10
import sklearn.tree
11
12
HAS_SKLEARN = True
13
except ImportError:
14
HAS_SKLEARN = False
15
16
import ee
17
18
from geemap import ml
19
20
21
class TestML(unittest.TestCase):
22
@unittest.skipIf(not HAS_SKLEARN, "sklearn not installed")
23
def test_tree_to_string_classification(self):
24
# Create a simple decision tree classifier.
25
X = np.array([[0, 0], [1, 1], [0, 1], [1, 0]])
26
y = np.array([0, 1, 1, 0])
27
clf = sklearn.tree.DecisionTreeClassifier(max_depth=2, random_state=42)
28
clf.fit(X, y)
29
30
tree_str = ml.tree_to_string(
31
clf, feature_names=["f1", "f2"], output_mode="CLASSIFICATION"
32
)
33
self.assertIsInstance(tree_str, str)
34
self.assertTrue(tree_str.startswith("1) root"))
35
36
@unittest.skipIf(not HAS_SKLEARN, "sklearn not installed")
37
def test_tree_to_string_regression(self):
38
# Create a simple decision tree regressor.
39
X = np.array([[0, 0], [1, 1], [0, 1], [1, 0]])
40
y = np.array([0.1, 0.9, 0.8, 0.2])
41
reg = sklearn.tree.DecisionTreeRegressor(max_depth=2, random_state=42)
42
reg.fit(X, y)
43
44
tree_str = ml.tree_to_string(
45
reg, feature_names=["f1", "f2"], output_mode="REGRESSION"
46
)
47
self.assertIsInstance(tree_str, str)
48
self.assertTrue(tree_str.startswith("1) root"))
49
50
@unittest.skipIf(not HAS_SKLEARN, "sklearn not installed")
51
@mock.patch.object(multiprocessing, "Pool")
52
def test_rf_to_strings(self, mock_pool):
53
# We need to mock multiprocessing because we don't want to actually spin up processes.
54
mock_pool_instance = mock_pool.return_value.__enter__.return_value
55
# Mock map_async to return a mock result.
56
mock_async_result = mock.MagicMock()
57
mock_async_result.get.return_value = ["tree1_str", "tree2_str"]
58
mock_pool_instance.map_async.return_value = mock_async_result
59
60
X = np.array([[0, 0], [1, 1], [0, 1], [1, 0]])
61
y = np.array([0, 1, 1, 0])
62
rf = sklearn.ensemble.RandomForestClassifier(
63
n_estimators=2, max_depth=2, random_state=42
64
)
65
rf.fit(X, y)
66
67
# Mock classes_ to avoid issues if output_mode INFER logic needs it.
68
rf.classes_ = np.array([0, 1])
69
# Set criterion to gini so INFER mode knows it's a classifier.
70
rf.criterion = "gini"
71
72
trees = ml.rf_to_strings(
73
rf, feature_names=["f1", "f2"], processes=1, output_mode="CLASSIFICATION"
74
)
75
self.assertEqual(len(trees), 2)
76
self.assertEqual(trees[0], "tree1_str")
77
self.assertEqual(trees[1], "tree2_str")
78
79
@mock.patch.object(ee.Classifier, "decisionTreeEnsemble")
80
@mock.patch.object(ee, "String")
81
def test_strings_to_classifier(self, mock_ee_string, mock_ensemble):
82
mock_ensemble.return_value = "mocked_classifier"
83
mock_ee_string.side_effect = lambda x: x
84
85
trees = ["tree1", "tree2"]
86
classifier = ml.strings_to_classifier(trees)
87
self.assertEqual(classifier, "mocked_classifier")
88
mock_ensemble.assert_called_once()
89
mock_ee_string.assert_any_call("tree1")
90
91
@mock.patch.object(ee.Classifier, "decisionTreeEnsemble")
92
def test_fc_to_classifier(self, mock_ensemble):
93
mock_ensemble.return_value = "mocked_classifier"
94
95
# Mock ee.FeatureCollection and its aggregate_array method.
96
mock_fc = mock.MagicMock()
97
mock_aggregate = mock.MagicMock()
98
# The map function should return a list-like of ee.Strings, we'll just return a list.
99
mock_aggregate.map.return_value = ["tree1\n", "tree2\n"]
100
mock_fc.aggregate_array.return_value = mock_aggregate
101
102
classifier = ml.fc_to_classifier(mock_fc)
103
self.assertEqual(classifier, "mocked_classifier")
104
mock_ensemble.assert_called_once()
105
mock_fc.aggregate_array.assert_called_with("tree")
106
107
@mock.patch.object(ee.batch.Export.table, "toAsset")
108
@mock.patch.object(ee, "FeatureCollection")
109
@mock.patch.object(ee, "Feature")
110
@mock.patch.object(ee.Geometry, "Point")
111
def test_export_trees_to_fc(self, mock_point, mock_feature, mock_fc, mock_to_asset):
112
mock_task = mock.MagicMock()
113
mock_to_asset.return_value = mock_task
114
mock_point.return_value = "mocked_point"
115
mock_feature.return_value = "mocked_feature"
116
mock_fc.return_value = "mocked_fc"
117
118
trees = ["tree1\n", "tree2\n"]
119
ml.export_trees_to_fc(trees, asset_id="users/test/test_rf")
120
121
mock_to_asset.assert_called_once_with(
122
collection="mocked_fc",
123
description="geemap_rf_export",
124
assetId="users/test/test_rf",
125
)
126
mock_task.start.assert_called_once()
127
128
def test_trees_to_csv(self):
129
trees = ["tree1\n", "tree2\n"]
130
with tempfile.TemporaryDirectory() as temp_dir:
131
out_csv = pathlib.Path(temp_dir) / "test_trees.csv"
132
ml.trees_to_csv(trees, str(out_csv))
133
self.assertTrue(out_csv.exists())
134
135
content = out_csv.read_text(encoding="utf-8")
136
self.assertIn("tree1#", content)
137
self.assertIn("tree2#", content)
138
139
@mock.patch.object(ml, "fc_to_classifier")
140
@mock.patch.object(ee, "FeatureCollection")
141
@mock.patch.object(ee, "Feature")
142
@mock.patch.object(ee.Geometry, "Point")
143
def test_csv_to_classifier(
144
self, mock_point, mock_feature, mock_fc, mock_fc_to_classifier
145
):
146
mock_fc_to_classifier.return_value = "mocked_classifier"
147
148
trees = ["tree1", "tree2"]
149
with tempfile.TemporaryDirectory() as temp_dir:
150
out_csv = pathlib.Path(temp_dir) / "test_trees.csv"
151
out_csv.write_text("\n".join(trees), encoding="utf-8")
152
153
classifier = ml.csv_to_classifier(str(out_csv))
154
self.assertEqual(classifier, "mocked_classifier")
155
mock_fc_to_classifier.assert_called_once()
156
157
def test_csv_to_classifier_file_not_found(self):
158
classifier = ml.csv_to_classifier("non_existent_file.csv")
159
self.assertIsNone(classifier)
160
161
@unittest.skipIf(not HAS_SKLEARN, "sklearn not installed")
162
def test_tree_to_string_infer_classification(self):
163
X = np.array([[0, 0], [1, 1], [0, 1], [1, 0]])
164
y = np.array([0, 1, 1, 0])
165
clf = sklearn.tree.DecisionTreeClassifier(max_depth=2, random_state=42)
166
clf.fit(X, y)
167
tree_str = ml.tree_to_string(
168
clf, feature_names=["f1", "f2"], output_mode="INFER"
169
)
170
self.assertIsInstance(tree_str, str)
171
172
@unittest.skipIf(not HAS_SKLEARN, "sklearn not installed")
173
def test_tree_to_string_infer_regression(self):
174
X = np.array([[0, 0], [1, 1], [0, 1], [1, 0]])
175
y = np.array([0.1, 0.9, 0.8, 0.2])
176
reg = sklearn.tree.DecisionTreeRegressor(max_depth=2, random_state=42)
177
reg.fit(X, y)
178
tree_str = ml.tree_to_string(
179
reg, feature_names=["f1", "f2"], output_mode="INFER"
180
)
181
self.assertIsInstance(tree_str, str)
182
183
@unittest.skipIf(not HAS_SKLEARN, "sklearn not installed")
184
def test_tree_to_string_infer_runtime_error(self):
185
X = np.array([[0, 0], [1, 1]])
186
y = np.array([0, 1])
187
clf = sklearn.tree.DecisionTreeClassifier(max_depth=2, random_state=42)
188
clf.fit(X, y)
189
# Manually mess up the shape of raw_vals via mocking the shape.
190
with mock.patch.object(np, "squeeze") as mock_squeeze:
191
mock_arr = mock.MagicMock()
192
mock_arr.ndim = 3
193
mock_squeeze.return_value = mock_arr
194
with self.assertRaisesRegex(
195
RuntimeError, "Could not infer the output type from the estimator"
196
):
197
ml.tree_to_string(clf, feature_names=["f1", "f2"], output_mode="INFER")
198
199
@unittest.skipIf(not HAS_SKLEARN, "sklearn not installed")
200
def test_tree_to_string_classification_labels(self):
201
X = np.array([[0, 0], [1, 1]])
202
y = np.array([0, 1])
203
clf = sklearn.tree.DecisionTreeClassifier(max_depth=2, random_state=42)
204
clf.fit(X, y)
205
tree_str = ml.tree_to_string(
206
clf,
207
feature_names=["f1", "f2"],
208
output_mode="CLASSIFICATION",
209
labels=[10, 20],
210
)
211
self.assertIsInstance(tree_str, str)
212
213
@unittest.skipIf(not HAS_SKLEARN, "sklearn not installed")
214
def test_tree_to_string_probability(self):
215
X = np.array([[0, 0], [1, 1], [0, 1], [1, 0]])
216
y = np.array([0, 1, 1, 0])
217
clf = sklearn.tree.DecisionTreeClassifier(max_depth=2, random_state=42)
218
clf.fit(X, y)
219
tree_str = ml.tree_to_string(
220
clf, feature_names=["f1", "f2"], output_mode="PROBABILITY"
221
)
222
self.assertIsInstance(tree_str, str)
223
224
@unittest.skipIf(not HAS_SKLEARN, "sklearn not installed")
225
def test_tree_to_string_probability_value_error(self):
226
# Trigger ValueError if raw_vals.shape[-1] != 2.
227
X = np.array([[0, 0], [1, 1], [0, 1], [1, 0]])
228
y = np.array([0, 1, 2, 0]) # 3 classes.
229
clf = sklearn.tree.DecisionTreeClassifier(max_depth=2, random_state=42)
230
clf.fit(X, y)
231
with self.assertRaisesRegex(ValueError, "shape mismatch: outputs from trees"):
232
ml.tree_to_string(
233
clf, feature_names=["f1", "f2"], output_mode="PROBABILITY"
234
)
235
236
@unittest.skipIf(not HAS_SKLEARN, "sklearn not installed")
237
def test_tree_to_string_multiprobability(self):
238
X = np.array([[0, 0], [1, 1]])
239
y = np.array([0, 1])
240
clf = sklearn.tree.DecisionTreeClassifier(max_depth=2, random_state=42)
241
clf.fit(X, y)
242
with self.assertRaisesRegex(
243
NotImplementedError, "Currently multiprobability output is not support"
244
):
245
ml.tree_to_string(
246
clf, feature_names=["f1", "f2"], output_mode="MULTIPROBABILITY"
247
)
248
249
@unittest.skipIf(not HAS_SKLEARN, "sklearn not installed")
250
def test_tree_to_string_unknown_output_mode(self):
251
X = np.array([[0, 0], [1, 1]])
252
y = np.array([0, 1])
253
clf = sklearn.tree.DecisionTreeClassifier(max_depth=2, random_state=42)
254
clf.fit(X, y)
255
with self.assertRaisesRegex(
256
RuntimeError,
257
"Could not understand estimator type and parse out the values.",
258
):
259
ml.tree_to_string(clf, feature_names=["f1", "f2"], output_mode="UNKNOWN")
260
261
@unittest.skipIf(not HAS_SKLEARN, "sklearn not installed")
262
def test_tree_to_string_left_right_leaves(self):
263
# Create a tree structured specifically to hit the specific branch coverage
264
# for left/right nodes being leaves (lines 211-215, 227-231).
265
266
# Left leaf tree.
267
X1 = np.array([[0, 0], [0, 1], [1, 0], [1, 1]])
268
y1 = np.array([0, 0, 1, 0])
269
clf1 = sklearn.tree.DecisionTreeClassifier(random_state=42)
270
clf1.fit(X1, y1)
271
tree_str1 = ml.tree_to_string(
272
clf1, feature_names=["f1", "f2"], output_mode="CLASSIFICATION"
273
)
274
self.assertIsInstance(tree_str1, str)
275
276
# Right leaf tree.
277
X2 = np.array([[0, 0], [0, 1], [1, 0], [1, 1]])
278
y2 = np.array([0, 1, 0, 0])
279
clf2 = sklearn.tree.DecisionTreeClassifier(random_state=42)
280
clf2.fit(X2, y2)
281
tree_str2 = ml.tree_to_string(
282
clf2, feature_names=["f1", "f2"], output_mode="CLASSIFICATION"
283
)
284
self.assertIsInstance(tree_str2, str)
285
286
@unittest.skipIf(not HAS_SKLEARN, "sklearn not installed")
287
def test_rf_to_strings_invalid_output_mode(self):
288
X = np.array([[0, 0], [1, 1]])
289
y = np.array([0, 1])
290
rf = sklearn.ensemble.RandomForestClassifier(n_estimators=2, random_state=42)
291
rf.fit(X, y)
292
with self.assertRaisesRegex(
293
ValueError, "The provided output_mode is not available."
294
):
295
ml.rf_to_strings(rf, feature_names=["f1", "f2"], output_mode="INVALID")
296
297
@unittest.skipIf(not HAS_SKLEARN, "sklearn not installed")
298
@mock.patch.object(multiprocessing, "Pool")
299
def test_rf_to_strings_infer_classification(self, mock_pool):
300
mock_pool_instance = mock_pool.return_value.__enter__.return_value
301
mock_async_result = mock.MagicMock()
302
mock_async_result.get.return_value = ["tree1", "tree2"]
303
mock_pool_instance.map_async.return_value = mock_async_result
304
305
X = np.array([[0, 0], [1, 1]])
306
y = np.array([0, 1])
307
rf = sklearn.ensemble.RandomForestClassifier(n_estimators=2, random_state=42)
308
rf.fit(X, y)
309
310
trees = ml.rf_to_strings(rf, feature_names=["f1", "f2"], output_mode="INFER")
311
self.assertEqual(len(trees), 2)
312
313
@unittest.skipIf(not HAS_SKLEARN, "sklearn not installed")
314
@mock.patch.object(multiprocessing, "Pool")
315
def test_rf_to_strings_infer_regression(self, mock_pool):
316
mock_pool_instance = mock_pool.return_value.__enter__.return_value
317
mock_async_result = mock.MagicMock()
318
mock_async_result.get.return_value = ["tree1", "tree2"]
319
mock_pool_instance.map_async.return_value = mock_async_result
320
321
X = np.array([[0, 0], [1, 1]])
322
y = np.array([0.1, 0.9])
323
rf = sklearn.ensemble.RandomForestRegressor(n_estimators=2, random_state=42)
324
rf.fit(X, y)
325
# Force criterion to squared_error (which used to be mse) or mae.
326
rf.criterion = "mse"
327
328
trees = ml.rf_to_strings(rf, feature_names=["f1", "f2"], output_mode="INFER")
329
self.assertEqual(len(trees), 2)
330
331
@unittest.skipIf(not HAS_SKLEARN, "sklearn not installed")
332
def test_rf_to_strings_infer_error(self):
333
X = np.array([[0, 0], [1, 1]])
334
y = np.array([0, 1])
335
rf = sklearn.ensemble.RandomForestClassifier(n_estimators=2, random_state=42)
336
rf.fit(X, y)
337
rf.criterion = "unknown_criterion"
338
with self.assertRaisesRegex(
339
RuntimeError, "Could not infer the output type from the estimator."
340
):
341
ml.rf_to_strings(rf, feature_names=["f1", "f2"], output_mode="INFER")
342
343
@unittest.skipIf(not HAS_SKLEARN, "sklearn not installed")
344
@mock.patch.object(multiprocessing, "Pool")
345
def test_rf_to_strings_probability(self, mock_pool):
346
mock_pool_instance = mock_pool.return_value.__enter__.return_value
347
mock_async_result = mock.MagicMock()
348
mock_async_result.get.return_value = ["tree1", "tree2"]
349
mock_pool_instance.map_async.return_value = mock_async_result
350
351
X = np.array([[0, 0], [1, 1]])
352
y = np.array([0, 1])
353
rf = sklearn.ensemble.RandomForestClassifier(n_estimators=2, random_state=42)
354
rf.fit(X, y)
355
356
trees = ml.rf_to_strings(
357
rf, feature_names=["f1", "f2"], output_mode="PROBABILITY"
358
)
359
self.assertEqual(len(trees), 2)
360
361
@unittest.skipIf(not HAS_SKLEARN, "sklearn not installed")
362
@mock.patch.object(multiprocessing, "cpu_count")
363
@mock.patch.object(multiprocessing, "Pool")
364
def test_rf_to_strings_processes_limit(self, mock_pool, mock_cpu_count):
365
mock_cpu_count.return_value = 2
366
mock_pool_instance = mock_pool.return_value.__enter__.return_value
367
mock_async_result = mock.MagicMock()
368
mock_async_result.get.return_value = ["tree1", "tree2"]
369
mock_pool_instance.map_async.return_value = mock_async_result
370
371
X = np.array([[0, 0], [1, 1]])
372
y = np.array([0, 1])
373
rf = sklearn.ensemble.RandomForestClassifier(n_estimators=2, random_state=42)
374
rf.fit(X, y)
375
rf.criterion = "gini"
376
377
# Requesting 10 processes, but cpu_count is 2, so it should cap at 1.
378
trees = ml.rf_to_strings(
379
rf, feature_names=["f1", "f2"], output_mode="CLASSIFICATION", processes=10
380
)
381
self.assertEqual(len(trees), 2)
382
# Ensure Pool was called with processes=1.
383
mock_pool.assert_called_with(1)
384
385
386
if __name__ == "__main__":
387
unittest.main()
388
389