Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,19 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.genai.types.Content;
import com.google.genai.types.CustomMetadata;
import com.google.genai.types.Part;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Maybe;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down Expand Up @@ -496,6 +500,85 @@ public Maybe<Event> onEventCallback(InvocationContext invocationContext, Event e
}
}
}

// --- A2A interaction logging ---
if (event.customMetadata().isPresent()) {
Map<String, Object> a2aKeys = new HashMap<>();
for (CustomMetadata cm : event.customMetadata().get()) {
if (cm.key().isPresent() && cm.key().get().startsWith(BigQueryUtils.A2A_PREFIX)) {
cm.stringValue().ifPresent(val -> a2aKeys.put(cm.key().get(), val));
}
}
if (a2aKeys.containsKey(BigQueryUtils.A2A_REQUEST_KEY)
|| a2aKeys.containsKey(BigQueryUtils.A2A_RESPONSE_KEY)) {
Object responsePayload = a2aKeys.get(BigQueryUtils.A2A_RESPONSE_KEY);
Object contentDict = null;
boolean contentTruncated = false;
if (responsePayload != null) {
TruncationResult tr = smartTruncate(responsePayload, config.maxContentLength());
contentDict = toJavaObject(tr.node());
contentTruncated = tr.isTruncated();
}

// Exclude a2a:response from a2a_metadata to save storage space and avoid duplication
Map<String, Object> a2aMetaKeys = new HashMap<>(a2aKeys);
a2aMetaKeys.remove(BigQueryUtils.A2A_RESPONSE_KEY);
TruncationResult a2aTruncated = smartTruncate(a2aMetaKeys, config.maxContentLength());

Map<String, Object> extraAttributes = new HashMap<>();
Object a2aMeta = toJavaObject(a2aTruncated.node());
if (a2aMeta != null) {
extraAttributes.put("a2a_metadata", a2aMeta);
}

logCompletable =
logCompletable.andThen(
logEvent(
"A2A_INTERACTION",
invocationContext,
contentDict,
a2aTruncated.isTruncated() || contentTruncated,
Optional.of(EventData.builder().setExtraAttributes(extraAttributes).build())));
}
}

// --- Final agent response logging ---
boolean isAgentResponse = isFinalAgentResponse(event);

if (isAgentResponse) {
List<Part> visibleParts = new ArrayList<>();
for (Part p : event.content().get().parts().get()) {
if (p.text().isPresent() && !p.thought().orElse(false)) {
visibleParts.add(p);
}
}
if (!visibleParts.isEmpty()) {
Content visibleContent =
Content.builder()
.role(event.content().get().role().orElse("model"))
.parts(visibleParts)
.build();

Map<String, Object> extraAttributes = new HashMap<>();
if (event.id() != null) {
extraAttributes.put("source_event_id", event.id());
}
if (event.author() != null) {
extraAttributes.put("source_event_author", event.author());
}
event.branch().ifPresent(branch -> extraAttributes.put("source_event_branch", branch));

logCompletable =
logCompletable.andThen(
logEvent(
"AGENT_RESPONSE",
invocationContext,
ImmutableMap.of("response", visibleContent),
false,
Optional.of(EventData.builder().setExtraAttributes(extraAttributes).build())));
}
}

return logCompletable.andThen(Maybe.empty());
}

Expand Down Expand Up @@ -635,6 +718,9 @@ public Maybe<LlmResponse> afterModelCallback(
usage.promptTokenCount().ifPresent(c -> usageDict.put("prompt", c));
usage.candidatesTokenCount().ifPresent(c -> usageDict.put("completion", c));
usage.totalTokenCount().ifPresent(c -> usageDict.put("total", c));
usage
.cachedContentTokenCount()
.ifPresent(c -> usageDict.put("cached_content_token_count", c));
});

InvocationContext invocationContext = callbackContext.invocationContext();
Expand Down Expand Up @@ -858,4 +944,22 @@ private String getToolOrigin(BaseTool tool) {
}
return "UNKNOWN";
}

