Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ jobs:
--dynamic-config-value activity.enableStandalone=true \
--dynamic-config-value nexusoperation.enableStandalone=true \
--dynamic-config-value history.enableChasm=true \
--dynamic-config-value history.enableCHASMSignalBacklinks=true \
--dynamic-config-value history.enableTransitionHistory=true &
sleep 10s

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ public WorkflowStartOutput start(WorkflowStartInput input) {
e);
}
}
// If this start is being issued from inside a Nexus operation handler, stash only the
// forward operation->workflow link from the start response so NexusStartWorkflowHelper can
// attach it to the WorkflowExecutionStarted event. Unlike signal/signalWithStart, start
// deliberately does NOT add a backlink here: the operation->workflow relationship is already
// captured by the forward link, so re-adding response.getLink() as a backlink would duplicate
// it on the caller's history event. Do not "restore symmetry" by calling addBacklink here.
if (CurrentNexusOperationContext.isNexusContext()) {
CurrentNexusOperationContext.get().setStartWorkflowResponseLink(response.getLink());
}
Expand All @@ -120,6 +126,13 @@ public WorkflowSignalOutput signal(WorkflowSignalInput input) {
.setRequestId(UUID.randomUUID().toString())
.setHeader(HeaderUtils.toHeaderGrpc(input.getHeader(), null));

// If this signal is being issued from inside a Nexus operation handler, forward the inbound
// Nexus task links so the SignalWorkflowExecution history event links back to the caller.
boolean inNexusContext = CurrentNexusOperationContext.isNexusContext();
if (inNexusContext) {
request.addAllLinks(CurrentNexusOperationContext.get().getNexusOperationLinks());
}

DataConverter dataConverterWitSignalContext =
clientOptions
.getDataConverter()
Expand All @@ -129,7 +142,12 @@ public WorkflowSignalOutput signal(WorkflowSignalInput input) {

Optional<Payloads> inputArgs = dataConverterWitSignalContext.toPayloads(input.getArguments());
inputArgs.ifPresent(request::setInput);
genericClient.signal(request.build());
SignalWorkflowExecutionResponse response = genericClient.signal(request.build());
// Server >=1.31 with EnableCHASMSignalBacklinks returns a backlink pointing at the signal
// event; older servers leave it unset. Propagate when present.
if (inNexusContext && response.hasLink()) {
CurrentNexusOperationContext.get().addBacklink(response.getLink());
}
return new WorkflowSignalOutput();
}

Expand All @@ -148,17 +166,28 @@ public WorkflowSignalWithStartOutput signalWithStart(WorkflowSignalWithStartInpu

Optional<Payloads> signalInput =
dataConverterWithWorkflowContext.toPayloads(input.getSignalArguments());
SignalWithStartWorkflowExecutionRequest request =
requestsHelper
.newSignalWithStartWorkflowExecutionRequest(
startRequest, input.getSignalName(), signalInput.orElse(null))
.build();
SignalWithStartWorkflowExecutionRequest.Builder requestBuilder =
requestsHelper.newSignalWithStartWorkflowExecutionRequest(
startRequest, input.getSignalName(), signalInput.orElse(null));
// If this signalWithStart is being issued from inside a Nexus operation handler, forward
// the inbound Nexus task links so both the WorkflowExecutionStarted and
// WorkflowExecutionSignaled events on the callee link back to the caller.
boolean inNexusContext = CurrentNexusOperationContext.isNexusContext();
if (inNexusContext) {
requestBuilder.addAllLinks(CurrentNexusOperationContext.get().getNexusOperationLinks());
}
SignalWithStartWorkflowExecutionRequest request = requestBuilder.build();
SignalWithStartWorkflowExecutionResponse response = genericClient.signalWithStart(request);
WorkflowExecution execution =
WorkflowExecution.newBuilder()
.setRunId(response.getRunId())
.setWorkflowId(request.getWorkflowId())
.build();
// Server >=1.31 with EnableCHASMSignalBacklinks returns a backlink pointing at the signal
// event; older servers leave it unset. Propagate when present.
if (inNexusContext && response.hasSignalLink()) {
CurrentNexusOperationContext.get().addBacklink(response.getSignalLink());
}
// TODO currently SignalWithStartWorkflowExecutionResponse doesn't have eagerWorkflowTask.
// We should wire it when it's implemented server-side.
return new WorkflowSignalWithStartOutput(new WorkflowStartOutput(execution));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public interface GenericWorkflowClient {

StartWorkflowExecutionResponse start(StartWorkflowExecutionRequest request);

void signal(SignalWorkflowExecutionRequest request);
SignalWorkflowExecutionResponse signal(SignalWorkflowExecutionRequest request);

SignalWithStartWorkflowExecutionResponse signalWithStart(
SignalWithStartWorkflowExecutionRequest request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,13 @@ private static Map<String, String> tagsForStartWorkflow(StartWorkflowExecutionRe
}

@Override
public void signal(SignalWorkflowExecutionRequest request) {
public SignalWorkflowExecutionResponse signal(SignalWorkflowExecutionRequest request) {
Map<String, String> tags =
new ImmutableMap.Builder<String, String>(1)
.put(MetricsTag.SIGNAL_NAME, request.getSignalName())
.build();
Scope scope = metricsScope.tagged(tags);
grpcRetryer.retry(
return grpcRetryer.retryWithResult(
() ->
service
.blockingStub()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
import io.temporal.common.interceptors.NexusOperationOutboundCallsInterceptor;
import io.temporal.nexus.NexusOperationContext;
import io.temporal.nexus.NexusOperationInfo;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nonnull;

public class InternalNexusOperationContext {
private final String namespace;
Expand All @@ -14,7 +18,25 @@ public class InternalNexusOperationContext {
private final Scope metricScope;
private final WorkflowClient client;
NexusOperationOutboundCallsInterceptor outboundCalls;
// Link returned by the StartWorkflowExecution response when the operation is backed by a workflow
// (workflow-run operations). Read by NexusStartWorkflowHelper to attach the forward
// operation->workflow link, fabricating a WORKFLOW_EXECUTION_STARTED link when the server omits
// one. Distinct from the signal backlinks below.
Link startWorkflowResponseLink;
// Links extracted from the inbound Nexus task. Stored once at the task-handler boundary so the
// workflow client can attach them to the outgoing requests it issues (e.g. signal,
// signalWithStart) via the request's links field.
private List<Link> nexusOperationLinks = Collections.emptyList();
// Backlinks returned by outbound RPCs the operation handler issues (such as
// SignalWorkflowExecutionResponse.link or SignalWithStartWorkflowExecutionResponse.signal_link).
// One entry per outbound RPC that returned a link. Drained
// by the task handler when building StartOperationResponse so each RPC the handler issued gets a
// corresponding link on the caller workflow's history event.
//
// A handler may issue RPCs from multiple threads, so every read and write of this list is guarded
// by backlinksLock and getBacklinks() returns a defensive copy taken under the lock.
private final Object backlinksLock = new Object();
private final List<Link> responseBacklinks = new ArrayList<>();

public InternalNexusOperationContext(
String namespace,
Expand Down Expand Up @@ -60,6 +82,19 @@ public NexusOperationContext getUserFacingContext() {
return new NexusOperationContextImpl();
}

/**
* Set the {@code common.v1.Link}s extracted from the inbound Nexus task so they can be attached
* to RPCs issued by the operation handler.
*/
Comment thread
Evanthx marked this conversation as resolved.
public void setNexusOperationLinks(List<Link> links) {
this.nexusOperationLinks = links == null ? Collections.emptyList() : links;
}

/** Links from the inbound Nexus task; empty if none. */
public @Nonnull List<Link> getNexusOperationLinks() {
return Collections.unmodifiableList(nexusOperationLinks);
}

public void setStartWorkflowResponseLink(Link link) {
this.startWorkflowResponseLink = link;
}
Expand All @@ -68,6 +103,32 @@ public Link getStartWorkflowResponseLink() {
return startWorkflowResponseLink;
}

/**
* Append a backlink returned by an outbound RPC the operation handler issued (e.g. signal,
* signalWithStart, etc). The task handler drains the list when building the operation's
* StartOperationResponse.
*/
public void addBacklink(Link link) {
if (link != null) {
synchronized (backlinksLock) {
responseBacklinks.add(link);
}
}
}

/**
* Backlinks from every outbound RPC the handler issued. Returned as an unmodifiable view; callers
* must not attempt to mutate. Entries are accumulated while the operation handler runs (the call
* that flows through {@link
* io.temporal.common.interceptors.NexusOperationInboundCallsInterceptor#startOperation}) and are
* drained afterward by the task handler when building the StartOperationResponse.
*/
public @Nonnull List<Link> getBacklinks() {
synchronized (backlinksLock) {
return Collections.unmodifiableList(new ArrayList<>(responseBacklinks));
}
}

private class NexusOperationContextImpl implements NexusOperationContext {
@Override
public NexusOperationInfo getInfo() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.temporal.failure.CanceledFailure;
import io.temporal.failure.TemporalFailure;
import io.temporal.internal.common.InternalUtils;
import io.temporal.internal.common.LinkConverter;
import io.temporal.internal.common.NexusUtil;
import io.temporal.internal.worker.NexusTask;
import io.temporal.internal.worker.NexusTaskHandler;
Expand Down Expand Up @@ -284,6 +285,10 @@ private StartOperationResponse handleStartOperation(
.setCallbackUrl(task.getCallback())
.setRequestId(task.getRequestId());
task.getCallbackHeaderMap().forEach(operationStartDetails::putCallbackHeader);
// Stash the inbound links in common.v1.Link form on the operation context so the RPCs the
// handler issues (e.g. signal, signalWithStart, etc) can attach them to their
// request's links field.
List<io.temporal.api.common.v1.Link> inboundCommonLinks = new ArrayList<>();
task.getLinksList()
.forEach(
link -> {
Expand All @@ -296,7 +301,23 @@ private StartOperationResponse handleStartOperation(
"Invalid link URL: " + link.getUrl(),
e);
}
// LinkConverter only returns a WorkflowEvent-shaped common.v1.Link; nexus links of
// other shapes (e.g. non-temporal URLs) come back null and are intentionally not
// forwarded onto the RPCs the handler issues, which require the WorkflowEvent
// variant. Log so a debugging session can see what was dropped.
io.temporal.api.common.v1.Link commonLink =
LinkConverter.nexusLinkToWorkflowEvent(link);
if (commonLink != null) {
inboundCommonLinks.add(commonLink);
} else {
log.warn(
Comment thread
Evanthx marked this conversation as resolved.
"Dropping inbound Nexus link from outbound link propagation: type='{}',"
+ " url='{}' (not a parseable temporal WorkflowEvent link)",
link.getType(),
link.getUrl());
}
});
CurrentNexusOperationContext.get().setNexusOperationLinks(inboundCommonLinks);

HandlerInputContent.Builder input =
HandlerInputContent.newBuilder().setDataStream(task.getPayload().toByteString().newInput());
Expand All @@ -307,10 +328,28 @@ private StartOperationResponse handleStartOperation(
try {
OperationStartResult<HandlerResultContent> result =
startOperation(context, operationStartDetails.build(), input.build());
// If any RPCs the handler issued (e.g. signal, signalWithStart, etc) returned
// backlinks, propagate them to the caller so the caller workflow's history event links to
// each event on the callee. Same set of backlinks applies to both sync and async response
// variants.
List<io.temporal.api.nexus.v1.Link> backlinks = new ArrayList<>();
for (io.temporal.api.common.v1.Link backlink :
CurrentNexusOperationContext.get().getBacklinks()) {
if (!backlink.hasWorkflowEvent()) {
continue;
}
io.temporal.api.nexus.v1.Link converted =
LinkConverter.workflowEventToNexusLink(backlink.getWorkflowEvent());
if (converted != null) {
backlinks.add(converted);
}
}

if (result.isSync()) {
startResponseBuilder.setSyncSuccess(
StartOperationResponse.Sync.newBuilder()
.setPayload(Payload.parseFrom(result.getSyncResult().getDataBytes()))
.addAllLinks(backlinks)
.build());
} else {
startResponseBuilder.setAsyncSuccess(
Expand All @@ -326,6 +365,7 @@ private StartOperationResponse handleStartOperation(
.setUrl(link.getUri().toString())
.build())
.collect(Collectors.toList()))
.addAllLinks(backlinks)
.build());
}
} catch (OperationException e) {
Comment thread
Evanthx marked this conversation as resolved.
Expand Down
Loading
Loading