|
| 1 | +import os |
| 2 | +import re |
| 3 | +import tempfile |
| 4 | +from pathlib import Path |
| 5 | + |
| 6 | +import pytest |
| 7 | +from fastapi.testclient import TestClient |
| 8 | + |
| 9 | +from theHarvester.__main__ import sanitize_filename, sanitize_for_xml |
| 10 | + |
| 11 | + |
| 12 | +class TestCORSConfiguration: |
| 13 | + """Test CORS security configuration.""" |
| 14 | + |
| 15 | + def test_cors_does_not_allow_credentials_with_wildcard_origins(self): |
| 16 | + """ |
| 17 | + Security Test: CORS should not allow credentials with wildcard origins. |
| 18 | +
|
| 19 | + This prevents credential theft attacks where any origin can make |
| 20 | + authenticated requests to the API. |
| 21 | + """ |
| 22 | + from theHarvester.lib.api.api import app |
| 23 | + |
| 24 | + # Find CORS middleware in the app |
| 25 | + cors_middleware = None |
| 26 | + for middleware in app.user_middleware: |
| 27 | + if 'CORSMiddleware' in str(middleware.cls): |
| 28 | + cors_middleware = middleware |
| 29 | + break |
| 30 | + |
| 31 | + assert cors_middleware is not None, 'CORS middleware should be configured' |
| 32 | + |
| 33 | + # Check that if allow_origins contains '*', allow_credentials must be False |
| 34 | + # Access kwargs from the middleware |
| 35 | + options = cors_middleware.kwargs |
| 36 | + allow_origins = options.get('allow_origins', []) |
| 37 | + allow_credentials = options.get('allow_credentials', False) |
| 38 | + |
| 39 | + if '*' in allow_origins: |
| 40 | + assert ( |
| 41 | + allow_credentials is False |
| 42 | + ), 'CRITICAL: CORS must not allow credentials with wildcard origins (CVE risk)' |
| 43 | + |
| 44 | + def test_cors_restricts_http_methods(self): |
| 45 | + """ |
| 46 | + Security Test: CORS should restrict HTTP methods to only what's needed. |
| 47 | +
|
| 48 | + Reduces attack surface by limiting available methods. |
| 49 | + """ |
| 50 | + from theHarvester.lib.api.api import app |
| 51 | + |
| 52 | + cors_middleware = None |
| 53 | + for middleware in app.user_middleware: |
| 54 | + if 'CORSMiddleware' in str(middleware.cls): |
| 55 | + cors_middleware = middleware |
| 56 | + break |
| 57 | + |
| 58 | + assert cors_middleware is not None |
| 59 | + |
| 60 | + options = cors_middleware.kwargs |
| 61 | + allow_methods = options.get('allow_methods', []) |
| 62 | + |
| 63 | + # Should not allow all methods |
| 64 | + assert allow_methods != ['*'], 'CORS should restrict HTTP methods, not allow all (*)' |
| 65 | + |
| 66 | + # Should only allow necessary methods (GET, POST for this API) |
| 67 | + if isinstance(allow_methods, list): |
| 68 | + dangerous_methods = {'DELETE', 'PUT', 'PATCH', 'TRACE', 'CONNECT'} |
| 69 | + allowed_set = {m.upper() for m in allow_methods} |
| 70 | + assert not ( |
| 71 | + allowed_set & dangerous_methods |
| 72 | + ), f'Unnecessary HTTP methods detected: {allowed_set & dangerous_methods}' |
| 73 | + |
| 74 | + |
| 75 | +class TestXMLInjectionPrevention: |
| 76 | + """Test XML injection prevention.""" |
| 77 | + |
| 78 | + def test_sanitize_for_xml_escapes_special_characters(self): |
| 79 | + """ |
| 80 | + Security Test: Verify XML special characters are properly escaped. |
| 81 | +
|
| 82 | + Prevents XML injection attacks. |
| 83 | + """ |
| 84 | + # Test all XML special characters |
| 85 | + test_cases = [ |
| 86 | + ('&', '&'), |
| 87 | + ('<', '<'), |
| 88 | + ('>', '>'), |
| 89 | + ('"', '"'), |
| 90 | + ("'", '''), |
| 91 | + ('<script>alert("XSS")</script>', '<script>alert("XSS")</script>'), |
| 92 | + ( '[email protected] & <test>', '[email protected] & <test>'), |
| 93 | + ('Normal text', 'Normal text'), |
| 94 | + ] |
| 95 | + |
| 96 | + for input_text, expected_output in test_cases: |
| 97 | + result = sanitize_for_xml(input_text) |
| 98 | + assert result == expected_output, f'Failed to properly escape: {input_text}' |
| 99 | + |
| 100 | + def test_sanitize_for_xml_prevents_xml_entity_injection(self): |
| 101 | + """ |
| 102 | + Security Test: Prevent XML entity injection attempts. |
| 103 | + """ |
| 104 | + malicious_inputs = [ |
| 105 | + '<?xml version="1.0"?><!DOCTYPE foo [<!ENTITY xxe SYSTEM "file:///etc/passwd">]>', |
| 106 | + '<!ENTITY xxe SYSTEM "file:///dev/random">', |
| 107 | + '<![CDATA[malicious]]>', |
| 108 | + '<script>', |
| 109 | + ] |
| 110 | + |
| 111 | + for malicious_input in malicious_inputs: |
| 112 | + result = sanitize_for_xml(malicious_input) |
| 113 | + # Ensure dangerous characters are escaped |
| 114 | + assert '<' in result or '&' in result, f'Failed to sanitize: {malicious_input}' |
| 115 | + assert '<' not in result or result == malicious_input.replace('<', '<'), f'XML tags not escaped: {malicious_input}' |
| 116 | + |
| 117 | + def test_command_line_args_are_sanitized_in_xml_output(self): |
| 118 | + """ |
| 119 | + Security Test: Command line arguments must be sanitized before XML output. |
| 120 | +
|
| 121 | + This test is a conceptual check - in real usage, ensure the XML writing |
| 122 | + code uses sanitize_for_xml() on all user-controlled data. |
| 123 | + """ |
| 124 | + # Simulate dangerous command line arguments |
| 125 | + dangerous_args = [ |
| 126 | + '--domain=test.com', |
| 127 | + "--source='<script>alert(1)</script>'", |
| 128 | + '--output="; rm -rf /', |
| 129 | + '--domain=example.com¶m=<injection>', |
| 130 | + ] |
| 131 | + |
| 132 | + for arg in dangerous_args: |
| 133 | + sanitized = sanitize_for_xml(arg) |
| 134 | + # Verify no unescaped XML special characters remain |
| 135 | + assert '<script>' not in sanitized, f'Script tag not escaped in: {arg}' |
| 136 | + assert '¶m=' not in sanitized or '&' in sanitized, f'Ampersand not escaped in: {arg}' |
| 137 | + |
| 138 | + |
| 139 | +class TestInformationDisclosure: |
| 140 | + """Test information disclosure prevention.""" |
| 141 | + |
| 142 | + @pytest.fixture |
| 143 | + def client(self): |
| 144 | + """Create a test client for API testing.""" |
| 145 | + from theHarvester.lib.api.api import app |
| 146 | + |
| 147 | + return TestClient(app) |
| 148 | + |
| 149 | + def test_api_does_not_expose_traceback_in_error_responses(self, client): |
| 150 | + """ |
| 151 | + Security Test: API should never expose stack traces to clients. |
| 152 | +
|
| 153 | + Stack traces can reveal sensitive information about the system. |
| 154 | + """ |
| 155 | + # Test the /sources endpoint with a simulated error condition |
| 156 | + response = client.get('/sources') |
| 157 | + |
| 158 | + # Even if there's an error, traceback should not be in response |
| 159 | + if response.status_code >= 400: |
| 160 | + response_data = response.json() |
| 161 | + assert 'traceback' not in response_data, 'Traceback exposed in error response' |
| 162 | + assert 'Traceback' not in str(response_data), 'Traceback text found in response' |
| 163 | + assert 'File "' not in str(response_data), 'File paths exposed in response' |
| 164 | + |
| 165 | + def test_error_responses_do_not_leak_internal_paths(self, client): |
| 166 | + """ |
| 167 | + Security Test: Error messages should not reveal internal file paths. |
| 168 | + """ |
| 169 | + # Try various endpoints |
| 170 | + endpoints = ['/sources', '/dnsbrute?domain=test', '/query?domain=test&source=baidu'] |
| 171 | + |
| 172 | + for endpoint in endpoints: |
| 173 | + response = client.get(endpoint) |
| 174 | + response_text = str(response.json() if response.status_code != 200 else {}) |
| 175 | + |
| 176 | + # Check for common path leakage patterns |
| 177 | + path_patterns = [ |
| 178 | + r'/home/\w+/', |
| 179 | + r'/usr/local/', |
| 180 | + r'C:\\Users\\', |
| 181 | + r'/var/www/', |
| 182 | + r'site-packages/', |
| 183 | + r'\.py:\d+', # filename.py:123 |
| 184 | + ] |
| 185 | + |
| 186 | + for pattern in path_patterns: |
| 187 | + matches = re.findall(pattern, response_text) |
| 188 | + assert not matches, f'Internal path leaked in {endpoint}: {matches}' |
| 189 | + |
| 190 | + def test_debug_mode_does_not_expose_sensitive_info(self, client, monkeypatch): |
| 191 | + """ |
| 192 | + Security Test: Even with DEBUG=1, sensitive info should not be exposed to clients. |
| 193 | + """ |
| 194 | + # Set DEBUG environment variable |
| 195 | + monkeypatch.setenv('DEBUG', '1') |
| 196 | + |
| 197 | + # Make request that might trigger an error |
| 198 | + response = client.get('/dnsbrute?domain=') # Invalid request |
| 199 | + |
| 200 | + if response.status_code >= 400: |
| 201 | + response_data = response.json() |
| 202 | + # Even with DEBUG=1, traceback should NOT be sent to client |
| 203 | + assert 'traceback' not in response_data, 'DEBUG mode exposes tracebacks to clients' |
| 204 | + |
| 205 | + |
| 206 | +class TestPathTraversalPrevention: |
| 207 | + """Test path traversal prevention.""" |
| 208 | + |
| 209 | + def test_sanitize_filename_removes_path_components(self): |
| 210 | + """ |
| 211 | + Security Test: Filenames should not contain path traversal sequences. |
| 212 | + """ |
| 213 | + dangerous_filenames = [ |
| 214 | + '../../../etc/passwd', |
| 215 | + '..\\..\\..\\windows\\system32\\config\\sam', |
| 216 | + '/etc/passwd', |
| 217 | + 'C:\\Windows\\System32\\config\\sam', |
| 218 | + '../../sensitive_file.txt', |
| 219 | + './../hidden_file', |
| 220 | + 'subdir/../../../etc/passwd', |
| 221 | + ] |
| 222 | + |
| 223 | + for dangerous_filename in dangerous_filenames: |
| 224 | + result = sanitize_filename(dangerous_filename) |
| 225 | + |
| 226 | + # Should not contain any path separators |
| 227 | + assert '/' not in result, f'Path separator found in sanitized filename: {result}' |
| 228 | + assert '\\' not in result, f'Windows path separator found: {result}' |
| 229 | + |
| 230 | + # Should not start with .. (parent directory reference at the beginning is most dangerous) |
| 231 | + assert not result.startswith('..'), f'Parent directory reference at start: {result}' |
| 232 | + |
| 233 | + # Should only be the basename |
| 234 | + assert os.path.dirname(result) == '', f'Path component remains: {result}' |
| 235 | + |
| 236 | + def test_sanitize_filename_removes_dangerous_characters(self): |
| 237 | + """ |
| 238 | + Security Test: Filenames should only contain safe characters. |
| 239 | + """ |
| 240 | + test_cases = [ |
| 241 | + 'file; rm -rf /', |
| 242 | + 'file`whoami`.txt', |
| 243 | + 'file$(malicious).txt', |
| 244 | + 'file|cmd.txt', |
| 245 | + 'file&background.txt', |
| 246 | + 'normal-file_123.txt', |
| 247 | + ] |
| 248 | + |
| 249 | + for input_filename in test_cases: |
| 250 | + result = sanitize_filename(input_filename) |
| 251 | + |
| 252 | + # Should not be empty |
| 253 | + assert len(result) > 0, f'Sanitized filename is empty for: {input_filename}' |
| 254 | + |
| 255 | + # Should not contain shell special characters |
| 256 | + dangerous_chars = [';', '|', '&', '$', '`', '(', ')', '{', '}', '[', ']', '<', '>'] |
| 257 | + for char in dangerous_chars: |
| 258 | + assert char not in result, f'Dangerous character {char} found in: {result}' |
| 259 | + |
| 260 | + # Should only contain alphanumeric, dash, underscore, and dot |
| 261 | + assert re.match(r'^[a-zA-Z0-9._-]+$', result), f'Invalid characters in sanitized filename: {result}' |
| 262 | + |
| 263 | + def test_sanitize_filename_prevents_hidden_files(self): |
| 264 | + """ |
| 265 | + Security Test: Prevent creation of hidden files. |
| 266 | + """ |
| 267 | + hidden_files = ['.bashrc', '.ssh_config', '.env', '..hidden', '.'] |
| 268 | + |
| 269 | + for hidden_file in hidden_files: |
| 270 | + result = sanitize_filename(hidden_file) |
| 271 | + |
| 272 | + # Should not start with a dot (except for allowed extensions) |
| 273 | + if result: # If not empty |
| 274 | + assert not result.startswith('.'), f'Hidden file not prevented: {result}' |
| 275 | + |
| 276 | + def test_filename_sanitization_preserves_safe_filenames(self): |
| 277 | + """ |
| 278 | + Security Test: Safe filenames should remain mostly unchanged. |
| 279 | + """ |
| 280 | + safe_filenames = [ |
| 281 | + 'report.json', |
| 282 | + 'results_2024-01-17.xml', |
| 283 | + 'scan-output.txt', |
| 284 | + 'data_file_v2.csv', |
| 285 | + ] |
| 286 | + |
| 287 | + for safe_filename in safe_filenames: |
| 288 | + result = sanitize_filename(safe_filename) |
| 289 | + |
| 290 | + # Safe filenames should be preserved (possibly with minor changes) |
| 291 | + assert len(result) > 0, 'Safe filename was completely removed' |
| 292 | + assert '.' in result if '.' in safe_filename else True, 'File extension removed incorrectly' |
| 293 | + |
| 294 | + def test_path_traversal_in_file_operations(self): |
| 295 | + """ |
| 296 | + Integration Test: Verify file operations don't allow path traversal. |
| 297 | + """ |
| 298 | + # This tests the actual usage in the code |
| 299 | + from theHarvester.__main__ import sanitize_filename |
| 300 | + |
| 301 | + # Simulate user input |
| 302 | + user_input = '../../../etc/passwd' |
| 303 | + sanitized = sanitize_filename(user_input) |
| 304 | + |
| 305 | + # Try to create a file with sanitized name |
| 306 | + with tempfile.TemporaryDirectory() as tmpdir: |
| 307 | + safe_path = os.path.join(tmpdir, sanitized) |
| 308 | + |
| 309 | + # Ensure the resolved path is still within tmpdir |
| 310 | + assert os.path.commonpath([tmpdir, safe_path]) == tmpdir, 'Path traversal detected!' |
| 311 | + |
| 312 | + # Verify we can't escape the directory |
| 313 | + assert tmpdir in os.path.abspath(safe_path), 'File path escaped temporary directory' |
| 314 | + |
| 315 | + |
| 316 | +class TestSecurityBestPractices: |
| 317 | + """Additional security best practices tests.""" |
| 318 | + |
| 319 | + def test_no_hardcoded_secrets_in_code(self): |
| 320 | + """ |
| 321 | + Security Test: Ensure no hardcoded secrets in main code files. |
| 322 | + """ |
| 323 | + # Check main application files for common secret patterns |
| 324 | + files_to_check = [ |
| 325 | + 'theHarvester/__main__.py', |
| 326 | + 'theHarvester/lib/api/api.py', |
| 327 | + 'theHarvester/lib/core.py', |
| 328 | + ] |
| 329 | + |
| 330 | + # Patterns that might indicate hardcoded secrets |
| 331 | + secret_patterns = [ |
| 332 | + r'password\s*=\s*["\'][^"\']+["\']', |
| 333 | + r'api_key\s*=\s*["\'][a-zA-Z0-9]{20,}["\']', |
| 334 | + r'secret\s*=\s*["\'][^"\']+["\']', |
| 335 | + r'token\s*=\s*["\'][a-zA-Z0-9]{20,}["\']', |
| 336 | + ] |
| 337 | + |
| 338 | + for file_path in files_to_check: |
| 339 | + if os.path.exists(file_path): |
| 340 | + with open(file_path) as f: |
| 341 | + content = f.read() |
| 342 | + |
| 343 | + for pattern in secret_patterns: |
| 344 | + matches = re.findall(pattern, content, re.IGNORECASE) |
| 345 | + # Filter out obvious non-secrets (like example values, empty strings, variable names) |
| 346 | + real_matches = [ |
| 347 | + m |
| 348 | + for m in matches |
| 349 | + if 'example' not in m.lower() |
| 350 | + and 'your_' not in m.lower() |
| 351 | + and '""' not in m |
| 352 | + and "''" not in m |
| 353 | + ] |
| 354 | + assert not real_matches, f'Potential hardcoded secret in {file_path}: {real_matches}' |
| 355 | + |
| 356 | + def test_api_has_rate_limiting(self): |
| 357 | + """ |
| 358 | + Security Test: Verify API endpoints have rate limiting enabled. |
| 359 | + """ |
| 360 | + from theHarvester.lib.api.api import app |
| 361 | + |
| 362 | + # Check that rate limiting is configured |
| 363 | + assert hasattr(app.state, 'limiter'), 'Rate limiter not configured' |
| 364 | + assert app.state.limiter is not None, 'Rate limiter is None' |
| 365 | + |
| 366 | + def test_sensitive_endpoints_require_validation(self): |
| 367 | + """ |
| 368 | + Security Test: Ensure sensitive endpoints validate input. |
| 369 | + """ |
| 370 | + from fastapi.testclient import TestClient |
| 371 | + |
| 372 | + from theHarvester.lib.api.api import app |
| 373 | + |
| 374 | + client = TestClient(app) |
| 375 | + |
| 376 | + # Test that endpoints reject invalid input |
| 377 | + # Note: The /query endpoint requires 'source' as a list parameter |
| 378 | + test_cases = [ |
| 379 | + ('/dnsbrute?domain=', 400), # Empty domain should be rejected |
| 380 | + ] |
| 381 | + |
| 382 | + for endpoint, expected_status in test_cases: |
| 383 | + response = client.get(endpoint) |
| 384 | + assert ( |
| 385 | + response.status_code >= 400 |
| 386 | + ), f'Endpoint {endpoint} should reject invalid input (got {response.status_code})' |
| 387 | + |
| 388 | + # Test query endpoint with proper parameter format but invalid domain |
| 389 | + response = client.get('/query?domain=a&source=baidu') # Too short domain |
| 390 | + # This may or may not fail depending on validation, but we check it doesn't crash |
| 391 | + assert response.status_code in [200, 400, 422, 500], 'Unexpected status code' |
| 392 | + |
| 393 | + |
| 394 | +if __name__ == '__main__': |
| 395 | + pytest.main([__file__, '-v']) |
0 commit comments