Python: Fix A2AAgent to surface message content from in-progress TaskStatusUpdateEvents#4798
Python: Fix A2AAgent to surface message content from in-progress TaskStatusUpdateEvents#4798giles17 wants to merge 3 commits intomicrosoft:mainfrom
Conversation
…teEvents (microsoft#4783) _updates_from_task() returned [] for working-state tasks when background=False, silently discarding all intermediate message content from task.status.message. Now extracts and yields message parts from in-progress status updates during streaming. Also fixed MockA2AClient.send_message to yield all queued responses (enabling multi-event streaming tests) and added text parameter to add_in_progress_task_response for tests that need status messages. Co-authored-by: Copilot <[email protected]>
giles17
left a comment
There was a problem hiding this comment.
Automated Code Review
Reviewers: 4 | Confidence: 89%
✓ Correctness
The production code change correctly addresses the bug where in-progress streaming updates with message content were silently dropped when background=False. The new guard clause in _updates_from_task properly surfaces status.message content after the background/terminal checks. The mock change (yielding all responses instead of one) is required for the new multi-event tests and doesn't break existing tests since they each enqueue only a single response. The new tests cover the key scenarios well. The only minor issue is a stale docstring on _map_a2a_stream that still describes all in-progress updates as silently consumed in non-background mode.
✓ Security Reliability
The fix correctly surfaces in-progress streaming message content that was previously silently dropped. The new code block follows existing patterns for parsing A2A parts and mapping roles. The mock
send_messagechange from pop-one to yield-all is necessary to simulate realistic multi-event streams and doesn't break existing single-response tests. No security or reliability issues found — no injection risks, no resource leaks, and input validation (None checks, empty-parts checks, empty-contents checks) is thorough.
✗ Test Coverage
The three new tests cover the primary happy paths for the bug fix (multiple working updates with messages, single working update, and working update without message). However, there are meaningful gaps: the user-role mapping branch in the new production code is never exercised (all tests hardcode A2ARole.agent), the interaction between background=True and a status message is untested, and there is no test for the edge case where status.message exists but parts is empty.
✗ Design Approach
The fix correctly surfaces
task.status.messagecontent during in-progress streaming events and is well-grounded: the A2A SDK'sClientTaskManagercopiesevent.status(includingevent.status.message) intotask.status, so reading fromtask.status.messagein_updates_from_taskis equivalent to reading from the discarded_update_event. However, the SDK also appends each working-state message totask.history. Because_parse_messages_from_taskfalls back totask.history[-1]when a terminal task has no artifacts, the last working-state message already surfaced during streaming will be yielded a second time by the terminal-task handler — a duplication scenario introduced by this fix. All three new tests use artifact-based terminal responses, so they never exercise this fallback. The mock change frompop(0)to iterating-all-and-clearing is safe for the existing test suite (all tests callsend_messageonce) and is necessary to deliver multiple responses in a single streaming call.
Flagged Issues
- The role ternary (
"assistant" if ... == A2ARole.agent else "user") at line 385 of_agent.pyis only tested for theA2ARole.agentbranch. The"user"path is never exercised becauseadd_in_progress_task_responsehardcodesrole=A2ARole.agent. A test withrole=A2ARole.useris needed to cover the else branch. - Potential double-delivery of the last working-state message: the A2A SDK's
ClientTaskManagerappends eachevent.status.messagetotask.history. When the terminal event arrives without artifacts,_parse_messages_from_taskfalls back totask.history[-1], which is the last working-state message already surfaced during streaming. None of the new tests cover a no-artifact terminal response after working-state messages, so this duplication path is untested.
Suggestions
- The
_map_a2a_streamdocstring (lines 316-318) still says in-progress task updates are "silently consumed" whenbackground=False, which is no longer accurate. Update it to note that updates with message content are now yielded. - The new code block at line 379 does not guard on
task.status.state in IN_PROGRESS_TASK_STATES. While currently unreachable for unknown states, adding an explicit state guard would be more defensive against future A2A spec additions. - Add a test for
background=Truewith a working-state update carrying astatus.message, to verify the continuation-token path (line 366) takes priority and message content is not surfaced. This documents the intentional precedence and guards against regressions. - Consider adding a test where
task.status.messageis present butpartsis an empty list, to verify the guard on line 379 correctly falls through toreturn []. - When
background=Trueand the task is in-progress, the early-return at line 366 silently dropsstatus.messagecontent. Add a comment clarifying this is intentional, or consider whether the continuation-token update should also carry thecontentsfromtask.status.message.
Automated review by giles17's agents
There was a problem hiding this comment.
Pull request overview
Fixes A2A streaming behavior in the Python A2A integration so callers can receive intermediate message content emitted via in-progress TaskStatusUpdateEvents (e.g., working updates with status.message text parts), addressing #4783.
Changes:
- Update
A2AAgent._updates_from_task()to convert in-progresstask.status.message.partsintoAgentResponseUpdates (when not running inbackground=Truemode). - Enhance the test mock client to yield multiple queued SSE-like events per
send_message()call. - Add regression tests covering streaming of multiple in-progress status messages, a single in-progress message, and skipping in-progress updates without message content.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
python/packages/a2a/agent_framework_a2a/_agent.py |
Surfaces status.message content for in-progress task updates as AgentResponseUpdates. |
python/packages/a2a/tests/test_a2a_agent.py |
Adds streaming regression tests and updates the mock client to support multi-event streams. |
giles17
left a comment
There was a problem hiding this comment.
Automated Code Review
Reviewers: 4 | Confidence: 89%
✓ Correctness
The fix correctly addresses the bug where streaming in-progress TaskStatusUpdateEvents with message content were silently dropped. The new
emit_intermediateflag gates intermediate content emission onstream=True, preserving backward compatibility for non-streaming callers. The mocksend_messagechange from pop-one to yield-all is necessary for multi-event streaming tests and is backward-compatible since all existing tests add only one response. Tests are thorough, covering role mapping, background precedence, and edge cases. No correctness issues found.
✓ Security Reliability
This PR fixes a real bug where streaming in-progress status updates with message content were silently dropped. The implementation is clean: it adds an
emit_intermediateflag propagated fromstreaminrun(), and surfacestask.status.messageparts for non-background streaming callers. Thebackgroundcheck correctly takes precedence (returning a continuation token instead of content). The mocksend_messagechange from single-pop to yield-all is necessary for multi-event streaming tests and is safe since all existing tests add exactly one response. No security issues (no injection, deserialization, secrets, or resource leak risks). One minor defensive coding suggestion regarding an explicit state guard on the new intermediate block.
✓ Test Coverage
The new tests provide solid coverage for the
emit_intermediatefeature: multiple streaming updates, single update, no-message skip, role mapping, background precedence, non-streaming exclusion, and terminal-after-working. The mocksend_messagechange from yield-one to yield-all is backward-compatible since all existing tests add a single response. One minor gap: there is no test for the edge case wheretask.status.messagehas an emptypartslist (notNonebut[]), which exercises thetask.status.message.partstruthiness check. Additionally, theresponse_idfield on intermediate updates is never asserted.
✓ Design Approach
The fix correctly addresses the root cause: streaming
TaskStatusUpdateEventitems with non-terminal (working) states were silently dropped because_updates_from_taskhad no path to yield intermediate message content. Addingemit_intermediate(tied tostream=True) and readingtask.status.messageis a clean, minimal change. Two design concerns remain: (1) the_update_eventfrom the stream tuple is still unconditionally discarded — the fix reads the task snapshot (task.status.message) rather than the event's own payload (_update_event.status.message), which is the authoritative source for streaming content and works only because the client happens to embed the event data into the task snapshot; (2) the mock'ssend_messageis silently changed from 'pop-one-per-call' to 'yield-all-then-clear', which is necessary for multi-event tests but would silently break any future test that callsrun()twice against accumulated responses — the mock now behaves differently from the old design without documentation of that contract change.
Suggestions
- Consider whether
run(stream=True, background=True)should surface both the intermediate message content and the continuation token rather than silently discarding the message. The testtest_background_with_status_message_yields_continuation_tokenverifies that background takes precedence and content is lost, which is a reasonable default but may surprise callers who want both. - The
emit_intermediateblock (around line 447) does not verifytask.status.state in IN_PROGRESS_TASK_STATES, unlike thebackgroundblock above it. Adding an explicit state check would be more defensive and consistent: if the A2A protocol adds a new non-terminal, non-in-progress state in the future, content could be unexpectedly emitted. - The
_update_eventfromtask, _update_event = itemin_map_a2a_streamis still discarded. TheTaskStatusUpdateEventis the authoritative source for streamed content, not the task snapshot. If a real A2A client ever provides a non-None event whosestatus.messagediffers fromtask.status.message, intermediate content will be read from the wrong source. Consider reading from the event when available and falling back to the task snapshot. - The
send_messagemock change from pop-one-per-call to yield-all-then-clear is implicitly required by the new tests but isn't documented. Adding a docstring comment explaining the new contract ('all queued responses are consumed by a singlesend_messagecall') would prevent future test authors from relying on the removed per-call behavior. - Add a test where
task.status.messageexists but has an emptypartslist ([]), verifying the update is skipped. This exercises thetask.status.message.partsfalsiness branch distinctly frommessage=None. - Consider asserting
response_idon intermediate updates (e.g.,assert updates[0].response_id == "task-w") in at least one streaming test to verify task ID propagation. - In
test_terminal_no_artifacts_after_working_with_content, the assertionupdates[1].contents == []documents that a terminal task with no artifacts always yields an empty-content update. Consider whether streaming consumers should receive this empty sentinel at all; if not, the fix to suppress it is out of scope here but worth a follow-up issue.
Automated review by giles17's agents
| @@ -373,7 +379,7 @@ async def _map_a2a_stream( | |||
| yield update | |||
There was a problem hiding this comment.
The _update_event (TaskStatusUpdateEvent) is still discarded. The fix reads task.status.message as a proxy, but the event's own status.message is the authoritative payload for this streaming frame. If a real client provides a non-None event whose status diverges from the task snapshot (e.g., stale snapshot), intermediate content will be silently wrong. Prefer reading from the event when available: source = _update_event.status if _update_event is not None else task.status.
| yield update | |
| task, _update_event = item | |
| effective_status = _update_event.status if _update_event is not None else task.status | |
| for update in self._updates_from_task(task, background=background, emit_intermediate=emit_intermediate, status_override=effective_status): |
| return [ | ||
| AgentResponseUpdate( | ||
| contents=contents, | ||
| role="assistant" if task.status.message.role == A2ARole.agent else "user", |
There was a problem hiding this comment.
Defensive: Consider adding task.status.state in IN_PROGRESS_TASK_STATES to this condition, matching the pattern used by the background branch above. Without it, any unknown future non-terminal task state would emit intermediate content when streaming, which may not be intended.
| role="assistant" if task.status.message.role == A2ARole.agent else "user", | |
| if emit_intermediate and task.status.state in IN_PROGRESS_TASK_STATES and task.status.message is not None and task.status.message.parts: |
| @@ -102,9 +111,9 @@ async def send_message(self, message: Any) -> AsyncIterator[Any]: | |||
| """Mock send_message method that yields responses.""" | |||
There was a problem hiding this comment.
The semantics changed from 'pop one response per send_message call' to 'yield all queued responses and clear'. This is correct for multi-event streaming but silently removes support for multi-call test scenarios. A short comment documenting the new contract would prevent future surprises.
| """Mock send_message method that yields responses.""" | |
| # All queued responses are delivered as a single streaming batch per call. | |
| for response in self.responses: | |
| yield response | |
| self.responses.clear() |
| mock_a2a_client.add_in_progress_task_response("task-n", context_id="ctx-n") | ||
| mock_a2a_client.add_task_response("task-n", [{"id": "art-n", "content": "Result"}]) | ||
|
|
||
| updates: list[AgentResponseUpdate] = [] |
There was a problem hiding this comment.
This test verifies stream=False doesn't surface intermediate messages. Consider also adding a test for the case where status.message exists but parts is an empty list [], to confirm the truthiness guard in _updates_from_task works as expected (distinct from message=None).
…nd add missing test coverage - Add emit_intermediate parameter to _updates_from_task and _map_a2a_stream - Thread stream flag from run() so only streaming callers see intermediate updates - Add IN_PROGRESS_TASK_STATES guard to emit_intermediate condition - Add role parameter to test helper add_in_progress_task_response - Add clarifying comment on MockA2AClient.send_message batch semantics - Add tests for user role mapping, background precedence, non-streaming behavior, terminal task with no artifacts, and empty parts edge case Co-authored-by: Copilot <[email protected]>
517990e to
3f83814
Compare
Motivation and Context
When an A2A remote agent sent
TaskStatusUpdateEvents in a working/submitted state withstatus.messagecontent (e.g., intermediate streaming text),A2AAgent._parse_task_into_updatessilently discarded those messages. This caused callers to lose intermediate streaming content during non-background task execution.Fixes #4783
Description
The root cause was that
_parse_task_into_updatesonly handled terminal task states and background continuation tokens, returning an empty list for all other in-progress updates — even whentask.status.messagecarried meaningful text parts. The fix adds a check after the background-token branch: if the in-progress task's status has a non-empty message with parts, those parts are parsed into anAgentResponseUpdateand yielded to the caller. Three regression tests verify that working updates with message content are surfaced, that multiple intermediate messages stream correctly, and that working updates without messages remain silently skipped.Contribution Checklist