Skip to content

benchmark-test: cancellation, memory-release, and reduce-path diagnos…#371

Open
mch2 wants to merge 3 commits into
mainfrom
benchmark-test
Open

benchmark-test: cancellation, memory-release, and reduce-path diagnos…#371
mch2 wants to merge 3 commits into
mainfrom
benchmark-test

Conversation

@mch2

@mch2 mch2 commented Jun 1, 2026

Copy link
Copy Markdown
Owner

…tics

Squashed work-in-progress for the benchmark-test branch:

Description

[Describe what this change achieves]

Related Issues

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

} else {
out = VectorStreamOutput.create(flightChannel.getAllocator(), flightChannel.getRoot());
task.response().writeTo(out);
logger.info("[flight-diag] producer requestId={} sending serialized batch, serverAllocator={}",

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove the logging from this file.


VectorSchemaRoot streamRoot = flightStream.getRoot();
currentBatchSize = FlightUtils.calculateVectorSchemaRootSize(streamRoot);
logger.info("[flight-diag] correlationId={} batch received: rows={} size={} bytes, allocator={}",

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove the logging from this file, in fact all flight files.


/**
* Whether this sink's downstream consumer has finished and will read no more batches — e.g.
* a coordinator reduce whose LimitExec satisfied its fetch and tore down the input receiver.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make this more concise.

_concurrency_permit: Option<tokio::sync::OwnedSemaphorePermit>,
/// Cumulative bytes this stream has exported across the Arrow C-Data boundary.
/// Used by [`stream_next`] to reject a query before its exported batches
/// accumulate past the Java import pool cap (which would leak — see

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove all changes from opensearch-project#21887 from this PR.


/// Positive FFI sentinel returned by [`df_sender_send`] when the send was skipped because the
/// consumer already finished (the receiver was dropped, e.g. a LimitExec satisfied its fetch).
/// It rides the success half of the FFM return contract (`>= 0` = success, `< 0` = `-error_ptr`),

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make this more concise.

let target_schema = crate::schema_coerce::coerce_inferred_schema(physical_plan.schema());
let physical_plan = crate::relabel_exec::wrap_if_relabel_needed(physical_plan, target_schema)?;
log_debug!("DataFusion physical plan (reduce):\n{}", displayable(physical_plan.as_ref()).indent(true));
log_debug!("DataFusion physical plan (reduce) BEFORE strip:\n{}", displayable(physical_plan.as_ref()).indent(true));

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reomve this log.

physical_plan,
crate::agg_mode::Mode::Final,
)?;
log_info!("DataFusion physical plan (reduce) AFTER agg-mode strip:\n{}", displayable(stripped.as_ref()).indent(true));

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we already have a log here somewher efor the plan right? or is it before the strip? if so, replace that with this one, and make it the appropriate log level.

/// Outcome of a blocking send. `ReceiverDropped` is the benign terminal case — the
/// DataFusion consumer finished (e.g. a `LimitExec` satisfied its fetch) and tore down the
/// receiver while producers were still feeding. `mpsc::Sender::send` fails for exactly this
/// one reason, so the condition is structurally unambiguous here; surfacing it as a distinct

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make more concise.

pub struct PartitionStreamReceiver {
rx: mpsc::Receiver<Result<RecordBatch, DataFusionError>>,
schema: SchemaRef,
poll_count: AtomicU64,

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this

let count = self.poll_count.fetch_add(1, Ordering::Relaxed) + 1;
match &result {
Poll::Ready(None) => {
log_info!("[partition-stream] receiver got None (EOF) after {} polls", count);

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove these logs

}
}
long elapsedMs = (System.nanoTime() - startNanos) / 1_000_000;
logger.info("[drain-diag] drainOutputIntoDownstream complete: taskId={}, batches={}, rows={}, elapsed={}ms",

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove these logs rom this clas.

try {
assert lifecycle.isWriteLockedByCurrentThread() : "close must hold the write lock across super.close()";
super.close();
logger.info("[sender] close() complete - sender dropped, native EOF signalled");

@mch2 mch2 Jun 1, 2026

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make this a trace log, and include task/query id.


@Override
public void close() {
logger.info("[sender] close() called, receiverDropped={}, thread={}",

@mch2 mch2 Jun 1, 2026

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this log.

*/
@Override
public boolean isConsumerDone() {
return sendersByChildStageId.size() == 1 && sendersByChildStageId.values().iterator().next().isReceiverDropped();

@mch2 mch2 Jun 1, 2026

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this not work for join early terminate? why have size check?

// Best-effort fast path — skip export work if already closed.
if (closed) {
long fc = feedCount.get();
logger.info("[feed-diag] feedToSender: taskId={} feedCount={} rows={} closed={} receiverDropped={}",

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this log

logger.info("[feed-diag] feedToSender: taskId={} feedCount={} rows={} closed={} receiverDropped={}",
ctx.taskId(), fc, batch.getRowCount(), closed, sender.isReceiverDropped());
if (closed || sender.isReceiverDropped()) {
logger.info("[feed-diag] feedToSender SKIPPING: taskId={} closed={} receiverDropped={} rows={} totalFed={}",

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make this a trace log

// here (double-free). The sender latched the drop (see DatafusionPartitionSender),
// so subsequent feeds for this input short-circuit and the producer stream is
// cancelled by the shard listener via isConsumerDone().
logger.debug("[ReduceSink] receiver dropped before send (consumer finished), discarding batch");

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make this a trace log.

@mch2 mch2 force-pushed the benchmark-test branch from 2724022 to 3d1680a Compare June 1, 2026 01:55
* @return the {@link QueryExecution} driving this query, for post-execution inspection
* (e.g. profiling). Callers that don't need it may ignore the return value.
*/
QueryExecution execute(QueryContext context, ActionListener<Iterable<VectorSchemaRoot>> listener);

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

undo this - we should be in sync with opensearch-project#21917

@Override
public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
if (getCollation().getFieldCollations().isEmpty()) {
if (getCollation().getFieldCollations().isEmpty() && fetch == null && offset == null) {

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

undo this

OpenSearchSort sort = call.rel(0);
if (sort.getCollation().getFieldCollations().isEmpty()) {
return false; // pure LIMIT — skip
if (sort.getCollation().getFieldCollations().isEmpty() && sort.fetch == null && sort.offset == null) {

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

undo this

totalRows += rows;
// Per-batch log at TRACE only (extremely verbose under load)
if (logger.isTraceEnabled() && (batchCount <= 3 || batchCount % 500 == 0)) {
logger.debug("[shard-stream] shard feeding batch #{} rows={} cumRows={}", batchCount, rows, totalRows);

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this log

}
logger.debug("[shard-stream] shard stream complete: {} batches, {} totalRows", batchCount, totalRows);
} catch (Exception e) {
logger.debug("[shard-stream] shard stream failed after {} batches, {} rows: {}", batchCount, totalRows, e.getMessage());

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this log.

listener::onResponse,
e -> listener.onFailure(e instanceof Exception ex ? contextProvider.convertException(ex) : e)
);
ContextAwareExecutor.wrap(searchExecutor, threadPool).execute(() -> {

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we want to use ContextAwareExecutor - undo its removal here and elsewhere in the pr, we should still be wrapping.

pub custom_cache_manager: Option<CustomCacheManager>,
pub dynamic_limit_handle: DynamicLimitHandle,
/// Typed reference to the pool for diagnostic reporting (report_top).
pub tracked_pool: Arc<TrackConsumersPool<crate::memory::DynamicLimitPool>>,

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remvoe this


impl Drop for LocalSession {
fn drop(&mut self) {
let phantom = self._phantom_reservation.as_ref().map_or(0, |r| r.size());

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this excess logging.

mch2 added a commit that referenced this pull request Jun 1, 2026
…ctness fixes

Squashed benchmark-test work (PR #371). Excludes PR opensearch-project#21899 (flight backpressure)
and PR opensearch-project#21887 (export-allocation cap), both removed per review.

- Framework AnalyticsQueryTask for HTTP-disconnect cancellation
- Reduce-input early cancel: tolerate receiver-drop, cancel stream when consumer satisfied
- Fix streaming reduce deadlock: blocking_send instead of handle.block_on
- Keep LIMIT above coordinator reduce: SortSplitRule + Sort cost gate fix
- RowProducingSink truncates result set at 10k rows to prevent OOM
- Close session/senders/outStream on reduce teardown + child-FAILED to fix native leaks
- Configurable datafusion.reduce.target_partitions setting
- Diagnostic logs at TRACE level
@mch2 mch2 force-pushed the benchmark-test branch from ff891c0 to 0ba435d Compare June 1, 2026 02:36
mch2 and others added 2 commits May 31, 2026 19:37
…TP-disconnect cancellation (opensearch-project#21917)

Remove manual task register/unregister and rely on parent task in local transport call.
Also update the explain endpoint to use our local transport call.

Adds AnalyticsQueryTaskCleanupIT covering task cleanup on success and on cancel.

Signed-off-by: Marc Handalian <marc.handalian@gmail.com>
Co-authored-by: Sandesh Kumar <sandeshkr419@gmail.com>
…project#21920)

Routes PPL percentile_approx(field, p) through the analytics-engine to
DataFusion's builtin approx_percentile_cont as a single-stage gather-on-
coordinator aggregate. PplAggregateCallRewriter strips the PPL type-flag
arg and normalises SymbolFlag literals to VARCHAR (isthmus's
LiteralConverter rejects unregistered enum classes).
DataFusionFragmentConvertor's literal-arg normaliser rescales the
percentile from PPL's [0, 100] to DataFusion's [0, 1] convention at
substrait emission. OpenSearchAggregateRule skips backend-viability
checks on null-FieldType metadata arg columns. Adds
CoordinatorReduceIT.testPercentileApproxAcrossShards.

Signed-off-by: Sandesh Kumar <sandeshkr419@gmail.com>
…ctness fixes

Squashed benchmark-test work (PR #371), rebased on upstream/main and with review
feedback applied. Excludes PR opensearch-project#21899 (flight backpressure) and PR opensearch-project#21887
(export-allocation cap).

- Framework AnalyticsQueryTask for HTTP-disconnect cancellation
- Reduce-input early cancel: tolerate receiver-drop, cancel stream when consumer satisfied
- Fix streaming reduce deadlock: blocking_send instead of handle.block_on
- RowProducingSink truncates result set at 10k rows to prevent OOM
- Close session/senders/outStream on reduce teardown + child-FAILED to fix native leaks
- Configurable datafusion.reduce.target_partitions setting
@mch2 mch2 force-pushed the benchmark-test branch from 0ba435d to 7b7a65b Compare June 1, 2026 02:50
arrowSchema.release();
if (closed) {
logger.debug("[ReduceSink] send-after-close race caught, discarding batch");
logger.trace("[ReduceSink] send-after-close race caught, discarding batch");

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this log.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants