From ef862bb59bd371e21c7384aafadc2d8d16dcdc04 Mon Sep 17 00:00:00 2001 From: Bofeng Huang Date: Mon, 1 Jun 2026 11:23:29 +0200 Subject: [PATCH 1/2] feat(agents): add include_sources for per-agent content source filtering MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add `include_sources: list[str] | None` to `LlmAgent` as an orthogonal axis to the existing `include_contents` temporal-window control. Where `include_contents` answers "how far back?", `include_sources` answers "from whom?" — allowing agents in a multi-agent pipeline to declare an allowlist of content sources rather than receiving every narrative-cast peer output. Reserved source names: 'user' (plain human messages), 'self' (this agent's own prior model turns), and any agent name matched directly against event.author before narrative casting occurs. Filtering runs at the event level inside _get_contents(), before _present_other_agent_message() converts authorship into embedded text, so source identity is read from structured metadata rather than parsed from "[agent_name] said:" strings. Function call/response pairing is preserved: FC responses for the current agent's own calls are tied to 'self' (dropped together with their calls when 'self' is absent), and another agent's FC responses are dropped when that agent's call is also filtered. Live-mode events are handled by mapping event.author == agent_name to the 'self' reserved name, since _is_other_agent_reply() returns True for all non-user events in live sessions. Raises ValueError when include_sources=[] (use None to disable). --- src/google/adk/agents/llm_agent.py | 32 ++ src/google/adk/flows/llm_flows/contents.py | 42 ++ .../agents/test_llm_agent_include_contents.py | 130 +++++- .../llm_flows/test_contents_source_filter.py | 393 ++++++++++++++++++ 4 files changed, 596 insertions(+), 1 deletion(-) create mode 100644 tests/unittests/flows/llm_flows/test_contents_source_filter.py diff --git a/src/google/adk/agents/llm_agent.py b/src/google/adk/agents/llm_agent.py index ee1b05c535..086d046e2c 100644 --- a/src/google/adk/agents/llm_agent.py +++ b/src/google/adk/agents/llm_agent.py @@ -352,6 +352,27 @@ class LlmAgent(BaseAgent, abc.ABC): instruction and input """ + include_sources: Optional[list[str]] = None + """Allowlist of content sources to include in model requests. + + Orthogonal to include_contents (temporal window); this controls which + sources are kept from within that window. + + Options: + None (default): all sources pass through — backward-compatible. + list[str]: only content from the listed sources is kept. + + Reserved source names: + 'user' — plain human user messages (not tool outputs) + 'self' — this agent's own prior model outputs + — any other string is matched against event.author (agent name) + + Example — keep full history but only user + this agent's turns: + include_contents='default', include_sources=['user', 'self'] + + Raises ValueError if set to [] (use None to disable filtering). + """ + # Controlled input/output configurations - Start input_schema: Optional[type[BaseModel]] = None """The input schema when agent is used as a tool.""" @@ -957,6 +978,17 @@ def __maybe_save_output_to_state(self, event: Event): def __model_validator_after(self) -> LlmAgent: return self + @field_validator('include_sources', mode='after') + @classmethod + def _validate_include_sources( + cls, v: Optional[list[str]] + ) -> Optional[list[str]]: + if v is not None and len(v) == 0: + raise ValueError( + "include_sources=[] keeps nothing. Use None to disable filtering." + ) + return v + @field_validator('generate_content_config', mode='after') @classmethod def validate_generate_content_config( diff --git a/src/google/adk/flows/llm_flows/contents.py b/src/google/adk/flows/llm_flows/contents.py index fab5afd2cd..c6f9afb747 100644 --- a/src/google/adk/flows/llm_flows/contents.py +++ b/src/google/adk/flows/llm_flows/contents.py @@ -68,6 +68,7 @@ async def run_async( instruction_related_contents = llm_request.contents is_single_turn = getattr(agent, 'mode', None) == 'single_turn' + source_filter = getattr(agent, 'include_sources', None) if agent.include_contents == 'default': # Include full conversation history llm_request.contents = _get_contents( @@ -78,6 +79,7 @@ async def run_async( isolation_scope=invocation_context.isolation_scope, is_single_turn=is_single_turn, user_content=invocation_context.user_content, + source_filter=source_filter, ) else: # Include current turn context only (no conversation history) @@ -89,6 +91,7 @@ async def run_async( isolation_scope=invocation_context.isolation_scope, is_single_turn=is_single_turn, user_content=invocation_context.user_content, + source_filter=source_filter, ) # Add instruction-related contents to proper position in conversation @@ -504,6 +507,7 @@ def _get_contents( isolation_scope: Optional[str] = None, is_single_turn: bool = False, user_content: Optional[types.Content] = None, + source_filter: Optional[list[str]] = None, ) -> list[types.Content]: """Get the contents for the LLM request. @@ -610,6 +614,7 @@ def _get_contents( accumulated_output_transcription = '' is_other_reply = _is_other_agent_reply(agent_name, event) + other_fc_author = None # set when is_other_reply via FC attribution # Check if it's a FunctionResponse for another agent if not is_other_reply and event.content: @@ -623,8 +628,43 @@ def _get_contents( and call_author != 'user' ): is_other_reply = True + other_fc_author = call_author break + if source_filter is not None: + if is_other_reply: + if event.author != 'user': + # In live mode the current agent's own events are also classified as + # other_reply (see _is_other_agent_reply). Map the actual agent name + # to the 'self' reserved name so source_filter=['self'] works. + effective_source = ( + 'self' if event.author == agent_name else event.author + ) + if effective_source not in source_filter: + continue + else: + # 'user'-authored FC response to another agent's call. + # other_fc_author was resolved above — no second iteration needed. + # _present_other_agent_message converts it to text, so no raw + # function_response survives — but drop it when its call author is + # filtered to avoid "[agent_b] returned X" with no visible preceding + # "[agent_b] called tool Y". + if other_fc_author and other_fc_author not in source_filter: + continue + elif event.content: + if event.content.role == 'model': + if 'self' not in source_filter: + continue + elif event.content.role == 'user': + if _content_contains_function_response(event.content): + # FC responses are paired with the current agent's own tool calls + # (role='model'). Tie them to 'self' so dropping 'self' drops both + # sides of the pair and avoids orphaned function_response parts. + if 'self' not in source_filter: + continue + elif 'user' not in source_filter: + continue + if is_other_reply: if converted_event := _present_other_agent_message(event): filtered_events.append(converted_event) @@ -677,6 +717,7 @@ def _get_current_turn_contents( is_single_turn: bool = False, isolation_scope: Optional[str] = None, user_content: Optional[types.Content] = None, + source_filter: Optional[list[str]] = None, ) -> list[types.Content]: """Get contents for the current turn only (no conversation history). @@ -712,6 +753,7 @@ def _get_current_turn_contents( isolation_scope=isolation_scope, is_single_turn=is_single_turn, user_content=user_content, + source_filter=source_filter, ) return [] diff --git a/tests/unittests/agents/test_llm_agent_include_contents.py b/tests/unittests/agents/test_llm_agent_include_contents.py index c24aab4ef0..3976146d17 100644 --- a/tests/unittests/agents/test_llm_agent_include_contents.py +++ b/tests/unittests/agents/test_llm_agent_include_contents.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Unit tests for LlmAgent include_contents field behavior.""" +"""Unit tests for LlmAgent include_contents and include_sources field behavior.""" from google.adk.agents.llm_agent import LlmAgent from google.adk.agents.sequential_agent import SequentialAgent @@ -241,3 +241,131 @@ async def test_include_contents_none_sequential_agents(): assert any( "Agent1 response" in str(content) for _, content in agent2_contents ) + + +# --------------------------------------------------------------------------- +# include_sources: field validation +# --------------------------------------------------------------------------- + + +def test_include_sources_empty_list_raises(): + """include_sources=[] must raise ValueError — use None to disable filtering.""" + with pytest.raises(ValueError, match='include_sources=\\[\\]'): + LlmAgent( + name='agent', + model='gemini-2.5-flash', + include_sources=[], + ) + + +def test_include_sources_none_is_accepted(): + """include_sources=None (default) must not raise.""" + agent = LlmAgent( + name='agent', model='gemini-2.5-flash', include_sources=None + ) + assert agent.include_sources is None + + +# --------------------------------------------------------------------------- +# include_sources: integration — user-only in sequential pipeline +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_include_sources_user_only_drops_upstream_agent_entries(): + """Downstream agent with include_sources=['user'] receives only the human user message.""" + agent1_model = testing_utils.MockModel.create( + responses=['Upstream agent reply'] + ) + agent1 = LlmAgent( + name='upstream', + model=agent1_model, + instruction='You are upstream', + ) + + agent2_model = testing_utils.MockModel.create( + responses=['Downstream response'] + ) + agent2 = LlmAgent( + name='downstream', + model=agent2_model, + include_sources=['user'], + instruction='You are downstream', + ) + + sequential = SequentialAgent( + name='pipeline', sub_agents=[agent1, agent2] + ) + runner = testing_utils.InMemoryRunner(sequential) + runner.run('Original user request') + + agent2_contents = testing_utils.simplify_contents( + agent2_model.requests[0].contents + ) + + # User message must be present + assert any( + 'Original user request' in str(c) for _, c in agent2_contents + ) + # Upstream agent's narrative entry must be absent + assert not any( + 'Upstream agent reply' in str(c) for _, c in agent2_contents + ) + assert not any('For context:' in str(c) for _, c in agent2_contents) + + +# --------------------------------------------------------------------------- +# include_sources: composing with include_contents='default' — multi-turn +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_include_sources_user_self_drops_upstream_across_turns(): + """include_sources=['user','self'] + include_contents='default' (full history): + downstream agent sees all user messages and its own prior turns, but no + narrative entries from the upstream agent across multiple invocations. + """ + agent1_model = testing_utils.MockModel.create( + responses=['Turn1 upstream reply', 'Turn2 upstream reply'] + ) + agent1 = LlmAgent( + name='upstream', + model=agent1_model, + instruction='You are upstream', + ) + + agent2_model = testing_utils.MockModel.create( + responses=['Turn1 downstream', 'Turn2 downstream'] + ) + agent2 = LlmAgent( + name='downstream', + model=agent2_model, + include_sources=['user', 'self'], + instruction='You are downstream', + ) + + sequential = SequentialAgent( + name='pipeline', sub_agents=[agent1, agent2] + ) + runner = testing_utils.InMemoryRunner(sequential) + runner.run('Turn 1 user message') + runner.run('Turn 2 user message') + + # Second invocation of downstream agent — should see user messages + own + # prior turn, but not upstream's narrative entries. + agent2_second_contents = testing_utils.simplify_contents( + agent2_model.requests[1].contents + ) + + # User messages must be present + assert any( + 'Turn 1 user message' in str(c) for _, c in agent2_second_contents + ) + assert any( + 'Turn 2 user message' in str(c) for _, c in agent2_second_contents + ) + # Upstream agent's narrative entries must be absent + assert not any( + 'upstream reply' in str(c).lower() for _, c in agent2_second_contents + ) + assert not any('For context:' in str(c) for _, c in agent2_second_contents) diff --git a/tests/unittests/flows/llm_flows/test_contents_source_filter.py b/tests/unittests/flows/llm_flows/test_contents_source_filter.py new file mode 100644 index 0000000000..33fe2d07f8 --- /dev/null +++ b/tests/unittests/flows/llm_flows/test_contents_source_filter.py @@ -0,0 +1,393 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for source_filter parameter in _get_contents / _get_current_turn_contents.""" + +from google.adk.events.event import Event +from google.adk.flows.llm_flows import contents +from google.genai import types +import pytest + + +def _user_event(text: str, invocation_id: str = 'inv') -> Event: + return Event( + invocation_id=invocation_id, + author='user', + content=types.Content( + role='user', parts=[types.Part(text=text)] + ), + ) + + +def _model_event( + text: str, author: str, invocation_id: str = 'inv' +) -> Event: + return Event( + invocation_id=invocation_id, + author=author, + content=types.Content( + role='model', parts=[types.Part(text=text)] + ), + ) + + +def _function_response_event( + name: str, response: dict, invocation_id: str = 'inv' +) -> Event: + return Event( + invocation_id=invocation_id, + author='user', + content=types.Content( + role='user', + parts=[ + types.Part.from_function_response(name=name, response=response) + ], + ), + ) + + +def _function_call_event( + name: str, args: dict, author: str, invocation_id: str = 'inv' +) -> Event: + return Event( + invocation_id=invocation_id, + author=author, + content=types.Content( + role='model', + parts=[types.Part.from_function_call(name=name, args=args)], + ), + ) + + +# --------------------------------------------------------------------------- +# Regression: source_filter=None is a no-op +# --------------------------------------------------------------------------- + + +def test_source_filter_none_is_no_op(): + """source_filter=None should produce identical output to omitting the param.""" + events = [ + _user_event('hello'), + _model_event('hi there', author='agent_a'), + _model_event('peer reply', author='agent_b'), + ] + without = contents._get_contents(None, events, agent_name='agent_a') + with_none = contents._get_contents( + None, events, agent_name='agent_a', source_filter=None + ) + assert without == with_none + + +# --------------------------------------------------------------------------- +# user only +# --------------------------------------------------------------------------- + + +def test_source_filter_user_keeps_user_drops_model_and_others(): + """['user'] keeps user messages, drops this agent's model turns and peers.""" + events = [ + _user_event('user msg 1'), + _model_event('self reply', author='agent_a'), + _model_event('peer reply', author='agent_b'), + _user_event('user msg 2'), + ] + result = contents._get_contents( + None, events, agent_name='agent_a', source_filter=['user'] + ) + texts = [p.text for c in result for p in c.parts if p.text] + assert 'user msg 1' in texts + assert 'user msg 2' in texts + assert 'self reply' not in texts + assert 'peer reply' not in texts + # No narrative "For context:" wrapper — other agent was dropped entirely + assert not any('For context:' in t for t in texts) + + +# --------------------------------------------------------------------------- +# self only +# --------------------------------------------------------------------------- + + +def test_source_filter_self_keeps_model_drops_user_and_others(): + """['self'] keeps this agent's model turns, drops user messages and peers.""" + events = [ + _user_event('user msg'), + _model_event('self turn 1', author='agent_a'), + _model_event('peer reply', author='agent_b'), + _model_event('self turn 2', author='agent_a'), + ] + result = contents._get_contents( + None, events, agent_name='agent_a', source_filter=['self'] + ) + texts = [p.text for c in result for p in c.parts if p.text] + assert 'self turn 1' in texts + assert 'self turn 2' in texts + assert 'user msg' not in texts + assert 'peer reply' not in texts + + +# --------------------------------------------------------------------------- +# user + self +# --------------------------------------------------------------------------- + + +def test_source_filter_user_and_self_drops_other_agents(): + """['user', 'self'] keeps user + this agent's turns, drops all other agents.""" + events = [ + _user_event('hi'), + _model_event('my answer', author='agent_a'), + _model_event('agent_b reply', author='agent_b'), + _model_event('agent_c reply', author='agent_c'), + _user_event('follow up'), + ] + result = contents._get_contents( + None, events, agent_name='agent_a', source_filter=['user', 'self'] + ) + texts = [p.text for c in result for p in c.parts if p.text] + assert 'hi' in texts + assert 'follow up' in texts + assert 'my answer' in texts + assert 'agent_b reply' not in texts + assert 'agent_c reply' not in texts + assert not any('For context:' in t for t in texts) + + +# --------------------------------------------------------------------------- +# specific agent name +# --------------------------------------------------------------------------- + + +def test_source_filter_specific_agent_name(): + """['agent_b'] keeps only agent_b's entries, drops user, self, and agent_c.""" + events = [ + _user_event('user msg'), + _model_event('self reply', author='agent_a'), + _model_event('b says hi', author='agent_b'), + _model_event('c says bye', author='agent_c'), + ] + result = contents._get_contents( + None, events, agent_name='agent_a', source_filter=['agent_b'] + ) + texts = [p.text for c in result for p in c.parts if p.text] + assert any('b says hi' in t for t in texts) + assert 'user msg' not in texts + assert 'self reply' not in texts + assert 'c says bye' not in texts + + +def test_source_filter_user_and_specific_agent(): + """['user', 'agent_b'] keeps user + agent_b, drops self and agent_c.""" + events = [ + _user_event('user msg'), + _model_event('self reply', author='agent_a'), + _model_event('b says hi', author='agent_b'), + _model_event('c says bye', author='agent_c'), + ] + result = contents._get_contents( + None, events, agent_name='agent_a', source_filter=['user', 'agent_b'] + ) + texts = [p.text for c in result for p in c.parts if p.text] + assert 'user msg' in texts + assert any('b says hi' in t for t in texts) + assert 'self reply' not in texts + assert 'c says bye' not in texts + + +# --------------------------------------------------------------------------- +# Function responses are never filtered +# --------------------------------------------------------------------------- + + +def test_source_filter_self_keeps_fc_call_and_response_together(): + """FC call and response are both tied to 'self': including 'self' keeps both.""" + events = [ + _user_event('user msg'), + _function_call_event('my_tool', {'x': 1}, author='agent_a'), + _function_response_event('my_tool', {'result': 'ok'}), + ] + result = contents._get_contents( + None, events, agent_name='agent_a', source_filter=['self'] + ) + # FC call (role=model) and FC response (role=user) both belong to 'self' + roles = [c.role for c in result] + assert 'model' in roles # function call kept + assert 'user' in roles # function response kept (no orphan) + + +def test_source_filter_without_self_drops_fc_call_and_response_together(): + """Dropping 'self' drops both sides of the FC/FR pair to avoid orphaned responses.""" + events = [ + _user_event('plain user message'), + _function_call_event('tool', {}, author='agent_a'), + _function_response_event('tool', {'v': 1}), + ] + result = contents._get_contents( + None, events, agent_name='agent_a', source_filter=['user'] + ) + texts = [p.text for c in result for p in c.parts if p.text] + assert 'plain user message' in texts + # Both FC call and FC response are dropped — no orphaned function_response part + assert not any(c.role == 'model' for c in result) + assert not any( + p.function_response is not None + for c in result + for p in (c.parts or []) + ) + + +# --------------------------------------------------------------------------- +# Interaction with _get_current_turn_contents +# --------------------------------------------------------------------------- + + +def test_source_filter_propagates_to_current_turn(): + """source_filter is respected when include_contents='none' path is taken. + + Simulates the start of a new invocation where only the user message has + arrived; _get_current_turn_contents identifies it as the turn boundary. + With source_filter=['user'], prior self/peer history is excluded and only + the current user message survives. + """ + events = [ + _user_event('turn 1', invocation_id='inv1'), + _model_event('self turn 1', author='agent_a', invocation_id='inv1'), + _model_event('peer old', author='agent_b', invocation_id='inv1'), + # New invocation: only the user message has arrived so far + _user_event('turn 2', invocation_id='inv2'), + ] + result = contents._get_current_turn_contents( + None, events, agent_name='agent_a', source_filter=['user'] + ) + texts = [p.text for c in result for p in c.parts if p.text] + # Only the current-turn user message survives; prior history is excluded + assert 'turn 2' in texts + assert 'turn 1' not in texts + assert 'self turn 1' not in texts + assert 'peer old' not in texts + + +def test_source_filter_drops_other_agent_fc_response_when_call_author_filtered(): + """When agent_b is not in filter, its FC call AND its 'user'-authored response are both dropped. + + Without this fix, the response would survive as + '[agent_b] tool returned X' text with no visible call — misleading context. + """ + events = [ + _user_event('user msg'), + # agent_b makes a function call (role=model, is_other_reply=True) + Event( + invocation_id='inv', + author='agent_b', + content=types.Content( + role='model', + parts=[types.Part.from_function_call(name='search', args={})], + ), + ), + # 'user'-authored response to agent_b's call (is_other_reply=True via fc_author_by_id) + _function_response_event('search', {'results': 'found it'}), + ] + result = contents._get_contents( + None, events, agent_name='agent_a', source_filter=['user', 'self'] + ) + texts = [p.text for c in result for p in c.parts if p.text] + assert 'user msg' in texts + # Neither the call nor the response from agent_b should appear + assert not any('search' in t for t in texts) + assert not any('found it' in t for t in texts) + + +def test_source_filter_keeps_other_agent_fc_response_when_call_author_included(): + """When agent_b IS in filter, its FC response is kept and converted to context text.""" + events = [ + _user_event('user msg'), + Event( + invocation_id='inv', + author='agent_b', + content=types.Content( + role='model', + parts=[types.Part.from_function_call(name='lookup', args={})], + ), + ), + _function_response_event('lookup', {'value': 42}), + ] + result = contents._get_contents( + None, events, agent_name='agent_a', source_filter=['user', 'agent_b'] + ) + texts = [p.text for c in result for p in c.parts if p.text] + assert 'user msg' in texts + # agent_b's call and response both present (as narrative text) + assert any('lookup' in t for t in texts) + + +def test_source_filter_self_matches_current_agent_in_live_mode(): + """In live mode, the current agent's events are classified as other_reply. + + source_filter=['self'] must still keep them by mapping event.author==agent_name + to the 'self' reserved name, not by literal string comparison. + """ + live_session_id = 'live-123' + events = [ + Event( + invocation_id='inv', + author='user', + live_session_id=live_session_id, + content=types.Content( + role='user', parts=[types.Part(text='user prompt')] + ), + ), + # In live mode, current agent's own turn has is_other_reply=True + Event( + invocation_id='inv', + author='agent_a', + live_session_id=live_session_id, + content=types.Content( + role='model', parts=[types.Part(text='my own reply')] + ), + ), + Event( + invocation_id='inv', + author='agent_b', + live_session_id=live_session_id, + content=types.Content( + role='model', parts=[types.Part(text='peer reply')] + ), + ), + ] + result = contents._get_contents( + None, events, agent_name='agent_a', source_filter=['user', 'self'] + ) + texts = [p.text for c in result for p in c.parts if p.text] + assert 'user prompt' in texts + # Current agent's own turn must survive even though is_other_reply=True in live mode + assert any('my own reply' in t for t in texts) + # Peer agent must be filtered + assert not any('peer reply' in t for t in texts) + + +def test_source_filter_all_sources_is_same_as_none(): + """Filtering with all relevant source names present is equivalent to no filter.""" + agent_name = 'agent_a' + events = [ + _user_event('hello'), + _model_event('self reply', author=agent_name), + _model_event('peer reply', author='agent_b'), + ] + no_filter = contents._get_contents(None, events, agent_name=agent_name) + all_sources = contents._get_contents( + None, + events, + agent_name=agent_name, + source_filter=['user', 'self', 'agent_b'], + ) + assert no_filter == all_sources From 0bfc109592e348470e0b603a35277e73a6094630 Mon Sep 17 00:00:00 2001 From: Bofeng Huang Date: Tue, 2 Jun 2026 08:42:49 +0200 Subject: [PATCH 2/2] chore: apply pre-commit formatting and fix mypy untyped-decorator --- src/google/adk/agents/llm_agent.py | 4 +- .../agents/test_llm_agent_include_contents.py | 74 ++++++++----------- .../llm_flows/test_contents_source_filter.py | 18 ++--- 3 files changed, 37 insertions(+), 59 deletions(-) diff --git a/src/google/adk/agents/llm_agent.py b/src/google/adk/agents/llm_agent.py index 086d046e2c..2b38f9110d 100644 --- a/src/google/adk/agents/llm_agent.py +++ b/src/google/adk/agents/llm_agent.py @@ -978,14 +978,14 @@ def __maybe_save_output_to_state(self, event: Event): def __model_validator_after(self) -> LlmAgent: return self - @field_validator('include_sources', mode='after') + @field_validator('include_sources', mode='after') # type: ignore[misc] @classmethod def _validate_include_sources( cls, v: Optional[list[str]] ) -> Optional[list[str]]: if v is not None and len(v) == 0: raise ValueError( - "include_sources=[] keeps nothing. Use None to disable filtering." + 'include_sources=[] keeps nothing. Use None to disable filtering.' ) return v diff --git a/tests/unittests/agents/test_llm_agent_include_contents.py b/tests/unittests/agents/test_llm_agent_include_contents.py index 3976146d17..64acde4117 100644 --- a/tests/unittests/agents/test_llm_agent_include_contents.py +++ b/tests/unittests/agents/test_llm_agent_include_contents.py @@ -250,19 +250,17 @@ async def test_include_contents_none_sequential_agents(): def test_include_sources_empty_list_raises(): """include_sources=[] must raise ValueError — use None to disable filtering.""" - with pytest.raises(ValueError, match='include_sources=\\[\\]'): + with pytest.raises(ValueError, match="include_sources=\\[\\]"): LlmAgent( - name='agent', - model='gemini-2.5-flash', + name="agent", + model="gemini-2.5-flash", include_sources=[], ) def test_include_sources_none_is_accepted(): """include_sources=None (default) must not raise.""" - agent = LlmAgent( - name='agent', model='gemini-2.5-flash', include_sources=None - ) + agent = LlmAgent(name="agent", model="gemini-2.5-flash", include_sources=None) assert agent.include_sources is None @@ -275,43 +273,37 @@ def test_include_sources_none_is_accepted(): async def test_include_sources_user_only_drops_upstream_agent_entries(): """Downstream agent with include_sources=['user'] receives only the human user message.""" agent1_model = testing_utils.MockModel.create( - responses=['Upstream agent reply'] + responses=["Upstream agent reply"] ) agent1 = LlmAgent( - name='upstream', + name="upstream", model=agent1_model, - instruction='You are upstream', + instruction="You are upstream", ) agent2_model = testing_utils.MockModel.create( - responses=['Downstream response'] + responses=["Downstream response"] ) agent2 = LlmAgent( - name='downstream', + name="downstream", model=agent2_model, - include_sources=['user'], - instruction='You are downstream', + include_sources=["user"], + instruction="You are downstream", ) - sequential = SequentialAgent( - name='pipeline', sub_agents=[agent1, agent2] - ) + sequential = SequentialAgent(name="pipeline", sub_agents=[agent1, agent2]) runner = testing_utils.InMemoryRunner(sequential) - runner.run('Original user request') + runner.run("Original user request") agent2_contents = testing_utils.simplify_contents( agent2_model.requests[0].contents ) # User message must be present - assert any( - 'Original user request' in str(c) for _, c in agent2_contents - ) + assert any("Original user request" in str(c) for _, c in agent2_contents) # Upstream agent's narrative entry must be absent - assert not any( - 'Upstream agent reply' in str(c) for _, c in agent2_contents - ) - assert not any('For context:' in str(c) for _, c in agent2_contents) + assert not any("Upstream agent reply" in str(c) for _, c in agent2_contents) + assert not any("For context:" in str(c) for _, c in agent2_contents) # --------------------------------------------------------------------------- @@ -326,30 +318,28 @@ async def test_include_sources_user_self_drops_upstream_across_turns(): narrative entries from the upstream agent across multiple invocations. """ agent1_model = testing_utils.MockModel.create( - responses=['Turn1 upstream reply', 'Turn2 upstream reply'] + responses=["Turn1 upstream reply", "Turn2 upstream reply"] ) agent1 = LlmAgent( - name='upstream', + name="upstream", model=agent1_model, - instruction='You are upstream', + instruction="You are upstream", ) agent2_model = testing_utils.MockModel.create( - responses=['Turn1 downstream', 'Turn2 downstream'] + responses=["Turn1 downstream", "Turn2 downstream"] ) agent2 = LlmAgent( - name='downstream', + name="downstream", model=agent2_model, - include_sources=['user', 'self'], - instruction='You are downstream', + include_sources=["user", "self"], + instruction="You are downstream", ) - sequential = SequentialAgent( - name='pipeline', sub_agents=[agent1, agent2] - ) + sequential = SequentialAgent(name="pipeline", sub_agents=[agent1, agent2]) runner = testing_utils.InMemoryRunner(sequential) - runner.run('Turn 1 user message') - runner.run('Turn 2 user message') + runner.run("Turn 1 user message") + runner.run("Turn 2 user message") # Second invocation of downstream agent — should see user messages + own # prior turn, but not upstream's narrative entries. @@ -358,14 +348,10 @@ async def test_include_sources_user_self_drops_upstream_across_turns(): ) # User messages must be present - assert any( - 'Turn 1 user message' in str(c) for _, c in agent2_second_contents - ) - assert any( - 'Turn 2 user message' in str(c) for _, c in agent2_second_contents - ) + assert any("Turn 1 user message" in str(c) for _, c in agent2_second_contents) + assert any("Turn 2 user message" in str(c) for _, c in agent2_second_contents) # Upstream agent's narrative entries must be absent assert not any( - 'upstream reply' in str(c).lower() for _, c in agent2_second_contents + "upstream reply" in str(c).lower() for _, c in agent2_second_contents ) - assert not any('For context:' in str(c) for _, c in agent2_second_contents) + assert not any("For context:" in str(c) for _, c in agent2_second_contents) diff --git a/tests/unittests/flows/llm_flows/test_contents_source_filter.py b/tests/unittests/flows/llm_flows/test_contents_source_filter.py index 33fe2d07f8..b930f2a1fd 100644 --- a/tests/unittests/flows/llm_flows/test_contents_source_filter.py +++ b/tests/unittests/flows/llm_flows/test_contents_source_filter.py @@ -24,21 +24,15 @@ def _user_event(text: str, invocation_id: str = 'inv') -> Event: return Event( invocation_id=invocation_id, author='user', - content=types.Content( - role='user', parts=[types.Part(text=text)] - ), + content=types.Content(role='user', parts=[types.Part(text=text)]), ) -def _model_event( - text: str, author: str, invocation_id: str = 'inv' -) -> Event: +def _model_event(text: str, author: str, invocation_id: str = 'inv') -> Event: return Event( invocation_id=invocation_id, author=author, - content=types.Content( - role='model', parts=[types.Part(text=text)] - ), + content=types.Content(role='model', parts=[types.Part(text=text)]), ) @@ -222,7 +216,7 @@ def test_source_filter_self_keeps_fc_call_and_response_together(): # FC call (role=model) and FC response (role=user) both belong to 'self' roles = [c.role for c in result] assert 'model' in roles # function call kept - assert 'user' in roles # function response kept (no orphan) + assert 'user' in roles # function response kept (no orphan) def test_source_filter_without_self_drops_fc_call_and_response_together(): @@ -240,9 +234,7 @@ def test_source_filter_without_self_drops_fc_call_and_response_together(): # Both FC call and FC response are dropped — no orphaned function_response part assert not any(c.role == 'model' for c in result) assert not any( - p.function_response is not None - for c in result - for p in (c.parts or []) + p.function_response is not None for c in result for p in c.parts or [] )