diff --git a/bom/pom.xml b/bom/pom.xml index dd76153a9b1..66f75e89490 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -78,7 +78,7 @@ com.datastax.oss native-protocol - 1.5.2 + 1.5.3-SNAPSHOT diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java b/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java index 2900e897cce..93ce5c018f6 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java @@ -1041,7 +1041,15 @@ public enum DefaultDriverOption implements DriverOption { * *

Value-Type: boolean */ - ADDRESS_TRANSLATOR_RESOLVE_ADDRESSES("advanced.address-translator.resolve-addresses"); + ADDRESS_TRANSLATOR_RESOLVE_ADDRESSES("advanced.address-translator.resolve-addresses"), + /** + * Whether to register for GRACEFUL_DISCONNECT events from the server (CEP-59). When enabled and + * the server advertises support, the driver will gracefully drain connections when a node shuts + * down. + * + *

Value-type: boolean + */ + GRACEFUL_DISCONNECT_ENABLED("advanced.connection.graceful-disconnect-enabled"); private final String path; diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java b/core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java index 98faf3e590c..7163dda0b43 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java @@ -272,6 +272,7 @@ protected static void fillWithDriverDefaults(OptionsMap map) { map.put(TypedDriverOption.CONNECTION_MAX_REQUESTS, 1024); map.put(TypedDriverOption.CONNECTION_MAX_ORPHAN_REQUESTS, 256); map.put(TypedDriverOption.CONNECTION_WARN_INIT_ERROR, true); + map.put(TypedDriverOption.GRACEFUL_DISCONNECT_ENABLED, true); map.put(TypedDriverOption.RECONNECT_ON_INIT, false); map.put(TypedDriverOption.RECONNECTION_POLICY_CLASS, "ExponentialReconnectionPolicy"); map.put(TypedDriverOption.RECONNECTION_BASE_DELAY, Duration.ofSeconds(1)); diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java b/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java index 182753300e7..089984c40e3 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java @@ -914,6 +914,9 @@ public String toString() { new TypedDriverOption<>( DefaultDriverOption.ADDRESS_TRANSLATOR_RESOLVE_ADDRESSES, GenericType.BOOLEAN); + public static final TypedDriverOption GRACEFUL_DISCONNECT_ENABLED = + new TypedDriverOption<>(DefaultDriverOption.GRACEFUL_DISCONNECT_ENABLED, GenericType.BOOLEAN); + /** * Ordered preference list of remote dcs optionally supplied for automatic failover and included * in query plan. This feature is enabled only when max-nodes-per-remote-dc is greater than 0. diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/metrics/DefaultNodeMetric.java b/core/src/main/java/com/datastax/oss/driver/api/core/metrics/DefaultNodeMetric.java index 0e9934c7034..918621161f1 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/metrics/DefaultNodeMetric.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/metrics/DefaultNodeMetric.java @@ -51,6 +51,7 @@ public enum DefaultNodeMetric implements NodeMetric { SPECULATIVE_EXECUTIONS("speculative-executions"), CONNECTION_INIT_ERRORS("errors.connection.init"), AUTHENTICATION_ERRORS("errors.connection.auth"), + GRACEFUL_DISCONNECTS("pool.graceful-disconnects"), ; private static final Map BY_PATH = sortByPath(); diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/metrics/DefaultSessionMetric.java b/core/src/main/java/com/datastax/oss/driver/api/core/metrics/DefaultSessionMetric.java index 63027a23fe7..3dc9dc2b5fd 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/metrics/DefaultSessionMetric.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/metrics/DefaultSessionMetric.java @@ -32,6 +32,7 @@ public enum DefaultSessionMetric implements SessionMetric { THROTTLING_QUEUE_SIZE("throttling.queue-size"), THROTTLING_ERRORS("throttling.errors"), CQL_PREPARED_CACHE_SIZE("cql-prepared-cache-size"), + GRACEFUL_DISCONNECTS("graceful-disconnects"), ; private static final Map BY_PATH = sortByPath(); diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ChannelFactory.java b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ChannelFactory.java index 66a5c4edc0e..370b7562617 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ChannelFactory.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ChannelFactory.java @@ -106,6 +106,12 @@ public class ChannelFactory { */ @VisibleForTesting volatile String productType; + @VisibleForTesting volatile boolean serverSupportsGracefulDisconnect; + + public boolean isGracefulDisconnectSupported() { + return serverSupportsGracefulDisconnect; + } + public ChannelFactory(InternalDriverContext context) { this.logPrefix = context.getSessionName(); this.context = context; @@ -232,6 +238,12 @@ private void connect( ConsistencyLevel.LOCAL_QUORUM.name())); } } + if (!serverSupportsGracefulDisconnect && supportedOptions != null) { + List gdValues = supportedOptions.get(GracefulDisconnectEvent.EVENT_TYPE); + if (gdValues != null && gdValues.contains("true")) { + serverSupportsGracefulDisconnect = true; + } + } resultFuture.complete(driverChannel); } else { Throwable error = connectFuture.cause(); diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/GracefulDisconnectEvent.java b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/GracefulDisconnectEvent.java new file mode 100644 index 00000000000..7fc337be481 --- /dev/null +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/GracefulDisconnectEvent.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.driver.internal.core.channel; + +import com.datastax.oss.driver.api.core.metadata.Node; +import net.jcip.annotations.Immutable; + +/** + * This event indicates that the server is shutting down gracefully and the driver should: + * + *

+ * + *

This is part of CEP-59: Graceful Disconnect – In-Band Connection Draining for Node Shutdown. + */ +@Immutable +public class GracefulDisconnectEvent { + + /** The event type string as defined in the native protocol. */ + public static final String EVENT_TYPE = "GRACEFUL_DISCONNECT"; + + /** The node that sent the graceful disconnect event. */ + public final Node node; + + /** The channel that received the graceful disconnect event. */ + public final DriverChannel channel; + + public GracefulDisconnectEvent(Node node, DriverChannel channel) { + this.node = node; + this.channel = channel; + } + + @Override + public String toString() { + return "GracefulDisconnectEvent{node=" + node + ", channel=" + channel + '}'; + } +} diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/InFlightHandler.java b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/InFlightHandler.java index 90b02f358cd..81a0a6c8094 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/InFlightHandler.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/InFlightHandler.java @@ -33,6 +33,7 @@ import com.datastax.oss.protocol.internal.Frame; import com.datastax.oss.protocol.internal.Message; import com.datastax.oss.protocol.internal.request.Query; +import com.datastax.oss.protocol.internal.response.Event; import com.datastax.oss.protocol.internal.response.result.SetKeyspace; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelFuture; @@ -218,6 +219,15 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception if (streamId < 0) { Message event = responseFrame.message; + if (event instanceof Event + && GracefulDisconnectEvent.EVENT_TYPE.equals(((Event) event).type)) { + LOG.debug("[{}] Received GRACEFUL_DISCONNECT, initiating graceful drain", logPrefix); + startGracefulShutdown(ctx); + if (eventCallback != null) { + eventCallback.onEvent(event); + } + return; + } if (eventCallback == null) { LOG.debug("[{}] Received event {} but no callback was registered", logPrefix, event); } else { diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandler.java b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandler.java index 8a426f7b368..f086ef70b6e 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandler.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandler.java @@ -55,7 +55,9 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Objects; import net.jcip.annotations.NotThreadSafe; import org.slf4j.Logger; @@ -183,12 +185,36 @@ Message getRequest() { case AUTH_RESPONSE: return request = new AuthResponse(authResponseToken); case REGISTER: - return request = new Register(options.eventTypes); + return request = new Register(filterSupportedEventTypes()); default: throw new AssertionError("unhandled step: " + step); } } + /** + * Filters the requested event types to only include those supported by the server. + * + *

Specifically, GRACEFUL_DISCONNECT is only included if the server advertises support for it + * in the SUPPORTED message response. + */ + private List filterSupportedEventTypes() { + List filteredEventTypes = new ArrayList<>(options.eventTypes); + + // Check if GRACEFUL_DISCONNECT is in the requested event types + if (filteredEventTypes.contains(GracefulDisconnectEvent.EVENT_TYPE)) { + // Get the supported options from the channel attribute (set during OPTIONS step) + Map> supportedOptions = channel.attr(DriverChannel.OPTIONS_KEY).get(); + + // Only include GRACEFUL_DISCONNECT if the server supports it + if (supportedOptions == null + || !supportedOptions.containsKey(GracefulDisconnectEvent.EVENT_TYPE)) { + filteredEventTypes.remove(GracefulDisconnectEvent.EVENT_TYPE); + } + } + + return filteredEventTypes; + } + @Override void send() { stepNumber++; diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java b/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java index 5c29a9b704b..1b665120e60 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java @@ -30,6 +30,7 @@ import com.datastax.oss.driver.internal.core.channel.DriverChannel; import com.datastax.oss.driver.internal.core.channel.DriverChannelOptions; import com.datastax.oss.driver.internal.core.channel.EventCallback; +import com.datastax.oss.driver.internal.core.channel.GracefulDisconnectEvent; import com.datastax.oss.driver.internal.core.context.InternalDriverContext; import com.datastax.oss.driver.internal.core.metadata.DefaultTopologyMonitor; import com.datastax.oss.driver.internal.core.metadata.DistanceEvent; @@ -178,8 +179,8 @@ public void onEvent(Message eventMessage) { if (!(eventMessage instanceof Event)) { LOG.warn("[{}] Unsupported event class: {}", logPrefix, eventMessage.getClass().getName()); } else { - LOG.debug("[{}] Processing incoming event {}", logPrefix, eventMessage); Event event = (Event) eventMessage; + LOG.debug("[{}] Processing incoming event {}", logPrefix, eventMessage); switch (event.type) { case ProtocolConstants.EventType.TOPOLOGY_CHANGE: processTopologyChange(event); @@ -190,6 +191,9 @@ public void onEvent(Message eventMessage) { case ProtocolConstants.EventType.SCHEMA_CHANGE: processSchemaChange(event); break; + case GracefulDisconnectEvent.EVENT_TYPE: + processGracefulDisconnect(); + break; default: LOG.warn("[{}] Unsupported event type: {}", logPrefix, event.type); } @@ -242,6 +246,28 @@ private void processSchemaChange(Event event) { }); } + private void processGracefulDisconnect() { + LOG.info( + "[{}] Received GRACEFUL_DISCONNECT event on control connection, " + + "the server is shutting down gracefully", + logPrefix); + // Fire an internal event to notify other components (particularly the ChannelPool) + DriverChannel currentChannel = channel; + if (currentChannel != null) { + context + .getMetadataManager() + .getMetadata() + .findNode(currentChannel.getEndPoint()) + .ifPresent( + node -> + context.getEventBus().fire(new GracefulDisconnectEvent(node, currentChannel))); + } + // The control connection will handle reconnection automatically when the channel closes. + // The ChannelPool will close all its channels when it receives the GracefulDisconnectEvent, + // which will cause the NodeStateManager to set the node to DOWN state and trigger the + // LoadBalancingPolicy to remove it from the live set. + } + private class SingleThreaded { private final InternalDriverContext context; private final DriverConfig config; @@ -292,7 +318,13 @@ private void init( } initWasCalled = true; try { - ImmutableList eventTypes = buildEventTypes(listenToClusterEvents); + boolean gracefulDisconnectEnabled = + context + .getConfig() + .getDefaultProfile() + .getBoolean(DefaultDriverOption.GRACEFUL_DISCONNECT_ENABLED, true); + ImmutableList eventTypes = + buildEventTypes(listenToClusterEvents, gracefulDisconnectEnabled); LOG.debug("[{}] Initializing with event types {}", logPrefix, eventTypes); channelOptions = DriverChannelOptions.builder() @@ -606,7 +638,8 @@ private boolean isAuthFailure(Throwable error) { return true; } - private static ImmutableList buildEventTypes(boolean listenClusterEvents) { + private static ImmutableList buildEventTypes( + boolean listenClusterEvents, boolean gracefulDisconnectEnabled) { ImmutableList.Builder builder = ImmutableList.builder(); builder.add(ProtocolConstants.EventType.SCHEMA_CHANGE); if (listenClusterEvents) { @@ -614,6 +647,9 @@ private static ImmutableList buildEventTypes(boolean listenClusterEvents .add(ProtocolConstants.EventType.STATUS_CHANGE) .add(ProtocolConstants.EventType.TOPOLOGY_CHANGE); } + if (gracefulDisconnectEnabled) { + builder.add(GracefulDisconnectEvent.EVENT_TYPE); + } return builder.build(); } } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/pool/ChannelPool.java b/core/src/main/java/com/datastax/oss/driver/internal/core/pool/ChannelPool.java index 6b7d06045bd..418c364e951 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/pool/ChannelPool.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/pool/ChannelPool.java @@ -33,6 +33,8 @@ import com.datastax.oss.driver.internal.core.channel.ClusterNameMismatchException; import com.datastax.oss.driver.internal.core.channel.DriverChannel; import com.datastax.oss.driver.internal.core.channel.DriverChannelOptions; +import com.datastax.oss.driver.internal.core.channel.EventCallback; +import com.datastax.oss.driver.internal.core.channel.GracefulDisconnectEvent; import com.datastax.oss.driver.internal.core.config.ConfigChangeEvent; import com.datastax.oss.driver.internal.core.context.EventBus; import com.datastax.oss.driver.internal.core.context.InternalDriverContext; @@ -44,7 +46,10 @@ import com.datastax.oss.driver.internal.core.util.concurrent.RunOrSchedule; import com.datastax.oss.driver.internal.core.util.concurrent.UncaughtExceptions; import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting; +import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList; import com.datastax.oss.driver.shaded.guava.common.collect.Sets; +import com.datastax.oss.protocol.internal.Message; +import com.datastax.oss.protocol.internal.response.Event; import edu.umd.cs.findbugs.annotations.NonNull; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; @@ -228,11 +233,13 @@ private class SingleThreaded { private final DriverConfig config; private final ChannelFactory channelFactory; private final EventBus eventBus; + private final boolean gracefulDisconnectEnabled; // The channels that are currently connecting private final List> pendingChannels = new ArrayList<>(); private final Set closingChannels = new HashSet<>(); private final Reconnection reconnection; private final Object configListenerKey; + private final Object gracefulDisconnectListenerKey; private NodeDistance distance; private int wantedCount; @@ -252,6 +259,11 @@ private SingleThreaded( this.wantedCount = getConfiguredSize(distance); this.channelFactory = context.getChannelFactory(); this.eventBus = context.getEventBus(); + this.gracefulDisconnectEnabled = + config + .getDefaultProfile() + .getBoolean(DefaultDriverOption.GRACEFUL_DISCONNECT_ENABLED, true) + && channelFactory.isGracefulDisconnectSupported(); ReconnectionPolicy reconnectionPolicy = context.getReconnectionPolicy(); this.reconnection = new Reconnection( @@ -264,6 +276,10 @@ private SingleThreaded( this.configListenerKey = eventBus.register( ConfigChangeEvent.class, RunOrSchedule.on(adminExecutor, this::onConfigChanged)); + this.gracefulDisconnectListenerKey = + eventBus.register( + GracefulDisconnectEvent.class, + RunOrSchedule.on(adminExecutor, this::onGracefulDisconnect)); } private void connect() { @@ -291,13 +307,26 @@ private CompletionStage addMissingChannels() { int missing = wantedCount - channels.size(); LOG.debug("[{}] Trying to create {} missing channels", logPrefix, missing); - DriverChannelOptions options = - DriverChannelOptions.builder() - .withKeyspace(keyspaceName) - .withOwnerLogPrefix(sessionLogPrefix) - .build(); + for (int i = 0; i < missing; i++) { + DriverChannelOptions.Builder optionsBuilder = + DriverChannelOptions.builder() + .withKeyspace(keyspaceName) + .withOwnerLogPrefix(sessionLogPrefix); + + QueryConnectionEventCallback eventCallback = null; + if (gracefulDisconnectEnabled) { + eventCallback = new QueryConnectionEventCallback(); + optionsBuilder.withEvents( + ImmutableList.of(GracefulDisconnectEvent.EVENT_TYPE), eventCallback); + } + + DriverChannelOptions options = optionsBuilder.build(); CompletionStage channelFuture = channelFactory.connect(node, options); + if (eventCallback != null) { + QueryConnectionEventCallback cb = eventCallback; + channelFuture.thenAccept(cb::setChannel); + } pendingChannels.add(channelFuture); } return CompletableFutures.allDone(pendingChannels) @@ -474,6 +503,74 @@ private void onConfigChanged(@SuppressWarnings("unused") ConfigChangeEvent event resize(distance); } + private void onGracefulDisconnect(GracefulDisconnectEvent event) { + assert adminExecutor.inEventLoop(); + // Only handle events for channels belonging to this pool's node + if (!event.node.equals(node)) { + return; + } + DriverChannel affectedChannel = event.channel; + // Check if this channel belongs to this pool + boolean channelFound = false; + for (DriverChannel channel : channels) { + if (channel == affectedChannel) { + channelFound = true; + break; + } + } + if (channelFound) { + LOG.info( + "[{}] Received GRACEFUL_DISCONNECT on channel {}, closing all channels gracefully", + logPrefix, + affectedChannel); + // Close ALL channels in the pool gracefully to immediately stop accepting new requests. + // When all channels are closed, the NodeStateManager will automatically set the node to + // DOWN state, which will trigger the LoadBalancingPolicy to remove it from the live set. + // The graceful close allows in-flight requests to complete before channels are fully + // closed. + // Reconnection will start automatically once all channels are closed. + for (DriverChannel channel : channels) { + channel.close(); + } + } + } + + /** + * Event callback for query connections that handles GRACEFUL_DISCONNECT events. + * + *

This is called from the Netty I/O thread when an event is received on a query connection. + */ + private class QueryConnectionEventCallback implements EventCallback { + private volatile DriverChannel channel; + + void setChannel(DriverChannel channel) { + this.channel = channel; + } + + @Override + public void onEvent(Message eventMessage) { + if (!(eventMessage instanceof Event)) { + LOG.warn( + "[{}] Unsupported event class on query connection: {}", + logPrefix, + eventMessage.getClass().getName()); + return; + } + Event event = (Event) eventMessage; + if (GracefulDisconnectEvent.EVENT_TYPE.equals(event.type)) { + LOG.debug("[{}] Received GRACEFUL_DISCONNECT on query connection", logPrefix); + DriverChannel currentChannel = this.channel; + if (currentChannel != null) { + eventBus.fire(new GracefulDisconnectEvent(node, currentChannel)); + } else { + LOG.warn("[{}] Channel not yet set, cannot fire GracefulDisconnectEvent", logPrefix); + } + } else { + LOG.warn("[{}] Unexpected event type on query connection: {}", logPrefix, event.type); + } + } + } + private CompletionStage setKeyspace(CqlIdentifier newKeyspaceName) { assert adminExecutor.inEventLoop(); if (setKeyspaceFuture != null && !setKeyspaceFuture.isDone()) { @@ -533,6 +630,7 @@ private void close() { reconnection.stop(); eventBus.unregister(configListenerKey, ConfigChangeEvent.class); + eventBus.unregister(gracefulDisconnectListenerKey, GracefulDisconnectEvent.class); // Close all channels, the pool future completes when all the channels futures have completed int toClose = closingChannels.size() + channels.size(); diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index 4ae83362e29..eb7ef870631 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -535,6 +535,15 @@ datastax-java-driver { # change. # Overridable in a profile: no warn-on-init-error = true + + # Whether to subscribe to GRACEFUL_DISCONNECT events (CEP-59). When the server supports it, + # the driver will drain in-flight requests before closing connections during a node shutdown, + # instead of failing them with a timeout or connection error. + # + # Required: yes + # Modifiable at runtime: no + # Overridable in a profile: no + graceful-disconnect-enabled = true } # Advanced options for the built-in load-balancing policies. diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/channel/InFlightHandlerTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/channel/InFlightHandlerTest.java index 35049e99af1..ed244b3f8c0 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/channel/InFlightHandlerTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/channel/InFlightHandlerTest.java @@ -33,6 +33,7 @@ import com.datastax.oss.protocol.internal.ProtocolConstants; import com.datastax.oss.protocol.internal.request.Query; import com.datastax.oss.protocol.internal.response.Error; +import com.datastax.oss.protocol.internal.response.event.GracefulDisconnectEvent; import com.datastax.oss.protocol.internal.response.event.StatusChangeEvent; import com.datastax.oss.protocol.internal.response.result.SetKeyspace; import com.datastax.oss.protocol.internal.response.result.Void; @@ -644,6 +645,98 @@ private void addToPipeline() { addToPipelineWithEventCallback(null); } + @Test + public void should_initiate_graceful_drain_on_graceful_disconnect_event() { + // Given + EventCallback eventCallback = mock(EventCallback.class); + addToPipelineWithEventCallback(eventCallback); + when(streamIds.acquire()).thenReturn(42); + MockResponseCallback responseCallback = new MockResponseCallback(); + channel + .writeAndFlush( + new DriverChannel.RequestMessage(QUERY, false, Frame.NO_PAYLOAD, responseCallback)) + .awaitUninterruptibly(); + + // When + GracefulDisconnectEvent gracefulDisconnectEvent = + new com.datastax.oss.protocol.internal.response.event.GracefulDisconnectEvent(); + Frame eventFrame = + Frame.forResponse( + DefaultProtocolVersion.V4.getCode(), + -1, + null, + Collections.emptyMap(), + Collections.emptyList(), + gracefulDisconnectEvent); + writeInboundFrame(eventFrame); + + // Then + // channel not closed yet because there is a pending request + assertThat(channel.closeFuture()).isNotDone(); + // callback was still notified + verify(eventCallback).onEvent(gracefulDisconnectEvent); + // new writes should be refused + ChannelFuture otherWriteFuture = + channel.writeAndFlush( + new DriverChannel.RequestMessage( + QUERY, false, Frame.NO_PAYLOAD, new MockResponseCallback())); + assertThat(otherWriteFuture) + .isFailed(e -> assertThat(e).isInstanceOf(IllegalStateException.class)); + + // When the pending request completes + Frame requestFrame = readOutboundFrame(); + writeInboundFrame(requestFrame, Void.INSTANCE); + + // Then the channel closes + assertThat(channel.closeFuture()).isSuccess(); + } + + @Test + public void should_close_immediately_on_graceful_disconnect_if_no_pending() { + // Given + EventCallback eventCallback = mock(EventCallback.class); + addToPipelineWithEventCallback(eventCallback); + + // When + GracefulDisconnectEvent gracefulDisconnectEvent = + new com.datastax.oss.protocol.internal.response.event.GracefulDisconnectEvent(); + Frame eventFrame = + Frame.forResponse( + DefaultProtocolVersion.V4.getCode(), + -1, + null, + Collections.emptyMap(), + Collections.emptyList(), + gracefulDisconnectEvent); + writeInboundFrame(eventFrame); + + // Then + assertThat(channel.closeFuture()).isSuccess(); + verify(eventCallback).onEvent(gracefulDisconnectEvent); + } + + @Test + public void should_handle_graceful_disconnect_without_event_callback() { + // Given + addToPipeline(); // no event callback + + // When + GracefulDisconnectEvent gracefulDisconnectEvent = + new com.datastax.oss.protocol.internal.response.event.GracefulDisconnectEvent(); + Frame eventFrame = + Frame.forResponse( + DefaultProtocolVersion.V4.getCode(), + -1, + null, + Collections.emptyMap(), + Collections.emptyList(), + gracefulDisconnectEvent); + writeInboundFrame(eventFrame); + + // Then + assertThat(channel.closeFuture()).isSuccess(); + } + private void addToPipelineWithEventCallback(EventCallback eventCallback) { channel .pipeline() diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionEventsTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionEventsTest.java index cb83b523ebe..200e7b8cfaf 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionEventsTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionEventsTest.java @@ -20,18 +20,23 @@ import static com.datastax.oss.driver.Assertions.assertThat; import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.datastax.oss.driver.api.core.config.DefaultDriverOption; +import com.datastax.oss.driver.api.core.metadata.Metadata; import com.datastax.oss.driver.internal.core.channel.DriverChannel; import com.datastax.oss.driver.internal.core.channel.DriverChannelOptions; import com.datastax.oss.driver.internal.core.channel.EventCallback; +import com.datastax.oss.driver.internal.core.channel.GracefulDisconnectEvent; import com.datastax.oss.driver.internal.core.metadata.TopologyEvent; import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList; import com.datastax.oss.protocol.internal.ProtocolConstants; import com.datastax.oss.protocol.internal.response.event.SchemaChangeEvent; import com.datastax.oss.protocol.internal.response.event.StatusChangeEvent; import com.datastax.oss.protocol.internal.response.event.TopologyChangeEvent; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -41,6 +46,8 @@ public class ControlConnectionEventsTest extends ControlConnectionTestBase { @Test public void should_register_for_all_events_if_topology_requested() { // Given + when(defaultProfile.getBoolean(DefaultDriverOption.GRACEFUL_DISCONNECT_ENABLED, true)) + .thenReturn(true); DriverChannel channel1 = newMockDriverChannel(1); ArgumentCaptor optionsCaptor = ArgumentCaptor.forClass(DriverChannelOptions.class); @@ -59,7 +66,8 @@ public void should_register_for_all_events_if_topology_requested() { .containsExactly( ProtocolConstants.EventType.SCHEMA_CHANGE, ProtocolConstants.EventType.STATUS_CHANGE, - ProtocolConstants.EventType.TOPOLOGY_CHANGE); + ProtocolConstants.EventType.TOPOLOGY_CHANGE, + GracefulDisconnectEvent.EVENT_TYPE); assertThat(channelOptions.eventCallback).isEqualTo(controlConnection); }); } @@ -67,6 +75,8 @@ public void should_register_for_all_events_if_topology_requested() { @Test public void should_register_for_schema_events_only_if_topology_not_requested() { // Given + when(defaultProfile.getBoolean(DefaultDriverOption.GRACEFUL_DISCONNECT_ENABLED, true)) + .thenReturn(false); DriverChannel channel1 = newMockDriverChannel(1); ArgumentCaptor optionsCaptor = ArgumentCaptor.forClass(DriverChannelOptions.class); @@ -87,9 +97,65 @@ public void should_register_for_schema_events_only_if_topology_not_requested() { }); } + @Test + public void should_not_register_for_graceful_disconnect_when_disabled() { + // Given + when(defaultProfile.getBoolean(DefaultDriverOption.GRACEFUL_DISCONNECT_ENABLED, true)) + .thenReturn(false); + DriverChannel channel1 = newMockDriverChannel(1); + ArgumentCaptor optionsCaptor = + ArgumentCaptor.forClass(DriverChannelOptions.class); + when(channelFactory.connect(eq(node1), optionsCaptor.capture())) + .thenReturn(CompletableFuture.completedFuture(channel1)); + + // When + controlConnection.init(true, false, false); + + // Then + await() + .untilAsserted( + () -> { + DriverChannelOptions channelOptions = optionsCaptor.getValue(); + assertThat(channelOptions.eventTypes) + .containsExactly( + ProtocolConstants.EventType.SCHEMA_CHANGE, + ProtocolConstants.EventType.STATUS_CHANGE, + ProtocolConstants.EventType.TOPOLOGY_CHANGE); + assertThat(channelOptions.eventCallback).isEqualTo(controlConnection); + }); + } + + @Test + public void should_process_graceful_disconnect_event() { + // Given + when(defaultProfile.getBoolean(DefaultDriverOption.GRACEFUL_DISCONNECT_ENABLED, true)) + .thenReturn(true); + DriverChannel channel1 = newMockDriverChannel(1); + Metadata metadata = mock(Metadata.class); + when(metadataManager.getMetadata()).thenReturn(metadata); + when(metadata.findNode(channel1.getEndPoint())).thenReturn(Optional.of(node1)); + ArgumentCaptor optionsCaptor = + ArgumentCaptor.forClass(DriverChannelOptions.class); + when(channelFactory.connect(eq(node1), optionsCaptor.capture())) + .thenReturn(CompletableFuture.completedFuture(channel1)); + controlConnection.init(true, false, false); + await().until(() -> optionsCaptor.getValue() != null); + EventCallback callback = optionsCaptor.getValue().eventCallback; + com.datastax.oss.protocol.internal.response.event.GracefulDisconnectEvent event = + new com.datastax.oss.protocol.internal.response.event.GracefulDisconnectEvent(); + + // When + callback.onEvent(event); + + // Then + verify(eventBus).fire(org.mockito.ArgumentMatchers.any(GracefulDisconnectEvent.class)); + } + @Test public void should_process_status_change_events() { // Given + when(defaultProfile.getBoolean(DefaultDriverOption.GRACEFUL_DISCONNECT_ENABLED, true)) + .thenReturn(true); DriverChannel channel1 = newMockDriverChannel(1); ArgumentCaptor optionsCaptor = ArgumentCaptor.forClass(DriverChannelOptions.class); @@ -111,6 +177,8 @@ public void should_process_status_change_events() { @Test public void should_process_topology_change_events() { // Given + when(defaultProfile.getBoolean(DefaultDriverOption.GRACEFUL_DISCONNECT_ENABLED, true)) + .thenReturn(true); DriverChannel channel1 = newMockDriverChannel(1); ArgumentCaptor optionsCaptor = ArgumentCaptor.forClass(DriverChannelOptions.class); @@ -132,6 +200,8 @@ public void should_process_topology_change_events() { @Test public void should_process_schema_change_events() { // Given + when(defaultProfile.getBoolean(DefaultDriverOption.GRACEFUL_DISCONNECT_ENABLED, true)) + .thenReturn(true); DriverChannel channel1 = newMockDriverChannel(1); ArgumentCaptor optionsCaptor = ArgumentCaptor.forClass(DriverChannelOptions.class); diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 00000000000..a85771578fb --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,79 @@ +services: + java-client: + image: maven:3.9-eclipse-temurin-8 + container_name: java-client + working_dir: /app + depends_on: + ccm-cluster: + condition: service_healthy + cap_add: + - NET_ADMIN + - NET_RAW + volumes: + - .:/app + - /Users/siyaohe/.m2/repository:/root/.m2/repository + networks: + cassandra-net: + ipv4_address: 172.20.0.10 + command: > + sh -c "mvn compile -DskipTests && mvn verify -pl integration-tests -Dtest=GracefulDisconnectIT" + + ccm-cluster: + image: apache.jfrog.io/cassan-docker/apache/cassandra-java-driver-testing-ubuntu2204:latest + container_name: ccm-cluster + hostname: ccm-cluster + user: root + ports: + - "9042:9042" + - "9043:9043" + - "9044:9044" + cap_add: + - NET_ADMIN + - NET_RAW + networks: + cassandra-net: + ipv4_address: 172.20.0.2 + environment: + - HEAP_NEWSIZE=128M + - MAX_HEAP_SIZE=512M + - HOST_UID=${HOST_UID:-1000} + - HOST_GID=${HOST_GID:-1000} + volumes: + - .:/home/docker/cassandra-java-driver + - /Users/siyaohe/Documents/DataStax/Repos/cassandra:/home/docker/cassandra + entrypoint: ["/bin/bash", "-l", "-c"] + command: + - | + bash /home/docker/cassandra-java-driver/ci/create-user.sh docker $${HOST_UID} $${HOST_GID} /home/docker/cassandra-java-driver && + . /home/docker/env.txt && + jabba use openjdk@1.11.0-9 && + pip install -U cqlsh && + sudo apt install iproute2 && + ccm create test -n 3 --install-dir=/home/docker/cassandra && + ccm node1 updateconf \ + "rpc_address: 0.0.0.0" \ + "broadcast_rpc_address: 172.20.0.2" \ + "native_transport_port: 9042" && + ccm node2 updateconf \ + "rpc_address: 0.0.0.0" \ + "broadcast_rpc_address: 172.20.0.2" \ + "native_transport_port: 9043" && + ccm node3 updateconf \ + "rpc_address: 0.0.0.0" \ + "broadcast_rpc_address: 172.20.0.2" \ + "native_transport_port: 9044" && + ccm start && + touch /home/docker/.setup_complete && + tail -f /dev/null + healthcheck: + test: ["CMD", "test", "-f", "/home/docker/.setup_complete"] + interval: 5s + timeout: 3s + retries: 30 + start_period: 30s +networks: + cassandra-net: + driver: bridge + ipam: + config: + - subnet: 172.20.0.0/16 \ No newline at end of file diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/connection/GracefulDisconnectIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/connection/GracefulDisconnectIT.java new file mode 100644 index 00000000000..41bd9f5a871 --- /dev/null +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/connection/GracefulDisconnectIT.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.driver.core.connection; + +import com.datastax.oss.driver.api.core.CqlSession; +import java.net.InetSocketAddress; +import org.junit.Test; + +public class GracefulDisconnectIT { + + @Test + public void should_opt_in_gracefully_disconnect() { + try (CqlSession session = + CqlSession.builder() + .addContactPoint(new InetSocketAddress("ccm-cluster", 9042)) + .withLocalDatacenter("datacenter1") + .build()) { + while (true) { + session.execute("SELECT * FROM system.local"); + Thread.sleep(10); + } + + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } +} diff --git a/integration-tests/src/test/resources/logback-test.xml b/integration-tests/src/test/resources/logback-test.xml index a2179e4357b..1d41a3d1692 100644 --- a/integration-tests/src/test/resources/logback-test.xml +++ b/integration-tests/src/test/resources/logback-test.xml @@ -24,14 +24,17 @@ %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n - + - - - + + + + + +