diff --git a/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPlugin.java b/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPlugin.java index 566dbd5a4..5d0145318 100644 --- a/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPlugin.java +++ b/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPlugin.java @@ -55,15 +55,19 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.genai.types.Content; +import com.google.genai.types.CustomMetadata; import com.google.genai.types.Part; import io.reactivex.rxjava3.core.Completable; import io.reactivex.rxjava3.core.Maybe; import java.io.IOException; import java.time.Duration; import java.time.Instant; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.logging.Level; import java.util.logging.Logger; @@ -496,6 +500,85 @@ public Maybe onEventCallback(InvocationContext invocationContext, Event e } } } + + // --- A2A interaction logging --- + if (event.customMetadata().isPresent()) { + Map a2aKeys = new HashMap<>(); + for (CustomMetadata cm : event.customMetadata().get()) { + if (cm.key().isPresent() && cm.key().get().startsWith(BigQueryUtils.A2A_PREFIX)) { + cm.stringValue().ifPresent(val -> a2aKeys.put(cm.key().get(), val)); + } + } + if (a2aKeys.containsKey(BigQueryUtils.A2A_REQUEST_KEY) + || a2aKeys.containsKey(BigQueryUtils.A2A_RESPONSE_KEY)) { + Object responsePayload = a2aKeys.get(BigQueryUtils.A2A_RESPONSE_KEY); + Object contentDict = null; + boolean contentTruncated = false; + if (responsePayload != null) { + TruncationResult tr = smartTruncate(responsePayload, config.maxContentLength()); + contentDict = toJavaObject(tr.node()); + contentTruncated = tr.isTruncated(); + } + + // Exclude a2a:response from a2a_metadata to save storage space and avoid duplication + Map a2aMetaKeys = new HashMap<>(a2aKeys); + a2aMetaKeys.remove(BigQueryUtils.A2A_RESPONSE_KEY); + TruncationResult a2aTruncated = smartTruncate(a2aMetaKeys, config.maxContentLength()); + + Map extraAttributes = new HashMap<>(); + Object a2aMeta = toJavaObject(a2aTruncated.node()); + if (a2aMeta != null) { + extraAttributes.put("a2a_metadata", a2aMeta); + } + + logCompletable = + logCompletable.andThen( + logEvent( + "A2A_INTERACTION", + invocationContext, + contentDict, + a2aTruncated.isTruncated() || contentTruncated, + Optional.of(EventData.builder().setExtraAttributes(extraAttributes).build()))); + } + } + + // --- Final agent response logging --- + boolean isAgentResponse = isFinalAgentResponse(event); + + if (isAgentResponse) { + List visibleParts = new ArrayList<>(); + for (Part p : event.content().get().parts().get()) { + if (p.text().isPresent() && !p.thought().orElse(false)) { + visibleParts.add(p); + } + } + if (!visibleParts.isEmpty()) { + Content visibleContent = + Content.builder() + .role(event.content().get().role().orElse("model")) + .parts(visibleParts) + .build(); + + Map extraAttributes = new HashMap<>(); + if (event.id() != null) { + extraAttributes.put("source_event_id", event.id()); + } + if (event.author() != null) { + extraAttributes.put("source_event_author", event.author()); + } + event.branch().ifPresent(branch -> extraAttributes.put("source_event_branch", branch)); + + logCompletable = + logCompletable.andThen( + logEvent( + "AGENT_RESPONSE", + invocationContext, + ImmutableMap.of("response", visibleContent), + false, + Optional.of(EventData.builder().setExtraAttributes(extraAttributes).build()))); + } + } + return logCompletable.andThen(Maybe.empty()); } @@ -635,6 +718,9 @@ public Maybe afterModelCallback( usage.promptTokenCount().ifPresent(c -> usageDict.put("prompt", c)); usage.candidatesTokenCount().ifPresent(c -> usageDict.put("completion", c)); usage.totalTokenCount().ifPresent(c -> usageDict.put("total", c)); + usage + .cachedContentTokenCount() + .ifPresent(c -> usageDict.put("cached_content_token_count", c)); }); InvocationContext invocationContext = callbackContext.invocationContext(); @@ -858,4 +944,22 @@ private String getToolOrigin(BaseTool tool) { } return "UNKNOWN"; } + + /** + * Returns true if the event represents a final agent response. + * + *

