From 55eea5ea5e0da1cf147546a8f37aa3786ea88a64 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 11 Jun 2026 09:47:15 -0700 Subject: [PATCH 1/2] Add GZIP compression defaulting to on --- .../serviceclient/ChannelManager.java | 3 + .../serviceclient/GrpcCompression.java | 31 +++++++ .../GrpcCompressionInterceptor.java | 21 +++++ .../serviceclient/ServiceStubsOptions.java | 49 +++++++++- .../serviceclient/GrpcCompressionTest.java | 91 +++++++++++++++++++ .../ServiceStubsOptionsTest.java | 26 ++++++ 6 files changed, 216 insertions(+), 5 deletions(-) create mode 100644 temporal-serviceclient/src/main/java/io/temporal/serviceclient/GrpcCompression.java create mode 100644 temporal-serviceclient/src/main/java/io/temporal/serviceclient/GrpcCompressionInterceptor.java create mode 100644 temporal-serviceclient/src/test/java/io/temporal/serviceclient/GrpcCompressionTest.java diff --git a/temporal-serviceclient/src/main/java/io/temporal/serviceclient/ChannelManager.java b/temporal-serviceclient/src/main/java/io/temporal/serviceclient/ChannelManager.java index da39c740f..2f5c87397 100644 --- a/temporal-serviceclient/src/main/java/io/temporal/serviceclient/ChannelManager.java +++ b/temporal-serviceclient/src/main/java/io/temporal/serviceclient/ChannelManager.java @@ -161,6 +161,7 @@ private Channel applyHeadStandardInterceptors(Channel channel) { return ClientInterceptors.intercept( channel, + new GrpcCompressionInterceptor(options.getGrpcCompression()), MetadataUtils.newAttachHeadersInterceptor(headers), new SystemInfoInterceptor(serverCapabilitiesFuture)); } @@ -206,6 +207,8 @@ private ManagedChannel prepareChannel() { builder.useTransportSecurity(); } + builder.decompressorRegistry(options.getGrpcCompression().getDecompressorRegistry()); + // Disable built-in idleTimer until https://github.com/grpc/grpc-java/issues/8714 is resolved. // jsdk force-idles channels often anyway, so this is not needed until we stop doing // force-idling as a part of diff --git a/temporal-serviceclient/src/main/java/io/temporal/serviceclient/GrpcCompression.java b/temporal-serviceclient/src/main/java/io/temporal/serviceclient/GrpcCompression.java new file mode 100644 index 000000000..861c950cd --- /dev/null +++ b/temporal-serviceclient/src/main/java/io/temporal/serviceclient/GrpcCompression.java @@ -0,0 +1,31 @@ +package io.temporal.serviceclient; + +import io.grpc.Codec; +import io.grpc.DecompressorRegistry; +import javax.annotation.Nullable; + +/** Selects transport-level gRPC compression for service calls. */ +public enum GrpcCompression { + /** Do not compress requests or advertise support for compressed responses. */ + NONE(null, DecompressorRegistry.emptyInstance().with(Codec.Identity.NONE, false)), + + /** Gzip-compress outbound requests and accept gzip-compressed responses. */ + GZIP("gzip", DecompressorRegistry.getDefaultInstance()); + + private final @Nullable String compressorName; + private final DecompressorRegistry decompressorRegistry; + + GrpcCompression(@Nullable String compressorName, DecompressorRegistry decompressorRegistry) { + this.compressorName = compressorName; + this.decompressorRegistry = decompressorRegistry; + } + + @Nullable + String getCompressorName() { + return compressorName; + } + + DecompressorRegistry getDecompressorRegistry() { + return decompressorRegistry; + } +} diff --git a/temporal-serviceclient/src/main/java/io/temporal/serviceclient/GrpcCompressionInterceptor.java b/temporal-serviceclient/src/main/java/io/temporal/serviceclient/GrpcCompressionInterceptor.java new file mode 100644 index 000000000..4d605ab33 --- /dev/null +++ b/temporal-serviceclient/src/main/java/io/temporal/serviceclient/GrpcCompressionInterceptor.java @@ -0,0 +1,21 @@ +package io.temporal.serviceclient; + +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.MethodDescriptor; + +final class GrpcCompressionInterceptor implements ClientInterceptor { + private final GrpcCompression compression; + + GrpcCompressionInterceptor(GrpcCompression compression) { + this.compression = compression; + } + + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + return next.newCall(method, callOptions.withCompression(compression.getCompressorName())); + } +} diff --git a/temporal-serviceclient/src/main/java/io/temporal/serviceclient/ServiceStubsOptions.java b/temporal-serviceclient/src/main/java/io/temporal/serviceclient/ServiceStubsOptions.java index 0c3d8ae3a..36e3afcbd 100644 --- a/temporal-serviceclient/src/main/java/io/temporal/serviceclient/ServiceStubsOptions.java +++ b/temporal-serviceclient/src/main/java/io/temporal/serviceclient/ServiceStubsOptions.java @@ -114,6 +114,9 @@ public class ServiceStubsOptions { protected final Scope metricsScope; + /** Transport-level gRPC compression. */ + protected final GrpcCompression grpcCompression; + ServiceStubsOptions(ServiceStubsOptions that) { this.channel = that.channel; this.target = that.target; @@ -135,6 +138,7 @@ public class ServiceStubsOptions { this.grpcMetadataProviders = that.grpcMetadataProviders; this.grpcClientInterceptors = that.grpcClientInterceptors; this.metricsScope = that.metricsScope; + this.grpcCompression = that.grpcCompression; } ServiceStubsOptions( @@ -157,7 +161,8 @@ public class ServiceStubsOptions { Metadata headers, Collection grpcMetadataProviders, Collection grpcClientInterceptors, - Scope metricsScope) { + Scope metricsScope, + GrpcCompression grpcCompression) { this.channel = channel; this.target = target; this.channelInitializer = channelInitializer; @@ -178,6 +183,7 @@ public class ServiceStubsOptions { this.grpcMetadataProviders = grpcMetadataProviders; this.grpcClientInterceptors = grpcClientInterceptors; this.metricsScope = metricsScope; + this.grpcCompression = grpcCompression; } /** @@ -342,6 +348,15 @@ public Scope getMetricsScope() { return metricsScope; } + /** + * @return transport-level gRPC compression used for requests and response negotiation. + * @see Builder#setGrpcCompression(GrpcCompression) + */ + @Nonnull + public GrpcCompression getGrpcCompression() { + return grpcCompression; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -366,7 +381,8 @@ public boolean equals(Object o) { && Objects.equals(headers, that.headers) && Objects.equals(grpcMetadataProviders, that.grpcMetadataProviders) && Objects.equals(grpcClientInterceptors, that.grpcClientInterceptors) - && Objects.equals(metricsScope, that.metricsScope); + && Objects.equals(metricsScope, that.metricsScope) + && grpcCompression == that.grpcCompression; } @Override @@ -391,7 +407,8 @@ public int hashCode() { headers, grpcMetadataProviders, grpcClientInterceptors, - metricsScope); + metricsScope, + grpcCompression); } @Override @@ -436,6 +453,8 @@ public String toString() { + grpcClientInterceptors + ", metricsScope=" + metricsScope + + ", grpcCompression=" + + grpcCompression + '}'; } @@ -460,6 +479,7 @@ public static class Builder> { private Collection grpcClientInterceptors; private Scope metricsScope; private boolean apiKeyProvided; + private GrpcCompression grpcCompression = GrpcCompression.GZIP; protected Builder() {} @@ -491,6 +511,7 @@ protected Builder(ServiceStubsOptions options) { ? new ArrayList<>(options.grpcClientInterceptors) : null; this.metricsScope = options.metricsScope; + this.grpcCompression = options.grpcCompression; } /** @@ -720,6 +741,22 @@ public T setMetricsScope(Scope metricsScope) { return self(); } + /** + * Sets transport-level gRPC compression. Defaults to {@link GrpcCompression#GZIP}. Set to + * {@link GrpcCompression#NONE} to opt out. + * + *

