Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
833 changes: 833 additions & 0 deletions Backup/main.py

Large diffs are not rendered by default.

File renamed without changes.
5 changes: 5 additions & 0 deletions src/lg_sotf/api/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""LG-SOTF API package."""

from .app import app, create_app

__all__ = ["app", "create_app"]
213 changes: 213 additions & 0 deletions src/lg_sotf/api/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
"""FastAPI application factory for LG-SOTF API."""

import asyncio
import logging
from typing import Optional

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

from lg_sotf.app_initializer import LG_SOTFApplication
from lg_sotf.api.utils.websocket import WebSocketManager
from lg_sotf.api.routers import (
alerts,
correlations,
dashboard,
escalations,
ingestion,
metrics,
websocket,
)

logger = logging.getLogger(__name__)


def create_app(
config_path: str = "configs/development.yaml",
setup_signal_handlers: bool = False,
) -> FastAPI:
"""Create and configure the FastAPI application.

Args:
config_path: Path to configuration file
setup_signal_handlers: Whether to setup signal handlers (False for uvicorn)

Returns:
Configured FastAPI application instance
"""
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)

# Create FastAPI app
app = FastAPI(
title="LG-SOTF Dashboard API",
description="Production-grade SOC Dashboard API",
version="1.0.0",
docs_url="/api/docs",
redoc_url="/api/redoc"
)

# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)

# Create application instances (will be initialized in startup event)
lg_sotf_app = LG_SOTFApplication(
config_path=config_path,
setup_signal_handlers=setup_signal_handlers
)
ws_manager = WebSocketManager()

# Store in app state for dependency injection
app.state.lg_sotf_app = lg_sotf_app
app.state.ws_manager = ws_manager

# Track background tasks for proper shutdown
app.state.background_tasks = []

# Register routers
app.include_router(metrics.router)
app.include_router(alerts.router)
app.include_router(ingestion.router)
app.include_router(dashboard.router)
app.include_router(correlations.router)
app.include_router(escalations.router)
app.include_router(websocket.router)

# Startup event handler
@app.on_event("startup")
async def startup():
"""Initialize application on startup."""
logger.info("Starting LG-SOTF API server...")

# Initialize LG-SOTF application
await lg_sotf_app.initialize()
logger.info("LG-SOTF application initialized")

# Start background tasks
_start_background_tasks(app)
logger.info("Background tasks started")

logger.info("✅ LG-SOTF API server ready")

# Shutdown event handler
@app.on_event("shutdown")
async def shutdown():
"""Cleanup on shutdown."""
logger.info("🛑 Shutting down API server...")

# Close all WebSocket connections
if ws_manager.active_connections:
logger.info(f"Closing {len(ws_manager.active_connections)} WebSocket connections...")
connections = list(ws_manager.active_connections.values())
for ws in connections:
try:
await ws.close()
except Exception as e:
logger.error(f"Error closing WebSocket: {e}")
ws_manager.active_connections.clear()
logger.info("✓ WebSocket connections closed")

# Cancel background tasks
if app.state.background_tasks:
logger.info(f"Cancelling {len(app.state.background_tasks)} background tasks...")
for task in app.state.background_tasks:
if not task.done():
task.cancel()

# Wait for tasks to complete with timeout
try:
await asyncio.wait_for(
asyncio.gather(*app.state.background_tasks, return_exceptions=True),
timeout=5.0
)
logger.info("✓ Background tasks cancelled")
except asyncio.TimeoutError:
logger.warning("⚠ Some background tasks did not complete within timeout")

# Shutdown LG-SOTF application
await lg_sotf_app.shutdown()
logger.info("✅ Shutdown complete")

return app


def _start_background_tasks(app: FastAPI):
"""Start background monitoring and update tasks."""
ws_manager: WebSocketManager = app.state.ws_manager
lg_sotf_app: LG_SOTFApplication = app.state.lg_sotf_app

async def metrics_updater():
"""Periodically collect and broadcast system metrics."""
while True:
try:
await asyncio.sleep(10)

# Import here to avoid circular dependency
from lg_sotf.api.routers.metrics import _collect_system_metrics

metrics = await _collect_system_metrics(lg_sotf_app)

await ws_manager.broadcast({
"type": "system_metrics",
"data": metrics.model_dump()
}, "system_metrics")

except asyncio.CancelledError:
logger.info("Metrics updater cancelled")
break
except Exception as e:
logger.error(f"Metrics updater error: {e}")

async def ingestion_monitor():
"""Monitor ingestion activity and broadcast updates."""
while True:
try:
await asyncio.sleep(5) # Check every 5 seconds

if not lg_sotf_app.workflow_engine:
continue

ingestion_agent = (
lg_sotf_app.workflow_engine.agents.get("ingestion_instance") or
lg_sotf_app.workflow_engine.agents.get("ingestion")
)

if not ingestion_agent:
continue

# Broadcast ingestion stats
await ws_manager.broadcast({
"type": "ingestion_stats",
"data": {
"total_ingested": ingestion_agent.ingestion_stats["total_ingested"],
"total_deduplicated": ingestion_agent.ingestion_stats["total_deduplicated"],
"total_errors": ingestion_agent.ingestion_stats["total_errors"],
"by_source": dict(ingestion_agent.ingestion_stats["by_source"]),
"enabled_sources": ingestion_agent.enabled_sources,
"last_poll": lg_sotf_app._last_ingestion_poll.isoformat() if lg_sotf_app._last_ingestion_poll else None
}
}, "ingestion_updates")

except asyncio.CancelledError:
logger.info("Ingestion monitor cancelled")
break
except Exception as e:
logger.error(f"Ingestion monitor error: {e}")

# Create and track background tasks
app.state.background_tasks.append(asyncio.create_task(ws_manager.heartbeat_loop()))
app.state.background_tasks.append(asyncio.create_task(metrics_updater()))
app.state.background_tasks.append(asyncio.create_task(ingestion_monitor()))


# Create app instance for uvicorn
app = create_app()
30 changes: 30 additions & 0 deletions src/lg_sotf/api/dependencies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""Dependency injection for FastAPI routes."""

