Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
laramies
GitHub Repository: laramies/theHarvester
Path: blob/master/tests/discovery/test_shodan_engine.py
904 views
1
import socket
2
import sys
3
from collections import OrderedDict
4
5
import pytest
6
7
8
class TestShodanEngine:
9
@pytest.mark.asyncio
10
async def test_shodan_engine_processes_without_work_item_error_and_yields_hostnames(self, monkeypatch, capsys):
11
# Import inside the test so monkeypatching affects the already-imported module namespace.
12
import theHarvester.__main__ as main_module
13
14
# Make DNS resolution deterministic and offline.
15
monkeypatch.setattr(socket, "gethostbyname", lambda _domain: "1.2.3.4", raising=True)
16
17
# Avoid filesystem/sqlite side effects.
18
class DummyStashManager:
19
async def do_init(self) -> None:
20
return None
21
22
async def store_all(self, domain, all, res_type, source) -> None: # noqa: A002
23
return None
24
25
monkeypatch.setattr(main_module.stash, "StashManager", DummyStashManager, raising=True)
26
27
# Stub Shodan search to avoid network and API key requirements.
28
class DummySearchShodan:
29
async def search_ip(self, ip):
30
return OrderedDict({ip: {"hostnames": ["a.example.com", "b.example.com"]}})
31
32
monkeypatch.setattr(main_module.shodansearch, "SearchShodan", DummySearchShodan, raising=True)
33
34
# Run the CLI path that uses the engine queue/worker (`-b shodan`).
35
monkeypatch.setattr(sys, "argv", ["theHarvester", "-d", "example.com", "-b", "shodan"], raising=True)
36
37
with pytest.raises(SystemExit) as excinfo:
38
await main_module.start()
39
assert excinfo.value.code == 0
40
41
out = capsys.readouterr().out
42
assert 'A error occurred while processing a "work item"' not in out
43
assert "a.example.com" in out
44
assert "b.example.com" in out
45
46