-
Notifications
You must be signed in to change notification settings - Fork 2k
Python: Add SSE keepalive interval to AG-UI FastAPI endpoint #6647
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
vaibhav-patel
wants to merge
3
commits into
microsoft:main
Choose a base branch
from
vaibhav-patel:fix/6611-agui-sse-keepalive
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+206
−2
Open
Changes from 1 commit
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,7 +2,9 @@ | |
|
|
||
| """Tests for FastAPI endpoint creation (_endpoint.py).""" | ||
|
|
||
| import asyncio | ||
| import json | ||
| from collections.abc import AsyncIterator | ||
| from typing import Any, cast | ||
|
|
||
| import pytest | ||
|
|
@@ -22,6 +24,7 @@ | |
|
|
||
| from agent_framework_ag_ui import InMemoryAGUIThreadSnapshotStore, add_agent_framework_fastapi_endpoint | ||
| from agent_framework_ag_ui._agent import AgentFrameworkAgent | ||
| from agent_framework_ag_ui._endpoint import _SSE_KEEPALIVE_COMMENT, _with_sse_keepalive | ||
| from agent_framework_ag_ui._workflow import AgentFrameworkWorkflow | ||
|
|
||
|
|
||
|
|
@@ -1844,3 +1847,117 @@ def factory(thread_id: str) -> Any: | |
|
|
||
| runner.clear_thread_workflow("thread-1") | ||
| assert runner._resolve_workflow("thread-1", "tenant-b") is not workflow_b | ||
|
|
||
|
|
||
| async def test_sse_keepalive_emitted_during_idle_gap_and_real_events_pass_through(): | ||
| """A silent gap between upstream events yields keepalive comments without dropping real events.""" | ||
| first_released = asyncio.Event() | ||
|
|
||
| async def upstream() -> AsyncIterator[str]: | ||
| # Emit immediately, then go silent until the test explicitly releases the next event, | ||
| # simulating a long-running tool that produces no AG-UI events for a while. | ||
| yield "data: A\n\n" | ||
| await first_released.wait() | ||
| yield "data: B\n\n" | ||
|
|
||
| # Use a tiny interval so the idle gap reliably trips several keepalives without slow tests. | ||
| wrapped = _with_sse_keepalive(upstream(), 0.01) | ||
|
|
||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed — bumped the test keepalive interval to 50ms to avoid CI scheduling flakiness in b528fdf. |
||
| chunks: list[str] = [] | ||
| chunks.append(await wrapped.__anext__()) # real event A flushes immediately | ||
|
|
||
| # While upstream is idle, the wrapper must surface keepalive comments. | ||
| keepalive = await wrapped.__anext__() | ||
| assert keepalive == _SSE_KEEPALIVE_COMMENT | ||
| chunks.append(keepalive) | ||
|
|
||
| # Release the second real event and drain the rest of the stream. | ||
| first_released.set() | ||
| async for chunk in wrapped: | ||
| chunks.append(chunk) | ||
|
|
||
| data_lines = [chunk for chunk in chunks if chunk.startswith("data: ")] | ||
| assert data_lines == ["data: A\n\n", "data: B\n\n"] | ||
| assert _SSE_KEEPALIVE_COMMENT in chunks | ||
|
|
||
|
|
||
| async def test_sse_keepalive_not_emitted_when_events_flow_without_gaps(): | ||
| """Back-to-back events must pass through untouched with no keepalive comments inserted.""" | ||
|
|
||
| async def upstream() -> AsyncIterator[str]: | ||
| yield "data: X\n\n" | ||
| yield "data: Y\n\n" | ||
| yield "data: Z\n\n" | ||
|
|
||
| chunks = [chunk async for chunk in _with_sse_keepalive(upstream(), 0.05)] | ||
|
|
||
| assert chunks == ["data: X\n\n", "data: Y\n\n", "data: Z\n\n"] | ||
| assert _SSE_KEEPALIVE_COMMENT not in chunks | ||
|
|
||
|
|
||
| async def test_sse_keepalive_wrapper_handles_empty_stream(): | ||
| """An upstream that yields nothing terminates cleanly with no keepalives.""" | ||
|
|
||
| async def upstream() -> AsyncIterator[str]: | ||
| return | ||
| yield # pragma: no cover - present only to make this an async generator | ||
|
|
||
| chunks = [chunk async for chunk in _with_sse_keepalive(upstream(), 0.01)] | ||
|
|
||
| assert chunks == [] | ||
|
|
||
|
|
||
| async def test_sse_keepalive_wrapper_propagates_upstream_errors(): | ||
| """Errors raised by the upstream generator surface to the consumer rather than hanging.""" | ||
|
|
||
| async def upstream() -> AsyncIterator[str]: | ||
| yield "data: A\n\n" | ||
| raise RuntimeError("boom") | ||
|
|
||
| wrapped = _with_sse_keepalive(upstream(), 0.05) | ||
| assert await wrapped.__anext__() == "data: A\n\n" | ||
| with pytest.raises(RuntimeError, match="boom"): | ||
| await wrapped.__anext__() | ||
|
|
||
|
|
||
| async def test_endpoint_accepts_keepalive_interval_and_streams_events(build_chat_client): | ||
| """Endpoint accepts keepalive_interval_seconds and still streams the full event sequence.""" | ||
| app = FastAPI() | ||
| agent = Agent(name="test", instructions="Test agent", client=build_chat_client("Keepalive response")) | ||
|
|
||
| add_agent_framework_fastapi_endpoint(app, agent, path="/keepalive", keepalive_interval_seconds=0.05) | ||
|
|
||
| client = TestClient(app) | ||
| response = client.post("/keepalive", json={"messages": [{"role": "user", "content": "Hello"}]}) | ||
|
|
||
| assert response.status_code == 200 | ||
| assert response.headers["content-type"] == "text/event-stream; charset=utf-8" | ||
|
|
||
| event_types = [event.get("type") for event in _decode_sse_events(response)] | ||
| assert "RUN_STARTED" in event_types | ||
| assert "TEXT_MESSAGE_CONTENT" in event_types | ||
| assert "RUN_FINISHED" in event_types | ||
|
|
||
|
|
||
| async def test_endpoint_keepalive_can_be_disabled(build_chat_client): | ||
| """Passing keepalive_interval_seconds=None keeps the plain stream behavior.""" | ||
| app = FastAPI() | ||
| agent = Agent(name="test", instructions="Test agent", client=build_chat_client()) | ||
|
|
||
| add_agent_framework_fastapi_endpoint(app, agent, path="/no-keepalive", keepalive_interval_seconds=None) | ||
|
|
||
| client = TestClient(app) | ||
| response = client.post("/no-keepalive", json={"messages": [{"role": "user", "content": "Hello"}]}) | ||
|
|
||
| assert response.status_code == 200 | ||
| # No keepalive comment lines should appear when the feature is disabled. | ||
| assert ": keepalive" not in response.content.decode("utf-8") | ||
|
|
||
|
|
||
| async def test_endpoint_rejects_non_positive_keepalive_interval(build_chat_client): | ||
| """A non-positive keepalive interval is rejected at registration time.""" | ||
| app = FastAPI() | ||
| agent = Agent(name="test", instructions="Test agent", client=build_chat_client()) | ||
|
|
||
| with pytest.raises(ValueError, match="keepalive_interval_seconds must be a positive number or None"): | ||
| add_agent_framework_fastapi_endpoint(app, agent, path="/bad-keepalive", keepalive_interval_seconds=0) | ||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point — bounded the queue (maxsize=1) so the producer applies backpressure to the upstream generator; done in b528fdf.