/**
* Returns true if the event represents a final agent response.
*
* <p>We verify finalResponse() along with empty checks for partial, function calls/responses, and
* long-running tool IDs. This is required because finalResponse() would otherwise return true
* even for thought-only, short-circuited skipSummarization() events (which ADK treats as
* invisible internal reasoning and should not be logged as agent responses).
*/
private boolean isFinalAgentResponse(Event event) {
return event.content().isPresent()
&& event.content().get().parts().isPresent()
&& event.finalResponse()
&& !event.partial().orElse(false)
&& event.functionCalls().isEmpty()
&& event.functionResponses().isEmpty()
&& event.longRunningToolIds().map(Set::isEmpty).orElse(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@
final class BigQueryUtils {
private static final Logger logger = Logger.getLogger(BigQueryUtils.class.getName());

static final String A2A_PREFIX = "a2a:";
static final String A2A_REQUEST_KEY = "a2a:request";
static final String A2A_RESPONSE_KEY = "a2a:response";
static final String A2A_TASK_ID_KEY = "a2a:task_id";
static final String A2A_CONTEXT_ID_KEY = "a2a:context_id";

private static final ImmutableList<String> VIEW_COMMON_COLUMNS =
ImmutableList.of(
"timestamp",
Expand Down Expand Up @@ -82,6 +88,12 @@ final class BigQueryUtils {
"CAST(JSON_VALUE(content, '$.usage.completion') AS INT64) AS"
+ " usage_completion_tokens",
"CAST(JSON_VALUE(content, '$.usage.total') AS INT64) AS usage_total_tokens",
"CAST(JSON_VALUE(attributes, '$.usage_metadata.cached_content_token_count') AS"
+ " INT64) AS usage_cached_tokens",
"SAFE_DIVIDE(CAST(JSON_VALUE(attributes,"
+ " '$.usage_metadata.cached_content_token_count') AS INT64),"
+ "CAST(JSON_VALUE(content, '$.usage.prompt') AS INT64)) AS"
+ " context_cache_hit_rate",
"CAST(JSON_VALUE(latency_ms, '$.total_ms') AS INT64) AS total_ms",
"CAST(JSON_VALUE(latency_ms, '$.time_to_first_token_ms') AS INT64) AS ttft_ms",
"JSON_VALUE(attributes, '$.model_version') AS model_version",
Expand Down Expand Up @@ -135,6 +147,33 @@ final class BigQueryUtils {
ImmutableList.of(
"JSON_VALUE(content, '$.tool') AS tool_name",
"JSON_QUERY(content, '$.args') AS tool_args"))
.put(
"A2A_INTERACTION",
ImmutableList.of(
"content AS response_content",
"JSON_VALUE(attributes, '$.a2a_metadata.\""
+ A2A_TASK_ID_KEY
+ "\"') AS"
+ " a2a_task_id",
"JSON_VALUE(attributes, '$.a2a_metadata.\""
+ A2A_CONTEXT_ID_KEY
+ "\"') AS"
+ " a2a_context_id",
"JSON_QUERY(attributes, '$.a2a_metadata.\""
+ A2A_REQUEST_KEY
+ "\"') AS"
+ " a2a_request",
"JSON_QUERY(attributes, '$.a2a_metadata.\""
+ A2A_RESPONSE_KEY
+ "\"') AS"
+ " a2a_response"))
.put(
"AGENT_RESPONSE",
ImmutableList.of(
"JSON_VALUE(content, '$.response') AS response_text",
"JSON_VALUE(attributes, '$.source_event_id') AS source_event_id",
"JSON_VALUE(attributes, '$.source_event_author') AS source_event_author",
"JSON_VALUE(attributes, '$.source_event_branch') AS source_event_branch"))
.buildOrThrow();

private static final String FRAMEWORK_PREFIX = "google-adk-bq-logger-java";
Expand Down
Loading