Skip to content

Commit f180ecf

Browse files
Copilotsgerlach
andcommitted
Complete issue #66 - Remove duplicates and add focused repository tools
Co-authored-by: sgerlach <[email protected]>
1 parent b4c7dee commit f180ecf

File tree

2 files changed

+212
-144
lines changed

2 files changed

+212
-144
lines changed

stackhawk_mcp/server.py

Lines changed: 0 additions & 144 deletions
Original file line numberDiff line numberDiff line change
@@ -2102,68 +2102,7 @@ def _detect_project_language_and_frameworks(self) -> dict:
21022102
"frameworks": frameworks
21032103
}
21042104

2105-
async def _get_sensitive_data_report(self, org_id: str, data_type_filter: str = "All", time_range: str = "30d", include_details: bool = True, group_by: str = "data_type", **kwargs) -> Dict[str, Any]:
2106-
"""
2107-
Generate a grouped and summarized sensitive data report for an entire organization.
2108-
Use this for org-wide analytics, compliance, and reporting.
2109-
"""
2110-
try:
2111-
# For all-time reports, fetch all results to get complete picture
2112-
if time_range == "all":
2113-
findings_response = await self.client.list_sensitive_data_findings(org_id, all_results=True)
2114-
findings = findings_response.get("sensitiveDataFindings", [])
2115-
else:
2116-
# For time-limited reports, use pagination to get a reasonable sample
2117-
findings_params = {"pageSize": 1000}
2118-
findings_response = await self.client.list_sensitive_data_findings(org_id, **findings_params)
2119-
findings = findings_response.get("sensitiveDataFindings", [])
21202105

2121-
# Apply client-side filters
2122-
filtered_findings = findings
2123-
2124-
if data_type_filter != "All":
2125-
filtered_findings = [
2126-
f for f in filtered_findings
2127-
if f.get("dataType") == data_type_filter
2128-
]
2129-
2130-
if time_range != "all":
2131-
time_filter = {
2132-
"startDate": (datetime.now() - timedelta(days=int(time_range[:-1]))).isoformat(),
2133-
"endDate": datetime.now().isoformat()
2134-
}
2135-
filtered_findings = [
2136-
f for f in filtered_findings
2137-
if f.get("findingDate") >= time_filter["startDate"] and f.get("findingDate") <= time_filter["endDate"]
2138-
]
2139-
2140-
# Group findings
2141-
grouped_findings = {}
2142-
for finding in filtered_findings:
2143-
group_key = finding.get(group_by)
2144-
if group_key not in grouped_findings:
2145-
grouped_findings[group_key] = []
2146-
grouped_findings[group_key].append(finding)
2147-
2148-
# Format findings
2149-
formatted_findings = []
2150-
for group, findings in grouped_findings.items():
2151-
formatted_findings.append({
2152-
"group": group,
2153-
"findings": findings
2154-
})
2155-
2156-
return {
2157-
"organizationId": org_id,
2158-
"dataTypeFilter": data_type_filter,
2159-
"timeRange": time_range,
2160-
"report": formatted_findings,
2161-
"totalFindings": len(filtered_findings),
2162-
"timestamp": datetime.now().isoformat()
2163-
}
2164-
except Exception as e:
2165-
debug_print(f"Error in _get_sensitive_data_report: {e}")
2166-
raise
21672106

