Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
* called. It is OK to ignore the exception to let the activity complete. It assumes that {@link
* WorkerFactory#awaitTermination(long, TimeUnit)} is called with a timeout larger than the activity
* execution time.
*
* <p>If {@link io.temporal.worker.WorkerOptions.Builder#setActivityHeartbeatDuringShutdown(
* boolean)} is enabled, this exception is not thrown during shutdown and heartbeats keep working as
* if the worker was not shutting down.
*/
public final class ActivityWorkerShutdownException extends ActivityCompletionException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public static final class Builder {
private boolean usingVirtualThreads;
private WorkerDeploymentOptions deploymentOptions;
private String workerInstanceKey;
private boolean activityHeartbeatDuringShutdown;

private Builder() {}

Expand All @@ -66,6 +67,7 @@ private Builder(SingleWorkerOptions options) {
this.usingVirtualThreads = options.isUsingVirtualThreads();
this.deploymentOptions = options.getDeploymentOptions();
this.workerInstanceKey = options.getWorkerInstanceKey();
this.activityHeartbeatDuringShutdown = options.isActivityHeartbeatDuringShutdown();
}

public Builder setIdentity(String identity) {
Expand Down Expand Up @@ -162,6 +164,11 @@ public Builder setWorkerInstanceKey(String workerInstanceKey) {
return this;
}

public Builder setActivityHeartbeatDuringShutdown(boolean activityHeartbeatDuringShutdown) {
this.activityHeartbeatDuringShutdown = activityHeartbeatDuringShutdown;
return this;
}

public SingleWorkerOptions build() {
PollerOptions pollerOptions = this.pollerOptions;
if (pollerOptions == null) {
Expand Down Expand Up @@ -201,7 +208,8 @@ public SingleWorkerOptions build() {
drainStickyTaskQueueTimeout,
usingVirtualThreads,
this.deploymentOptions,
this.workerInstanceKey);
this.workerInstanceKey,
this.activityHeartbeatDuringShutdown);
}
}

Expand All @@ -223,6 +231,7 @@ public SingleWorkerOptions build() {
private final boolean usingVirtualThreads;
private final WorkerDeploymentOptions deploymentOptions;
private final String workerInstanceKey;
private final boolean activityHeartbeatDuringShutdown;

private SingleWorkerOptions(
String identity,
Expand All @@ -242,7 +251,8 @@ private SingleWorkerOptions(
Duration drainStickyTaskQueueTimeout,
boolean usingVirtualThreads,
WorkerDeploymentOptions deploymentOptions,
String workerInstanceKey) {
String workerInstanceKey,
boolean activityHeartbeatDuringShutdown) {
this.identity = identity;
this.binaryChecksum = binaryChecksum;
this.buildId = buildId;
Expand All @@ -261,6 +271,7 @@ private SingleWorkerOptions(
this.usingVirtualThreads = usingVirtualThreads;
this.deploymentOptions = deploymentOptions;
this.workerInstanceKey = workerInstanceKey;
this.activityHeartbeatDuringShutdown = activityHeartbeatDuringShutdown;
}

public String getIdentity() {
Expand Down Expand Up @@ -291,6 +302,10 @@ public Duration getDrainStickyTaskQueueTimeout() {
return drainStickyTaskQueueTimeout;
}

public boolean isActivityHeartbeatDuringShutdown() {
return activityHeartbeatDuringShutdown;
}

public DataConverter getDataConverter() {
return dataConverter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class SyncActivityWorker implements SuspendableWorker {
private final ScheduledExecutorService heartbeatExecutor;
private final ActivityTaskHandlerImpl taskHandler;
private final ActivityWorker worker;
private final boolean activityHeartbeatDuringShutdown;

public SyncActivityWorker(
WorkflowClient client,
Expand All @@ -38,6 +39,7 @@ public SyncActivityWorker(
this.identity = options.getIdentity();
this.namespace = namespace;
this.taskQueue = taskQueue;
this.activityHeartbeatDuringShutdown = options.isActivityHeartbeatDuringShutdown();

this.heartbeatExecutor =
Executors.newScheduledThreadPool(
Expand Down Expand Up @@ -89,16 +91,31 @@ public boolean start() {

@Override
public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean interruptTasks) {
return shutdownManager
// we want to shut down heartbeatExecutor before activity worker, so in-flight activities
// could get an ActivityWorkerShutdownException from their heartbeat
.shutdownExecutor(heartbeatExecutor, this + "#heartbeatExecutor", Duration.ofSeconds(5))
.thenCompose(r -> worker.shutdown(shutdownManager, interruptTasks))
.exceptionally(
e -> {
log.error("[BUG] Unexpected exception during shutdown", e);
return null;
});
CompletableFuture<Void> shutdownFuture;
if (activityHeartbeatDuringShutdown) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When interruptTasks is true (shutdownNow was called instead of shutdown), it should behave as if heartbeat during shutdown was disabled.

Suggested change
if (activityHeartbeatDuringShutdown) {
if (allowActivityHeartbeatDuringShutdown && !interruptTasks) {

// we want to shut down heartbeatExecutor only after all outstanding activity tasks have
// finished executing, so in-flight activities can keep heartbeating during the shutdown
shutdownFuture =
worker
.shutdown(shutdownManager, interruptTasks)
.thenCompose(r -> shutdownHeartbeatExecutor(shutdownManager));
} else {
// we want to shut down heartbeatExecutor before activity worker, so in-flight activities
// could get an ActivityWorkerShutdownException from their heartbeat
shutdownFuture =
shutdownHeartbeatExecutor(shutdownManager)
.thenCompose(r -> worker.shutdown(shutdownManager, interruptTasks));
}
return shutdownFuture.exceptionally(
e -> {
log.error("[BUG] Unexpected exception during shutdown", e);
return null;
});
}

private CompletableFuture<Void> shutdownHeartbeatExecutor(ShutdownManager shutdownManager) {
return shutdownManager.shutdownExecutor(
heartbeatExecutor, this + "#heartbeatExecutor", Duration.ofSeconds(5));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,7 @@ private static SingleWorkerOptions toActivityOptions(
return toSingleWorkerOptions(
factoryOptions, options, clientOptions, contextPropagators, workerInstanceKey)
.setUsingVirtualThreads(options.isUsingVirtualThreadsOnActivityWorker())
.setActivityHeartbeatDuringShutdown(options.isActivityHeartbeatDuringShutdown())
.setPollerOptions(
PollerOptions.newBuilder()
.setMaximumPollRatePerSecond(options.getMaxWorkerActivitiesPerSecond())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,9 @@ public WorkflowClient getWorkflowClient() {
* activity tasks are executed. <br>
* After the shutdown, calls to {@link
* io.temporal.activity.ActivityExecutionContext#heartbeat(Object)} start throwing {@link
* io.temporal.client.ActivityWorkerShutdownException}.<br>
* io.temporal.client.ActivityWorkerShutdownException}, unless {@link
* WorkerOptions.Builder#setActivityHeartbeatDuringShutdown(boolean)} is enabled, in which case
* heartbeats keep working until the activity tasks finish executing.<br>
* This method does not wait for the shutdown to complete. Use {@link #awaitTermination(long,
* TimeUnit)} to do that.<br>
* Invocation has no additional effect if already shut down.
Expand All @@ -375,7 +377,9 @@ public synchronized void shutdown() {
* interrupts may never terminate.<br>
* After the shutdownNow calls to {@link
* io.temporal.activity.ActivityExecutionContext#heartbeat(Object)} start throwing {@link
* io.temporal.client.ActivityWorkerShutdownException}.<br>
* io.temporal.client.ActivityWorkerShutdownException}, unless {@link
* WorkerOptions.Builder#setActivityHeartbeatDuringShutdown(boolean)} is enabled, in which case
* heartbeats keep working until the activity tasks finish executing.<br>
Comment on lines +380 to +382

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shutdownNow behavior stays the same, see comment in SyncActivityWorker.

Suggested change
* io.temporal.client.ActivityWorkerShutdownException}, unless {@link
* WorkerOptions.Builder#setActivityHeartbeatDuringShutdown(boolean)} is enabled, in which case
* heartbeats keep working until the activity tasks finish executing.<br>
* io.temporal.client.ActivityWorkerShutdownException}.<br>

* This method does not wait for the shutdown to complete. Use {@link #awaitTermination(long,
* TimeUnit)} to do that.<br>
* Invocation has no additional effect if already shut down.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public static final class Builder {
private PollerBehavior workflowTaskPollersBehavior;
private PollerBehavior activityTaskPollersBehavior;
private PollerBehavior nexusTaskPollersBehavior;
private boolean activityHeartbeatDuringShutdown;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The field should be named allowActivityHeartbeatDuringShutdown, the options getter should be named getAllowActivityHeartbeatDuringShutdown, and the builder setter should be named setAllowActivityHeartbeatDuringShutdown. Apply this change consistently throughout the PR.


private Builder() {}

Expand Down Expand Up @@ -112,6 +113,7 @@ private Builder(WorkerOptions o) {
this.workflowTaskPollersBehavior = o.workflowTaskPollersBehavior;
this.activityTaskPollersBehavior = o.activityTaskPollersBehavior;
this.nexusTaskPollersBehavior = o.nexusTaskPollersBehavior;
this.activityHeartbeatDuringShutdown = o.activityHeartbeatDuringShutdown;
}

/**
Expand Down Expand Up @@ -524,6 +526,29 @@ public Builder setNexusTaskPollersBehavior(PollerBehavior pollerBehavior) {
return this;
}

/**
* If enabled, activities can keep heartbeating while the worker is shutting down. The activity
* heartbeat executor is closed only after all outstanding activity tasks have finished
* executing, so {@link io.temporal.activity.ActivityExecutionContext#heartbeat(Object)} behaves
* exactly as it does while the worker is running: heartbeats are throttled and sent to the
* server, which keeps the server from timing the activity out during the {@link
* WorkerFactory#awaitTermination(long, java.util.concurrent.TimeUnit)} grace period.
*
* <p>Note that with this option enabled activities are no longer notified of the worker
* shutdown by an {@link io.temporal.client.ActivityWorkerShutdownException} thrown from {@code
* heartbeat}, so they are expected to complete within the termination grace period on their
* own.
*
* <p>Defaults to false, meaning that after shutdown is requested, {@link
* io.temporal.activity.ActivityExecutionContext#heartbeat(Object)} stops sending heartbeats and
* throws {@link io.temporal.client.ActivityWorkerShutdownException}.
*/
@Experimental
public Builder setActivityHeartbeatDuringShutdown(boolean activityHeartbeatDuringShutdown) {
Comment on lines +529 to +547

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to document implementation details.

Suggested change
/**
* If enabled, activities can keep heartbeating while the worker is shutting down. The activity
* heartbeat executor is closed only after all outstanding activity tasks have finished
* executing, so {@link io.temporal.activity.ActivityExecutionContext#heartbeat(Object)} behaves
* exactly as it does while the worker is running: heartbeats are throttled and sent to the
* server, which keeps the server from timing the activity out during the {@link
* WorkerFactory#awaitTermination(long, java.util.concurrent.TimeUnit)} grace period.
*
* <p>Note that with this option enabled activities are no longer notified of the worker
* shutdown by an {@link io.temporal.client.ActivityWorkerShutdownException} thrown from {@code
* heartbeat}, so they are expected to complete within the termination grace period on their
* own.
*
* <p>Defaults to false, meaning that after shutdown is requested, {@link
* io.temporal.activity.ActivityExecutionContext#heartbeat(Object)} stops sending heartbeats and
* throws {@link io.temporal.client.ActivityWorkerShutdownException}.
*/
@Experimental
public Builder setActivityHeartbeatDuringShutdown(boolean activityHeartbeatDuringShutdown) {
/**
* If true, activities can keep heartbeating during graceful worker shutdown (see {@link
* io.temporal.worker.WorkerFactory#shutdown WorkerFactory.shutdown}). Defaults to false,
* which means that after graceful shutdown is requested, calling {@link
* io.temporal.activity.ActivityExecutionContext#heartbeat ActivityExecutionContext.heartbeat}
* does not send a heartbeat and instead throws {@link
* io.temporal.client.ActivityWorkerShutdownException ActivityWorkerShutdownException}. This
* option is ignored by non-graceful shutdown (see {@link
* io.temporal.worker.WorkerFactory#shutdownNow WorkerFactory.shutdownNow}).
*
* <p>Note that with this option enabled, activities are no longer notified of the worker
* shutdown by the {@link io.temporal.client.ActivityWorkerShutdownException
* ActivityWorkerShutdownException} exception, so they are expected to complete within the
* termination grace period on their own.
*/
@Experimental
public Builder setAllowActivityHeartbeatDuringShutdown(boolean allowActivityHeartbeatDuringShutdown) {

this.activityHeartbeatDuringShutdown = activityHeartbeatDuringShutdown;
return this;
}

public WorkerOptions build() {
return new WorkerOptions(
maxWorkerActivitiesPerSecond,
Expand Down Expand Up @@ -553,7 +578,8 @@ public WorkerOptions build() {
deploymentOptions,
workflowTaskPollersBehavior,
activityTaskPollersBehavior,
nexusTaskPollersBehavior);
nexusTaskPollersBehavior,
activityHeartbeatDuringShutdown);
}

public WorkerOptions validateAndBuildWithDefaults() {
Expand Down Expand Up @@ -685,7 +711,8 @@ public WorkerOptions validateAndBuildWithDefaults() {
deploymentOptions,
workflowTaskPollersBehavior,
activityTaskPollersBehavior,
nexusTaskPollersBehavior);
nexusTaskPollersBehavior,
activityHeartbeatDuringShutdown);
}
}

Expand Down Expand Up @@ -717,6 +744,7 @@ public WorkerOptions validateAndBuildWithDefaults() {
private final PollerBehavior workflowTaskPollersBehavior;
private final PollerBehavior activityTaskPollersBehavior;
private final PollerBehavior nexusTaskPollersBehavior;
private final boolean activityHeartbeatDuringShutdown;

private WorkerOptions(
double maxWorkerActivitiesPerSecond,
Expand Down Expand Up @@ -746,7 +774,8 @@ private WorkerOptions(
WorkerDeploymentOptions deploymentOptions,
PollerBehavior workflowTaskPollersBehavior,
PollerBehavior activityTaskPollersBehavior,
PollerBehavior nexusTaskPollersBehavior) {
PollerBehavior nexusTaskPollersBehavior,
boolean activityHeartbeatDuringShutdown) {
this.maxWorkerActivitiesPerSecond = maxWorkerActivitiesPerSecond;
this.maxConcurrentActivityExecutionSize = maxConcurrentActivityExecutionSize;
this.maxConcurrentWorkflowTaskExecutionSize = maxConcurrentWorkflowTaskExecutionSize;
Expand Down Expand Up @@ -775,6 +804,7 @@ private WorkerOptions(
this.workflowTaskPollersBehavior = workflowTaskPollersBehavior;
this.activityTaskPollersBehavior = activityTaskPollersBehavior;
this.nexusTaskPollersBehavior = nexusTaskPollersBehavior;
this.activityHeartbeatDuringShutdown = activityHeartbeatDuringShutdown;
}

public double getMaxWorkerActivitiesPerSecond() {
Expand Down Expand Up @@ -912,6 +942,14 @@ public PollerBehavior getNexusTaskPollersBehavior() {
return nexusTaskPollersBehavior;
}

/**
* @return true if activities keep heartbeating while the worker is shutting down
*/
@Experimental
public boolean isActivityHeartbeatDuringShutdown() {
return activityHeartbeatDuringShutdown;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down Expand Up @@ -944,7 +982,8 @@ && compare(maxTaskQueueActivitiesPerSecond, that.maxTaskQueueActivitiesPerSecond
&& Objects.equals(deploymentOptions, that.deploymentOptions)
&& Objects.equals(workflowTaskPollersBehavior, that.workflowTaskPollersBehavior)
&& Objects.equals(activityTaskPollersBehavior, that.activityTaskPollersBehavior)
&& Objects.equals(nexusTaskPollersBehavior, that.nexusTaskPollersBehavior);
&& Objects.equals(nexusTaskPollersBehavior, that.nexusTaskPollersBehavior)
&& activityHeartbeatDuringShutdown == that.activityHeartbeatDuringShutdown;
}

@Override
Expand Down Expand Up @@ -977,7 +1016,8 @@ public int hashCode() {
deploymentOptions,
workflowTaskPollersBehavior,
activityTaskPollersBehavior,
nexusTaskPollersBehavior);
nexusTaskPollersBehavior,
activityHeartbeatDuringShutdown);
}

@Override
Expand Down Expand Up @@ -1040,6 +1080,8 @@ public String toString() {
+ activityTaskPollersBehavior
+ ", nexusTaskPollersBehavior="
+ nexusTaskPollersBehavior
+ ", activityHeartbeatDuringShutdown="
+ activityHeartbeatDuringShutdown
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public void verifyNewBuilderFromExistingWorkerOptions() {
.setBuildId("build-id")
.setStickyTaskQueueDrainTimeout(Duration.ofSeconds(15))
.setIdentity("worker-identity")
.setActivityHeartbeatDuringShutdown(true)
.build();

WorkerOptions w2 = WorkerOptions.newBuilder(w1).build();
Expand Down Expand Up @@ -89,6 +90,7 @@ public void verifyNewBuilderFromExistingWorkerOptions() {
assertEquals(w1.getBuildId(), w2.getBuildId());
assertEquals(w1.getStickyTaskQueueDrainTimeout(), w2.getStickyTaskQueueDrainTimeout());
assertEquals(w1.getIdentity(), w2.getIdentity());
assertEquals(w1.isActivityHeartbeatDuringShutdown(), w2.isActivityHeartbeatDuringShutdown());
}

@Test
Expand Down
Loading