diff --git a/temporal-sdk/src/main/java/io/temporal/client/ActivityWorkerShutdownException.java b/temporal-sdk/src/main/java/io/temporal/client/ActivityWorkerShutdownException.java index 3afcfc7e83..e0a1f5ba2e 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/ActivityWorkerShutdownException.java +++ b/temporal-sdk/src/main/java/io/temporal/client/ActivityWorkerShutdownException.java @@ -9,6 +9,10 @@ * called. It is OK to ignore the exception to let the activity complete. It assumes that {@link * WorkerFactory#awaitTermination(long, TimeUnit)} is called with a timeout larger than the activity * execution time. + * + *

If {@link io.temporal.worker.WorkerOptions.Builder#setActivityHeartbeatDuringShutdown( + * boolean)} is enabled, this exception is not thrown during shutdown and heartbeats keep working as + * if the worker was not shutting down. */ public final class ActivityWorkerShutdownException extends ActivityCompletionException { diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/SingleWorkerOptions.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/SingleWorkerOptions.java index 5593707720..47e17e213b 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/SingleWorkerOptions.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/SingleWorkerOptions.java @@ -41,6 +41,7 @@ public static final class Builder { private boolean usingVirtualThreads; private WorkerDeploymentOptions deploymentOptions; private String workerInstanceKey; + private boolean activityHeartbeatDuringShutdown; private Builder() {} @@ -66,6 +67,7 @@ private Builder(SingleWorkerOptions options) { this.usingVirtualThreads = options.isUsingVirtualThreads(); this.deploymentOptions = options.getDeploymentOptions(); this.workerInstanceKey = options.getWorkerInstanceKey(); + this.activityHeartbeatDuringShutdown = options.isActivityHeartbeatDuringShutdown(); } public Builder setIdentity(String identity) { @@ -162,6 +164,11 @@ public Builder setWorkerInstanceKey(String workerInstanceKey) { return this; } + public Builder setActivityHeartbeatDuringShutdown(boolean activityHeartbeatDuringShutdown) { + this.activityHeartbeatDuringShutdown = activityHeartbeatDuringShutdown; + return this; + } + public SingleWorkerOptions build() { PollerOptions pollerOptions = this.pollerOptions; if (pollerOptions == null) { @@ -201,7 +208,8 @@ public SingleWorkerOptions build() { drainStickyTaskQueueTimeout, usingVirtualThreads, this.deploymentOptions, - this.workerInstanceKey); + this.workerInstanceKey, + this.activityHeartbeatDuringShutdown); } } @@ -223,6 +231,7 @@ public SingleWorkerOptions build() { private final boolean usingVirtualThreads; private final WorkerDeploymentOptions deploymentOptions; private final String workerInstanceKey; + private final boolean activityHeartbeatDuringShutdown; private SingleWorkerOptions( String identity, @@ -242,7 +251,8 @@ private SingleWorkerOptions( Duration drainStickyTaskQueueTimeout, boolean usingVirtualThreads, WorkerDeploymentOptions deploymentOptions, - String workerInstanceKey) { + String workerInstanceKey, + boolean activityHeartbeatDuringShutdown) { this.identity = identity; this.binaryChecksum = binaryChecksum; this.buildId = buildId; @@ -261,6 +271,7 @@ private SingleWorkerOptions( this.usingVirtualThreads = usingVirtualThreads; this.deploymentOptions = deploymentOptions; this.workerInstanceKey = workerInstanceKey; + this.activityHeartbeatDuringShutdown = activityHeartbeatDuringShutdown; } public String getIdentity() { @@ -291,6 +302,10 @@ public Duration getDrainStickyTaskQueueTimeout() { return drainStickyTaskQueueTimeout; } + public boolean isActivityHeartbeatDuringShutdown() { + return activityHeartbeatDuringShutdown; + } + public DataConverter getDataConverter() { return dataConverter; } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncActivityWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncActivityWorker.java index ecb24a736a..0e1c53ecbf 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncActivityWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncActivityWorker.java @@ -26,6 +26,7 @@ public class SyncActivityWorker implements SuspendableWorker { private final ScheduledExecutorService heartbeatExecutor; private final ActivityTaskHandlerImpl taskHandler; private final ActivityWorker worker; + private final boolean activityHeartbeatDuringShutdown; public SyncActivityWorker( WorkflowClient client, @@ -38,6 +39,7 @@ public SyncActivityWorker( this.identity = options.getIdentity(); this.namespace = namespace; this.taskQueue = taskQueue; + this.activityHeartbeatDuringShutdown = options.isActivityHeartbeatDuringShutdown(); this.heartbeatExecutor = Executors.newScheduledThreadPool( @@ -89,16 +91,31 @@ public boolean start() { @Override public CompletableFuture shutdown(ShutdownManager shutdownManager, boolean interruptTasks) { - return shutdownManager - // we want to shut down heartbeatExecutor before activity worker, so in-flight activities - // could get an ActivityWorkerShutdownException from their heartbeat - .shutdownExecutor(heartbeatExecutor, this + "#heartbeatExecutor", Duration.ofSeconds(5)) - .thenCompose(r -> worker.shutdown(shutdownManager, interruptTasks)) - .exceptionally( - e -> { - log.error("[BUG] Unexpected exception during shutdown", e); - return null; - }); + CompletableFuture shutdownFuture; + if (activityHeartbeatDuringShutdown) { + // we want to shut down heartbeatExecutor only after all outstanding activity tasks have + // finished executing, so in-flight activities can keep heartbeating during the shutdown + shutdownFuture = + worker + .shutdown(shutdownManager, interruptTasks) + .thenCompose(r -> shutdownHeartbeatExecutor(shutdownManager)); + } else { + // we want to shut down heartbeatExecutor before activity worker, so in-flight activities + // could get an ActivityWorkerShutdownException from their heartbeat + shutdownFuture = + shutdownHeartbeatExecutor(shutdownManager) + .thenCompose(r -> worker.shutdown(shutdownManager, interruptTasks)); + } + return shutdownFuture.exceptionally( + e -> { + log.error("[BUG] Unexpected exception during shutdown", e); + return null; + }); + } + + private CompletableFuture shutdownHeartbeatExecutor(ShutdownManager shutdownManager) { + return shutdownManager.shutdownExecutor( + heartbeatExecutor, this + "#heartbeatExecutor", Duration.ofSeconds(5)); } @Override diff --git a/temporal-sdk/src/main/java/io/temporal/worker/Worker.java b/temporal-sdk/src/main/java/io/temporal/worker/Worker.java index c846667d68..fd54b695d9 100644 --- a/temporal-sdk/src/main/java/io/temporal/worker/Worker.java +++ b/temporal-sdk/src/main/java/io/temporal/worker/Worker.java @@ -893,6 +893,7 @@ private static SingleWorkerOptions toActivityOptions( return toSingleWorkerOptions( factoryOptions, options, clientOptions, contextPropagators, workerInstanceKey) .setUsingVirtualThreads(options.isUsingVirtualThreadsOnActivityWorker()) + .setActivityHeartbeatDuringShutdown(options.isActivityHeartbeatDuringShutdown()) .setPollerOptions( PollerOptions.newBuilder() .setMaximumPollRatePerSecond(options.getMaxWorkerActivitiesPerSecond()) diff --git a/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java b/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java index c0a949f825..a90443440c 100644 --- a/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java +++ b/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java @@ -358,7 +358,9 @@ public WorkflowClient getWorkflowClient() { * activity tasks are executed.
* After the shutdown, calls to {@link * io.temporal.activity.ActivityExecutionContext#heartbeat(Object)} start throwing {@link - * io.temporal.client.ActivityWorkerShutdownException}.
+ * io.temporal.client.ActivityWorkerShutdownException}, unless {@link + * WorkerOptions.Builder#setActivityHeartbeatDuringShutdown(boolean)} is enabled, in which case + * heartbeats keep working until the activity tasks finish executing.
* This method does not wait for the shutdown to complete. Use {@link #awaitTermination(long, * TimeUnit)} to do that.
* Invocation has no additional effect if already shut down. @@ -375,7 +377,9 @@ public synchronized void shutdown() { * interrupts may never terminate.
* After the shutdownNow calls to {@link * io.temporal.activity.ActivityExecutionContext#heartbeat(Object)} start throwing {@link - * io.temporal.client.ActivityWorkerShutdownException}.
+ * io.temporal.client.ActivityWorkerShutdownException}, unless {@link + * WorkerOptions.Builder#setActivityHeartbeatDuringShutdown(boolean)} is enabled, in which case + * heartbeats keep working until the activity tasks finish executing.
* This method does not wait for the shutdown to complete. Use {@link #awaitTermination(long, * TimeUnit)} to do that.
* Invocation has no additional effect if already shut down. diff --git a/temporal-sdk/src/main/java/io/temporal/worker/WorkerOptions.java b/temporal-sdk/src/main/java/io/temporal/worker/WorkerOptions.java index f8bfd2f442..6179f5d36b 100644 --- a/temporal-sdk/src/main/java/io/temporal/worker/WorkerOptions.java +++ b/temporal-sdk/src/main/java/io/temporal/worker/WorkerOptions.java @@ -77,6 +77,7 @@ public static final class Builder { private PollerBehavior workflowTaskPollersBehavior; private PollerBehavior activityTaskPollersBehavior; private PollerBehavior nexusTaskPollersBehavior; + private boolean activityHeartbeatDuringShutdown; private Builder() {} @@ -112,6 +113,7 @@ private Builder(WorkerOptions o) { this.workflowTaskPollersBehavior = o.workflowTaskPollersBehavior; this.activityTaskPollersBehavior = o.activityTaskPollersBehavior; this.nexusTaskPollersBehavior = o.nexusTaskPollersBehavior; + this.activityHeartbeatDuringShutdown = o.activityHeartbeatDuringShutdown; } /** @@ -524,6 +526,29 @@ public Builder setNexusTaskPollersBehavior(PollerBehavior pollerBehavior) { return this; } + /** + * If enabled, activities can keep heartbeating while the worker is shutting down. The activity + * heartbeat executor is closed only after all outstanding activity tasks have finished + * executing, so {@link io.temporal.activity.ActivityExecutionContext#heartbeat(Object)} behaves + * exactly as it does while the worker is running: heartbeats are throttled and sent to the + * server, which keeps the server from timing the activity out during the {@link + * WorkerFactory#awaitTermination(long, java.util.concurrent.TimeUnit)} grace period. + * + *

Note that with this option enabled activities are no longer notified of the worker + * shutdown by an {@link io.temporal.client.ActivityWorkerShutdownException} thrown from {@code + * heartbeat}, so they are expected to complete within the termination grace period on their + * own. + * + *

Defaults to false, meaning that after shutdown is requested, {@link + * io.temporal.activity.ActivityExecutionContext#heartbeat(Object)} stops sending heartbeats and + * throws {@link io.temporal.client.ActivityWorkerShutdownException}. + */ + @Experimental + public Builder setActivityHeartbeatDuringShutdown(boolean activityHeartbeatDuringShutdown) { + this.activityHeartbeatDuringShutdown = activityHeartbeatDuringShutdown; + return this; + } + public WorkerOptions build() { return new WorkerOptions( maxWorkerActivitiesPerSecond, @@ -553,7 +578,8 @@ public WorkerOptions build() { deploymentOptions, workflowTaskPollersBehavior, activityTaskPollersBehavior, - nexusTaskPollersBehavior); + nexusTaskPollersBehavior, + activityHeartbeatDuringShutdown); } public WorkerOptions validateAndBuildWithDefaults() { @@ -685,7 +711,8 @@ public WorkerOptions validateAndBuildWithDefaults() { deploymentOptions, workflowTaskPollersBehavior, activityTaskPollersBehavior, - nexusTaskPollersBehavior); + nexusTaskPollersBehavior, + activityHeartbeatDuringShutdown); } } @@ -717,6 +744,7 @@ public WorkerOptions validateAndBuildWithDefaults() { private final PollerBehavior workflowTaskPollersBehavior; private final PollerBehavior activityTaskPollersBehavior; private final PollerBehavior nexusTaskPollersBehavior; + private final boolean activityHeartbeatDuringShutdown; private WorkerOptions( double maxWorkerActivitiesPerSecond, @@ -746,7 +774,8 @@ private WorkerOptions( WorkerDeploymentOptions deploymentOptions, PollerBehavior workflowTaskPollersBehavior, PollerBehavior activityTaskPollersBehavior, - PollerBehavior nexusTaskPollersBehavior) { + PollerBehavior nexusTaskPollersBehavior, + boolean activityHeartbeatDuringShutdown) { this.maxWorkerActivitiesPerSecond = maxWorkerActivitiesPerSecond; this.maxConcurrentActivityExecutionSize = maxConcurrentActivityExecutionSize; this.maxConcurrentWorkflowTaskExecutionSize = maxConcurrentWorkflowTaskExecutionSize; @@ -775,6 +804,7 @@ private WorkerOptions( this.workflowTaskPollersBehavior = workflowTaskPollersBehavior; this.activityTaskPollersBehavior = activityTaskPollersBehavior; this.nexusTaskPollersBehavior = nexusTaskPollersBehavior; + this.activityHeartbeatDuringShutdown = activityHeartbeatDuringShutdown; } public double getMaxWorkerActivitiesPerSecond() { @@ -912,6 +942,14 @@ public PollerBehavior getNexusTaskPollersBehavior() { return nexusTaskPollersBehavior; } + /** + * @return true if activities keep heartbeating while the worker is shutting down + */ + @Experimental + public boolean isActivityHeartbeatDuringShutdown() { + return activityHeartbeatDuringShutdown; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -944,7 +982,8 @@ && compare(maxTaskQueueActivitiesPerSecond, that.maxTaskQueueActivitiesPerSecond && Objects.equals(deploymentOptions, that.deploymentOptions) && Objects.equals(workflowTaskPollersBehavior, that.workflowTaskPollersBehavior) && Objects.equals(activityTaskPollersBehavior, that.activityTaskPollersBehavior) - && Objects.equals(nexusTaskPollersBehavior, that.nexusTaskPollersBehavior); + && Objects.equals(nexusTaskPollersBehavior, that.nexusTaskPollersBehavior) + && activityHeartbeatDuringShutdown == that.activityHeartbeatDuringShutdown; } @Override @@ -977,7 +1016,8 @@ public int hashCode() { deploymentOptions, workflowTaskPollersBehavior, activityTaskPollersBehavior, - nexusTaskPollersBehavior); + nexusTaskPollersBehavior, + activityHeartbeatDuringShutdown); } @Override @@ -1040,6 +1080,8 @@ public String toString() { + activityTaskPollersBehavior + ", nexusTaskPollersBehavior=" + nexusTaskPollersBehavior + + ", activityHeartbeatDuringShutdown=" + + activityHeartbeatDuringShutdown + '}'; } } diff --git a/temporal-sdk/src/test/java/io/temporal/worker/WorkerOptionsTest.java b/temporal-sdk/src/test/java/io/temporal/worker/WorkerOptionsTest.java index 897600443d..ed013a0ad1 100644 --- a/temporal-sdk/src/test/java/io/temporal/worker/WorkerOptionsTest.java +++ b/temporal-sdk/src/test/java/io/temporal/worker/WorkerOptionsTest.java @@ -56,6 +56,7 @@ public void verifyNewBuilderFromExistingWorkerOptions() { .setBuildId("build-id") .setStickyTaskQueueDrainTimeout(Duration.ofSeconds(15)) .setIdentity("worker-identity") + .setActivityHeartbeatDuringShutdown(true) .build(); WorkerOptions w2 = WorkerOptions.newBuilder(w1).build(); @@ -89,6 +90,7 @@ public void verifyNewBuilderFromExistingWorkerOptions() { assertEquals(w1.getBuildId(), w2.getBuildId()); assertEquals(w1.getStickyTaskQueueDrainTimeout(), w2.getStickyTaskQueueDrainTimeout()); assertEquals(w1.getIdentity(), w2.getIdentity()); + assertEquals(w1.isActivityHeartbeatDuringShutdown(), w2.isActivityHeartbeatDuringShutdown()); } @Test diff --git a/temporal-sdk/src/test/java/io/temporal/worker/shutdown/HeartbeatDuringWorkerShutdownTest.java b/temporal-sdk/src/test/java/io/temporal/worker/shutdown/HeartbeatDuringWorkerShutdownTest.java new file mode 100644 index 0000000000..787bd25ac7 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/worker/shutdown/HeartbeatDuringWorkerShutdownTest.java @@ -0,0 +1,122 @@ +package io.temporal.worker.shutdown; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import io.temporal.activity.Activity; +import io.temporal.api.common.v1.Payloads; +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.api.enums.v1.EventType; +import io.temporal.api.history.v1.HistoryEvent; +import io.temporal.client.ActivityCompletionException; +import io.temporal.client.WorkflowClient; +import io.temporal.common.converter.DefaultDataConverter; +import io.temporal.testing.internal.SDKTestOptions; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.worker.WorkerFactory; +import io.temporal.worker.WorkerOptions; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.shared.TestActivities.NoArgsReturnsStringActivity; +import io.temporal.workflow.shared.TestWorkflows.TestWorkflowReturnString; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import org.junit.Rule; +import org.junit.Test; + +public class HeartbeatDuringWorkerShutdownTest { + + private static final String EXPECTED_RESULT = "completed"; + private static final CompletableFuture started = new CompletableFuture<>(); + private static final HeartBeatingActivitiesImpl activitiesImpl = + new HeartBeatingActivitiesImpl(started); + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkerOptions( + WorkerOptions.newBuilder().setActivityHeartbeatDuringShutdown(true).build()) + .setWorkflowTypes(TestWorkflowImpl.class) + .setActivityImplementations(activitiesImpl) + .build(); + + /** + * Tests that when {@link WorkerOptions.Builder#setActivityHeartbeatDuringShutdown(boolean)} is + * enabled, {@code Activity#heartbeat} keeps working normally after {@link + * WorkerFactory#shutdown()} and the activity runs to completion instead of getting an {@code + * ActivityWorkerShutdownException}. + */ + @Test + public void testHeartbeatingActivityCompletesDuringShutdown() + throws ExecutionException, InterruptedException { + TestWorkflowReturnString workflow = + testWorkflowRule.newWorkflowStub(TestWorkflowReturnString.class); + WorkflowExecution execution = WorkflowClient.start(workflow::execute); + started.get(); + testWorkflowRule.getTestEnvironment().shutdown(); + testWorkflowRule.getTestEnvironment().awaitTermination(10, TimeUnit.MINUTES); + List events = + testWorkflowRule + .getExecutionHistory(execution.getWorkflowId()) + .getHistory() + .getEventsList(); + boolean found = false; + for (HistoryEvent e : events) { + if (e.getEventType() == EventType.EVENT_TYPE_ACTIVITY_TASK_COMPLETED) { + found = true; + Payloads ar = e.getActivityTaskCompletedEventAttributes().getResult(); + String r = + DefaultDataConverter.STANDARD_INSTANCE.fromPayloads( + 0, Optional.of(ar), String.class, String.class); + assertEquals(EXPECTED_RESULT, r); + } + } + assertTrue("Contains ActivityTaskCompleted", found); + } + + public static class TestWorkflowImpl implements TestWorkflowReturnString { + + private final NoArgsReturnsStringActivity activities = + Workflow.newActivityStub( + NoArgsReturnsStringActivity.class, + SDKTestOptions.newActivityOptions20sScheduleToClose()); + + @Override + public String execute() { + return activities.execute(); + } + } + + public static class HeartBeatingActivitiesImpl implements NoArgsReturnsStringActivity { + private final CompletableFuture started; + + public HeartBeatingActivitiesImpl(CompletableFuture started) { + this.started = started; + } + + @Override + public String execute() { + started.complete(true); + // Heartbeat through the worker shutdown that the main test thread initiates after + // `started` completes. With activityHeartbeatDuringShutdown enabled none of these + // calls may throw ActivityWorkerShutdownException. + for (int i = 0; i < 4; i++) { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + fail("Activity was interrupted during graceful shutdown"); + } + try { + Activity.getExecutionContext().heartbeat("progress-" + i); + } catch (ActivityCompletionException e) { + fail("Heartbeat threw during worker shutdown: " + e); + } + } + return EXPECTED_RESULT; + } + } +}