21682107
async def _analyze_sensitive_data_trends(self, org_id: str, analysis_period: str = "90d", include_applications: bool = True, include_repositories: bool = True, **kwargs) -> Dict[str, Any]:
21692108
"""
@@ -2248,90 +2187,7 @@ async def _analyze_sensitive_data_trends(self, org_id: str, analysis_period: str
22482187
debug_print(f"Error in _analyze_sensitive_data_trends: {e}")
22492188
raise
22502189

2251-
async def _get_critical_sensitive_data(self, org_id: str, data_types: List[str] = None, include_remediation: bool = True, max_results: int = 50, **kwargs) -> Dict[str, Any]:
2252-
"""Get critical sensitive data findings requiring immediate attention"""
2253-
try:
2254-
if data_types is None:
2255-
data_types = ["PII", "PCI", "PHI"]
2256-
2257-
# For critical findings, we want to see ALL critical findings, not just the first page
2258-
findings_response = await self.client.list_sensitive_data_findings(org_id, all_results=True)
2259-
findings = findings_response.get("sensitiveDataFindings", [])
22602190

2261-
# Filter findings based on data types
2262-
filtered_findings = [
2263-
f for f in findings
2264-
if f.get("dataType") in data_types
2265-
]
2266-
2267-
# Include remediation details
2268-
if include_remediation:
2269-
for finding in filtered_findings:
2270-
finding["remediation"] = finding.get("remediationDetails", "No remediation details available")
2271-
2272-
return {
2273-
"organizationId": org_id,
2274-
"dataTypes": data_types,
2275-
"findings": filtered_findings,
2276-
"totalFindings": len(filtered_findings),
2277-
"timestamp": datetime.now().isoformat()
2278-
}
2279-
except Exception as e:
2280-
debug_print(f"Error in _get_critical_sensitive_data: {e}")
2281-
raise
2282-
2283-
async def _generate_sensitive_data_summary(self, org_id: str, time_period: str = "30d", include_recommendations: bool = True, include_risk_assessment: bool = True, **kwargs) -> Dict[str, Any]:
2284-
"""Generate executive-level sensitive data summary and recommendations"""
2285-
try:
2286-
# For all-time reports, fetch all results to get complete picture
2287-
if time_period == "all":
2288-
findings_response = await self.client.list_sensitive_data_findings(org_id, all_results=True)
2289-
findings = findings_response.get("sensitiveDataFindings", [])
2290-
else:
2291-
# For time-limited reports, use pagination to get a reasonable sample
2292-
findings_params = {"pageSize": 1000}
2293-
findings_response = await self.client.list_sensitive_data_findings(org_id, **findings_params)
2294-
findings = findings_response.get("sensitiveDataFindings", [])
2295-
2296-
# Group findings by data type
2297-
data_type_findings = {"PII": [], "PCI": [], "PHI": [], "Other": []}
2298-
for finding in findings:
2299-
data_type = finding.get("dataType", "Other")
2300-
if data_type in data_type_findings:
2301-
data_type_findings[data_type].append(finding)
2302-
else:
2303-
data_type_findings["Other"].append(finding)
2304-
2305-
# Generate summary
2306-
summary = {
2307-
"totalFindings": len(findings),
2308-
"dataTypeBreakdown": data_type_findings
2309-
}
2310-
2311-
# Include recommendations
2312-
if include_recommendations:
2313-
summary["recommendations"] = [
2314-
{
2315-
"dataType": data_type,
2316-
"recommendation": f"Review and secure {data_type} data exposure"
2317-
}
2318-
for data_type, findings in data_type_findings.items()
2319-
if findings
2320-
]
2321-
2322-
# Include risk assessment
2323-
if include_risk_assessment:
2324-
summary["riskAssessment"] = self._calculate_sensitive_data_risk_score(findings)
2325-
2326-
return {
2327-
"organizationId": org_id,
2328-
"timePeriod": time_period,
2329-
"summary": summary,
2330-
"timestamp": datetime.now().isoformat()
2331-
}
2332-
except Exception as e:
2333-
debug_print(f"Error in _generate_sensitive_data_summary: {e}")
2334-
raise
23352191

23362192
async def _check_repository_attack_surface(self, repo_name: str = None, org_id: str = None, include_vulnerabilities: bool = True, include_apps: bool = True, **kwargs) -> Dict[str, Any]:
23372193
"""Check if a repository name exists in StackHawk attack surface and get security information"""

test_new_tools.py

Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Test script for the new MCP tools implemented for issue #66
4+
5+
This script tests:
6+
1. check_repository_attack_surface - Check if repo is in attack surface
7+
2. check_repository_sensitive_data - Check if repo has sensitive data
8+
3. list_application_repository_connections - List app-repo connections
9+
4. get_sensitive_data_summary - Comprehensive sensitive data summary
10+
"""
11+
12+
import asyncio
13+
import json
14+
import os
15+
import sys
16+
from datetime import datetime
17+
18+
# Add the stackhawk_mcp directory to the path
19+
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '.')))
20+
21+
from stackhawk_mcp.server import StackHawkMCPServer
22+
23+
24+
async def test_new_tools():
25+
"""Test the new MCP tools implemented for issue #66"""
26+
27+
api_key = os.environ.get("STACKHAWK_API_KEY")
28+
if not api_key:
29+
print("❌ STACKHAWK_API_KEY environment variable is required")
30+
print("Please set it to test the new tools")
31+
return
32+
33+
print("🔍 Testing New MCP Tools for Issue #66")
34+
print("=" * 60)
35+
36+
server = StackHawkMCPServer(api_key)
37+
38+
try:
39+
# Get user info
40+
user_info = await server.client.get_user_info()
41+
org_id = user_info["user"]["external"]["organizations"][0]["organization"]["id"]
42+
org_name = user_info["user"]["external"]["organizations"][0]["organization"]["name"]
43+
44+
print(f"✅ Connected to organization: {org_name} (ID: {org_id})")
45+
print()
46+
47+
# Test 1: Check Repository Attack Surface
48+
print("1. Testing check_repository_attack_surface...")
49+
try:
50+
# Test with the current repository name
51+
current_repo = "stackhawk-mcp" # This repo
52+
result = await server._check_repository_attack_surface(
53+
repo_name=current_repo,
54+
include_vulnerabilities=True,
55+
include_apps=True
56+
)
57+
print("✅ Repository attack surface check completed!")
58+
print(f" Repository: {result['repository_name']}")
59+
print(f" Found in attack surface: {result.get('found_in_attack_surface', False)}")
60+
print(f" Total matching repos: {len(result.get('matching_repositories', []))}")
61+
if result.get('connected_applications'):
62+
print(f" Connected apps: {result['total_connected_apps']}")
63+
print(f" Recommendation: {result.get('recommendation', 'None')}")
64+
except Exception as e:
65+
print(f"❌ Failed to check repository attack surface: {e}")
66+
67+
print("\n" + "="*60 + "\n")
68+
69+
# Test 2: Check Repository Sensitive Data
70+
print("2. Testing check_repository_sensitive_data...")
71+
try:
72+
result = await server._check_repository_sensitive_data(
73+
repo_name=current_repo,
74+
data_type_filter="All",
75+
include_remediation=True
76+
)
77+
print("✅ Repository sensitive data check completed!")
78+
print(f" Repository: {result['repository_name']}")
79+
print(f" Found in StackHawk: {result.get('found_in_stackhawk', False)}")
80+
print(f" Has sensitive data: {result.get('has_sensitive_data', False)}")
81+
print(f" Total findings: {result.get('total_findings', 0)}")
82+
if result.get('data_type_breakdown'):
83+
print(f" Data type breakdown: {result['data_type_breakdown']}")
84+
print(f" Recommendation: {result.get('recommendation', 'None')}")
85+
except Exception as e:
86+
print(f"❌ Failed to check repository sensitive data: {e}")
87+
88+
print("\n" + "="*60 + "\n")
89+
90+
# Test 3: List Application Repository Connections
91+
print("3. Testing list_application_repository_connections...")
92+
try:
93+
result = await server._list_application_repository_connections(
94+
include_repo_details=True,
95+
include_app_details=True,
96+
filter_connected_only=False
97+
)
98+
print("✅ Application-repository connections listed!")
99+
print(f" Total applications: {result['total_applications']}")
100+
print(f" Total repositories: {result['total_repositories']}")
101+
print(f" Total connections: {result['total_connections']}")
102+
103+
coverage_stats = result.get('coverage_stats', {})
104+
print(f" Connected applications: {coverage_stats.get('connected_applications', 0)}")
105+
print(f" Orphaned applications: {coverage_stats.get('orphaned_applications', 0)}")
106+
print(f" Orphaned repositories: {coverage_stats.get('orphaned_repositories', 0)}")
107+
print(f" Connection coverage: {coverage_stats.get('connection_coverage', 0):.1f}%")
108+
109+
recommendations = result.get('recommendations', [])
110+
if recommendations:
111+
print(" Recommendations:")
112+
for i, rec in enumerate(recommendations[:3], 1):
113+
print(f" {i}. {rec}")
114+
except Exception as e:
115+
print(f"❌ Failed to list application-repository connections: {e}")
116+
117+
print("\n" + "="*60 + "\n")
118+
119+
# Test 4: Get Comprehensive Sensitive Data Summary
120+
print("4. Testing get_comprehensive_sensitive_data_summary...")
121+
try:
122+
result = await server._get_comprehensive_sensitive_data_summary(
123+
time_period="30d",
124+
include_trends=True,
125+
include_critical_only=False,
126+
include_recommendations=True,
127+
group_by="data_type"
128+
)
129+
print("✅ Comprehensive sensitive data summary generated!")
130+
print(f" Total findings: {result['total_findings']}")
131+
print(f" Analysis type: {result['analysis_type']}")
132+
print(f" Overall risk score: {result.get('overall_risk_score', 0):.1f}")
133+
print(f" Group by: {result['group_by']}")
134+
135+
grouped_summary = result.get('grouped_summary', {})
136+
print(f" Groups found: {len(grouped_summary)}")
137+
for group_name, group_data in list(grouped_summary.items())[:3]:
138+
print(f" {group_name}: {group_data['count']} findings, risk: {group_data['risk_score']:.1f}")
139+
140+
recommendations = result.get('recommendations', [])
141+
if recommendations:
142+
print(" Recommendations:")
143+
for i, rec in enumerate(recommendations[:3], 1):
144+
print(f" {i}. {rec}")
145+
except Exception as e:
146+
print(f"❌ Failed to generate comprehensive sensitive data summary: {e}")
147+
148+
print("\n" + "="*60 + "\n")
149+
150+
# Test 5: Test MCP Tool Interface
151+
print("5. Testing MCP tool interface...")
152+
try:
153+
# List available tools
154+
tools = await server._list_tools_handler()
155+
new_tool_names = [
156+
"check_repository_attack_surface",
157+
"check_repository_sensitive_data",
158+
"list_application_repository_connections",
159+
"get_sensitive_data_summary"
160+
]
161+
162+
found_tools = [tool.name for tool in tools if tool.name in new_tool_names]
163+
print(f"✅ Found {len(found_tools)}/{len(new_tool_names)} new tools in MCP interface")
164+
for tool_name in found_tools:
165+
print(f" ✓ {tool_name}")
166+
167+
missing_tools = [name for name in new_tool_names if name not in found_tools]
168+
if missing_tools:
169+
print(" Missing tools:")
170+
for tool_name in missing_tools:
171+
print(f" ✗ {tool_name}")
172+
except Exception as e:
173+
print(f"❌ Failed to test MCP tool interface: {e}")
174+
175+
print("\n" + "="*60 + "\n")
176+
177+
# Test 6: Demonstrate improved tool usage
178+
print("6. Testing tool call interface...")
179+
try:
180+
# Test calling the new tool through the MCP interface
181+
result = await server.call_tool(
182+
"check_repository_attack_surface",
183+
{"repo_name": "test-repo", "include_vulnerabilities": True}
184+
)
185+
print("✅ Tool call interface working!")
186+
print(f" Response type: {type(result)}")
187+
print(f" Response length: {len(result) if result else 0}")
188+
except Exception as e:
189+
print(f"⚠️ Tool call interface test failed: {e}")
190+
191+
print("\n" + "="*60)
192+
print("✅ All New MCP Tools Testing Complete!")
193+
print("\nSummary of Changes for Issue #66:")
194+
print("- ✅ Removed duplicate sensitive data tools")
195+
print("- ✅ Added attack surface lookup for current repository")
196+
print("- ✅ Added sensitive data lookup for current repository")
197+
print("- ✅ Added application/code repository connection mapping")
198+
print("- ✅ Consolidated sensitive data tools into single comprehensive tool")
199+
print("- ✅ All tools support auto-detection of current repository name")
200+
print("- ✅ All tools provide actionable recommendations")
201+
202+
except Exception as e:
203+
print(f"❌ Error during testing: {e}")
204+
import traceback
205+
traceback.print_exc()
206+
207+
finally:
208+
await server.cleanup()
209+
210+
211+
if __name__ == "__main__":
212+
asyncio.run(test_new_tools())

0 commit comments

Comments
 (0)