Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
laramies
GitHub Repository: laramies/theHarvester
Path: blob/master/tests/lib/test_core.py
893 views
1
from __future__ import annotations
2
3
import ast
4
from pathlib import Path
5
from typing import Any
6
from unittest import mock
7
8
import pytest
9
import yaml
10
11
import theHarvester.lib.core as core_module
12
from theHarvester.lib.core import CONFIG_DIRS, DATA_DIR, Core
13
14
15
@pytest.fixture(autouse=True)
16
def mock_environ(monkeypatch, tmp_path: Path):
17
monkeypatch.setenv("HOME", str(tmp_path))
18
19
20
def mock_read_text(mocked: dict[Path, str | Exception]):
21
read_text = Path.read_text
22
23
def _read_text(self: Path, *args, **kwargs):
24
if result := mocked.get(self):
25
if isinstance(result, Exception):
26
raise result
27
return result
28
return read_text(self, *args, **kwargs)
29
30
return _read_text
31
32
33
@pytest.mark.parametrize(
34
("name", "contents", "expected"),
35
[
36
("api-keys", "apikeys: {}", {}),
37
("proxies", "http: [localhost:8080]", {"http": ["http://localhost:8080"], "socks5": []}),
38
],
39
)
40
@pytest.mark.parametrize("dir", CONFIG_DIRS)
41
def test_read_config_searches_config_dirs(
42
name: str, contents: str, expected: Any, dir: Path, capsys
43
):
44
file = dir.expanduser() / f"{name}.yaml"
45
config_files = [d.expanduser() / file.name for d in CONFIG_DIRS]
46
side_effect = mock_read_text(
47
{f: contents if f == file else FileNotFoundError() for f in config_files}
48
)
49
50
with mock.patch("pathlib.Path.read_text", autospec=True, side_effect=side_effect):
51
got = Core.api_keys() if name == "api-keys" else Core.proxy_list()
52
53
assert got == expected
54
assert f"Read {file.name} from {file}" in capsys.readouterr().out
55
56
57
@pytest.mark.parametrize("name", ("api-keys", "proxies"))
58
def test_read_config_copies_default_to_home(name: str, capsys):
59
file = Path(f"~/.theHarvester/{name}.yaml").expanduser()
60
config_files = [d.expanduser() / file.name for d in CONFIG_DIRS]
61
side_effect = mock_read_text({f: FileNotFoundError() for f in config_files})
62
63
with mock.patch("pathlib.Path.read_text", autospec=True, side_effect=side_effect):
64
got = Core.api_keys() if name == "api-keys" else Core.proxy_list()
65
66
default = yaml.safe_load((DATA_DIR / file.name).read_text())
67
expected = (
68
default["apikeys"]
69
if name == "api-keys"
70
else {
71
"http": [f"http://{h}" for h in default["http"]] if default.get("http") else [],
72
"socks5": [f"socks5://{h}" for h in default["socks5"]] if default.get("socks5") else [],
73
}
74
)
75
assert got == expected
76
assert f"Created default {file.name} at {file}" in capsys.readouterr().out
77
assert file.exists()
78
79
80
def _extract_required_apikey_entries_from_core() -> dict[str, set[str]]:
81
tree = ast.parse(Path(core_module.__file__).read_text(encoding="utf-8"))
82
core_class = next((n for n in tree.body if isinstance(n, ast.ClassDef) and n.name == "Core"), None)
83
assert core_class is not None, "Unable to locate `class Core` in theHarvester.lib.core"
84
85
required: dict[str, set[str]] = {}
86
for node in ast.walk(core_class):
87
if not isinstance(node, ast.Subscript):
88
continue
89
90
parts: list[str] = []
91
current: ast.AST = node
92
while isinstance(current, ast.Subscript):
93
sl = current.slice
94
if isinstance(sl, ast.Constant) and isinstance(sl.value, str):
95
parts.append(sl.value)
96
current = current.value
97
continue
98
break
99
100
if not parts or not isinstance(current, ast.Call):
101
continue
102
103
func = current.func
104
is_core_api_keys = (
105
isinstance(func, ast.Attribute)
106
and func.attr == "api_keys"
107
and isinstance(func.value, ast.Name)
108
and func.value.id == "Core"
109
)
110
if not is_core_api_keys:
111
continue
112
113
parts.reverse()
114
provider = parts[0]
115
required.setdefault(provider, set())
116
if len(parts) > 1:
117
required[provider].add(parts[1])
118
119
return required
120
121
122
def test_api_keys_yaml_is_in_sync_with_core_accessors():
123
required = _extract_required_apikey_entries_from_core()
124
assert required, "No API-key references were detected in `Core`"
125
126
config = yaml.safe_load((DATA_DIR / "api-keys.yaml").read_text(encoding="utf-8"))
127
apikeys = config["apikeys"]
128
129
missing_providers = sorted(set(required) - set(apikeys))
130
assert not missing_providers, f"Missing providers in api-keys.yaml: {missing_providers}"
131
132
missing_fields: dict[str, list[str]] = {}
133
for provider, fields in required.items():
134
for field in sorted(fields):
135
if field not in apikeys[provider]:
136
missing_fields.setdefault(provider, []).append(field)
137
138
assert not missing_fields, f"Missing fields in api-keys.yaml: {missing_fields}"
139
140