Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 14 additions & 13 deletions core/src/main/java/com/google/adk/agents/BaseAgent.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@
import com.google.adk.agents.Callbacks.BeforeAgentCallback;
import com.google.adk.events.Event;
import com.google.adk.plugins.Plugin;
import com.google.adk.telemetry.Instrumentation;
import com.google.adk.telemetry.Instrumentation.AgentInvocation;
import com.google.adk.telemetry.Tracing;
import com.google.adk.utils.AgentEnums.AgentOrigin;
import com.google.common.collect.ImmutableList;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
Expand Down Expand Up @@ -323,13 +322,11 @@ public Flowable<Event> runAsync(InvocationContext parentContext) {
private Flowable<Event> run(
InvocationContext parentContext,
Function<InvocationContext, Flowable<Event>> runImplementation) {
Context otelContext = Context.current();
return Flowable.using(
() ->
Instrumentation.recordAgentInvocation(
createInvocationContext(parentContext), this, otelContext),
agentInvocation -> {
InvocationContext invocationContext = agentInvocation.getCtx();
Context parentSpanContext = Context.current();
return Flowable.defer(
() -> {
InvocationContext invocationContext = createInvocationContext(parentContext);

Flowable<Event> mainAndAfterEvents =
Flowable.defer(() -> runImplementation.apply(invocationContext))
.concatWith(
Expand All @@ -353,10 +350,14 @@ private Flowable<Event> run(
return Flowable.just(beforeEvent).concatWith(mainAndAfterEvents);
})
.switchIfEmpty(mainAndAfterEvents)
.doOnNext(agentInvocation::addEvent)
.doOnError(agentInvocation::setError);
},
AgentInvocation::close);
.compose(
Tracing.<Event>trace("invoke_agent " + name())
.setParent(parentSpanContext)
.configure(
span ->
Tracing.traceAgentInvocation(
span, name(), description(), invocationContext)));
});
}

/**
Expand Down
24 changes: 0 additions & 24 deletions core/src/main/java/com/google/adk/agents/LoopAgent.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,30 +30,6 @@
*
* <p>The loop continues until a sub-agent escalates, or until the maximum number of iterations is
* reached (if specified).
*
* <p><b>Composition with {@link LlmAgent}s:</b> a {@code LoopAgent} does not transfer control back
* to a parent {@link LlmAgent}. To react to loop results, place the {@code LoopAgent} and the
* follow-up {@link LlmAgent} as siblings inside a {@link SequentialAgent}. Loop sub-agents publish
* via {@code outputKey} and the follow-up reads via {@code {key}} placeholders in its instruction:
*
* <pre>{@code
* var refiner =
* LlmAgent.builder()
* .name("refiner")
* .model("gemini-flash-latest")
* .instruction("Refine: {draft?}")
* .outputKey("draft")
* .build();
* var publisher =
* LlmAgent.builder()
* .name("publisher")
* .model("gemini-flash-latest")
* .instruction("Publish: {draft}")
* .build();
* var loop =
* LoopAgent.builder().name("loop").subAgents(refiner).maxIterations(3).build();
* var root = SequentialAgent.builder().name("root").subAgents(loop, publisher).build();
* }</pre>
*/
public class LoopAgent extends BaseAgent {
private static final Logger logger = LoggerFactory.getLogger(LoopAgent.class);
Expand Down
32 changes: 0 additions & 32 deletions core/src/main/java/com/google/adk/agents/ParallelAgent.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,38 +34,6 @@
* <p>This approach is beneficial for scenarios requiring multiple perspectives or attempts on a
* single task, such as running different algorithms simultaneously or generating multiple responses
* for review by a subsequent evaluation agent.
*
* <p><b>Composition with {@link LlmAgent}s:</b> a {@code ParallelAgent} does not transfer control
* back to a parent {@link LlmAgent}. To follow a fan-out with an aggregation step, wrap both in a
* {@link SequentialAgent} (used as the root or transferred-to agent). Each parallel sub-agent
* publishes via {@code outputKey} and the aggregator reads via {@code {key}} placeholders in its
* instruction:
*
* <pre>{@code
* var contacts =
* LlmAgent.builder()
* .name("contacts")
* .model("gemini-flash-latest")
* .instruction("List contacts.")
* .outputKey("contacts")
* .build();
* var schedule =
* LlmAgent.builder()
* .name("schedule")
* .model("gemini-flash-latest")
* .instruction("List schedule.")
* .outputKey("schedule")
* .build();
* var writer =
* LlmAgent.builder()
* .name("writer")
* .model("gemini-flash-latest")
* .instruction("Write: contacts={contacts}, schedule={schedule}")
* .build();
* var gather =
* ParallelAgent.builder().name("gather").subAgents(contacts, schedule).build();
* var root = SequentialAgent.builder().name("root").subAgents(gather, writer).build();
* }</pre>
*/
public class ParallelAgent extends BaseAgent {

Expand Down
27 changes: 1 addition & 26 deletions core/src/main/java/com/google/adk/agents/SequentialAgent.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,32 +22,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* An agent that runs its sub-agents sequentially.
*
* <p><b>Composition with {@link LlmAgent}s:</b> a {@code SequentialAgent} does not transfer control
* back to a parent {@link LlmAgent}. Use it as the root or transferred-to agent and place any
* follow-up {@link LlmAgent} as the next sibling. Upstream publishes via {@code outputKey} and
* downstream reads via {@code {key}} placeholders in its instruction:
*
* <pre>{@code
* var draft =
* LlmAgent.builder()
* .name("draft")
* .model("gemini-flash-latest")
* .instruction("Draft a summary.")
* .outputKey("draft")
* .build();
* var reviewer =
* LlmAgent.builder()
* .name("reviewer")
* .model("gemini-flash-latest")
* .instruction("Polish the draft: {draft}")
* .build();
* var pipeline =
* SequentialAgent.builder().name("pipeline").subAgents(draft, reviewer).build();
* }</pre>
*/
/** An agent that runs its sub-agents sequentially. */
public class SequentialAgent extends BaseAgent {

private static final Logger logger = LoggerFactory.getLogger(SequentialAgent.class);
Expand Down
90 changes: 25 additions & 65 deletions core/src/main/java/com/google/adk/flows/llmflows/BaseLlmFlow.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,7 @@
import com.google.adk.models.LlmRequest;
import com.google.adk.models.LlmResponse;
import com.google.adk.telemetry.Tracing;
import com.google.adk.tools.BaseTool;
import com.google.adk.tools.BaseToolset;
import com.google.adk.tools.ToolContext;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.genai.types.FunctionResponse;
Expand All @@ -61,7 +58,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -100,8 +96,20 @@ private Flowable<Event> preprocess(
Context currentContext = Context.current();
LlmAgent agent = (LlmAgent) context.agent();

RequestProcessor toolsProcessor =
(ctx, req) -> {
LlmRequest.Builder builder = req.toBuilder();
return agent
.canonicalTools(new ReadonlyContext(ctx))
.concatMapCompletable(
tool -> tool.processLlmRequest(builder, ToolContext.builder(ctx).build()))
.andThen(
Single.fromCallable(
() -> RequestProcessingResult.create(builder.build(), ImmutableList.of())));
};

Iterable<RequestProcessor> allProcessors =
Iterables.concat(requestProcessors, ImmutableList.of(getRequestProcessorFromTools(agent)));
Iterables.concat(requestProcessors, ImmutableList.of(toolsProcessor));

return Flowable.fromIterable(allProcessors)
.concatMap(
Expand All @@ -113,58 +121,6 @@ private Flowable<Event> preprocess(
result -> result.events() != null ? result.events() : ImmutableList.of()));
}

/**
* Constructs a {@link RequestProcessor} that sequentially applies the {@code processLlmRequest}
* methods of all tools and toolsets associated with this agent to the incoming {@link
* LlmRequest}.
*
* @return A {@link RequestProcessor} that applies tool-specific modifications to LLM requests.
*/
@VisibleForTesting
RequestProcessor getRequestProcessorFromTools(LlmAgent agent) {
return (context, request) -> {
ReadonlyContext readonlyContext = new ReadonlyContext(context);
List<BiFunction<LlmRequest.Builder, ToolContext, Completable>> processors = new ArrayList<>();

for (Object toolOrToolset : agent.toolsUnion()) {
if (toolOrToolset instanceof BaseTool baseTool) {
processors.add(
(builder, ctx) -> {
Completable c = baseTool.processLlmRequest(builder, ctx);
return c == null ? Completable.complete() : c;
});
} else if (toolOrToolset instanceof BaseToolset baseToolset) {
// First apply the toolset's own request processor, then unwrap all tools from the toolset
// and apply each individual tool's request processor sequentially.
processors.add(
(builder, ctx) -> {
Completable c = baseToolset.processLlmRequest(builder, ctx);
Completable toolsetProcessor = c == null ? Completable.complete() : c;
return toolsetProcessor
.andThen(baseToolset.getTools(readonlyContext))
.concatMapCompletable(
b -> {
Completable tc = b.processLlmRequest(builder, ctx);
return tc == null ? Completable.complete() : tc;
});
});
} else {
throw new IllegalArgumentException(
"Object in tools list is not of a supported type: "
+ toolOrToolset.getClass().getName());
}
}

LlmRequest.Builder builder = request.toBuilder();
ToolContext toolContext = ToolContext.builder(context).build();
return Flowable.fromIterable(processors)
.concatMapCompletable(f -> f.apply(builder, toolContext))
.andThen(
Single.fromCallable(
() -> RequestProcessingResult.create(builder.build(), ImmutableList.of())));
};
}

/**
* Post-processes the LLM response after receiving it from the LLM. Executes all registered {@link
* ResponseProcessor} instances. Emits events for the model response and any subsequent function
Expand Down Expand Up @@ -479,10 +435,12 @@ private Flowable<Event> runOneStep(Context spanContext, InvocationContext contex
"Agent not found: " + agentToTransfer)));
}
return postProcessedEvents.concatWith(
nextAgent
.get()
.runAsync(context)
.compose(Tracing.withContext(spanContext)));
Flowable.defer(
() -> {
try (Scope s = spanContext.makeCurrent()) {
return nextAgent.get().runAsync(context);
}
}));
}
return postProcessedEvents;
});
Expand Down Expand Up @@ -664,10 +622,12 @@ public void onError(Throwable e) {
"Agent not found: " + event.actions().transferToAgent().get());
}
Flowable<Event> nextAgentEvents =
nextAgent
.get()
.runLive(invocationContext)
.compose(Tracing.withContext(spanContext));
Flowable.defer(
() -> {
try (Scope s = spanContext.makeCurrent()) {
return nextAgent.get().runLive(invocationContext);
}
});
events = Flowable.concat(events, nextAgentEvents);
}
return events;
Expand Down
36 changes: 14 additions & 22 deletions core/src/main/java/com/google/adk/flows/llmflows/Functions.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import com.google.adk.events.Event;
import com.google.adk.events.EventActions;
import com.google.adk.events.ToolConfirmation;
import com.google.adk.telemetry.Instrumentation;
import com.google.adk.telemetry.Instrumentation.ToolExecution;
import com.google.adk.telemetry.Tracing;
import com.google.adk.tools.BaseTool;
import com.google.adk.tools.FunctionTool;
Expand Down Expand Up @@ -432,25 +430,6 @@ private static Maybe<Event> postProcessFunctionResult(
ToolContext toolContext,
boolean isLive,
Context parentContext) {
return Maybe.using(
() ->
Instrumentation.recordToolExecution(
tool, invocationContext.agent(), functionArgs, parentContext),
toolExecution ->
processFunctionResult(
maybeFunctionResult, invocationContext, tool, functionArgs, toolContext, isLive)
.doOnSuccess(event -> toolExecution.context().setFunctionResponseEvent(event))
.doOnError(toolExecution::setError),
ToolExecution::close);
}

private static Maybe<Event> processFunctionResult(
Maybe<Map<String, Object>> maybeFunctionResult,
InvocationContext invocationContext,
BaseTool tool,
Map<String, Object> functionArgs,
ToolContext toolContext,
boolean isLive) {
return maybeFunctionResult
.map(Optional::of)
.defaultIfEmpty(Optional.empty())
Expand Down Expand Up @@ -488,7 +467,20 @@ private static Maybe<Event> processFunctionResult(
tool, finalFunctionResult, toolContext, invocationContext);
return Maybe.just(event);
});
});
})
.compose(
Tracing.<Event>trace("execute_tool [" + tool.name() + "]")
.setParent(parentContext)
.onSuccess(
(span, event) ->
Tracing.traceToolExecution(
span,
tool.name(),
tool.description(),
tool.getClass().getSimpleName(),
functionArgs,
event,
null)));
}

private static Optional<Event> mergeParallelFunctionResponseEvents(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,7 @@ private Completable logEvent(
parseFuture =
state
.getParser()
.parse(
content,
traceIds.traceId(),
traceIds.spanId() != null ? traceIds.spanId() : "no_span")
.parse(content)
.thenAccept(
parsedContent -> {
row.put(
Expand Down
Loading