We verify finalResponse() along with empty checks for partial, function calls/responses, and + * long-running tool IDs. This is required because finalResponse() would otherwise return true + * even for thought-only, short-circuited skipSummarization() events (which ADK treats as + * invisible internal reasoning and should not be logged as agent responses). + */ + private boolean isFinalAgentResponse(Event event) { + return event.content().isPresent() + && event.content().get().parts().isPresent() + && event.finalResponse() + && !event.partial().orElse(false) + && event.functionCalls().isEmpty() + && event.functionResponses().isEmpty() + && event.longRunningToolIds().map(Set::isEmpty).orElse(true); + } } diff --git a/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryUtils.java b/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryUtils.java index f0db45e12..946f244d4 100644 --- a/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryUtils.java +++ b/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryUtils.java @@ -47,6 +47,12 @@ final class BigQueryUtils { private static final Logger logger = Logger.getLogger(BigQueryUtils.class.getName()); + static final String A2A_PREFIX = "a2a:"; + static final String A2A_REQUEST_KEY = "a2a:request"; + static final String A2A_RESPONSE_KEY = "a2a:response"; + static final String A2A_TASK_ID_KEY = "a2a:task_id"; + static final String A2A_CONTEXT_ID_KEY = "a2a:context_id"; + private static final ImmutableList VIEW_COMMON_COLUMNS = ImmutableList.of( "timestamp", @@ -82,6 +88,12 @@ final class BigQueryUtils { "CAST(JSON_VALUE(content, '$.usage.completion') AS INT64) AS" + " usage_completion_tokens", "CAST(JSON_VALUE(content, '$.usage.total') AS INT64) AS usage_total_tokens", + "CAST(JSON_VALUE(attributes, '$.usage_metadata.cached_content_token_count') AS" + + " INT64) AS usage_cached_tokens", + "SAFE_DIVIDE(CAST(JSON_VALUE(attributes," + + " '$.usage_metadata.cached_content_token_count') AS INT64)," + + "CAST(JSON_VALUE(content, '$.usage.prompt') AS INT64)) AS" + + " context_cache_hit_rate", "CAST(JSON_VALUE(latency_ms, '$.total_ms') AS INT64) AS total_ms", "CAST(JSON_VALUE(latency_ms, '$.time_to_first_token_ms') AS INT64) AS ttft_ms", "JSON_VALUE(attributes, '$.model_version') AS model_version", @@ -135,6 +147,33 @@ final class BigQueryUtils { ImmutableList.of( "JSON_VALUE(content, '$.tool') AS tool_name", "JSON_QUERY(content, '$.args') AS tool_args")) + .put( + "A2A_INTERACTION", + ImmutableList.of( + "content AS response_content", + "JSON_VALUE(attributes, '$.a2a_metadata.\"" + + A2A_TASK_ID_KEY + + "\"') AS" + + " a2a_task_id", + "JSON_VALUE(attributes, '$.a2a_metadata.\"" + + A2A_CONTEXT_ID_KEY + + "\"') AS" + + " a2a_context_id", + "JSON_QUERY(attributes, '$.a2a_metadata.\"" + + A2A_REQUEST_KEY + + "\"') AS" + + " a2a_request", + "JSON_QUERY(attributes, '$.a2a_metadata.\"" + + A2A_RESPONSE_KEY + + "\"') AS" + + " a2a_response")) + .put( + "AGENT_RESPONSE", + ImmutableList.of( + "JSON_VALUE(content, '$.response') AS response_text", + "JSON_VALUE(attributes, '$.source_event_id') AS source_event_id", + "JSON_VALUE(attributes, '$.source_event_author') AS source_event_author", + "JSON_VALUE(attributes, '$.source_event_branch') AS source_event_branch")) .buildOrThrow(); private static final String FRAMEWORK_PREFIX = "google-adk-bq-logger-java"; diff --git a/core/src/test/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPluginTest.java b/core/src/test/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPluginTest.java index 6d65e2f15..5898fb772 100644 --- a/core/src/test/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPluginTest.java +++ b/core/src/test/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPluginTest.java @@ -65,6 +65,7 @@ import com.google.common.collect.ImmutableMap; import com.google.genai.types.Candidate; import com.google.genai.types.Content; +import com.google.genai.types.CustomMetadata; import com.google.genai.types.GenerateContentResponse; import com.google.genai.types.GenerateContentResponseUsageMetadata; import com.google.genai.types.Part; @@ -555,6 +556,169 @@ public void onEventCallback_populatesCorrectFields() throws Exception { assertEquals(false, row.get("is_truncated")); } + @Test + public void onEventCallback_withA2AMetadata_emitsA2AInteraction() throws Exception { + Event event = + Event.builder() + .author("agent_author") + .customMetadata( + ImmutableList.of( + CustomMetadata.builder().key("a2a:task_id").stringValue("task-123").build(), + CustomMetadata.builder() + .key("a2a:response") + .stringValue("a2a_payload") + .build())) + .build(); + + plugin.onEventCallback(mockInvocationContext, event).blockingSubscribe(); + CompletableFuture.allOf( + state + .getPendingTasksForInvocation("invocation_id") + .toArray(new CompletableFuture[0])) + .join(); + + Map stateDeltaRow = state.getBatchProcessor("invocation_id").queue.poll(); + assertNotNull(stateDeltaRow); + assertEquals("STATE_DELTA", stateDeltaRow.get("event_type")); + + Map a2aRow = state.getBatchProcessor("invocation_id").queue.poll(); + assertNotNull("A2A_INTERACTION row not found in queue", a2aRow); + assertEquals("A2A_INTERACTION", a2aRow.get("event_type")); + assertEquals("agent_name", a2aRow.get("agent")); + assertTrue(a2aRow.get("content").toString().contains("a2a_payload")); + ObjectNode attributes = (ObjectNode) a2aRow.get("attributes"); + ObjectNode a2aMetadata = (ObjectNode) attributes.get("a2a_metadata"); + assertEquals("task-123", a2aMetadata.get("a2a:task_id").asText()); + assertFalse( + "a2a:response should be excluded from a2a_metadata to avoid duplication", + a2aMetadata.has("a2a:response")); + } + + @Test + public void onEventCallback_agentResponse_emitsAgentResponse() throws Exception { + Event event = + Event.builder() + .id("evt-id") + .author("agent_author") + .branch("branch-val") + .content(Content.fromParts(Part.fromText("agent final answer"))) + .build(); + + plugin.onEventCallback(mockInvocationContext, event).blockingSubscribe(); + CompletableFuture.allOf( + state + .getPendingTasksForInvocation("invocation_id") + .toArray(new CompletableFuture[0])) + .join(); + + Map stateDeltaRow = state.getBatchProcessor("invocation_id").queue.poll(); + assertNotNull(stateDeltaRow); + assertEquals("STATE_DELTA", stateDeltaRow.get("event_type")); + + Map agentResponseRow = state.getBatchProcessor("invocation_id").queue.poll(); + assertNotNull("AGENT_RESPONSE row not found in queue", agentResponseRow); + assertEquals("AGENT_RESPONSE", agentResponseRow.get("event_type")); + assertEquals("agent_name", agentResponseRow.get("agent")); + String contentStr = String.valueOf(agentResponseRow.get("content")); + assertTrue( + "Expected content to contain 'agent final answer', but was: " + contentStr, + contentStr.contains("agent final answer")); + + ObjectNode attributes = (ObjectNode) agentResponseRow.get("attributes"); + assertEquals("evt-id", attributes.get("source_event_id").asText()); + assertEquals("agent_author", attributes.get("source_event_author").asText()); + assertEquals("branch-val", attributes.get("source_event_branch").asText()); + } + + @Test + public void onEventCallback_withA2ARequestOnlyMetadata_emitsA2AInteraction() throws Exception { + Event event = + Event.builder() + .author("agent_author") + .customMetadata( + ImmutableList.of( + CustomMetadata.builder().key("a2a:task_id").stringValue("task-456").build(), + CustomMetadata.builder().key("a2a:context_id").stringValue("ctx-789").build(), + CustomMetadata.builder().key("a2a:request").stringValue("req_payload").build())) + .build(); + + plugin.onEventCallback(mockInvocationContext, event).blockingSubscribe(); + CompletableFuture.allOf( + state + .getPendingTasksForInvocation("invocation_id") + .toArray(new CompletableFuture[0])) + .join(); + + Map stateDeltaRow = state.getBatchProcessor("invocation_id").queue.poll(); + assertNotNull(stateDeltaRow); + + Map a2aRow = state.getBatchProcessor("invocation_id").queue.poll(); + assertNotNull("A2A_INTERACTION row not found in queue", a2aRow); + assertEquals("A2A_INTERACTION", a2aRow.get("event_type")); + assertEquals("agent_name", a2aRow.get("agent")); + assertFalse( + "Content should not contain a2a_response payload since it was absent", + a2aRow.containsKey("content")); + ObjectNode attributes = (ObjectNode) a2aRow.get("attributes"); + ObjectNode a2aMetadata = (ObjectNode) attributes.get("a2a_metadata"); + assertEquals("task-456", a2aMetadata.get("a2a:task_id").asText()); + assertEquals("ctx-789", a2aMetadata.get("a2a:context_id").asText()); + assertEquals("req_payload", a2aMetadata.get("a2a:request").asText()); + } + + @Test + public void onEventCallback_agentResponse_filtersThoughtAndAppliesTruncation() throws Exception { + Event event = + Event.builder() + .author("agent_author") + .content( + Content.builder() + .parts( + Part.builder().text("internal reasoning process").thought(true).build(), + Part.fromText("this text is very long and will exceed the limit")) + .build()) + .build(); + + BigQueryLoggerConfig customConfig = config.toBuilder().maxContentLength(20).build(); + PluginState customState = + new PluginState(customConfig) { + @Override + protected BigQueryWriteClient createWriteClient(BigQueryLoggerConfig config) { + return mockWriteClient; + } + + @Override + protected StreamWriter createWriter() { + return mockWriter; + } + }; + BigQueryAgentAnalyticsPlugin customPlugin = + new BigQueryAgentAnalyticsPlugin(customConfig, mockBigQuery, customState); + + customPlugin.onEventCallback(mockInvocationContext, event).blockingSubscribe(); + CompletableFuture.allOf( + customState + .getPendingTasksForInvocation("invocation_id") + .toArray(new CompletableFuture[0])) + .join(); + + // Consume STATE_DELTA + Map stateDeltaRow = customState.getBatchProcessor("invocation_id").queue.poll(); + assertNotNull(stateDeltaRow); + + // Get AGENT_RESPONSE + Map agentResponseRow = + customState.getBatchProcessor("invocation_id").queue.poll(); + assertNotNull("AGENT_RESPONSE row not found in queue", agentResponseRow); + assertEquals("AGENT_RESPONSE", agentResponseRow.get("event_type")); + + // Check content and truncation behavior + String contentStr = String.valueOf(agentResponseRow.get("content")); + assertTrue("Content should be marked as truncated", contentStr.contains("truncated")); + assertFalse("Thought part should be filtered out", contentStr.contains("reasoning")); + assertEquals(true, agentResponseRow.get("is_truncated")); + } + @Test public void onModelErrorCallback_populatesCorrectFields() throws Exception { CallbackContext mockCallbackContext = mock(CallbackContext.class); @@ -592,6 +756,7 @@ public void afterModelCallback_populatesCorrectFields() throws Exception { .promptTokenCount(10) .candidatesTokenCount(20) .totalTokenCount(30) + .cachedContentTokenCount(5) .build(); GenerateContentResponse response = @@ -630,6 +795,8 @@ public void afterModelCallback_populatesCorrectFields() throws Exception { assertEquals("v1", attributes.get("model_version").asText()); ObjectNode usageAttr = (ObjectNode) attributes.get("usage_metadata"); assertEquals(10, usageAttr.get("prompt").asInt()); + assertEquals(5, usageAttr.get("cached_content_token_count").asInt()); + assertEquals(false, row.get("is_truncated")); assertNotNull(row.get("parent_span_id")); ObjectNode latencyMs = (ObjectNode) row.get("latency_ms"); @@ -952,6 +1119,14 @@ public void createAnalyticsViews_executesQueries() throws Exception { assertTrue( queries.stream() .anyMatch(q -> q.contains("CREATE OR REPLACE VIEW `project.dataset.v_llm_response`"))); + assertTrue( + queries.stream() + .anyMatch( + q -> q.contains("CREATE OR REPLACE VIEW `project.dataset.v_a2a_interaction`"))); + assertTrue( + queries.stream() + .anyMatch( + q -> q.contains("CREATE OR REPLACE VIEW `project.dataset.v_agent_response`"))); } @Test