-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapp.py
More file actions
160 lines (133 loc) · 5.17 KB
/
Copy pathapp.py
File metadata and controls
160 lines (133 loc) · 5.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
from __future__ import annotations
import time
from typing import TYPE_CHECKING
from agentwrit._transport import AgentWritTransport
from agentwrit.app_types import _AppSession
from agentwrit.models import HealthStatus, ValidateResult
from agentwrit.scope import validate as module_validate
if TYPE_CHECKING:
from cryptography.hazmat.primitives.asymmetric import ed25519
from agentwrit.agent import Agent
class AgentWritApp:
"""The developer's app container. Manages authentication internally,
creates agents, validates tokens, and gates tool access.
All agent authority flows from this app's scope ceiling.
"""
def __init__(
self,
broker_url: str,
client_id: str,
client_secret: str,
*,
timeout: float = 10.0,
user_agent: str | None = None,
) -> None:
"""Initialize the AgentWritApp.
Args:
broker_url: Base URL of the AgentWrit broker.
client_id: App client ID from operator.
client_secret: App client secret from operator.
timeout: HTTP request timeout in seconds.
user_agent: Optional User-Agent header.
"""
self.broker_url = broker_url.rstrip("/")
self.client_id = client_id
self.client_secret = client_secret
self.timeout = timeout
self.user_agent = user_agent
self._transport = AgentWritTransport(
broker_url=self.broker_url,
timeout=self.timeout,
user_agent=self.user_agent
)
self._session: _AppSession | None = None
def _ensure_app_authenticated(self) -> None:
"""Internal method to ensure the app has a valid JWT.
Business Logic:
Implements "Lazy Authentication". The app doesn't authenticate during
`__init__`. Instead, it waits until the first operation that requires
authorization (like `create_agent` or `health`).
If no session exists, or the current session JWT is expired (or close
to expiry), it performs a `POST /v1/app/auth` to obtain a new one.
"""
now = time.time()
# Re-authenticate if no session exists, or if the token is within
# a 60-second buffer of expiring.
if (
self._session is None or
self._session.expires_at is None or
(self._session.expires_at - now) < 60
):
self._authenticate()
def _authenticate(self) -> None:
"""Performs the actual authentication request to the broker.
Business Logic:
Exchange `client_id` and `client_secret` for an app JWT.
Updates the internal `_session` with the new JWT and expiry.
Raises:
AuthenticationError: If credentials are invalid.
TransportError: If the broker is unreachable.
"""
response = self._transport.request(
"POST",
"/v1/app/auth",
json={
"client_id": self.client_id,
"client_secret": self.client_secret,
}
)
data = response.json()
# Broker returns expires_in (seconds), not expires_at.
# Compute absolute expiry from wall clock + TTL.
self._session = _AppSession(
access_token=data["access_token"],
expires_at=time.time() + float(data["expires_in"]),
scopes=data.get("scopes", []),
)
def create_agent(
self,
orch_id: str,
task_id: str,
requested_scope: list[str],
*,
private_key: ed25519.Ed25519PrivateKey | None = None,
max_ttl: int = 300,
label: str | None = None,
) -> Agent:
"""Create an ephemeral agent under this app.
Implementation:
Uses the `AgentCreationOrchestrator` to perform the multi-step
challenge-response registration ceremony.
"""
from agentwrit.orchestrator import AgentCreationOrchestrator
orchestrator = AgentCreationOrchestrator(self)
return orchestrator.orchestrate(
orch_id=orch_id,
task_id=task_id,
requested_scope=requested_scope,
private_key=private_key,
max_ttl=max_ttl,
label=label
)
def validate(self, token: str) -> ValidateResult:
"""POST /v1/token/validate -- verify any token via the broker.
Convenience shortcut for `agentwrit.validate(self.broker_url, token)`.
"""
return module_validate(self.broker_url, token, timeout=self.timeout)
def health(self) -> HealthStatus:
"""GET /v1/health -- broker health check.
No auth required. Tests broker reachability and internal service
status (DB connectivity, audit event count).
"""
response = self._transport.request("GET", "/v1/health")
data = response.json()
return HealthStatus(
status=data["status"],
version=data["version"],
uptime=data["uptime"],
db_connected=data["db_connected"],
audit_events_count=data["audit_events_count"],
)
def close(self) -> None:
"""Closes the underlying transport client."""
self._transport.close()