RATIS-1240. Add input stream to DataStreamApi for read operations in Server#1469
Conversation
szetszwo
left a comment
There was a problem hiding this comment.
@peterxcli , thanks a lot working on stream read! It is going to be very useful.
This change is very big and quite involved. In particular, it defines several new public APIs. We need to careful design the APIs.
Could you split this into two or three subtasks? One usual way is to change the server first and then the client.
| private CompletableFuture<Void> submitReadOnlyRequest(DataStreamRequestByteBuf request, | ||
| RaftClientRequest raftClientRequest, ChannelHandlerContext ctx) { | ||
| try { | ||
| final StateMachine.ReadOnlyDataStream readOnlyDataStream = new StateMachine.ReadOnlyDataStream() { |
There was a problem hiding this comment.
We should reuse StateMachine.DataChannel which is a Java WritableByteChannel. Then, we can use other Java API such as FileChannel.transferTo (which is a highly efficient, zero-copy data transfer operation).
final StateMachine.DataChannel readOnlyDataStream = new StateMachine.DataChannel() {
private long streamOffset;
private boolean closed = false;
@Override
public synchronized boolean isOpen() {
return !closed;
}
@Override
public synchronized void close() {
closed = true;
}
@Override
public synchronized void force(boolean metadata) throws AlreadyClosedException{
if (!isOpen()) {
throw new AlreadyClosedException("Channel closed at offset " + streamOffset);
}
ctx.flush();
}
@Override
public synchronized int write(ByteBuffer buffer) throws IOException {
if (!isOpen()) {
throw new AlreadyClosedException("Channel closed at offset " + streamOffset);
}
final DataStreamReplyByteBuffer reply = newDataStreamReadOnlyReplyByteBuffer(request, streamOffset, buffer);
final int length = buffer.remaining();
final ChannelFuture future = ctx.write(reply);
try {
future.await();
} catch (InterruptedException e) {
throw new InterruptedIOException("Interrupted while writing " + length
+ " bytes at offset " + streamOffset);
}
streamOffset += length;
return length;
}
};
Thanks for the review! Sure, I will split this into server and client patches! |
Co-authored-by: Tsz-Wo Nicholas Sze <szetszwo@apache.org> Signed-off-by: peterxcli <peterxcli@gmail.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
|
@szetszwo thanks for the review, I address the data channel reuse suggestion and only keep server side code in this PR. Please take another look, thanks! |
|
@szetszwo I integrated the Ratis stream read API into Ozone as a POC, and the results are promising. The no-MD5 sequential reads comparison:
The no-MD5 random reads comparison:
ozone poc: peterxcli/ozone#16 |
szetszwo
left a comment
There was a problem hiding this comment.
@peterxcli , thanks for working on this! Please see the comments inlined and also linearizable.
| default CompletableFuture<RaftClientReply> streamReadOnlyAsync( | ||
| RaftClientRequest request, StateMachine.DataChannel stream) throws IOException { | ||
| throw new UnsupportedOperationException("This method is NOT supported."); | ||
| } |
There was a problem hiding this comment.
This new method seems not needed since we may:
- Phase 1: Directly call DataApi.streamReadOnly(..) and ignore all linearizable checks.
- Phase 2: Reuse RaftClientAsynchronousProtocol.submitClientRequestAsync(..) to submit a dummy read request for linearizable checks and then call DataApi.streamReadOnly(..).
Of course, we should start with Phase 1 for simpilcity.
Signed-off-by: peterxcli <peterxcli@gmail.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
|
@szetszwo Thanks for the review!
I'm ok with this direction, but curious why use |
szetszwo
left a comment
There was a problem hiding this comment.
@peterxcli , thanks for the update! Please see the comments inlined and also https://issues.apache.org/jira/secure/attachment/13082557/1469_review2.patch
Sorry that I forgot to mention 1469_review.patch (deleted) in my previous comment.
| <module name="SuppressionSingleFilter"> | ||
| <property name="checks" value="FileLength"/> | ||
| <property name="files" value="RaftServerImpl.java"/> | ||
| </module> |
There was a problem hiding this comment.
Since this PR no longer changes RaftServerImpl. Please revert the checkstyle file.
| static DataStreamReplyByteBuffer newDataStreamReadOnlyReplyByteBuffer(DataStreamRequestByteBuf request, | ||
| long streamOffset, ByteBuffer buffer) { | ||
| final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer(); | ||
| return DataStreamReplyByteBuffer.newBuilder() | ||
| .setClientId(request.getClientId()) | ||
| .setType(Type.STREAM_DATA) | ||
| .setStreamId(request.getStreamId()) | ||
| .setStreamOffset(streamOffset) | ||
| .setBuffer(readOnlyBuffer) | ||
| .setSuccess(true) | ||
| .setBytesWritten(readOnlyBuffer.remaining()) | ||
| .build(); | ||
| } | ||
|
|
||
| private static CompletableFuture<Void> writeAndFlush(ChannelHandlerContext ctx, DataStreamReply reply) { | ||
| final CompletableFuture<Void> future = new CompletableFuture<>(); | ||
| ctx.writeAndFlush(reply).addListener(channelFuture -> { | ||
| if (channelFuture.isSuccess()) { | ||
| future.complete(null); | ||
| } else { | ||
| future.completeExceptionally(channelFuture.cause()); | ||
| } | ||
| }); | ||
| return future; | ||
| } |
There was a problem hiding this comment.
Let's move all the new code to a new class, say ReadStreamManagement.
| if (request.getType() == Type.STREAM_HEADER) { | ||
| final RaftClientRequest raftClientRequest = toRaftClientRequest(request); | ||
| if (raftClientRequest.is(TypeCase.READ)) { | ||
| submitReadOnlyRequest(request, raftClientRequest, ctx).whenComplete((v, exception) -> { | ||
| try { | ||
| if (exception != null) { | ||
| replyDataStreamException(server, exception, raftClientRequest, request, ctx); | ||
| } | ||
| } finally { | ||
| request.release(); | ||
| channels.remove(channelId, key); | ||
| } | ||
| }); | ||
| return; | ||
| } | ||
| } |
There was a problem hiding this comment.
The new read streams have nothing to do with the existing write streams. Let's do the check in NettyServerStreamRpc.
@@ -235,6 +237,9 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf)msg;
try(UncheckedAutoCloseable autoReset = requestRef.set(request)) {
+ if (reads.process(request, ctx)) {
+ return;
+ }
requests.read(request, ctx, proxies.get(request)::getDataStreamOutput);
}
}| request.release(); | ||
| channels.remove(channelId, key); |
There was a problem hiding this comment.
- request can be release earlier -- we only need to clientId and streamId in the read stream.
- channelId is not used. So, we don't need a ChannelMap in ReadStreamManagement.
What changes were proposed in this pull request?
New Server and StateMachine API:
streamReadOnlyAsync(RaftClientRequest, StateMachine.DataChannel)to theRaftServerinterface, allowing clients to submit read-only requests whose responses are streamed via the data stream RPC.streamReadOnly(RaftClientRequest, DataChannel)to theStateMachine.Datainterface, enabling state machines to implement custom streaming logic for read-only queries.Server Implementation:
DataStreamManagement, including request parsing, channel management, and response streaming.executeStreamReadOnlyAsynctoRaftServerImplto coordinate read-only stream execution and integrate with the read index logic.What is the link to the Apache JIRA
RATIS-1240
How was this patch tested?
(Please explain how this patch was tested. Ex: unit tests, manual tests)
(If this patch involves UI changes, please attach a screen-shot; otherwise, remove this)