diff --git a/temporal-sdk/src/main/java/io/temporal/client/ActivityWorkerShutdownException.java b/temporal-sdk/src/main/java/io/temporal/client/ActivityWorkerShutdownException.java index 3afcfc7e83..e0a1f5ba2e 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/ActivityWorkerShutdownException.java +++ b/temporal-sdk/src/main/java/io/temporal/client/ActivityWorkerShutdownException.java @@ -9,6 +9,10 @@ * called. It is OK to ignore the exception to let the activity complete. It assumes that {@link * WorkerFactory#awaitTermination(long, TimeUnit)} is called with a timeout larger than the activity * execution time. + * + *
If {@link io.temporal.worker.WorkerOptions.Builder#setActivityHeartbeatDuringShutdown(
+ * boolean)} is enabled, this exception is not thrown during shutdown and heartbeats keep working as
+ * if the worker was not shutting down.
*/
public final class ActivityWorkerShutdownException extends ActivityCompletionException {
diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/SingleWorkerOptions.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/SingleWorkerOptions.java
index 5593707720..47e17e213b 100644
--- a/temporal-sdk/src/main/java/io/temporal/internal/worker/SingleWorkerOptions.java
+++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/SingleWorkerOptions.java
@@ -41,6 +41,7 @@ public static final class Builder {
private boolean usingVirtualThreads;
private WorkerDeploymentOptions deploymentOptions;
private String workerInstanceKey;
+ private boolean activityHeartbeatDuringShutdown;
private Builder() {}
@@ -66,6 +67,7 @@ private Builder(SingleWorkerOptions options) {
this.usingVirtualThreads = options.isUsingVirtualThreads();
this.deploymentOptions = options.getDeploymentOptions();
this.workerInstanceKey = options.getWorkerInstanceKey();
+ this.activityHeartbeatDuringShutdown = options.isActivityHeartbeatDuringShutdown();
}
public Builder setIdentity(String identity) {
@@ -162,6 +164,11 @@ public Builder setWorkerInstanceKey(String workerInstanceKey) {
return this;
}
+ public Builder setActivityHeartbeatDuringShutdown(boolean activityHeartbeatDuringShutdown) {
+ this.activityHeartbeatDuringShutdown = activityHeartbeatDuringShutdown;
+ return this;
+ }
+
public SingleWorkerOptions build() {
PollerOptions pollerOptions = this.pollerOptions;
if (pollerOptions == null) {
@@ -201,7 +208,8 @@ public SingleWorkerOptions build() {
drainStickyTaskQueueTimeout,
usingVirtualThreads,
this.deploymentOptions,
- this.workerInstanceKey);
+ this.workerInstanceKey,
+ this.activityHeartbeatDuringShutdown);
}
}
@@ -223,6 +231,7 @@ public SingleWorkerOptions build() {
private final boolean usingVirtualThreads;
private final WorkerDeploymentOptions deploymentOptions;
private final String workerInstanceKey;
+ private final boolean activityHeartbeatDuringShutdown;
private SingleWorkerOptions(
String identity,
@@ -242,7 +251,8 @@ private SingleWorkerOptions(
Duration drainStickyTaskQueueTimeout,
boolean usingVirtualThreads,
WorkerDeploymentOptions deploymentOptions,
- String workerInstanceKey) {
+ String workerInstanceKey,
+ boolean activityHeartbeatDuringShutdown) {
this.identity = identity;
this.binaryChecksum = binaryChecksum;
this.buildId = buildId;
@@ -261,6 +271,7 @@ private SingleWorkerOptions(
this.usingVirtualThreads = usingVirtualThreads;
this.deploymentOptions = deploymentOptions;
this.workerInstanceKey = workerInstanceKey;
+ this.activityHeartbeatDuringShutdown = activityHeartbeatDuringShutdown;
}
public String getIdentity() {
@@ -291,6 +302,10 @@ public Duration getDrainStickyTaskQueueTimeout() {
return drainStickyTaskQueueTimeout;
}
+ public boolean isActivityHeartbeatDuringShutdown() {
+ return activityHeartbeatDuringShutdown;
+ }
+
public DataConverter getDataConverter() {
return dataConverter;
}
diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncActivityWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncActivityWorker.java
index ecb24a736a..0e1c53ecbf 100644
--- a/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncActivityWorker.java
+++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncActivityWorker.java
@@ -26,6 +26,7 @@ public class SyncActivityWorker implements SuspendableWorker {
private final ScheduledExecutorService heartbeatExecutor;
private final ActivityTaskHandlerImpl taskHandler;
private final ActivityWorker worker;
+ private final boolean activityHeartbeatDuringShutdown;
public SyncActivityWorker(
WorkflowClient client,
@@ -38,6 +39,7 @@ public SyncActivityWorker(
this.identity = options.getIdentity();
this.namespace = namespace;
this.taskQueue = taskQueue;
+ this.activityHeartbeatDuringShutdown = options.isActivityHeartbeatDuringShutdown();
this.heartbeatExecutor =
Executors.newScheduledThreadPool(
@@ -89,16 +91,31 @@ public boolean start() {
@Override
public CompletableFuture Note that with this option enabled activities are no longer notified of the worker
+ * shutdown by an {@link io.temporal.client.ActivityWorkerShutdownException} thrown from {@code
+ * heartbeat}, so they are expected to complete within the termination grace period on their
+ * own.
+ *
+ * Defaults to false, meaning that after shutdown is requested, {@link
+ * io.temporal.activity.ActivityExecutionContext#heartbeat(Object)} stops sending heartbeats and
+ * throws {@link io.temporal.client.ActivityWorkerShutdownException}.
+ */
+ @Experimental
+ public Builder setActivityHeartbeatDuringShutdown(boolean activityHeartbeatDuringShutdown) {
+ this.activityHeartbeatDuringShutdown = activityHeartbeatDuringShutdown;
+ return this;
+ }
+
public WorkerOptions build() {
return new WorkerOptions(
maxWorkerActivitiesPerSecond,
@@ -553,7 +578,8 @@ public WorkerOptions build() {
deploymentOptions,
workflowTaskPollersBehavior,
activityTaskPollersBehavior,
- nexusTaskPollersBehavior);
+ nexusTaskPollersBehavior,
+ activityHeartbeatDuringShutdown);
}
public WorkerOptions validateAndBuildWithDefaults() {
@@ -685,7 +711,8 @@ public WorkerOptions validateAndBuildWithDefaults() {
deploymentOptions,
workflowTaskPollersBehavior,
activityTaskPollersBehavior,
- nexusTaskPollersBehavior);
+ nexusTaskPollersBehavior,
+ activityHeartbeatDuringShutdown);
}
}
@@ -717,6 +744,7 @@ public WorkerOptions validateAndBuildWithDefaults() {
private final PollerBehavior workflowTaskPollersBehavior;
private final PollerBehavior activityTaskPollersBehavior;
private final PollerBehavior nexusTaskPollersBehavior;
+ private final boolean activityHeartbeatDuringShutdown;
private WorkerOptions(
double maxWorkerActivitiesPerSecond,
@@ -746,7 +774,8 @@ private WorkerOptions(
WorkerDeploymentOptions deploymentOptions,
PollerBehavior workflowTaskPollersBehavior,
PollerBehavior activityTaskPollersBehavior,
- PollerBehavior nexusTaskPollersBehavior) {
+ PollerBehavior nexusTaskPollersBehavior,
+ boolean activityHeartbeatDuringShutdown) {
this.maxWorkerActivitiesPerSecond = maxWorkerActivitiesPerSecond;
this.maxConcurrentActivityExecutionSize = maxConcurrentActivityExecutionSize;
this.maxConcurrentWorkflowTaskExecutionSize = maxConcurrentWorkflowTaskExecutionSize;
@@ -775,6 +804,7 @@ private WorkerOptions(
this.workflowTaskPollersBehavior = workflowTaskPollersBehavior;
this.activityTaskPollersBehavior = activityTaskPollersBehavior;
this.nexusTaskPollersBehavior = nexusTaskPollersBehavior;
+ this.activityHeartbeatDuringShutdown = activityHeartbeatDuringShutdown;
}
public double getMaxWorkerActivitiesPerSecond() {
@@ -912,6 +942,14 @@ public PollerBehavior getNexusTaskPollersBehavior() {
return nexusTaskPollersBehavior;
}
+ /**
+ * @return true if activities keep heartbeating while the worker is shutting down
+ */
+ @Experimental
+ public boolean isActivityHeartbeatDuringShutdown() {
+ return activityHeartbeatDuringShutdown;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
@@ -944,7 +982,8 @@ && compare(maxTaskQueueActivitiesPerSecond, that.maxTaskQueueActivitiesPerSecond
&& Objects.equals(deploymentOptions, that.deploymentOptions)
&& Objects.equals(workflowTaskPollersBehavior, that.workflowTaskPollersBehavior)
&& Objects.equals(activityTaskPollersBehavior, that.activityTaskPollersBehavior)
- && Objects.equals(nexusTaskPollersBehavior, that.nexusTaskPollersBehavior);
+ && Objects.equals(nexusTaskPollersBehavior, that.nexusTaskPollersBehavior)
+ && activityHeartbeatDuringShutdown == that.activityHeartbeatDuringShutdown;
}
@Override
@@ -977,7 +1016,8 @@ public int hashCode() {
deploymentOptions,
workflowTaskPollersBehavior,
activityTaskPollersBehavior,
- nexusTaskPollersBehavior);
+ nexusTaskPollersBehavior,
+ activityHeartbeatDuringShutdown);
}
@Override
@@ -1040,6 +1080,8 @@ public String toString() {
+ activityTaskPollersBehavior
+ ", nexusTaskPollersBehavior="
+ nexusTaskPollersBehavior
+ + ", activityHeartbeatDuringShutdown="
+ + activityHeartbeatDuringShutdown
+ '}';
}
}
diff --git a/temporal-sdk/src/test/java/io/temporal/worker/WorkerOptionsTest.java b/temporal-sdk/src/test/java/io/temporal/worker/WorkerOptionsTest.java
index 897600443d..ed013a0ad1 100644
--- a/temporal-sdk/src/test/java/io/temporal/worker/WorkerOptionsTest.java
+++ b/temporal-sdk/src/test/java/io/temporal/worker/WorkerOptionsTest.java
@@ -56,6 +56,7 @@ public void verifyNewBuilderFromExistingWorkerOptions() {
.setBuildId("build-id")
.setStickyTaskQueueDrainTimeout(Duration.ofSeconds(15))
.setIdentity("worker-identity")
+ .setActivityHeartbeatDuringShutdown(true)
.build();
WorkerOptions w2 = WorkerOptions.newBuilder(w1).build();
@@ -89,6 +90,7 @@ public void verifyNewBuilderFromExistingWorkerOptions() {
assertEquals(w1.getBuildId(), w2.getBuildId());
assertEquals(w1.getStickyTaskQueueDrainTimeout(), w2.getStickyTaskQueueDrainTimeout());
assertEquals(w1.getIdentity(), w2.getIdentity());
+ assertEquals(w1.isActivityHeartbeatDuringShutdown(), w2.isActivityHeartbeatDuringShutdown());
}
@Test
diff --git a/temporal-sdk/src/test/java/io/temporal/worker/shutdown/HeartbeatDuringWorkerShutdownTest.java b/temporal-sdk/src/test/java/io/temporal/worker/shutdown/HeartbeatDuringWorkerShutdownTest.java
new file mode 100644
index 0000000000..787bd25ac7
--- /dev/null
+++ b/temporal-sdk/src/test/java/io/temporal/worker/shutdown/HeartbeatDuringWorkerShutdownTest.java
@@ -0,0 +1,122 @@
+package io.temporal.worker.shutdown;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import io.temporal.activity.Activity;
+import io.temporal.api.common.v1.Payloads;
+import io.temporal.api.common.v1.WorkflowExecution;
+import io.temporal.api.enums.v1.EventType;
+import io.temporal.api.history.v1.HistoryEvent;
+import io.temporal.client.ActivityCompletionException;
+import io.temporal.client.WorkflowClient;
+import io.temporal.common.converter.DefaultDataConverter;
+import io.temporal.testing.internal.SDKTestOptions;
+import io.temporal.testing.internal.SDKTestWorkflowRule;
+import io.temporal.worker.WorkerFactory;
+import io.temporal.worker.WorkerOptions;
+import io.temporal.workflow.Workflow;
+import io.temporal.workflow.shared.TestActivities.NoArgsReturnsStringActivity;
+import io.temporal.workflow.shared.TestWorkflows.TestWorkflowReturnString;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class HeartbeatDuringWorkerShutdownTest {
+
+ private static final String EXPECTED_RESULT = "completed";
+ private static final CompletableFuture
* After the shutdown, calls to {@link
* io.temporal.activity.ActivityExecutionContext#heartbeat(Object)} start throwing {@link
- * io.temporal.client.ActivityWorkerShutdownException}.
+ * io.temporal.client.ActivityWorkerShutdownException}, unless {@link
+ * WorkerOptions.Builder#setActivityHeartbeatDuringShutdown(boolean)} is enabled, in which case
+ * heartbeats keep working until the activity tasks finish executing.
* This method does not wait for the shutdown to complete. Use {@link #awaitTermination(long,
* TimeUnit)} to do that.
* Invocation has no additional effect if already shut down.
@@ -375,7 +377,9 @@ public synchronized void shutdown() {
* interrupts may never terminate.
* After the shutdownNow calls to {@link
* io.temporal.activity.ActivityExecutionContext#heartbeat(Object)} start throwing {@link
- * io.temporal.client.ActivityWorkerShutdownException}.
+ * io.temporal.client.ActivityWorkerShutdownException}, unless {@link
+ * WorkerOptions.Builder#setActivityHeartbeatDuringShutdown(boolean)} is enabled, in which case
+ * heartbeats keep working until the activity tasks finish executing.
* This method does not wait for the shutdown to complete. Use {@link #awaitTermination(long,
* TimeUnit)} to do that.
* Invocation has no additional effect if already shut down.
diff --git a/temporal-sdk/src/main/java/io/temporal/worker/WorkerOptions.java b/temporal-sdk/src/main/java/io/temporal/worker/WorkerOptions.java
index f8bfd2f442..6179f5d36b 100644
--- a/temporal-sdk/src/main/java/io/temporal/worker/WorkerOptions.java
+++ b/temporal-sdk/src/main/java/io/temporal/worker/WorkerOptions.java
@@ -77,6 +77,7 @@ public static final class Builder {
private PollerBehavior workflowTaskPollersBehavior;
private PollerBehavior activityTaskPollersBehavior;
private PollerBehavior nexusTaskPollersBehavior;
+ private boolean activityHeartbeatDuringShutdown;
private Builder() {}
@@ -112,6 +113,7 @@ private Builder(WorkerOptions o) {
this.workflowTaskPollersBehavior = o.workflowTaskPollersBehavior;
this.activityTaskPollersBehavior = o.activityTaskPollersBehavior;
this.nexusTaskPollersBehavior = o.nexusTaskPollersBehavior;
+ this.activityHeartbeatDuringShutdown = o.activityHeartbeatDuringShutdown;
}
/**
@@ -524,6 +526,29 @@ public Builder setNexusTaskPollersBehavior(PollerBehavior pollerBehavior) {
return this;
}
+ /**
+ * If enabled, activities can keep heartbeating while the worker is shutting down. The activity
+ * heartbeat executor is closed only after all outstanding activity tasks have finished
+ * executing, so {@link io.temporal.activity.ActivityExecutionContext#heartbeat(Object)} behaves
+ * exactly as it does while the worker is running: heartbeats are throttled and sent to the
+ * server, which keeps the server from timing the activity out during the {@link
+ * WorkerFactory#awaitTermination(long, java.util.concurrent.TimeUnit)} grace period.
+ *
+ *