For SDK-created channels, this controls both request compression and the {@code + * grpc-accept-encoding} response negotiation header. For user-supplied channels, the SDK still + * controls request compression, but response decompression negotiation is configured by the + * supplied channel. + * + * @return {@code this} + */ + public T setGrpcCompression(GrpcCompression grpcCompression) { + this.grpcCompression = Objects.requireNonNull(grpcCompression); + return self(); + } + /** * Set the time to wait between service responses on each health check. * @@ -853,7 +890,8 @@ public ServiceStubsOptions build() { this.headers, this.grpcMetadataProviders, this.grpcClientInterceptors, - this.metricsScope); + this.metricsScope, + this.grpcCompression); } public ServiceStubsOptions validateAndBuildWithDefaults() { @@ -916,7 +954,8 @@ public ServiceStubsOptions validateAndBuildWithDefaults() { headers, grpcMetadataProviders, grpcClientInterceptors, - metricsScope); + metricsScope, + this.grpcCompression); } } } diff --git a/temporal-serviceclient/src/test/java/io/temporal/serviceclient/GrpcCompressionTest.java b/temporal-serviceclient/src/test/java/io/temporal/serviceclient/GrpcCompressionTest.java new file mode 100644 index 000000000..6c7d71acb --- /dev/null +++ b/temporal-serviceclient/src/test/java/io/temporal/serviceclient/GrpcCompressionTest.java @@ -0,0 +1,91 @@ +package io.temporal.serviceclient; + +import static org.junit.Assert.*; + +import io.grpc.Metadata; +import io.grpc.Server; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.ServerInterceptors; +import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcCleanupRule; +import io.temporal.api.workflowservice.v1.GetSystemInfoRequest; +import io.temporal.api.workflowservice.v1.GetSystemInfoResponse; +import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.Rule; +import org.junit.Test; + +public class GrpcCompressionTest { + private static final Metadata.Key GRPC_ENCODING = + Metadata.Key.of("grpc-encoding", Metadata.ASCII_STRING_MARSHALLER); + private static final Metadata.Key GRPC_ACCEPT_ENCODING = + Metadata.Key.of("grpc-accept-encoding", Metadata.ASCII_STRING_MARSHALLER); + + @Rule public final GrpcCleanupRule grpcCleanupRule = new GrpcCleanupRule(); + + @Test + public void gzipCompressionSendsAndAcceptsGzip() throws Exception { + Metadata headers = callGetSystemInfo(GrpcCompression.GZIP); + + assertEquals("gzip", headers.get(GRPC_ENCODING)); + assertTrue(headers.get(GRPC_ACCEPT_ENCODING).contains("gzip")); + } + + @Test + public void noneCompressionDoesNotSendOrAcceptGzip() throws Exception { + Metadata headers = callGetSystemInfo(GrpcCompression.NONE); + + assertNull(headers.get(GRPC_ENCODING)); + assertNull(headers.get(GRPC_ACCEPT_ENCODING)); + } + + private Metadata callGetSystemInfo(GrpcCompression compression) throws Exception { + AtomicReference capturedHeaders = new AtomicReference<>(); + ServerInterceptor captureHeadersInterceptor = + new ServerInterceptor() { + @Override + public ServerCall.Listener interceptCall( + ServerCall call, Metadata headers, ServerCallHandler next) { + capturedHeaders.set(headers); + return next.startCall(call, headers); + } + }; + Server server = + grpcCleanupRule.register( + NettyServerBuilder.forPort(0) + .addService( + ServerInterceptors.intercept( + new TestWorkflowService(), captureHeadersInterceptor)) + .build() + .start()); + + WorkflowServiceStubs serviceStubs = + WorkflowServiceStubs.newServiceStubs( + WorkflowServiceStubsOptions.newBuilder() + .setTarget("127.0.0.1:" + server.getPort()) + .setEnableHttps(false) + .setGrpcCompression(compression) + .build()); + try { + serviceStubs.blockingStub().getSystemInfo(GetSystemInfoRequest.getDefaultInstance()); + } finally { + serviceStubs.shutdownNow(); + } + + assertNotNull(capturedHeaders.get()); + return capturedHeaders.get(); + } + + private static final class TestWorkflowService + extends WorkflowServiceGrpc.WorkflowServiceImplBase { + @Override + public void getSystemInfo( + GetSystemInfoRequest request, StreamObserver responseObserver) { + responseObserver.onNext(GetSystemInfoResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } + } +} diff --git a/temporal-serviceclient/src/test/java/io/temporal/serviceclient/ServiceStubsOptionsTest.java b/temporal-serviceclient/src/test/java/io/temporal/serviceclient/ServiceStubsOptionsTest.java index ccadbffe8..0c0e76d74 100644 --- a/temporal-serviceclient/src/test/java/io/temporal/serviceclient/ServiceStubsOptionsTest.java +++ b/temporal-serviceclient/src/test/java/io/temporal/serviceclient/ServiceStubsOptionsTest.java @@ -151,4 +151,30 @@ public void testSpringBootStyleAutoTLSWithApiKey() { "TLS should be disabled when no API key and no explicit TLS setting", options3.getEnableHttps()); } + + @Test + public void testGrpcCompressionDefaultsToGzip() { + ServiceStubsOptions options = + WorkflowServiceStubsOptions.newBuilder() + .setTarget("localhost:7233") + .validateAndBuildWithDefaults(); + + assertEquals(GrpcCompression.GZIP, options.getGrpcCompression()); + } + + @Test + public void testGrpcCompressionNonePassesThroughBuilderCopy() { + ServiceStubsOptions options = + WorkflowServiceStubsOptions.newBuilder() + .setTarget("localhost:7233") + .setGrpcCompression(GrpcCompression.NONE) + .validateAndBuildWithDefaults(); + + assertEquals(GrpcCompression.NONE, options.getGrpcCompression()); + + ServiceStubsOptions copied = + WorkflowServiceStubsOptions.newBuilder(options).validateAndBuildWithDefaults(); + + assertEquals(GrpcCompression.NONE, copied.getGrpcCompression()); + } } From 3ee9a4162e01acf4cdb7a7d4ad59e382fcfea44e Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 11 Jun 2026 10:10:39 -0700 Subject: [PATCH 2/2] Disable compression in message-too-large tests --- .../workflow/GrpcMessageTooLargeTest.java | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/GrpcMessageTooLargeTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/GrpcMessageTooLargeTest.java index 89956c32b..3cb1675a8 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/GrpcMessageTooLargeTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/GrpcMessageTooLargeTest.java @@ -13,6 +13,8 @@ import io.temporal.internal.replay.ReplayWorkflowTaskHandler; import io.temporal.internal.retryer.GrpcMessageTooLargeException; import io.temporal.internal.worker.PollerOptions; +import io.temporal.serviceclient.GrpcCompression; +import io.temporal.serviceclient.WorkflowServiceStubsOptions; import io.temporal.testUtils.LoggerUtils; import io.temporal.testing.internal.SDKTestWorkflowRule; import io.temporal.workflow.shared.TestActivities; @@ -24,6 +26,8 @@ public class GrpcMessageTooLargeTest { private static final String QUERY_ERROR_MESSAGE = "Failed to send query response: RESOURCE_EXHAUSTED: grpc: received message larger than max"; + private static final WorkflowServiceStubsOptions GRPC_COMPRESSION_DISABLED = + WorkflowServiceStubsOptions.newBuilder().setGrpcCompression(GrpcCompression.NONE).build(); private static final String VERY_LARGE_DATA; static { @@ -39,21 +43,31 @@ public class GrpcMessageTooLargeTest { @Rule public SDKTestWorkflowRule activityStartWorkflowRule = SDKTestWorkflowRule.newBuilder() + .setWorkflowServiceStubsOptions(GRPC_COMPRESSION_DISABLED) .setWorkflowTypes(ActivityStartWorkflowImpl.class) .setActivityImplementations(new TestActivityImpl()) .build(); @Rule public SDKTestWorkflowRule failureWorkflowRule = - SDKTestWorkflowRule.newBuilder().setWorkflowTypes(FailureWorkflowImpl.class).build(); + SDKTestWorkflowRule.newBuilder() + .setWorkflowServiceStubsOptions(GRPC_COMPRESSION_DISABLED) + .setWorkflowTypes(FailureWorkflowImpl.class) + .build(); @Rule public SDKTestWorkflowRule querySuccessWorkflowRule = - SDKTestWorkflowRule.newBuilder().setWorkflowTypes(QuerySuccessWorkflowImpl.class).build(); + SDKTestWorkflowRule.newBuilder() + .setWorkflowServiceStubsOptions(GRPC_COMPRESSION_DISABLED) + .setWorkflowTypes(QuerySuccessWorkflowImpl.class) + .build(); @Rule public SDKTestWorkflowRule queryFailureWorkflowRule = - SDKTestWorkflowRule.newBuilder().setWorkflowTypes(QueryFailureWorkflowImpl.class).build(); + SDKTestWorkflowRule.newBuilder() + .setWorkflowServiceStubsOptions(GRPC_COMPRESSION_DISABLED) + .setWorkflowTypes(QueryFailureWorkflowImpl.class) + .build(); @Test public void workflowStartTooLarge() {