"""Tests for the common module."""
import base64
import builtins
import io
import math
import os
import pathlib
import shutil
import sys
import tempfile
import unittest
from unittest import mock
import zipfile
import ee
import ipywidgets
from PIL import Image
import psutil
import requests
from geemap import colormaps
from geemap import common
from tests import fake_ee
class CommonTest(unittest.TestCase):
def _create_zip_with_tif(self, tif_name: str, content: bytes) -> bytes:
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zip_file:
zip_file.writestr(tif_name, content)
return zip_buffer.getvalue()
@mock.patch.object(requests, "get")
def test_ee_export_image_unzip(self, mock_get):
"""Tests ee_export_image with unzip=True."""
mock_response = mock.Mock()
mock_response.status_code = 200
zip_content = self._create_zip_with_tif("test.tif", b"tif content")
mock_response.iter_content.return_value = [zip_content]
mock_get.return_value = mock_response
image_mock = mock.MagicMock(spec=ee.Image)
image_mock.getDownloadURL.return_value = "http://example.com/image.zip"
image_mock.geometry.return_value = fake_ee.Geometry()
with tempfile.TemporaryDirectory() as tmpdir:
filename = str(pathlib.Path(tmpdir) / "test.tif")
common.ee_export_image(image_mock, filename, unzip=True, verbose=False)
image_mock.getDownloadURL.assert_called_once()
mock_get.assert_called_once_with(
"http://example.com/image.zip", stream=True, timeout=300, proxies=None
)
filename_path = pathlib.Path(filename)
self.assertTrue(filename_path.exists())
with open(filename_path, "rb") as f:
self.assertEqual(f.read(), b"tif content")
filename_zip_path = pathlib.Path(tmpdir) / "test.zip"
self.assertFalse(filename_zip_path.exists())
@mock.patch.object(requests, "get")
def test_ee_export_image_no_unzip(self, mock_get):
"""Tests ee_export_image with unzip=False."""
mock_response = mock.Mock()
mock_response.status_code = 200
zip_content = self._create_zip_with_tif("test.tif", b"tif content")
mock_response.iter_content.return_value = [zip_content]
mock_get.return_value = mock_response
image_mock = mock.MagicMock(spec=ee.Image)
image_mock.getDownloadURL.return_value = "http://example.com/image.zip"
image_mock.geometry.return_value = fake_ee.Geometry()
with tempfile.TemporaryDirectory() as tmpdir:
filename = str(pathlib.Path(tmpdir) / "test.tif")
common.ee_export_image(image_mock, filename, unzip=False, verbose=False)
image_mock.getDownloadURL.assert_called_once()
mock_get.assert_called_once_with(
"http://example.com/image.zip", stream=True, timeout=300, proxies=None
)
filename_path = pathlib.Path(filename)
filename_zip_path = pathlib.Path(tmpdir) / "test.zip"
self.assertTrue(filename_zip_path.exists())
with open(filename_zip_path, "rb") as f:
self.assertEqual(f.read(), zip_content)
self.assertFalse(filename_path.exists())
@mock.patch.object(requests, "get")
def test_ee_export_geojson(self, mock_get):
"""Tests ee_export_geojson."""
mock_response = mock.Mock()
mock_response.status_code = 200
mock_response.iter_content.return_value = [b"geojson content"]
mock_get.return_value = mock_response
collection_mock = mock.MagicMock(spec=ee.FeatureCollection)
collection_mock.first().propertyNames().getInfo.return_value = [
"prop1",
"prop2",
]
collection_mock.getDownloadURL.return_value = "http://example.com/data.geojson"
with tempfile.TemporaryDirectory() as tmpdir:
filename = str(pathlib.Path(tmpdir) / "test.geojson")
result = common.ee_export_geojson(collection_mock, filename)
self.assertEqual(result, "geojson content")
mock_get.assert_called_with(
"http://example.com/data.geojson",
stream=True,
timeout=300,
proxies=None,
)
result = common.ee_export_geojson(
collection_mock, filename, selectors=["prop1"]
)
self.assertEqual(result, "geojson content")
collection_mock.getDownloadURL.assert_called_with(
filetype="geojson", selectors=["prop1"], filename="test"
)
self.assertIsNone(common.ee_export_geojson("not a collection", filename))
invalid_filename = str(pathlib.Path(tmpdir) / "test.shp")
self.assertIsNone(
common.ee_export_geojson(collection_mock, invalid_filename)
)
self.assertIsNone(
common.ee_export_geojson(collection_mock, filename, selectors="invalid")
)
self.assertIsNone(
common.ee_export_geojson(
collection_mock, filename, selectors=["invalid_prop"]
)
)
mock_response_fail = mock.Mock()
mock_response_fail.status_code = 404
mock_response_fail.json.return_value = {"error": {"message": "Not Found"}}
mock_get.return_value = mock_response_fail
collection_mock.map.return_value.getDownloadURL.return_value = (
"http://example.com/data2.geojson"
)
self.assertIsNone(common.ee_export_geojson(collection_mock, filename))
@mock.patch.object(zipfile, "ZipFile")
@mock.patch.object(requests, "get")
def test_ee_export_vector(self, mock_get, mock_zip):
"""Tests ee_export_vector."""
mock_response = mock.Mock()
mock_response.status_code = 200
mock_response.iter_content.return_value = [b"vector content"]
mock_get.return_value = mock_response
collection_mock = mock.MagicMock(spec=ee.FeatureCollection)
collection_mock.first().propertyNames().getInfo.return_value = [
"prop1",
"prop2",
]
collection_mock.getDownloadURL.return_value = "http://example.com/data.shp"
with tempfile.TemporaryDirectory() as tmpdir:
filename = str(pathlib.Path(tmpdir) / "test.shp")
common.ee_export_vector(collection_mock, filename)
mock_get.assert_called_with(
"http://example.com/data.shp", stream=True, timeout=300, proxies=None
)
self.assertTrue(mock_zip.called)
collection_mock.select.return_value = collection_mock
filename_csv = str(pathlib.Path(tmpdir) / "test.csv")
common.ee_export_vector(collection_mock, filename_csv)
collection_mock.select.assert_called_with([".*"], None, False)
filename_geojson = str(pathlib.Path(tmpdir) / "test.geojson")
common.ee_export_vector(collection_mock, filename_geojson)
with self.assertRaisesRegex(
ValueError, "ee_object must be an ee.FeatureCollection"
):
common.ee_export_vector("not a collection", filename)
filename_invalid = str(pathlib.Path(tmpdir) / "test.invalid")
with self.assertRaisesRegex(
ValueError, "The file type must be one of the following:"
):
common.ee_export_vector(collection_mock, filename_invalid)
with self.assertRaisesRegex(ValueError, "selectors must be a list"):
common.ee_export_vector(collection_mock, filename, selectors="invalid")
with self.assertRaisesRegex(
ValueError, "Attributes must be one chosen from:"
):
common.ee_export_vector(
collection_mock, filename, selectors=["invalid_prop"]
)
mock_response.status_code = 404
mock_response.json.return_value = {"error": {"message": "Not Found"}}
collection_mock.map.return_value.getDownloadURL.return_value = (
"http://example.com/data2.shp"
)
with mock.patch("sys.stdout", new_callable=io.StringIO):
common.ee_export_vector(collection_mock, filename)
@mock.patch.object(ee.batch.Export.table, "toDrive")
def test_ee_export_vector_to_drive(self, mock_to_drive):
"""Tests ee_export_vector_to_drive."""
mock_task = mock.MagicMock()
mock_to_drive.return_value = mock_task
collection_mock = mock.MagicMock(spec=ee.FeatureCollection)
common.ee_export_vector_to_drive(
collection_mock,
description="test_task",
folder="test_folder",
fileFormat="CSV",
)
mock_to_drive.assert_called_once_with(
collection_mock,
"test_task",
"test_folder",
None,
"CSV",
None,
None,
)
mock_task.start.assert_called_once()
with self.assertRaisesRegex(
ValueError, "The collection must be an ee.FeatureCollection"
):
common.ee_export_vector_to_drive("not a collection", fileFormat="CSV")
with self.assertRaisesRegex(ValueError, "The file type must be one"):
common.ee_export_vector_to_drive(collection_mock, fileFormat="INVALID")
@mock.patch.object(ee.batch.Export.table, "toAsset")
@mock.patch.object(common, "ee_user_id")
def test_ee_export_vector_to_asset(self, mock_ee_user_id, mock_to_asset):
"""Tests ee_export_vector_to_asset."""
mock_ee_user_id.return_value = "projects/test-project"
mock_task = mock.MagicMock()
mock_to_asset.return_value = mock_task
collection_mock = mock.MagicMock(spec=ee.FeatureCollection)
common.ee_export_vector_to_asset(
collection_mock,
description="test_task",
assetId="my_asset",
)
mock_to_asset.assert_called_once_with(
collection_mock,
"test_task",
"projects/test-project/my_asset",
None,
)
mock_task.start.assert_called_once()
with self.assertRaisesRegex(
ValueError, "The collection must be an ee.FeatureCollection"
):
common.ee_export_vector_to_asset("not a collection")
@mock.patch.object(ee.batch.Export.table, "toCloudStorage")
def test_ee_export_vector_to_cloud_storage(self, mock_to_cloud_storage):
"""Tests ee_export_vector_to_cloud_storage."""
mock_task = mock.MagicMock()
mock_to_cloud_storage.return_value = mock_task
collection_mock = mock.MagicMock(spec=ee.FeatureCollection)
common.ee_export_vector_to_cloud_storage(
collection_mock,
description="test_task",
bucket="test_bucket",
fileFormat="CSV",
)
mock_to_cloud_storage.assert_called_once_with(
collection_mock,
"test_task",
"test_bucket",
None,
"CSV",
None,
None,
)
mock_task.start.assert_called_once()
with self.assertRaisesRegex(
ValueError, "The collection must be an ee.FeatureCollection"
):
common.ee_export_vector_to_cloud_storage(
"not a collection", fileFormat="CSV"
)
with self.assertRaisesRegex(ValueError, "The file type must be one"):
common.ee_export_vector_to_cloud_storage(
collection_mock, fileFormat="INVALID"
)
@mock.patch.object(ee.batch.Export.table, "toFeatureView")
def test_ee_export_vector_to_feature_view(self, mock_to_feature_view):
"""Tests ee_export_vector_to_feature_view."""
mock_task = mock.MagicMock()
mock_to_feature_view.return_value = mock_task
collection_mock = mock.MagicMock(spec=ee.FeatureCollection)
common.ee_export_vector_to_feature_view(
collection_mock,
description="test_task",
assetId="test_asset",
)
mock_to_feature_view.assert_called_once_with(
collection_mock,
"test_task",
"test_asset",
None,
)
mock_task.start.assert_called_once()
with self.assertRaisesRegex(
ValueError, "The collection must be an ee.FeatureCollection"
):
common.ee_export_vector_to_feature_view("not a collection")
@mock.patch.object(ee.batch.Export.video, "toDrive")
def test_ee_export_video_to_drive(self, mock_to_drive):
"""Tests ee_export_video_to_drive."""
mock_task = mock.MagicMock()
mock_to_drive.return_value = mock_task
collection_mock = mock.MagicMock(spec=ee.ImageCollection)
common.ee_export_video_to_drive(
collection_mock,
description="test_task",
folder="test_folder",
framesPerSecond=30,
)
mock_to_drive.assert_called_once_with(
collection_mock,
"test_task",
"test_folder",
None,
30,
None,
None,
None,
None,
None,
None,
None,
)
mock_task.start.assert_called_once()
with self.assertRaisesRegex(
TypeError, "collection must be an ee.ImageCollection"
):
common.ee_export_video_to_drive("not a collection")
@mock.patch.object(ee.batch.Export.video, "toCloudStorage")
def test_ee_export_video_to_cloud_storage(self, mock_to_cloud_storage):
"""Tests ee_export_video_to_cloud_storage."""
mock_task = mock.MagicMock()
mock_to_cloud_storage.return_value = mock_task
collection_mock = mock.MagicMock(spec=ee.ImageCollection)
common.ee_export_video_to_cloud_storage(
collection_mock,
description="test_task",
bucket="test_bucket",
framesPerSecond=30,
)
mock_to_cloud_storage.assert_called_once_with(
collection_mock,
"test_task",
"test_bucket",
None,
30,
None,
None,
None,
None,
None,
None,
None,
)
mock_task.start.assert_called_once()
with self.assertRaisesRegex(
TypeError, "collection must be an ee.ImageCollection"
):
common.ee_export_video_to_cloud_storage("not a collection")
@mock.patch.object(ee.batch.Export.map, "toCloudStorage")
def test_ee_export_map_to_cloud_storage(self, mock_to_cloud_storage):
"""Tests ee_export_map_to_cloud_storage."""
mock_task = mock.MagicMock()
mock_to_cloud_storage.return_value = mock_task
image_mock = mock.MagicMock(spec=ee.Image)
common.ee_export_map_to_cloud_storage(
image_mock,
description="test_task",
bucket="test_bucket",
fileFormat="png",
)
mock_to_cloud_storage.assert_called_once_with(
image_mock,
"test_task",
"test_bucket",
"png",
None,
None,
None,
None,
None,
None,
None,
None,
)
mock_task.start.assert_called_once()
with self.assertRaisesRegex(TypeError, "image must be an ee.Image"):
common.ee_export_map_to_cloud_storage("not an image")
def test_check_titiler_endpoint(self):
"""Tests check_titiler_endpoint."""
self.assertEqual(
common.check_titiler_endpoint(None),
"https://giswqs-titiler-endpoint.hf.space",
)
self.assertEqual(common.check_titiler_endpoint("some_url"), "some_url")
self.assertIsInstance(
common.check_titiler_endpoint("pc"),
common.PlanetaryComputerEndpoint,
)
self.assertIsInstance(
common.check_titiler_endpoint("planetary-computer"),
common.PlanetaryComputerEndpoint,
)
with mock.patch.dict(os.environ, {"TITILER_ENDPOINT": "planetary-computer"}):
self.assertIsInstance(
common.check_titiler_endpoint(None),
common.PlanetaryComputerEndpoint,
)
with mock.patch.dict(os.environ, {"TITILER_ENDPOINT": "some_other_url"}):
self.assertEqual(common.check_titiler_endpoint(None), "some_other_url")
@mock.patch.object(requests, "get")
def test_set_proxy(self, mock_get):
"""Tests set_proxy."""
mock_response = mock.Mock()
mock_response.status_code = 200
mock_get.return_value = mock_response
common.set_proxy(port=8080, ip="192.168.1.1")
self.assertEqual(os.environ["HTTP_PROXY"], "http://192.168.1.1:8080")
self.assertEqual(os.environ["HTTPS_PROXY"], "http://192.168.1.1:8080")
mock_get.assert_called_with("https://earthengine.google.com/", timeout=300)
common.set_proxy(port=8080, ip="192.168.1.2")
self.assertEqual(os.environ["HTTP_PROXY"], "http://192.168.1.2:8080")
self.assertEqual(os.environ["HTTPS_PROXY"], "http://192.168.1.2:8080")
common.set_proxy()
self.assertEqual(os.environ["HTTP_PROXY"], "http://127.0.0.1:1080")
self.assertEqual(os.environ["HTTPS_PROXY"], "http://127.0.0.1:1080")
mock_response.status_code = 404
mock_get.return_value = mock_response
with mock.patch("sys.stdout", new_callable=io.StringIO) as mock_stdout:
common.set_proxy()
self.assertIn("Failed to connect", mock_stdout.getvalue())
del os.environ["HTTP_PROXY"]
del os.environ["HTTPS_PROXY"]
@mock.patch.object(os.path, "exists")
def test_is_drive_mounted(self, mock_exists):
"""Tests is_drive_mounted."""
mock_exists.return_value = True
self.assertTrue(common.is_drive_mounted())
mock_exists.assert_called_with("/content/drive/My Drive")
mock_exists.return_value = False
self.assertFalse(common.is_drive_mounted())
mock_exists.assert_called_with("/content/drive/My Drive")
@mock.patch.object(os.path, "exists")
def test_credentials_in_drive(self, mock_exists):
"""Tests credentials_in_drive."""
mock_exists.return_value = True
self.assertTrue(common.credentials_in_drive())
mock_exists.assert_called_with(
"/content/drive/My Drive/.config/earthengine/credentials"
)
mock_exists.return_value = False
self.assertFalse(common.credentials_in_drive())
mock_exists.assert_called_with(
"/content/drive/My Drive/.config/earthengine/credentials"
)
@mock.patch.object(os.path, "exists")
def test_credentials_in_colab(self, mock_exists):
"""Tests credentials_in_colab."""
mock_exists.return_value = True
self.assertTrue(common.credentials_in_colab())
mock_exists.assert_called_with("/root/.config/earthengine/credentials")
mock_exists.return_value = False
self.assertFalse(common.credentials_in_colab())
mock_exists.assert_called_with("/root/.config/earthengine/credentials")
@mock.patch.object(shutil, "copyfile")
@mock.patch.object(os, "makedirs")
@mock.patch.object(os.path, "exists")
def test_copy_credentials_to_drive_exists(
self, mock_exists, mock_makedirs, mock_copyfile
):
"""Tests copy_credentials_to_drive."""
src = "/root/.config/earthengine/credentials"
dst = "/content/drive/My Drive/.config/earthengine/credentials"
dst_dir = "/content/drive/My Drive/.config/earthengine"
mock_exists.return_value = True
common.copy_credentials_to_drive()
mock_exists.assert_called_with(dst_dir)
mock_makedirs.assert_not_called()
mock_copyfile.assert_called_with(src, dst)
@mock.patch.object(shutil, "copyfile")
@mock.patch.object(os, "makedirs")
@mock.patch.object(os.path, "exists")
def test_copy_credentials_to_drive_does_not_exist(
self, mock_exists, mock_makedirs, mock_copyfile
):
"""Tests copy_credentials_to_drive."""
src = "/root/.config/earthengine/credentials"
dst = "/content/drive/My Drive/.config/earthengine/credentials"
dst_dir = "/content/drive/My Drive/.config/earthengine"
mock_exists.return_value = False
common.copy_credentials_to_drive()
mock_exists.assert_called_with(dst_dir)
mock_makedirs.assert_called_once_with(dst_dir)
mock_copyfile.assert_called_with(src, dst)
@mock.patch.object(shutil, "copyfile")
@mock.patch.object(os, "makedirs")
@mock.patch.object(os.path, "exists")
def test_copy_credentials_to_colab_exists(
self, mock_exists, mock_makedirs, mock_copyfile
):
"""Tests copy_credentials_to_colab."""
src = "/content/drive/My Drive/.config/earthengine/credentials"
dst = "/root/.config/earthengine/credentials"
dst_dir = "/root/.config/earthengine"
mock_exists.return_value = True
common.copy_credentials_to_colab()
mock_exists.assert_called_with(dst_dir)
mock_makedirs.assert_not_called()
mock_copyfile.assert_called_with(src, dst)
@mock.patch.object(shutil, "copyfile")
@mock.patch.object(os, "makedirs")
@mock.patch.object(os.path, "exists")
def test_copy_credentials_to_colab_does_not_exist(
self, mock_exists, mock_makedirs, mock_copyfile
):
"""Tests copy_credentials_to_colab."""
src = "/content/drive/My Drive/.config/earthengine/credentials"
dst = "/root/.config/earthengine/credentials"
dst_dir = "/root/.config/earthengine"
mock_exists.return_value = False
common.copy_credentials_to_colab()
mock_exists.assert_called_with(dst_dir)
mock_makedirs.assert_called_once_with(dst_dir)
mock_copyfile.assert_called_with(src, dst)
def test_bbox_to_geojson(self):
"""Tests bbox_to_geojson."""
bounds = [-10, -20, 10, 20]
expected_geojson = {
"geometry": {
"type": "Polygon",
"coordinates": [
[
[-10, 20],
[-10, -20],
[10, -20],
[10, 20],
[-10, 20],
]
],
},
"type": "Feature",
}
self.assertEqual(common.bbox_to_geojson(bounds), expected_geojson)
self.assertEqual(common.bbox_to_geojson(tuple(bounds)), expected_geojson)
def test_coords_to_geojson(self):
"""Tests coords_to_geojson."""
coords = [[-10, -20, 10, 20], [-100, -80, 100, 80]]
expected_geojson = {
"type": "FeatureCollection",
"features": [
{
"geometry": {
"type": "Polygon",
"coordinates": [
[
[-10, 20],
[-10, -20],
[10, -20],
[10, 20],
[-10, 20],
]
],
},
"type": "Feature",
},
{
"geometry": {
"type": "Polygon",
"coordinates": [
[
[-100, 80],
[-100, -80],
[100, -80],
[100, 80],
[-100, 80],
]
],
},
"type": "Feature",
},
],
}
self.assertEqual(common.coords_to_geojson(coords), expected_geojson)
def test_explode(self):
"""Tests explode."""
self.assertEqual(list(common.explode([1.0, 2.0])), [[1.0, 2.0]])
self.assertEqual(
list(common.explode([[1.0, 2.0], [3.0, 4.0]])), [[1.0, 2.0], [3.0, 4.0]]
)
self.assertEqual(
list(common.explode([[[1.0, 2.0], [3.0, 4.0]]])), [[1.0, 2.0], [3.0, 4.0]]
)
def test_num_round(self):
self.assertEqual(common.num_round(1.2345), 1.23)
self.assertEqual(common.num_round(1.2345, 3), 1.234)
self.assertEqual(common.num_round(-1.2, 3), -1.2)
def test_delete_shp(self):
with tempfile.TemporaryDirectory() as tmpdir:
shp_path = os.path.join(tmpdir, "test.shp")
extensions = [".shp", ".shx", ".dbf", ".prj"]
for ext in extensions:
with open(os.path.join(tmpdir, "test" + ext), "w") as f:
f.write("test")
common.delete_shp(shp_path)
for ext in extensions:
self.assertFalse(os.path.exists(os.path.join(tmpdir, "test" + ext)))
def test_list_vars(self):
"""Tests list_vars function."""
common.list_vars_test_int = 1
common.list_vars_test_str = "test"
vars_all = common.list_vars()
vars_int = common.list_vars(var_type=int)
vars_str = common.list_vars(var_type=str)
del common.list_vars_test_int
del common.list_vars_test_str
self.assertIn("list_vars_test_int", vars_all)
self.assertIn("list_vars_test_str", vars_all)
self.assertIn("list_vars_test_int", vars_int)
self.assertNotIn("list_vars_test_str", vars_int)
self.assertIn("list_vars_test_str", vars_str)
self.assertNotIn("list_vars_test_int", vars_str)
def test_create_contours(self):
with (
mock.patch.object(ee.Kernel, "gaussian") as mock_gaussian,
mock.patch.object(ee.List, "sequence") as mock_sequence,
mock.patch.object(ee, "ImageCollection") as mock_ic,
mock.patch.object(ee.Image, "constant") as mock_constant,
):
image = mock.MagicMock(spec=ee.Image)
region_geom = mock.MagicMock(spec=ee.Geometry)
region_fc = mock.MagicMock(spec=ee.FeatureCollection)
kernel = mock.MagicMock()
constant_img_mock = mock.MagicMock()
mock_constant.return_value = constant_img_mock
constant_img_mock.toFloat.return_value = mock.MagicMock()
mock_gaussian.return_value = kernel
list_mock = mock.MagicMock()
list_mock.map.return_value = "contours"
mock_sequence.return_value = list_mock
mosaic_mock = mock.MagicMock()
mosaic_mock.clip.return_value = "clip_geom_result"
mosaic_mock.clipToCollection.return_value = "clip_fc_result"
mock_ic.return_value.mosaic.return_value = mosaic_mock
self.assertEqual(common.create_contours(image, 0.0, 1.0, 0.5), mosaic_mock)
self.assertEqual(
common.create_contours(image, 0.0, 1.0, 0.5, region=region_geom),
"clip_geom_result",
)
self.assertEqual(
common.create_contours(image, 0.0, 1.0, 0.5, region=region_fc),
"clip_fc_result",
)
with self.assertRaisesRegex(TypeError, r"image must be an ee\.Image"):
common.create_contours("not an image", 0, 1, 0.5)
message = r"region must be an ee\.Geometry or ee\.FeatureCollection"
with self.assertRaisesRegex(TypeError, message):
common.create_contours(image, 0, 1, 0.5, region="not a geometry")
def test_check_dir(self):
with tempfile.TemporaryDirectory() as tmpdir:
dir_path_1 = os.path.join(tmpdir, "subdir1")
abs_path_1 = common.check_dir(dir_path_1, make_dirs=True)
self.assertTrue(os.path.exists(abs_path_1))
self.assertEqual(abs_path_1, os.path.abspath(dir_path_1))
dir_path_2 = os.path.join(tmpdir, "subdir2")
with self.assertRaises(FileNotFoundError):
common.check_dir(dir_path_2, make_dirs=False)
os.makedirs(dir_path_2)
abs_path_2 = common.check_dir(dir_path_2, make_dirs=False)
self.assertTrue(os.path.exists(abs_path_2))
self.assertEqual(abs_path_2, os.path.abspath(dir_path_2))
with self.assertRaises(TypeError):
common.check_dir(123)
def test_check_file_path(self):
with tempfile.TemporaryDirectory() as tmpdir:
file_path_1 = os.path.join(tmpdir, "subdir1", "file1.txt")
abs_path_1 = common.check_file_path(file_path_1, make_dirs=True)
self.assertTrue(os.path.exists(os.path.dirname(abs_path_1)))
self.assertEqual(abs_path_1, os.path.abspath(file_path_1))
file_path_2 = os.path.join(tmpdir, "subdir2", "file2.txt")
abs_path_2 = common.check_file_path(file_path_2, make_dirs=False)
self.assertFalse(os.path.exists(os.path.dirname(abs_path_2)))
self.assertEqual(abs_path_2, os.path.abspath(file_path_2))
file_path_3 = "~/some_dir/file3.txt"
abs_path_3 = common.check_file_path(file_path_3, make_dirs=False)
self.assertEqual(
abs_path_3, os.path.abspath(os.path.expanduser(file_path_3))
)
with self.assertRaises(TypeError):
common.check_file_path(123)
def test_get_palette_colors(self):
colors = common.get_palette_colors("viridis", n_class=5)
self.assertEqual(len(colors), 5)
self.assertTrue(all(isinstance(c, str) and len(c) == 6 for c in colors))
self.assertEqual(colors, ["440154", "3b528b", "21918c", "5ec962", "fde725"])
colors_hashtag = common.get_palette_colors("viridis", 5, hashtag=True)
self.assertTrue(all(c.startswith("#") for c in colors_hashtag))
self.assertEqual(
colors_hashtag, ["#440154", "#3b528b", "#21918c", "#5ec962", "#fde725"]
)
colors_default = common.get_palette_colors("viridis")
self.assertGreater(len(colors_default), 1)
self.assertTrue(all(isinstance(c, str) and len(c) == 6 for c in colors_default))
with self.assertRaises(ValueError):
common.get_palette_colors("invalid_cmap_name")
@mock.patch("geemap.common.IFrame")
@mock.patch("geemap.common.display")
def test_display_html(self, mock_display, mock_iframe):
mock_iframe.return_value = "iframe_object"
with tempfile.NamedTemporaryFile(suffix=".html", delete=False) as fp:
html_file = fp.name
try:
common.display_html(html_file, width=800, height=400)
mock_iframe.assert_called_once_with(src=html_file, width=800, height=400)
mock_display.assert_called_once_with(mock_iframe.return_value)
finally:
os.remove(html_file)
with self.assertRaisesRegex(ValueError, "is not a valid file path"):
common.display_html("non_existent_file.html")
@mock.patch("geemap.common.requests.head")
def test_get_direct_url(self, mock_head):
mock_response = mock.Mock()
mock_response.url = "https://example.com/direct_url"
mock_head.return_value = mock_response
self.assertEqual(
common.get_direct_url("https://example.com/redirect"),
"https://example.com/direct_url",
)
mock_head.assert_called_with(
"https://example.com/redirect", allow_redirects=True
)
self.assertEqual(
common.get_direct_url("https://example.com/direct_url"),
"https://example.com/direct_url",
)
mock_head.assert_called_with(
"https://example.com/direct_url", allow_redirects=True
)
with self.assertRaisesRegex(ValueError, "url must start with http."):
common.get_direct_url("ftp://example.com/file")
with self.assertRaisesRegex(ValueError, "url must be a string."):
common.get_direct_url(123)
def test_html_to_gradio(self):
html_list = [
"<!DOCTYPE html>",
"<html>",
"<body>",
" <script>",
' L.tileLayer("url", {',
' "attribution": "..."',
" }).addTo(map);",
' function(e) { console.log("foo"); }',
' "should be kept";',
" </script>",
"</body>",
"</html>",
]
gradio_html = common.html_to_gradio(html_list, width="800px", height="400px")
self.assertIn('<iframe style="width: 800px; height: 400px"', gradio_html)
self.assertNotIn('function(e) { console.log("foo"); }', gradio_html)
self.assertIn('"should be kept";', gradio_html)
def test_find_files(self):
with tempfile.TemporaryDirectory() as tmpdir:
dir_path = pathlib.Path(tmpdir)
f1 = dir_path / "file1.txt"
f2 = dir_path / "file2.csv"
f1.touch()
f2.touch()
subdir = dir_path / "subdir"
subdir.mkdir()
f3 = subdir / "file3.txt"
f4 = subdir / "file4.py"
f3.touch()
f4.touch()
result = common.find_files(tmpdir, fullpath=True, recursive=True)
expected = [str(f1), str(f2), str(f3), str(f4)]
self.assertCountEqual(result, expected)
result = common.find_files(
tmpdir, ext=".txt", fullpath=True, recursive=True
)
expected = [str(f1), str(f3)]
self.assertCountEqual(result, expected)
result = common.find_files(tmpdir, ext="txt", fullpath=True, recursive=True)
expected = [str(f1), str(f3)]
self.assertCountEqual(result, expected)
result = common.find_files(
tmpdir, ext="txt", fullpath=True, recursive=False
)
expected = [str(f1)]
self.assertCountEqual(result, expected)
result = common.find_files(
tmpdir, ext="txt", fullpath=False, recursive=True
)
expected = ["file1.txt", "file3.txt"]
self.assertCountEqual(result, expected)
result = common.find_files(tmpdir, fullpath=False, recursive=False)
expected = ["file1.txt", "file2.csv"]
self.assertCountEqual(result, expected)
def test_zoom_level_resolution(self):
self.assertAlmostEqual(
common.zoom_level_resolution(zoom=0, latitude=0), 156543.04, places=4
)
self.assertAlmostEqual(
common.zoom_level_resolution(zoom=10, latitude=0), 152.8740625, places=4
)
self.assertAlmostEqual(
common.zoom_level_resolution(zoom=10, latitude=math.pi / 3),
76.43703125,
places=4,
)
self.assertAlmostEqual(
common.zoom_level_resolution(zoom=10, latitude=-math.pi / 3),
76.43703125,
places=4,
)
def test_lnglat_to_meters(self):
x, y = common.lnglat_to_meters(longitude=0, latitude=0)
self.assertAlmostEqual(x, 0)
self.assertAlmostEqual(y, 0)
x, y = common.lnglat_to_meters(longitude=1, latitude=2)
self.assertAlmostEqual(x, 111319.4908, places=4)
self.assertAlmostEqual(y, 222684.2085, places=4)
x, y = common.lnglat_to_meters(10, 20)
lng, lat = common.meters_to_lnglat(x, y)
self.assertAlmostEqual(lng, 10, places=4)
self.assertAlmostEqual(lat, 20, places=4)
def test_meters_to_lnglat(self):
lng, lat = common.meters_to_lnglat(x=0, y=0)
self.assertAlmostEqual(lng, 0)
self.assertAlmostEqual(lat, 0)
lng, lat = common.meters_to_lnglat(x=111319.4908, y=222684.2085)
self.assertAlmostEqual(lng, 1.0, places=4)
self.assertAlmostEqual(lat, 2.0, places=4)
x, y = common.lnglat_to_meters(10, 20)
lng, lat = common.meters_to_lnglat(x, y)
self.assertAlmostEqual(lng, 10, places=4)
self.assertAlmostEqual(lat, 20, places=4)
def test_center_zoom_to_xy_range(self):
x_range, y_range = common.center_zoom_to_xy_range(center=(0, 0), zoom=2)
self.assertAlmostEqual(x_range[0], -19926188.8520, places=4)
self.assertAlmostEqual(x_range[1], 19926188.8520, places=4)
self.assertAlmostEqual(y_range[0], -11068715.6594, places=4)
self.assertAlmostEqual(y_range[1], 11068715.6594, places=4)
x_range, y_range = common.center_zoom_to_xy_range(center=(0, 0), zoom=3)
self.assertAlmostEqual(x_range[0], -9963094.4260, places=4)
self.assertAlmostEqual(x_range[1], 9963094.4260, places=4)
self.assertAlmostEqual(y_range[0], -4163881.1441, places=4)
self.assertAlmostEqual(y_range[1], 4163881.1441, places=4)
def test_landsat_scaling(self):
image = mock.MagicMock(spec=ee.Image)
optical_bands_mock = mock.MagicMock(name="optical_bands_mock")
thermal_bands_mock = mock.MagicMock(name="thermal_bands_mock")
qa_pixel_mock = mock.MagicMock(name="qa_pixel_mock")
def select_side_effect(band_selector):
if band_selector == "SR_B.":
return optical_bands_mock
if band_selector == "ST_B.*":
return thermal_bands_mock
if band_selector == "QA_PIXEL":
return qa_pixel_mock
raise ValueError(f"Unexpected band selector: {band_selector}")
image.select.side_effect = select_side_effect
scaled_optical = mock.MagicMock(name="scaled_optical")
optical_bands_mock.multiply.return_value.add.return_value = scaled_optical
scaled_thermal = mock.MagicMock(name="scaled_thermal")
thermal_bands_mock.multiply.return_value.add.return_value = scaled_thermal
qa_mask = mock.MagicMock(name="qa_mask")
qa_pixel_mock.bitwiseAnd.return_value.eq.return_value = qa_mask
image.addBands.return_value = image
image.reset_mock()
optical_bands_mock.reset_mock()
thermal_bands_mock.reset_mock()
qa_pixel_mock.reset_mock()
common.landsat_scaling(image, thermal_bands=True, apply_fmask=False)
image.select.assert_has_calls([mock.call("SR_B."), mock.call("ST_B.*")])
self.assertEqual(image.select.call_count, 2)
optical_bands_mock.multiply.assert_called_with(0.0000275)
optical_bands_mock.multiply().add.assert_called_with(-0.2)
thermal_bands_mock.multiply.assert_called_with(0.00341802)
thermal_bands_mock.multiply().add.assert_called_with(149)
image.addBands.assert_has_calls(
[
mock.call(scaled_thermal, None, True),
mock.call(scaled_optical, None, True),
]
)
image.updateMask.assert_not_called()
image.reset_mock()
optical_bands_mock.reset_mock()
thermal_bands_mock.reset_mock()
qa_pixel_mock.reset_mock()
common.landsat_scaling(image, thermal_bands=False, apply_fmask=False)
image.select.assert_called_once_with("SR_B.")
optical_bands_mock.multiply.assert_called_with(0.0000275)
optical_bands_mock.multiply().add.assert_called_with(-0.2)
thermal_bands_mock.multiply.assert_not_called()
image.addBands.assert_called_once_with(scaled_optical, None, True)
image.updateMask.assert_not_called()
image.reset_mock()
optical_bands_mock.reset_mock()
thermal_bands_mock.reset_mock()
qa_pixel_mock.reset_mock()
common.landsat_scaling(image, thermal_bands=True, apply_fmask=True)
image.select.assert_has_calls(
[mock.call("SR_B."), mock.call("ST_B.*"), mock.call("QA_PIXEL")],
any_order=True,
)
self.assertEqual(image.select.call_count, 3)
optical_bands_mock.multiply.assert_called_with(0.0000275)
optical_bands_mock.multiply().add.assert_called_with(-0.2)
thermal_bands_mock.multiply.assert_called_with(0.00341802)
thermal_bands_mock.multiply().add.assert_called_with(149)
qa_pixel_mock.bitwiseAnd.assert_called_once_with(31)
qa_pixel_mock.bitwiseAnd().eq.assert_called_once_with(0)
image.addBands.assert_has_calls(
[
mock.call(scaled_thermal, None, True),
mock.call(scaled_optical, None, True),
]
)
image.updateMask.assert_called_once_with(qa_mask)
image.reset_mock()
optical_bands_mock.reset_mock()
thermal_bands_mock.reset_mock()
qa_pixel_mock.reset_mock()
common.landsat_scaling(image, thermal_bands=False, apply_fmask=True)
image.select.assert_has_calls(
[mock.call("SR_B."), mock.call("QA_PIXEL")], any_order=True
)
self.assertEqual(image.select.call_count, 2)
optical_bands_mock.multiply.assert_called_with(0.0000275)
optical_bands_mock.multiply().add.assert_called_with(-0.2)
thermal_bands_mock.multiply.assert_not_called()
qa_pixel_mock.bitwiseAnd.assert_called_once_with(31)
qa_pixel_mock.bitwiseAnd().eq.assert_called_once_with(0)
image.addBands.assert_called_once_with(scaled_optical, None, True)
image.updateMask.assert_called_once_with(qa_mask)
def test_jslink_slider_label(self):
int_slider = ipywidgets.IntSlider(value=5)
int_label = ipywidgets.Label(value="0")
common.jslink_slider_label(int_slider, int_label)
int_slider.value = 10
self.assertEqual(int_label.value, "10")
float_slider = ipywidgets.FloatSlider(value=5.5)
float_label = ipywidgets.Label(value="0.0")
common.jslink_slider_label(float_slider, float_label)
float_slider.value = 10.1
self.assertEqual(float_label.value, "10.1")
def test_check_basemap(self):
self.assertEqual(common.check_basemap("ROADMAP"), "Google Maps")
self.assertEqual(common.check_basemap("SATELLITE"), "Google Satellite")
self.assertEqual(common.check_basemap("TERRAIN"), "Google Terrain")
self.assertEqual(common.check_basemap("HYBRID"), "Google Hybrid")
self.assertEqual(common.check_basemap("roadmap"), "Google Maps")
self.assertEqual(common.check_basemap("satellite"), "Google Satellite")
self.assertEqual(common.check_basemap("terrain"), "Google Terrain")
self.assertEqual(common.check_basemap("hybrid"), "Google Hybrid")
self.assertEqual(common.check_basemap("OpenStreetMap"), "OpenStreetMap")
self.assertIsNone(common.check_basemap(None))
self.assertEqual(common.check_basemap(123), 123)
@mock.patch.object(os.path, "exists")
@mock.patch.object(
builtins,
"open",
new_callable=mock.mock_open,
read_data='{"access_token": "TOKEN"}',
)
def test_get_ee_token_exists(self, mock_file, mock_exists):
del mock_file
mock_exists.return_value = True
token = common.get_ee_token()
self.assertEqual(token, {"access_token": "TOKEN"})
@mock.patch.object(os.path, "exists")
@mock.patch.object(sys, "stdout", new_callable=io.StringIO)
def test_get_ee_token_not_exists(self, mock_stdout, mock_exists):
mock_exists.return_value = False
token = common.get_ee_token()
self.assertIsNone(token)
self.assertIn("credentials not found", mock_stdout.getvalue())
@mock.patch.object(psutil, "Process")
def test_is_studio_lab(self, mock_process):
mock_process.return_value.parent.return_value.cmdline.return_value = [
"python",
"/path/to/studiolab/bin/jupyter",
]
self.assertTrue(common.is_studio_lab())
mock_process.return_value.parent.return_value.cmdline.return_value = [
"python",
"/home/user/run.py",
]
self.assertFalse(common.is_studio_lab())
@mock.patch.object(psutil, "Process")
def test_is_on_aws(self, mock_process):
mock_process.return_value.parent.return_value.cmdline.return_value = [
"python",
"/home/ec2-user/run.py",
]
self.assertTrue(common.is_on_aws())
mock_process.return_value.parent.return_value.cmdline.return_value = [
"python",
"script.aws",
]
self.assertTrue(common.is_on_aws())
mock_process.return_value.parent.return_value.cmdline.return_value = [
"python",
"/home/user/run.py",
]
self.assertFalse(common.is_on_aws())
def test_hex_to_rgba(self):
self.assertEqual(common.hex_to_rgba("#000000", 1.0), "rgba(0,0,0,1.0)")
self.assertEqual(common.hex_to_rgba("000000", 1.0), "rgba(0,0,0,1.0)")
self.assertEqual(common.hex_to_rgba("#FF0000", 0.5), "rgba(255,0,0,0.5)")
self.assertEqual(common.hex_to_rgba("00FF00", 0.0), "rgba(0,255,0,0.0)")
self.assertEqual(common.hex_to_rgba("0000FF", 1.0), "rgba(0,0,255,1.0)")
self.assertEqual(common.hex_to_rgba("#FFFFFF", 0.7), "rgba(255,255,255,0.7)")
def test_replace_top_level_hyphens(self):
self.assertEqual(
common.replace_top_level_hyphens({"a-b": 1, "c": 2}), {"a_b": 1, "c": 2}
)
self.assertEqual(
common.replace_top_level_hyphens({"a-b": {"c-d": 1}}),
{"a_b": {"c-d": 1}},
)
self.assertEqual(
common.replace_top_level_hyphens([{"a-b": 1}, {"c-d": 2}]),
[{"a-b": 1}, {"c-d": 2}],
)
self.assertEqual(common.replace_top_level_hyphens(1), 1)
self.assertEqual(common.replace_top_level_hyphens("test"), "test")
self.assertEqual(common.replace_top_level_hyphens({"a_b": 1}), {"a_b": 1})
def test_replace_hyphens_in_keys(self):
self.assertEqual(
common.replace_hyphens_in_keys({"a-b": 1, "c": 2}), {"a_b": 1, "c": 2}
)
self.assertEqual(
common.replace_hyphens_in_keys({"a-b": {"c-d": 1}}), {"a_b": {"c_d": 1}}
)
self.assertEqual(
common.replace_hyphens_in_keys([{"a-b": 1}, {"c-d": 2}]),
[{"a_b": 1}, {"c_d": 2}],
)
self.assertEqual(common.replace_hyphens_in_keys(1), 1)
self.assertEqual(common.replace_hyphens_in_keys("test"), "test")
self.assertEqual(common.replace_hyphens_in_keys({"a_b": 1}), {"a_b": 1})
def test_remove_port_from_string(self):
self.assertEqual(
common.remove_port_from_string("http://127.0.0.1:8080"),
"http://127.0.0.1",
)
self.assertEqual(
common.remove_port_from_string("http://127.0.0.1"), "http://127.0.0.1"
)
self.assertEqual(
common.remove_port_from_string("https://127.0.0.1:8080"),
"https://127.0.0.1:8080",
)
self.assertEqual(
common.remove_port_from_string("ABCD http://10.0.0.1:1234 EFGH"),
"ABCD http://10.0.0.1 EFGH",
)
self.assertEqual(common.remove_port_from_string("no url"), "no url")
self.assertEqual(
common.remove_port_from_string('{"url": "http://127.0.0.1:8000/tile"}'),
'{"url": "http://127.0.0.1/tile"}',
)
@mock.patch.object(common, "pmtiles_metadata")
def test_pmtiles_style(self, mock_pmtiles_metadata):
mock_pmtiles_metadata.return_value = {
"layer_names": ["layer1", "layer2"],
"center": [0, 0, 0],
"bounds": [0, 0, 0, 0],
}
style = common.pmtiles_style("some_url")
self.assertIn("layers", style)
self.assertEqual(6, len(style["layers"]))
@mock.patch.object(common, "pmtiles_metadata")
def test_pmtiles_style_cmap_list(self, mock_pmtiles_metadata):
mock_pmtiles_metadata.return_value = {
"layer_names": ["layer1", "layer2"],
"center": [0, 0, 0],
"bounds": [0, 0, 0, 0],
}
style = common.pmtiles_style("some_url", cmap=["#1a9850", "#91cf60"])
self.assertEqual(style["layers"][0]["paint"]["circle-color"], "#1a9850")
self.assertEqual(style["layers"][1]["paint"]["line-color"], "#1a9850")
self.assertEqual(style["layers"][2]["paint"]["fill-color"], "#1a9850")
self.assertEqual(style["layers"][3]["paint"]["circle-color"], "#91cf60")
self.assertEqual(style["layers"][4]["paint"]["line-color"], "#91cf60")
self.assertEqual(style["layers"][5]["paint"]["fill-color"], "#91cf60")
@mock.patch.object(common, "pmtiles_metadata")
@mock.patch.object(colormaps, "get_palette")
def test_pmtiles_style_cmap_palette(self, mock_get_palette, mock_pmtiles_metadata):
mock_pmtiles_metadata.return_value = {
"layer_names": ["layer1", "layer2"],
"center": [0, 0, 0],
"bounds": [0, 0, 0, 0],
}
mock_get_palette.return_value = ["440154", "414287"]
style = common.pmtiles_style("some_url", cmap="viridis")
self.assertEqual(style["layers"][0]["paint"]["circle-color"], "#440154")
self.assertEqual(style["layers"][1]["paint"]["line-color"], "#440154")
self.assertEqual(style["layers"][2]["paint"]["fill-color"], "#440154")
self.assertEqual(style["layers"][3]["paint"]["circle-color"], "#414287")
self.assertEqual(style["layers"][4]["paint"]["line-color"], "#414287")
self.assertEqual(style["layers"][5]["paint"]["fill-color"], "#414287")
@mock.patch.object(common, "pmtiles_metadata")
def test_pmtiles_style_layers_str(self, mock_pmtiles_metadata):
mock_pmtiles_metadata.return_value = {
"layer_names": ["layer1", "layer2"],
"center": [0, 0, 0],
"bounds": [0, 0, 0, 0],
}
style = common.pmtiles_style("some_url", layers="layer1")
self.assertEqual(3, len(style["layers"]))
self.assertEqual(style["layers"][0]["id"], "layer1_point")
@mock.patch.object(common, "pmtiles_metadata")
def test_pmtiles_style_layers_list(self, mock_pmtiles_metadata):
mock_pmtiles_metadata.return_value = {
"layer_names": ["layer1", "layer2"],
"center": [0, 0, 0],
"bounds": [0, 0, 0, 0],
}
style = common.pmtiles_style("some_url", layers=["layer2"])
self.assertEqual(3, len(style["layers"]))
self.assertEqual(style["layers"][0]["id"], "layer2_point")
@mock.patch.object(common, "pmtiles_metadata")
def test_pmtiles_style_layers_invalid_list(self, mock_pmtiles_metadata):
mock_pmtiles_metadata.return_value = {
"layer_names": ["layer1", "layer2"],
"center": [0, 0, 0],
"bounds": [0, 0, 0, 0],
}
with self.assertRaises(ValueError):
common.pmtiles_style("some_url", layers=["layer1", "invalid_layer"])
@mock.patch.object(common, "pmtiles_metadata")
def test_pmtiles_style_layers_invalid_type(self, mock_pmtiles_metadata):
mock_pmtiles_metadata.return_value = {
"layer_names": ["layer1", "layer2"],
"center": [0, 0, 0],
"bounds": [0, 0, 0, 0],
}
with self.assertRaises(ValueError):
common.pmtiles_style(
"some_url", layers=123
)
def test_check_html_string(self):
with tempfile.TemporaryDirectory() as root_dir:
root = pathlib.Path(root_dir)
temp_file = root / "test_image.png"
img = Image.new("RGB", (1, 1), color="red")
img.save(temp_file)
img_data = temp_file.read_bytes()
base64_data = base64.b64encode(img_data).decode("utf-8")
expected_base64_src = f"data:image/png;base64,{base64_data}"
html_string = f'<html><body><img src="{temp_file}"></body></html>'
result_html = common.check_html_string(html_string)
self.assertIn(expected_base64_src, result_html)
self.assertNotIn(str(temp_file), result_html)
if __name__ == "__main__":
unittest.main()