benchmark-test: cancellation, memory-release, and reduce-path diagnos…#371
benchmark-test: cancellation, memory-release, and reduce-path diagnos…#371mch2 wants to merge 3 commits into
Conversation
| } else { | ||
| out = VectorStreamOutput.create(flightChannel.getAllocator(), flightChannel.getRoot()); | ||
| task.response().writeTo(out); | ||
| logger.info("[flight-diag] producer requestId={} sending serialized batch, serverAllocator={}", |
There was a problem hiding this comment.
remove the logging from this file.
|
|
||
| VectorSchemaRoot streamRoot = flightStream.getRoot(); | ||
| currentBatchSize = FlightUtils.calculateVectorSchemaRootSize(streamRoot); | ||
| logger.info("[flight-diag] correlationId={} batch received: rows={} size={} bytes, allocator={}", |
There was a problem hiding this comment.
remove the logging from this file, in fact all flight files.
|
|
||
| /** | ||
| * Whether this sink's downstream consumer has finished and will read no more batches — e.g. | ||
| * a coordinator reduce whose LimitExec satisfied its fetch and tore down the input receiver. |
| _concurrency_permit: Option<tokio::sync::OwnedSemaphorePermit>, | ||
| /// Cumulative bytes this stream has exported across the Arrow C-Data boundary. | ||
| /// Used by [`stream_next`] to reject a query before its exported batches | ||
| /// accumulate past the Java import pool cap (which would leak — see |
There was a problem hiding this comment.
remove all changes from opensearch-project#21887 from this PR.
|
|
||
| /// Positive FFI sentinel returned by [`df_sender_send`] when the send was skipped because the | ||
| /// consumer already finished (the receiver was dropped, e.g. a LimitExec satisfied its fetch). | ||
| /// It rides the success half of the FFM return contract (`>= 0` = success, `< 0` = `-error_ptr`), |
| let target_schema = crate::schema_coerce::coerce_inferred_schema(physical_plan.schema()); | ||
| let physical_plan = crate::relabel_exec::wrap_if_relabel_needed(physical_plan, target_schema)?; | ||
| log_debug!("DataFusion physical plan (reduce):\n{}", displayable(physical_plan.as_ref()).indent(true)); | ||
| log_debug!("DataFusion physical plan (reduce) BEFORE strip:\n{}", displayable(physical_plan.as_ref()).indent(true)); |
| physical_plan, | ||
| crate::agg_mode::Mode::Final, | ||
| )?; | ||
| log_info!("DataFusion physical plan (reduce) AFTER agg-mode strip:\n{}", displayable(stripped.as_ref()).indent(true)); |
There was a problem hiding this comment.
we already have a log here somewher efor the plan right? or is it before the strip? if so, replace that with this one, and make it the appropriate log level.
| /// Outcome of a blocking send. `ReceiverDropped` is the benign terminal case — the | ||
| /// DataFusion consumer finished (e.g. a `LimitExec` satisfied its fetch) and tore down the | ||
| /// receiver while producers were still feeding. `mpsc::Sender::send` fails for exactly this | ||
| /// one reason, so the condition is structurally unambiguous here; surfacing it as a distinct |
| pub struct PartitionStreamReceiver { | ||
| rx: mpsc::Receiver<Result<RecordBatch, DataFusionError>>, | ||
| schema: SchemaRef, | ||
| poll_count: AtomicU64, |
| let count = self.poll_count.fetch_add(1, Ordering::Relaxed) + 1; | ||
| match &result { | ||
| Poll::Ready(None) => { | ||
| log_info!("[partition-stream] receiver got None (EOF) after {} polls", count); |
| } | ||
| } | ||
| long elapsedMs = (System.nanoTime() - startNanos) / 1_000_000; | ||
| logger.info("[drain-diag] drainOutputIntoDownstream complete: taskId={}, batches={}, rows={}, elapsed={}ms", |
There was a problem hiding this comment.
remove these logs rom this clas.
| try { | ||
| assert lifecycle.isWriteLockedByCurrentThread() : "close must hold the write lock across super.close()"; | ||
| super.close(); | ||
| logger.info("[sender] close() complete - sender dropped, native EOF signalled"); |
There was a problem hiding this comment.
make this a trace log, and include task/query id.
|
|
||
| @Override | ||
| public void close() { | ||
| logger.info("[sender] close() called, receiverDropped={}, thread={}", |
| */ | ||
| @Override | ||
| public boolean isConsumerDone() { | ||
| return sendersByChildStageId.size() == 1 && sendersByChildStageId.values().iterator().next().isReceiverDropped(); |
There was a problem hiding this comment.
will this not work for join early terminate? why have size check?
| // Best-effort fast path — skip export work if already closed. | ||
| if (closed) { | ||
| long fc = feedCount.get(); | ||
| logger.info("[feed-diag] feedToSender: taskId={} feedCount={} rows={} closed={} receiverDropped={}", |
| logger.info("[feed-diag] feedToSender: taskId={} feedCount={} rows={} closed={} receiverDropped={}", | ||
| ctx.taskId(), fc, batch.getRowCount(), closed, sender.isReceiverDropped()); | ||
| if (closed || sender.isReceiverDropped()) { | ||
| logger.info("[feed-diag] feedToSender SKIPPING: taskId={} closed={} receiverDropped={} rows={} totalFed={}", |
| // here (double-free). The sender latched the drop (see DatafusionPartitionSender), | ||
| // so subsequent feeds for this input short-circuit and the producer stream is | ||
| // cancelled by the shard listener via isConsumerDone(). | ||
| logger.debug("[ReduceSink] receiver dropped before send (consumer finished), discarding batch"); |
| * @return the {@link QueryExecution} driving this query, for post-execution inspection | ||
| * (e.g. profiling). Callers that don't need it may ignore the return value. | ||
| */ | ||
| QueryExecution execute(QueryContext context, ActionListener<Iterable<VectorSchemaRoot>> listener); |
There was a problem hiding this comment.
undo this - we should be in sync with opensearch-project#21917
| @Override | ||
| public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { | ||
| if (getCollation().getFieldCollations().isEmpty()) { | ||
| if (getCollation().getFieldCollations().isEmpty() && fetch == null && offset == null) { |
| OpenSearchSort sort = call.rel(0); | ||
| if (sort.getCollation().getFieldCollations().isEmpty()) { | ||
| return false; // pure LIMIT — skip | ||
| if (sort.getCollation().getFieldCollations().isEmpty() && sort.fetch == null && sort.offset == null) { |
| totalRows += rows; | ||
| // Per-batch log at TRACE only (extremely verbose under load) | ||
| if (logger.isTraceEnabled() && (batchCount <= 3 || batchCount % 500 == 0)) { | ||
| logger.debug("[shard-stream] shard feeding batch #{} rows={} cumRows={}", batchCount, rows, totalRows); |
| } | ||
| logger.debug("[shard-stream] shard stream complete: {} batches, {} totalRows", batchCount, totalRows); | ||
| } catch (Exception e) { | ||
| logger.debug("[shard-stream] shard stream failed after {} batches, {} rows: {}", batchCount, totalRows, e.getMessage()); |
| listener::onResponse, | ||
| e -> listener.onFailure(e instanceof Exception ex ? contextProvider.convertException(ex) : e) | ||
| ); | ||
| ContextAwareExecutor.wrap(searchExecutor, threadPool).execute(() -> { |
There was a problem hiding this comment.
we want to use ContextAwareExecutor - undo its removal here and elsewhere in the pr, we should still be wrapping.
| pub custom_cache_manager: Option<CustomCacheManager>, | ||
| pub dynamic_limit_handle: DynamicLimitHandle, | ||
| /// Typed reference to the pool for diagnostic reporting (report_top). | ||
| pub tracked_pool: Arc<TrackConsumersPool<crate::memory::DynamicLimitPool>>, |
|
|
||
| impl Drop for LocalSession { | ||
| fn drop(&mut self) { | ||
| let phantom = self._phantom_reservation.as_ref().map_or(0, |r| r.size()); |
…ctness fixes Squashed benchmark-test work (PR #371). Excludes PR opensearch-project#21899 (flight backpressure) and PR opensearch-project#21887 (export-allocation cap), both removed per review. - Framework AnalyticsQueryTask for HTTP-disconnect cancellation - Reduce-input early cancel: tolerate receiver-drop, cancel stream when consumer satisfied - Fix streaming reduce deadlock: blocking_send instead of handle.block_on - Keep LIMIT above coordinator reduce: SortSplitRule + Sort cost gate fix - RowProducingSink truncates result set at 10k rows to prevent OOM - Close session/senders/outStream on reduce teardown + child-FAILED to fix native leaks - Configurable datafusion.reduce.target_partitions setting - Diagnostic logs at TRACE level
…TP-disconnect cancellation (opensearch-project#21917) Remove manual task register/unregister and rely on parent task in local transport call. Also update the explain endpoint to use our local transport call. Adds AnalyticsQueryTaskCleanupIT covering task cleanup on success and on cancel. Signed-off-by: Marc Handalian <marc.handalian@gmail.com> Co-authored-by: Sandesh Kumar <sandeshkr419@gmail.com>
…project#21920) Routes PPL percentile_approx(field, p) through the analytics-engine to DataFusion's builtin approx_percentile_cont as a single-stage gather-on- coordinator aggregate. PplAggregateCallRewriter strips the PPL type-flag arg and normalises SymbolFlag literals to VARCHAR (isthmus's LiteralConverter rejects unregistered enum classes). DataFusionFragmentConvertor's literal-arg normaliser rescales the percentile from PPL's [0, 100] to DataFusion's [0, 1] convention at substrait emission. OpenSearchAggregateRule skips backend-viability checks on null-FieldType metadata arg columns. Adds CoordinatorReduceIT.testPercentileApproxAcrossShards. Signed-off-by: Sandesh Kumar <sandeshkr419@gmail.com>
…ctness fixes Squashed benchmark-test work (PR #371), rebased on upstream/main and with review feedback applied. Excludes PR opensearch-project#21899 (flight backpressure) and PR opensearch-project#21887 (export-allocation cap). - Framework AnalyticsQueryTask for HTTP-disconnect cancellation - Reduce-input early cancel: tolerate receiver-drop, cancel stream when consumer satisfied - Fix streaming reduce deadlock: blocking_send instead of handle.block_on - RowProducingSink truncates result set at 10k rows to prevent OOM - Close session/senders/outStream on reduce teardown + child-FAILED to fix native leaks - Configurable datafusion.reduce.target_partitions setting
| arrowSchema.release(); | ||
| if (closed) { | ||
| logger.debug("[ReduceSink] send-after-close race caught, discarding batch"); | ||
| logger.trace("[ReduceSink] send-after-close race caught, discarding batch"); |
…tics
Squashed work-in-progress for the benchmark-test branch:
Description
[Describe what this change achieves]
Related Issues
Resolves #[Issue number to be closed when this PR is merged]
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.