from fastapi import Depends, Request

from lg_sotf.app_initializer import LG_SOTFApplication
from lg_sotf.api.utils.websocket import WebSocketManager


def get_lg_sotf_app(request: Request) -> LG_SOTFApplication:
"""Get the LG-SOTF application instance.

Args:
request: FastAPI request object

Returns:
LG-SOTF application instance from app state
"""
return request.app.state.lg_sotf_app


def get_websocket_manager(request: Request) -> WebSocketManager:
"""Get the WebSocket manager instance.

Args:
request: FastAPI request object

Returns:
WebSocket manager from app state
"""
return request.app.state.ws_manager
Empty file.
19 changes: 19 additions & 0 deletions src/lg_sotf/api/models/alerts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
"""Alert-related Pydantic models."""

from typing import Any, Dict, Optional
from pydantic import BaseModel


class AlertRequest(BaseModel):
"""Request model for processing a new alert."""
alert_data: Dict[str, Any]
priority: Optional[str] = "normal"


class AlertResponse(BaseModel):
"""Response model for alert processing."""
alert_id: str
status: str
workflow_instance_id: str
processing_started: bool
estimated_completion: Optional[str] = None
23 changes: 23 additions & 0 deletions src/lg_sotf/api/models/ingestion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""Ingestion-related Pydantic models."""
from typing import Any, Dict, List, Optional
from pydantic import BaseModel

class IngestionStatusResponse(BaseModel):
is_active: bool
last_poll_time: Optional[str]
next_poll_time: Optional[str]
polling_interval: int
sources_enabled: List[str]
sources_stats: Dict[str, Dict[str, int]]
total_ingested: int
total_deduplicated: int
total_errors: int

class IngestionControlRequest(BaseModel):
action: str
sources: Optional[List[str]] = None

class SourceConfigRequest(BaseModel):
source_name: str
enabled: bool
config: Optional[Dict[str, Any]] = None
29 changes: 29 additions & 0 deletions src/lg_sotf/api/models/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""Metrics and health-related Pydantic models."""
from typing import Any, Dict, List, Optional
from pydantic import BaseModel

class MetricsResponse(BaseModel):
timestamp: str
alerts_processed_today: int
alerts_in_progress: int
average_processing_time: float
success_rate: float
agent_health: Dict[str, bool]
system_health: bool

class DashboardStatsResponse(BaseModel):
total_alerts_today: int
high_priority_alerts: int
alerts_by_status: Dict[str, int]
alerts_by_severity: Dict[str, int]
top_threat_indicators: List[Dict[str, Any]]
recent_escalations: List[Dict[str, Any]]
processing_time_avg: float

class AgentStatusResponse(BaseModel):
agent_name: str
status: str
last_execution: Optional[str]
success_rate: float
average_execution_time: float
error_count: int
Empty file.
41 changes: 41 additions & 0 deletions src/lg_sotf/api/models/workflows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""Workflow-related Pydantic models."""
from typing import Any, Dict, List, Optional
from pydantic import BaseModel

class WorkflowStatusResponse(BaseModel):
alert_id: str
workflow_instance_id: str
current_node: str
triage_status: str
confidence_score: int
threat_score: Optional[int] = 0
processing_notes: List[str]
enriched_data: Optional[Dict[str, Any]] = {}
escalation_info: Optional[Dict[str, Any]] = None
response_execution: Optional[Dict[str, Any]] = None
fp_indicators: Optional[List[str]] = []
tp_indicators: Optional[List[str]] = []
correlations: Optional[List[Dict[str, Any]]] = []
correlation_score: Optional[int] = 0
analysis_conclusion: Optional[str] = None
recommended_actions: Optional[List[str]] = []
last_updated: str
progress_percentage: int

class CorrelationResponse(BaseModel):
alert_id: str
correlations: List[Dict[str, Any]]
correlation_score: int
attack_campaign_indicators: List[str]
threat_actor_patterns: List[str]

class FeedbackRequest(BaseModel):
analyst_username: str
decision: str
confidence: int
notes: str
actions_taken: Optional[List[str]] = None
actions_recommended: Optional[List[str]] = None
triage_correct: Optional[bool] = None
correlation_helpful: Optional[bool] = None
analysis_accurate: Optional[bool] = None
21 changes: 21 additions & 0 deletions src/lg_sotf/api/routers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
"""API routers for LG-SOTF Dashboard."""

from . import (
alerts,
correlations,
dashboard,
escalations,
ingestion,
metrics,
websocket,
)

__all__ = [
"alerts",
"correlations",
"dashboard",
"escalations",
"ingestion",
"metrics",
"websocket",
]
Loading
Loading