Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1376,6 +1376,10 @@ public void continueAsNew(ContinueAsNewInput input) {
attributes.setWorkflowTaskTimeout(
ProtobufTimeUtils.toProtoDuration(options.getWorkflowTaskTimeout()));
}
if (options.getBackoffStartInterval() != null) {
attributes.setBackoffStartInterval(
ProtobufTimeUtils.toProtoDuration(options.getBackoffStartInterval()));
}
if (options.getTaskQueue() != null && !options.getTaskQueue().isEmpty()) {
attributes.setTaskQueue(TaskQueue.newBuilder().setName(options.getTaskQueue()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public static final class Builder {
private String taskQueue;
private RetryOptions retryOptions;
private Duration workflowTaskTimeout;
private Duration backoffStartInterval;
private Map<String, Object> memo;
private Map<String, Object> searchAttributes;
private SearchAttributes typedSearchAttributes;
Expand All @@ -57,6 +58,7 @@ private Builder(ContinueAsNewOptions options) {
this.taskQueue = options.taskQueue;
this.retryOptions = options.retryOptions;
this.workflowTaskTimeout = options.workflowTaskTimeout;
this.backoffStartInterval = options.backoffStartInterval;
this.memo = options.getMemo();
this.searchAttributes = options.getSearchAttributes();
this.typedSearchAttributes = options.getTypedSearchAttributes();
Expand Down Expand Up @@ -85,6 +87,12 @@ public Builder setWorkflowTaskTimeout(Duration workflowTaskTimeout) {
return this;
}

/** Sets the delay before the first workflow task of the continued run is scheduled. */
public Builder setBackoffStartInterval(Duration backoffStartInterval) {
this.backoffStartInterval = backoffStartInterval;
return this;
}

public Builder setMemo(Map<String, Object> memo) {
this.memo = memo;
return this;
Expand Down Expand Up @@ -152,6 +160,7 @@ public ContinueAsNewOptions build() {
taskQueue,
retryOptions,
workflowTaskTimeout,
backoffStartInterval,
memo,
searchAttributes,
typedSearchAttributes,
Expand All @@ -165,6 +174,7 @@ public ContinueAsNewOptions build() {
private final @Nullable String taskQueue;
private final @Nullable RetryOptions retryOptions;
private final @Nullable Duration workflowTaskTimeout;
private final @Nullable Duration backoffStartInterval;
private final @Nullable Map<String, Object> memo;
private final @Nullable Map<String, Object> searchAttributes;
private final @Nullable SearchAttributes typedSearchAttributes;
Expand All @@ -186,10 +196,37 @@ public ContinueAsNewOptions(
@Nullable List<ContextPropagator> contextPropagators,
@SuppressWarnings("deprecation") @Nullable VersioningIntent versioningIntent,
@Nullable InitialVersioningBehavior initialVersioningBehavior) {
this(
workflowRunTimeout,
taskQueue,
retryOptions,
workflowTaskTimeout,
null,
memo,
searchAttributes,
typedSearchAttributes,
contextPropagators,
versioningIntent,
initialVersioningBehavior);
}

public ContinueAsNewOptions(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I consider it a mistake that the original constructor is public; it should be deprecated with a comment directing users to use the builder, and the new constructor should be made private.

@Nullable Duration workflowRunTimeout,
@Nullable String taskQueue,
@Nullable RetryOptions retryOptions,
@Nullable Duration workflowTaskTimeout,
@Nullable Duration backoffStartInterval,
@Nullable Map<String, Object> memo,
@Nullable Map<String, Object> searchAttributes,
@Nullable SearchAttributes typedSearchAttributes,
@Nullable List<ContextPropagator> contextPropagators,
@SuppressWarnings("deprecation") @Nullable VersioningIntent versioningIntent,
@Nullable InitialVersioningBehavior initialVersioningBehavior) {
this.workflowRunTimeout = workflowRunTimeout;
this.taskQueue = taskQueue;
this.retryOptions = retryOptions;
this.workflowTaskTimeout = workflowTaskTimeout;
this.backoffStartInterval = backoffStartInterval;
this.memo = memo;
this.searchAttributes = searchAttributes;
this.typedSearchAttributes = typedSearchAttributes;
Expand All @@ -215,6 +252,14 @@ public RetryOptions getRetryOptions() {
return workflowTaskTimeout;
}

/**
* @return the delay before the first workflow task of the continued run is scheduled, or null if
* unset.
*/
public @Nullable Duration getBackoffStartInterval() {
return backoffStartInterval;
}

public @Nullable Map<String, Object> getMemo() {
return memo;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,50 @@
package io.temporal.internal.sync;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.temporal.api.command.v1.ContinueAsNewWorkflowExecutionCommandAttributes;
import io.temporal.api.common.v1.SearchAttributes;
import io.temporal.api.common.v1.WorkflowType;
import io.temporal.common.interceptors.Header;
import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor.ContinueAsNewInput;
import io.temporal.internal.common.SearchAttributesUtil;
import io.temporal.internal.replay.ReplayWorkflowContext;
import io.temporal.workflow.ContinueAsNewOptions;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;

public class SyncWorkflowContextTest {
SyncWorkflowContext context;
ReplayWorkflowContext mockReplayWorkflowContext = mock(ReplayWorkflowContext.class);
ExecutorService threadPool;
DeterministicRunner runner;

@Before
public void setUp() {
this.threadPool = Executors.newCachedThreadPool();
this.context = DummySyncWorkflowContext.newDummySyncWorkflowContext();
this.context.setReplayContext(mockReplayWorkflowContext);
when(mockReplayWorkflowContext.getWorkflowType())
.thenReturn(WorkflowType.newBuilder().setName("dummy-workflow").build());
}
Comment on lines 27 to +40

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

threadPool, runner and the call to when should be moved inside the test that uses them.


@After
public void tearDown() {
if (runner != null) {
runner.close();
}
threadPool.shutdown();
}

@Test
Expand All @@ -32,6 +57,27 @@ public void testUpsertSearchAttributes() {
verify(mockReplayWorkflowContext, times(1)).upsertSearchAttributes(serializedAttr);
}

@Test
public void testContinueAsNewBackoffStartInterval() {
Comment on lines +60 to +61

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this test should be moved down as to not split up the two upsert SA tests.

Duration backoffStartInterval = Duration.ofSeconds(7);
ContinueAsNewOptions options =
ContinueAsNewOptions.newBuilder().setBackoffStartInterval(backoffStartInterval).build();
runner =
DeterministicRunner.newRunner(
threadPool::submit,
context,
() ->
context.continueAsNew(
new ContinueAsNewInput(null, options, new Object[0], Header.empty())));

runner.runUntilAllBlocked(DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT_MS);

ArgumentCaptor<ContinueAsNewWorkflowExecutionCommandAttributes> attributes =
ArgumentCaptor.forClass(ContinueAsNewWorkflowExecutionCommandAttributes.class);
verify(mockReplayWorkflowContext).continueAsNewOnCompletion(attributes.capture());
assertEquals(7, attributes.getValue().getBackoffStartInterval().getSeconds());
}

@Test(expected = IllegalArgumentException.class)
public void testUpsertSearchAttributesException() {
Map<String, Object> attr = new HashMap<>();
Expand Down
Loading