diff --git a/src/google/adk/agents/llm_agent.py b/src/google/adk/agents/llm_agent.py index ee1b05c535..2b38f9110d 100644 --- a/src/google/adk/agents/llm_agent.py +++ b/src/google/adk/agents/llm_agent.py @@ -352,6 +352,27 @@ class LlmAgent(BaseAgent, abc.ABC): instruction and input """ + include_sources: Optional[list[str]] = None + """Allowlist of content sources to include in model requests. + + Orthogonal to include_contents (temporal window); this controls which + sources are kept from within that window. + + Options: + None (default): all sources pass through — backward-compatible. + list[str]: only content from the listed sources is kept. + + Reserved source names: + 'user' — plain human user messages (not tool outputs) + 'self' — this agent's own prior model outputs + — any other string is matched against event.author (agent name) + + Example — keep full history but only user + this agent's turns: + include_contents='default', include_sources=['user', 'self'] + + Raises ValueError if set to [] (use None to disable filtering). + """ + # Controlled input/output configurations - Start input_schema: Optional[type[BaseModel]] = None """The input schema when agent is used as a tool.""" @@ -957,6 +978,17 @@ def __maybe_save_output_to_state(self, event: Event): def __model_validator_after(self) -> LlmAgent: return self + @field_validator('include_sources', mode='after') # type: ignore[misc] + @classmethod + def _validate_include_sources( + cls, v: Optional[list[str]] + ) -> Optional[list[str]]: + if v is not None and len(v) == 0: + raise ValueError( + 'include_sources=[] keeps nothing. Use None to disable filtering.' + ) + return v + @field_validator('generate_content_config', mode='after') @classmethod def validate_generate_content_config( diff --git a/src/google/adk/flows/llm_flows/contents.py b/src/google/adk/flows/llm_flows/contents.py index fab5afd2cd..c6f9afb747 100644 --- a/src/google/adk/flows/llm_flows/contents.py +++ b/src/google/adk/flows/llm_flows/contents.py @@ -68,6 +68,7 @@ async def run_async( instruction_related_contents = llm_request.contents is_single_turn = getattr(agent, 'mode', None) == 'single_turn' + source_filter = getattr(agent, 'include_sources', None) if agent.include_contents == 'default': # Include full conversation history llm_request.contents = _get_contents( @@ -78,6 +79,7 @@ async def run_async( isolation_scope=invocation_context.isolation_scope, is_single_turn=is_single_turn, user_content=invocation_context.user_content, + source_filter=source_filter, ) else: # Include current turn context only (no conversation history) @@ -89,6 +91,7 @@ async def run_async( isolation_scope=invocation_context.isolation_scope, is_single_turn=is_single_turn, user_content=invocation_context.user_content, + source_filter=source_filter, ) # Add instruction-related contents to proper position in conversation @@ -504,6 +507,7 @@ def _get_contents( isolation_scope: Optional[str] = None, is_single_turn: bool = False, user_content: Optional[types.Content] = None, + source_filter: Optional[list[str]] = None, ) -> list[types.Content]: """Get the contents for the LLM request. @@ -610,6 +614,7 @@ def _get_contents( accumulated_output_transcription = '' is_other_reply = _is_other_agent_reply(agent_name, event) + other_fc_author = None # set when is_other_reply via FC attribution # Check if it's a FunctionResponse for another agent if not is_other_reply and event.content: @@ -623,8 +628,43 @@ def _get_contents( and call_author != 'user' ): is_other_reply = True + other_fc_author = call_author break + if source_filter is not None: + if is_other_reply: + if event.author != 'user': + # In live mode the current agent's own events are also classified as + # other_reply (see _is_other_agent_reply). Map the actual agent name + # to the 'self' reserved name so source_filter=['self'] works. + effective_source = ( + 'self' if event.author == agent_name else event.author + ) + if effective_source not in source_filter: + continue + else: + # 'user'-authored FC response to another agent's call. + # other_fc_author was resolved above — no second iteration needed. + # _present_other_agent_message converts it to text, so no raw + # function_response survives — but drop it when its call author is + # filtered to avoid "[agent_b] returned X" with no visible preceding + # "[agent_b] called tool Y". + if other_fc_author and other_fc_author not in source_filter: + continue + elif event.content: + if event.content.role == 'model': + if 'self' not in source_filter: + continue + elif event.content.role == 'user': + if _content_contains_function_response(event.content): + # FC responses are paired with the current agent's own tool calls + # (role='model'). Tie them to 'self' so dropping 'self' drops both + # sides of the pair and avoids orphaned function_response parts. + if 'self' not in source_filter: + continue + elif 'user' not in source_filter: + continue + if is_other_reply: if converted_event := _present_other_agent_message(event): filtered_events.append(converted_event) @@ -677,6 +717,7 @@ def _get_current_turn_contents( is_single_turn: bool = False, isolation_scope: Optional[str] = None, user_content: Optional[types.Content] = None, + source_filter: Optional[list[str]] = None, ) -> list[types.Content]: """Get contents for the current turn only (no conversation history). @@ -712,6 +753,7 @@ def _get_current_turn_contents( isolation_scope=isolation_scope, is_single_turn=is_single_turn, user_content=user_content, + source_filter=source_filter, ) return [] diff --git a/tests/unittests/agents/test_llm_agent_include_contents.py b/tests/unittests/agents/test_llm_agent_include_contents.py index c24aab4ef0..64acde4117 100644 --- a/tests/unittests/agents/test_llm_agent_include_contents.py +++ b/tests/unittests/agents/test_llm_agent_include_contents.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Unit tests for LlmAgent include_contents field behavior.""" +"""Unit tests for LlmAgent include_contents and include_sources field behavior.""" from google.adk.agents.llm_agent import LlmAgent from google.adk.agents.sequential_agent import SequentialAgent @@ -241,3 +241,117 @@ async def test_include_contents_none_sequential_agents(): assert any( "Agent1 response" in str(content) for _, content in agent2_contents ) + + +# --------------------------------------------------------------------------- +# include_sources: field validation +# --------------------------------------------------------------------------- + + +def test_include_sources_empty_list_raises(): + """include_sources=[] must raise ValueError — use None to disable filtering.""" + with pytest.raises(ValueError, match="include_sources=\\[\\]"): + LlmAgent( + name="agent", + model="gemini-2.5-flash", + include_sources=[], + ) + + +def test_include_sources_none_is_accepted(): + """include_sources=None (default) must not raise.""" + agent = LlmAgent(name="agent", model="gemini-2.5-flash", include_sources=None) + assert agent.include_sources is None + + +# --------------------------------------------------------------------------- +# include_sources: integration — user-only in sequential pipeline +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_include_sources_user_only_drops_upstream_agent_entries(): + """Downstream agent with include_sources=['user'] receives only the human user message.""" + agent1_model = testing_utils.MockModel.create( + responses=["Upstream agent reply"] + ) + agent1 = LlmAgent( + name="upstream", + model=agent1_model, + instruction="You are upstream", + ) + + agent2_model = testing_utils.MockModel.create( + responses=["Downstream response"] + ) + agent2 = LlmAgent( + name="downstream", + model=agent2_model, + include_sources=["user"], + instruction="You are downstream", + ) + + sequential = SequentialAgent(name="pipeline", sub_agents=[agent1, agent2]) + runner = testing_utils.InMemoryRunner(sequential) + runner.run("Original user request") + + agent2_contents = testing_utils.simplify_contents( + agent2_model.requests[0].contents + ) + + # User message must be present + assert any("Original user request" in str(c) for _, c in agent2_contents) + # Upstream agent's narrative entry must be absent + assert not any("Upstream agent reply" in str(c) for _, c in agent2_contents) + assert not any("For context:" in str(c) for _, c in agent2_contents) + + +# --------------------------------------------------------------------------- +# include_sources: composing with include_contents='default' — multi-turn +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_include_sources_user_self_drops_upstream_across_turns(): + """include_sources=['user','self'] + include_contents='default' (full history): + downstream agent sees all user messages and its own prior turns, but no + narrative entries from the upstream agent across multiple invocations. + """ + agent1_model = testing_utils.MockModel.create( + responses=["Turn1 upstream reply", "Turn2 upstream reply"] + ) + agent1 = LlmAgent( + name="upstream", + model=agent1_model, + instruction="You are upstream", + ) + + agent2_model = testing_utils.MockModel.create( + responses=["Turn1 downstream", "Turn2 downstream"] + ) + agent2 = LlmAgent( + name="downstream", + model=agent2_model, + include_sources=["user", "self"], + instruction="You are downstream", + ) + + sequential = SequentialAgent(name="pipeline", sub_agents=[agent1, agent2]) + runner = testing_utils.InMemoryRunner(sequential) + runner.run("Turn 1 user message") + runner.run("Turn 2 user message") + + # Second invocation of downstream agent — should see user messages + own + # prior turn, but not upstream's narrative entries. + agent2_second_contents = testing_utils.simplify_contents( + agent2_model.requests[1].contents + ) + + # User messages must be present + assert any("Turn 1 user message" in str(c) for _, c in agent2_second_contents) + assert any("Turn 2 user message" in str(c) for _, c in agent2_second_contents) + # Upstream agent's narrative entries must be absent + assert not any( + "upstream reply" in str(c).lower() for _, c in agent2_second_contents + ) + assert not any("For context:" in str(c) for _, c in agent2_second_contents) diff --git a/tests/unittests/flows/llm_flows/test_contents_source_filter.py b/tests/unittests/flows/llm_flows/test_contents_source_filter.py new file mode 100644 index 0000000000..b930f2a1fd --- /dev/null +++ b/tests/unittests/flows/llm_flows/test_contents_source_filter.py @@ -0,0 +1,385 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for source_filter parameter in _get_contents / _get_current_turn_contents.""" + +from google.adk.events.event import Event +from google.adk.flows.llm_flows import contents +from google.genai import types +import pytest + + +def _user_event(text: str, invocation_id: str = 'inv') -> Event: + return Event( + invocation_id=invocation_id, + author='user', + content=types.Content(role='user', parts=[types.Part(text=text)]), + ) + + +def _model_event(text: str, author: str, invocation_id: str = 'inv') -> Event: + return Event( + invocation_id=invocation_id, + author=author, + content=types.Content(role='model', parts=[types.Part(text=text)]), + ) + + +def _function_response_event( + name: str, response: dict, invocation_id: str = 'inv' +) -> Event: + return Event( + invocation_id=invocation_id, + author='user', + content=types.Content( + role='user', + parts=[ + types.Part.from_function_response(name=name, response=response) + ], + ), + ) + + +def _function_call_event( + name: str, args: dict, author: str, invocation_id: str = 'inv' +) -> Event: + return Event( + invocation_id=invocation_id, + author=author, + content=types.Content( + role='model', + parts=[types.Part.from_function_call(name=name, args=args)], + ), + ) + + +# --------------------------------------------------------------------------- +# Regression: source_filter=None is a no-op +# --------------------------------------------------------------------------- + + +def test_source_filter_none_is_no_op(): + """source_filter=None should produce identical output to omitting the param.""" + events = [ + _user_event('hello'), + _model_event('hi there', author='agent_a'), + _model_event('peer reply', author='agent_b'), + ] + without = contents._get_contents(None, events, agent_name='agent_a') + with_none = contents._get_contents( + None, events, agent_name='agent_a', source_filter=None + ) + assert without == with_none + + +# --------------------------------------------------------------------------- +# user only +# --------------------------------------------------------------------------- + + +def test_source_filter_user_keeps_user_drops_model_and_others(): + """['user'] keeps user messages, drops this agent's model turns and peers.""" + events = [ + _user_event('user msg 1'), + _model_event('self reply', author='agent_a'), + _model_event('peer reply', author='agent_b'), + _user_event('user msg 2'), + ] + result = contents._get_contents( + None, events, agent_name='agent_a', source_filter=['user'] + ) + texts = [p.text for c in result for p in c.parts if p.text] + assert 'user msg 1' in texts + assert 'user msg 2' in texts + assert 'self reply' not in texts + assert 'peer reply' not in texts + # No narrative "For context:" wrapper — other agent was dropped entirely + assert not any('For context:' in t for t in texts) + + +# --------------------------------------------------------------------------- +# self only +# --------------------------------------------------------------------------- + + +def test_source_filter_self_keeps_model_drops_user_and_others(): + """['self'] keeps this agent's model turns, drops user messages and peers.""" + events = [ + _user_event('user msg'), + _model_event('self turn 1', author='agent_a'), + _model_event('peer reply', author='agent_b'), + _model_event('self turn 2', author='agent_a'), + ] + result = contents._get_contents( + None, events, agent_name='agent_a', source_filter=['self'] + ) + texts = [p.text for c in result for p in c.parts if p.text] + assert 'self turn 1' in texts + assert 'self turn 2' in texts + assert 'user msg' not in texts + assert 'peer reply' not in texts + + +# --------------------------------------------------------------------------- +# user + self +# --------------------------------------------------------------------------- + + +def test_source_filter_user_and_self_drops_other_agents(): + """['user', 'self'] keeps user + this agent's turns, drops all other agents.""" + events = [ + _user_event('hi'), + _model_event('my answer', author='agent_a'), + _model_event('agent_b reply', author='agent_b'), + _model_event('agent_c reply', author='agent_c'), + _user_event('follow up'), + ] + result = contents._get_contents( + None, events, agent_name='agent_a', source_filter=['user', 'self'] + ) + texts = [p.text for c in result for p in c.parts if p.text] + assert 'hi' in texts + assert 'follow up' in texts + assert 'my answer' in texts + assert 'agent_b reply' not in texts + assert 'agent_c reply' not in texts + assert not any('For context:' in t for t in texts) + + +# --------------------------------------------------------------------------- +# specific agent name +# --------------------------------------------------------------------------- + + +def test_source_filter_specific_agent_name(): + """['agent_b'] keeps only agent_b's entries, drops user, self, and agent_c.""" + events = [ + _user_event('user msg'), + _model_event('self reply', author='agent_a'), + _model_event('b says hi', author='agent_b'), + _model_event('c says bye', author='agent_c'), + ] + result = contents._get_contents( + None, events, agent_name='agent_a', source_filter=['agent_b'] + ) + texts = [p.text for c in result for p in c.parts if p.text] + assert any('b says hi' in t for t in texts) + assert 'user msg' not in texts + assert 'self reply' not in texts + assert 'c says bye' not in texts + + +def test_source_filter_user_and_specific_agent(): + """['user', 'agent_b'] keeps user + agent_b, drops self and agent_c.""" + events = [ + _user_event('user msg'), + _model_event('self reply', author='agent_a'), + _model_event('b says hi', author='agent_b'), + _model_event('c says bye', author='agent_c'), + ] + result = contents._get_contents( + None, events, agent_name='agent_a', source_filter=['user', 'agent_b'] + ) + texts = [p.text for c in result for p in c.parts if p.text] + assert 'user msg' in texts + assert any('b says hi' in t for t in texts) + assert 'self reply' not in texts + assert 'c says bye' not in texts + + +# --------------------------------------------------------------------------- +# Function responses are never filtered +# --------------------------------------------------------------------------- + + +def test_source_filter_self_keeps_fc_call_and_response_together(): + """FC call and response are both tied to 'self': including 'self' keeps both.""" + events = [ + _user_event('user msg'), + _function_call_event('my_tool', {'x': 1}, author='agent_a'), + _function_response_event('my_tool', {'result': 'ok'}), + ] + result = contents._get_contents( + None, events, agent_name='agent_a', source_filter=['self'] + ) + # FC call (role=model) and FC response (role=user) both belong to 'self' + roles = [c.role for c in result] + assert 'model' in roles # function call kept + assert 'user' in roles # function response kept (no orphan) + + +def test_source_filter_without_self_drops_fc_call_and_response_together(): + """Dropping 'self' drops both sides of the FC/FR pair to avoid orphaned responses.""" + events = [ + _user_event('plain user message'), + _function_call_event('tool', {}, author='agent_a'), + _function_response_event('tool', {'v': 1}), + ] + result = contents._get_contents( + None, events, agent_name='agent_a', source_filter=['user'] + ) + texts = [p.text for c in result for p in c.parts if p.text] + assert 'plain user message' in texts + # Both FC call and FC response are dropped — no orphaned function_response part + assert not any(c.role == 'model' for c in result) + assert not any( + p.function_response is not None for c in result for p in c.parts or [] + ) + + +# --------------------------------------------------------------------------- +# Interaction with _get_current_turn_contents +# --------------------------------------------------------------------------- + + +def test_source_filter_propagates_to_current_turn(): + """source_filter is respected when include_contents='none' path is taken. + + Simulates the start of a new invocation where only the user message has + arrived; _get_current_turn_contents identifies it as the turn boundary. + With source_filter=['user'], prior self/peer history is excluded and only + the current user message survives. + """ + events = [ + _user_event('turn 1', invocation_id='inv1'), + _model_event('self turn 1', author='agent_a', invocation_id='inv1'), + _model_event('peer old', author='agent_b', invocation_id='inv1'), + # New invocation: only the user message has arrived so far + _user_event('turn 2', invocation_id='inv2'), + ] + result = contents._get_current_turn_contents( + None, events, agent_name='agent_a', source_filter=['user'] + ) + texts = [p.text for c in result for p in c.parts if p.text] + # Only the current-turn user message survives; prior history is excluded + assert 'turn 2' in texts + assert 'turn 1' not in texts + assert 'self turn 1' not in texts + assert 'peer old' not in texts + + +def test_source_filter_drops_other_agent_fc_response_when_call_author_filtered(): + """When agent_b is not in filter, its FC call AND its 'user'-authored response are both dropped. + + Without this fix, the response would survive as + '[agent_b] tool returned X' text with no visible call — misleading context. + """ + events = [ + _user_event('user msg'), + # agent_b makes a function call (role=model, is_other_reply=True) + Event( + invocation_id='inv', + author='agent_b', + content=types.Content( + role='model', + parts=[types.Part.from_function_call(name='search', args={})], + ), + ), + # 'user'-authored response to agent_b's call (is_other_reply=True via fc_author_by_id) + _function_response_event('search', {'results': 'found it'}), + ] + result = contents._get_contents( + None, events, agent_name='agent_a', source_filter=['user', 'self'] + ) + texts = [p.text for c in result for p in c.parts if p.text] + assert 'user msg' in texts + # Neither the call nor the response from agent_b should appear + assert not any('search' in t for t in texts) + assert not any('found it' in t for t in texts) + + +def test_source_filter_keeps_other_agent_fc_response_when_call_author_included(): + """When agent_b IS in filter, its FC response is kept and converted to context text.""" + events = [ + _user_event('user msg'), + Event( + invocation_id='inv', + author='agent_b', + content=types.Content( + role='model', + parts=[types.Part.from_function_call(name='lookup', args={})], + ), + ), + _function_response_event('lookup', {'value': 42}), + ] + result = contents._get_contents( + None, events, agent_name='agent_a', source_filter=['user', 'agent_b'] + ) + texts = [p.text for c in result for p in c.parts if p.text] + assert 'user msg' in texts + # agent_b's call and response both present (as narrative text) + assert any('lookup' in t for t in texts) + + +def test_source_filter_self_matches_current_agent_in_live_mode(): + """In live mode, the current agent's events are classified as other_reply. + + source_filter=['self'] must still keep them by mapping event.author==agent_name + to the 'self' reserved name, not by literal string comparison. + """ + live_session_id = 'live-123' + events = [ + Event( + invocation_id='inv', + author='user', + live_session_id=live_session_id, + content=types.Content( + role='user', parts=[types.Part(text='user prompt')] + ), + ), + # In live mode, current agent's own turn has is_other_reply=True + Event( + invocation_id='inv', + author='agent_a', + live_session_id=live_session_id, + content=types.Content( + role='model', parts=[types.Part(text='my own reply')] + ), + ), + Event( + invocation_id='inv', + author='agent_b', + live_session_id=live_session_id, + content=types.Content( + role='model', parts=[types.Part(text='peer reply')] + ), + ), + ] + result = contents._get_contents( + None, events, agent_name='agent_a', source_filter=['user', 'self'] + ) + texts = [p.text for c in result for p in c.parts if p.text] + assert 'user prompt' in texts + # Current agent's own turn must survive even though is_other_reply=True in live mode + assert any('my own reply' in t for t in texts) + # Peer agent must be filtered + assert not any('peer reply' in t for t in texts) + + +def test_source_filter_all_sources_is_same_as_none(): + """Filtering with all relevant source names present is equivalent to no filter.""" + agent_name = 'agent_a' + events = [ + _user_event('hello'), + _model_event('self reply', author=agent_name), + _model_event('peer reply', author='agent_b'), + ] + no_filter = contents._get_contents(None, events, agent_name=agent_name) + all_sources = contents._get_contents( + None, + events, + agent_name=agent_name, + source_filter=['user', 'self', 'agent_b'], + ) + assert no_filter == all_sources