From 1d0c5fea03a73fdb63b7de74297934884addf4f5 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 11 Jun 2026 14:05:57 -0700 Subject: [PATCH] Add backoff start for CAN --- .../internal/sync/SyncWorkflowContext.java | 4 ++ .../workflow/ContinueAsNewOptions.java | 45 ++++++++++++++++++ .../sync/SyncWorkflowContextTest.java | 46 +++++++++++++++++++ 3 files changed, 95 insertions(+) diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java index f83d2bcd4a..1517c9257e 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java @@ -1376,6 +1376,10 @@ public void continueAsNew(ContinueAsNewInput input) { attributes.setWorkflowTaskTimeout( ProtobufTimeUtils.toProtoDuration(options.getWorkflowTaskTimeout())); } + if (options.getBackoffStartInterval() != null) { + attributes.setBackoffStartInterval( + ProtobufTimeUtils.toProtoDuration(options.getBackoffStartInterval())); + } if (options.getTaskQueue() != null && !options.getTaskQueue().isEmpty()) { attributes.setTaskQueue(TaskQueue.newBuilder().setName(options.getTaskQueue())); } diff --git a/temporal-sdk/src/main/java/io/temporal/workflow/ContinueAsNewOptions.java b/temporal-sdk/src/main/java/io/temporal/workflow/ContinueAsNewOptions.java index ab9e2b7c58..1a1f0e0b83 100644 --- a/temporal-sdk/src/main/java/io/temporal/workflow/ContinueAsNewOptions.java +++ b/temporal-sdk/src/main/java/io/temporal/workflow/ContinueAsNewOptions.java @@ -37,6 +37,7 @@ public static final class Builder { private String taskQueue; private RetryOptions retryOptions; private Duration workflowTaskTimeout; + private Duration backoffStartInterval; private Map memo; private Map searchAttributes; private SearchAttributes typedSearchAttributes; @@ -57,6 +58,7 @@ private Builder(ContinueAsNewOptions options) { this.taskQueue = options.taskQueue; this.retryOptions = options.retryOptions; this.workflowTaskTimeout = options.workflowTaskTimeout; + this.backoffStartInterval = options.backoffStartInterval; this.memo = options.getMemo(); this.searchAttributes = options.getSearchAttributes(); this.typedSearchAttributes = options.getTypedSearchAttributes(); @@ -85,6 +87,12 @@ public Builder setWorkflowTaskTimeout(Duration workflowTaskTimeout) { return this; } + /** Sets the delay before the first workflow task of the continued run is scheduled. */ + public Builder setBackoffStartInterval(Duration backoffStartInterval) { + this.backoffStartInterval = backoffStartInterval; + return this; + } + public Builder setMemo(Map memo) { this.memo = memo; return this; @@ -152,6 +160,7 @@ public ContinueAsNewOptions build() { taskQueue, retryOptions, workflowTaskTimeout, + backoffStartInterval, memo, searchAttributes, typedSearchAttributes, @@ -165,6 +174,7 @@ public ContinueAsNewOptions build() { private final @Nullable String taskQueue; private final @Nullable RetryOptions retryOptions; private final @Nullable Duration workflowTaskTimeout; + private final @Nullable Duration backoffStartInterval; private final @Nullable Map memo; private final @Nullable Map searchAttributes; private final @Nullable SearchAttributes typedSearchAttributes; @@ -186,10 +196,37 @@ public ContinueAsNewOptions( @Nullable List contextPropagators, @SuppressWarnings("deprecation") @Nullable VersioningIntent versioningIntent, @Nullable InitialVersioningBehavior initialVersioningBehavior) { + this( + workflowRunTimeout, + taskQueue, + retryOptions, + workflowTaskTimeout, + null, + memo, + searchAttributes, + typedSearchAttributes, + contextPropagators, + versioningIntent, + initialVersioningBehavior); + } + + public ContinueAsNewOptions( + @Nullable Duration workflowRunTimeout, + @Nullable String taskQueue, + @Nullable RetryOptions retryOptions, + @Nullable Duration workflowTaskTimeout, + @Nullable Duration backoffStartInterval, + @Nullable Map memo, + @Nullable Map searchAttributes, + @Nullable SearchAttributes typedSearchAttributes, + @Nullable List contextPropagators, + @SuppressWarnings("deprecation") @Nullable VersioningIntent versioningIntent, + @Nullable InitialVersioningBehavior initialVersioningBehavior) { this.workflowRunTimeout = workflowRunTimeout; this.taskQueue = taskQueue; this.retryOptions = retryOptions; this.workflowTaskTimeout = workflowTaskTimeout; + this.backoffStartInterval = backoffStartInterval; this.memo = memo; this.searchAttributes = searchAttributes; this.typedSearchAttributes = typedSearchAttributes; @@ -215,6 +252,14 @@ public RetryOptions getRetryOptions() { return workflowTaskTimeout; } + /** + * @return the delay before the first workflow task of the continued run is scheduled, or null if + * unset. + */ + public @Nullable Duration getBackoffStartInterval() { + return backoffStartInterval; + } + public @Nullable Map getMemo() { return memo; } diff --git a/temporal-sdk/src/test/java/io/temporal/internal/sync/SyncWorkflowContextTest.java b/temporal-sdk/src/test/java/io/temporal/internal/sync/SyncWorkflowContextTest.java index 9325f84207..bf719f0c05 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/sync/SyncWorkflowContextTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/sync/SyncWorkflowContextTest.java @@ -1,25 +1,50 @@ package io.temporal.internal.sync; +import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import io.temporal.api.command.v1.ContinueAsNewWorkflowExecutionCommandAttributes; import io.temporal.api.common.v1.SearchAttributes; +import io.temporal.api.common.v1.WorkflowType; +import io.temporal.common.interceptors.Header; +import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor.ContinueAsNewInput; import io.temporal.internal.common.SearchAttributesUtil; import io.temporal.internal.replay.ReplayWorkflowContext; +import io.temporal.workflow.ContinueAsNewOptions; +import java.time.Duration; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; public class SyncWorkflowContextTest { SyncWorkflowContext context; ReplayWorkflowContext mockReplayWorkflowContext = mock(ReplayWorkflowContext.class); + ExecutorService threadPool; + DeterministicRunner runner; @Before public void setUp() { + this.threadPool = Executors.newCachedThreadPool(); this.context = DummySyncWorkflowContext.newDummySyncWorkflowContext(); this.context.setReplayContext(mockReplayWorkflowContext); + when(mockReplayWorkflowContext.getWorkflowType()) + .thenReturn(WorkflowType.newBuilder().setName("dummy-workflow").build()); + } + + @After + public void tearDown() { + if (runner != null) { + runner.close(); + } + threadPool.shutdown(); } @Test @@ -32,6 +57,27 @@ public void testUpsertSearchAttributes() { verify(mockReplayWorkflowContext, times(1)).upsertSearchAttributes(serializedAttr); } + @Test + public void testContinueAsNewBackoffStartInterval() { + Duration backoffStartInterval = Duration.ofSeconds(7); + ContinueAsNewOptions options = + ContinueAsNewOptions.newBuilder().setBackoffStartInterval(backoffStartInterval).build(); + runner = + DeterministicRunner.newRunner( + threadPool::submit, + context, + () -> + context.continueAsNew( + new ContinueAsNewInput(null, options, new Object[0], Header.empty()))); + + runner.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS); + + ArgumentCaptor attributes = + ArgumentCaptor.forClass(ContinueAsNewWorkflowExecutionCommandAttributes.class); + verify(mockReplayWorkflowContext).continueAsNewOnCompletion(attributes.capture()); + assertEquals(7, attributes.getValue().getBackoffStartInterval().getSeconds()); + } + @Test(expected = IllegalArgumentException.class) public void testUpsertSearchAttributesException() { Map attr = new HashMap<>();