feat: metrics reporting for scan and commit#589
Conversation
5fa02cb to
0a85180
Compare
|
Let me know when ready for review :) |
for sure ! most likely tomorrow |
5241044 to
d26fb96
Compare
|
Thanks for updating this! I still need to take some time to get familiar with the Java implementation and its API in the rest spec before reviewing it. |
wgtmac
left a comment
There was a problem hiding this comment.
Thanks for working on this! I have some general questions on the design, especially on the capability of customization. IMO we can focus on the API design for now and later integrate with other classes. Please let me know what you think.
| struct ICEBERG_EXPORT ScanReport { | ||
| /// \brief The fully qualified name of the table that was scanned. | ||
| std::string table_name; | ||
|
|
There was a problem hiding this comment.
Why some metrics have blank lines in between but others don't? I think we can remove all these blank lines to be compact.
| int64_t snapshot_id = -1; | ||
|
|
||
| /// \brief Filter expression used in the scan, if any. | ||
| std::string filter; |
There was a problem hiding this comment.
Should this be std::shared_ptr<Expression>?
| namespace iceberg { | ||
|
|
||
| /// \brief Duration type for metrics reporting in milliseconds. | ||
| using DurationMs = std::chrono::milliseconds; |
There was a problem hiding this comment.
This hard-codes our reported duration unit to be milliseconds, which violates the spec I think?
| std::string filter; | ||
|
|
||
| /// \brief Schema ID. | ||
| int32_t schema_id = -1; |
There was a problem hiding this comment.
We are missing some fields like projectedFieldIds and projectedFieldNames from the Java implementation. I think they are required by the REST spec: https://github.com/apache/iceberg/blob/149cc464f9b7df800cc5718af725983473819504/open-api/rest-catalog-open-api.yaml#L3990-L4023
| std::string table_name; | ||
|
|
||
| /// \brief The snapshot ID created by this commit. | ||
| int64_t snapshot_id = -1; |
There was a problem hiding this comment.
Please use kInvalidSnapshotId defined in the iceberg/constant.h
| /// \brief Total number of data manifests. | ||
| int64_t total_data_manifests = 0; | ||
|
|
||
| /// \brief Number of data manifests that were skipped. |
| int64_t skipped_data_files = 0; | ||
|
|
||
| /// \brief Number of data manifests that were skipped. | ||
| int64_t skipped_delete_files = 0; |
| int32_t schema_id = -1; | ||
|
|
||
| /// \brief Total duration of the entire scan operation. | ||
| DurationMs total_duration{0}; |
There was a problem hiding this comment.
Should we remove this as this is not defined by the Java implementation?
| /// | ||
| /// This variant type allows handling both report types uniformly through | ||
| /// the MetricsReporter interface. | ||
| using MetricsReport = std::variant<ScanReport, CommitReport>; |
There was a problem hiding this comment.
If we define MetricsReport as a std::variant, we cannot support customizing metrics report. For example, engines may have more metrics to report than defined by the Java implementation. Even the REST spec does not explicitly define what keys are required.
Instead, should we define the MetricsReport like below?
struct MetricsReport {
std::string kind; // can be "scan" or "commit", or whatever customized
std::unordered_map<std::string, CounterResult> counter_results;
std::unordered_map<std::string, TimerResult> timer_results;
};What do you think?
There was a problem hiding this comment.
After thinking more on this, I'm fine to use your current approach to define ScanReport and CommitReport with explicit fields. MetricsReports are collected by this library so users do not have the flexibility to customize them. We only need to customize MetricsReporter.
There was a problem hiding this comment.
I like the bag of keys approach because it provides for customization without necessarily having to require compilation. let me think a bit more about the entire thing, I might reach out on slack for a quick sync.
| /// | ||
| /// \param reporter_type Case-insensitive type identifier (e.g., "noop"). | ||
| /// \param factory Factory function that produces the reporter. | ||
| static void Register(std::string_view reporter_type, MetricsReporterFactory factory); |
There was a problem hiding this comment.
How do we support the Java parity CompositeMetricsReporter? It would be useful if we want to report metrics to multiple sinks.
This is fair, I just wanted to show the end to end picture for ease of understanding. |
| # specific language governing permissions and limitations | ||
| # under the License. | ||
|
|
||
| iceberg_install_all_headers(iceberg/metrics) |
There was a problem hiding this comment.
Missing meson.build equivalent for this subdirectory
wgtmac
left a comment
There was a problem hiding this comment.
Follow-up findings from a deeper design/parity pass over the metrics reporting changes.
| committed_ = true; | ||
| ctx_->table = std::move(commit_result.value()); | ||
|
|
||
| ReportCommitMetrics(); |
There was a problem hiding this comment.
Transaction::Commit() always calls ReportCommitMetrics() after a successful metadata commit, even for metadata-only transactions. That helper then reads current_snapshot() from the post-commit table and reports it as if this transaction created that snapshot, which can re-emit stale snapshot metrics and a misleading operation name. Java only reports commit metrics from snapshot-producing commits (CreateSnapshotEvent in SnapshotProducer), so the reporting hook should move closer to snapshot-producing updates or otherwise prove that this transaction actually created a new snapshot before emitting a CommitReport.
|
|
||
| // Load metrics reporter from catalog properties | ||
| std::shared_ptr<MetricsReporter> reporter; | ||
| auto reporter_result = MetricsReporters::Load(final_config.configs()); |
There was a problem hiding this comment.
The REST catalog now loads a local reporter from catalog properties, but it never constructs the built-in REST reporter for the /metrics endpoint and never combines the two. As a result, scan/commit reports from tables loaded through RestCatalog are never POSTed back to the server even when the server advertises ReportMetrics support. Java's RESTSessionCatalog explicitly combines the catalog reporter with RESTMetricsReporter, so this leaves the feature only partially implemented on the C++ side.
| /// \brief Infer the reporter type from properties. | ||
| std::string InferReporterType( | ||
| const std::unordered_map<std::string, std::string>& properties) { | ||
| auto it = properties.find(std::string(kMetricsReporterImpl)); |
There was a problem hiding this comment.
This loader treats metrics-reporter-impl as a lowercase registry key and defaults to noop, while Java and the Iceberg docs treat the same property as a fully qualified class name whose instance is initialized with catalog properties. On top of that, the catalog constructors ignore Load() errors and Table falls back to noop, so a Java-style config string or typo quietly disables metrics instead of surfacing a configuration error. That is a public-contract drift, not just an implementation detail.
There was a problem hiding this comment.
while Java and the Iceberg docs treat the same property as a fully qualified class name whose instance is initialized with catalog properties
the divergence is because c++ does not support reflection. I decided on making things local case to reduce the risk of typos based on caps on/off.
typo quietly disables metrics instead of surfacing a configuration error
I will update to surface a configuration error.
| /// | ||
| /// Embedded in ScanReport and populated by DataTableScan after PlanFiles() | ||
| /// completes. Mirrors the fields in Java's ScanMetricsResult. | ||
| struct ICEBERG_EXPORT ScanMetrics { |
There was a problem hiding this comment.
ScanReport embeds a plain ScanMetrics struct of raw integers and std::chrono::nanoseconds, but Java's contract is layered: live counters/timers are collected in ScanMetrics, converted into ScanMetricsResult, and then wrapped by ScanReport. That layering preserves units, optional presence, noop semantics, and serialization behavior. In C++, every metric defaults to 0, so consumers cannot distinguish "not collected" from a real zero value, and the report shape is no longer compatible with Java's richer contract.
| metric.total_equality_deletes = parse_int64(SnapshotSummaryFields::kTotalEqDeletes); | ||
| metric.added_dvs = parse_int64(SnapshotSummaryFields::kAddedDVs); | ||
| metric.removed_dvs = parse_int64(SnapshotSummaryFields::kRemovedDVs); | ||
| metric.created_manifest_count = parse_int64(SnapshotSummaryFields::kManifestsCreated); |
There was a problem hiding this comment.
CommitMetrics exposes replaced_manifest_count, but ReportCommitMetrics() never reads SnapshotSummaryFields::kManifestsReplaced into it. Java's CommitMetricsResult.from(...) does populate the corresponding manifestsReplaced field from the snapshot summary. Any consumer relying on this report will currently observe 0 even when the commit actually replaced manifests.
| for (const auto& task : tasks) { | ||
| report.scan_metrics.total_file_size_in_bytes += | ||
| task->data_file()->file_size_in_bytes; | ||
| for (const auto& del_file : task->delete_files()) { |
There was a problem hiding this comment.
The scan report counts equality and positional delete files, but it never increments indexed_delete_files and it never separates deletion vectors from ordinary positional deletes. Java's ScanMetricsUtil.indexedDeleteFile(...) increments all three counters (indexedDeleteFiles, positionalDeleteFiles/dvs, equalityDeleteFiles). Because the C++ report schema already exposes indexed_delete_files and dvs, leaving them unset produces parity drift and misleading metrics for DV-heavy tables.
|
|
||
| namespace iceberg { | ||
|
|
||
| /// \brief Metrics collected during a table commit (snapshot creation). |
There was a problem hiding this comment.
CommitReport only carries summary-derived counters, but Java's commit reporting model also has live commit metrics (totalDuration, attempts) collected through CommitMetrics and serialized through CommitMetricsResult. Because those fields are absent from the C++ public model entirely, even a future implementation cannot fully mirror Java's commit report without another API change.
| /// \param io FileIO instance for reading manifests files. | ||
| /// \param reporter Optional metrics reporter for scan metrics. | ||
| /// \param table_name Optional table name for metrics reporting. | ||
| static Result<std::unique_ptr<TableScanBuilder<ScanType>>> Make( |
There was a problem hiding this comment.
The builder stores a reporter internally, but there is no public scan-level API to add an extra reporter the way Java's Scan.metricsReporter(...) does. That means C++ can only use the reporter injected from the table/catalog path, while Java supports composing an additional reporter for a particular scan. For observability integrations and tests, that is a meaningful flexibility gap.
There was a problem hiding this comment.
good catch will address.
|
I have a design question about the |
The plan is still to keep parity as a next step I was thinking of doing serialization and then introduce the utilities. I initially elected to do it this way because I wanted to have an idea of the end to end view. Now I can probably update the first initial PR to just on the utilities if it will make your review easier I also don't want the change to be too big. |
|
@evindj Sorry I've been busy these days. I think it would be good to focus on the API design first. |
| std::shared_ptr<FileIO> file_io_; | ||
| std::string warehouse_location_; | ||
| std::unique_ptr<class InMemoryNamespace> root_namespace_; | ||
| std::shared_ptr<MetricsReporter> reporter_; |
There was a problem hiding this comment.
please add comment about this new member
| namespace { | ||
|
|
||
| /// \brief Registry type for MetricsReporter factories. | ||
| using MetricsReporterRegistry = std::unordered_map<std::string, MetricsReporterFactory>; |
There was a problem hiding this comment.
Do we have any use case that std::string fit better?
| /// \param[in] metadata The metadata for the table. | ||
| /// \param[in] metadata_location The location of the table metadata file. | ||
| /// \param[in] io The FileIO to read and write table data and metadata files. | ||
| /// \param[in] catalog The catalog that this table belongs to. |
There was a problem hiding this comment.
update comment of parameter list
| std::shared_ptr<TableMetadata> table_metadata, std::shared_ptr<FileIO> file_io, | ||
| std::shared_ptr<MetricsReporter> reporter, const std::string& table_name) | ||
| : metadata_(std::move(table_metadata)), io_(std::move(file_io)) { | ||
| context_.reporter = std::move(reporter); |
There was a problem hiding this comment.
should we also check port reporter like table.h above?
There was a problem hiding this comment.
Just want to make all usage of reporter to be consistent
| report.scan_metrics.total_data_manifests = | ||
| static_cast<int64_t>(data_manifests.size()); | ||
| report.scan_metrics.total_delete_manifests = | ||
| static_cast<int64_t>(delete_manifests.size()); |
There was a problem hiding this comment.
if the source of the data are not int64_t, why do we declare them as int64_t ?
It looke like unnecessary type casting?
|
@evindj Do you have any chance to revive this? |
Sorry for the long wait, I have been a bit busy lately. My schedule should clear toward the end of the week. |
56026ae to
b5156af
Compare
f787939 to
dbf02c0
Compare
wgtmac
left a comment
There was a problem hiding this comment.
I haven't finished my review yet. Just post my findings so far and I'll finish it in two days. I think this change looks great. Thank you for polishing the design!
| } | ||
|
|
||
| /// \brief Return a shared no-op timer singleton. | ||
| static Timer& Noop(); |
There was a problem hiding this comment.
Is it more flexible to return a std::shared_ptr<Timer>?
| virtual bool IsNoop() const { return false; } | ||
|
|
||
| /// \brief Return a shared no-op counter singleton. | ||
| static Counter& Noop(); |
There was a problem hiding this comment.
ditto, should we use std::shared_ptr<Counter> here?
| virtual void Increment(int64_t amount) = 0; | ||
|
|
||
| /// \brief Return the current count. | ||
| virtual int64_t Value() const = 0; |
There was a problem hiding this comment.
No strong opinion but by convention trivial getters usually use snake case like value() and unit().
| virtual ~Counter() = default; | ||
|
|
||
| /// \brief Increment the counter by 1. | ||
| virtual void Increment() = 0; |
There was a problem hiding this comment.
Does this have to be a pure virtual function? Can't we directly call Increment(1)?
| auto Time(Callable&& fn) { | ||
| auto timed = Start(); | ||
| if constexpr (std::is_void_v<std::invoke_result_t<Callable>>) { | ||
| std::forward<Callable>(fn)(); | ||
| timed.Stop(); | ||
| } else { | ||
| auto result = std::forward<Callable>(fn)(); | ||
| timed.Stop(); | ||
| return result; | ||
| } | ||
| } |
There was a problem hiding this comment.
| auto Time(Callable&& fn) { | |
| auto timed = Start(); | |
| if constexpr (std::is_void_v<std::invoke_result_t<Callable>>) { | |
| std::forward<Callable>(fn)(); | |
| timed.Stop(); | |
| } else { | |
| auto result = std::forward<Callable>(fn)(); | |
| timed.Stop(); | |
| return result; | |
| } | |
| } | |
| decltype(auto) Time(Callable&& fn) { | |
| auto timed = Start(); | |
| return std::forward<Callable>(fn)(); | |
| } |
How about this? It can support a wider range of return types.
| class ICEBERG_EXPORT ScanMetrics { | ||
| public: | ||
| /// \brief Create a ScanMetrics instance backed by the given MetricsContext. | ||
| static ScanMetrics Of(MetricsContext& context); |
There was a problem hiding this comment.
Should we return std::unique_ptr<ScanMetrics> instead? Downstream is free to choose unique_ptr or shared_ptr then.
There was a problem hiding this comment.
Do we want to make the default ctor private?
|
|
||
| ScanMetrics ScanMetrics::Of(MetricsContext& context) { | ||
| ScanMetrics m; | ||
| m.total_planning_duration = context.GetTimer("totalPlanningDuration"); |
There was a problem hiding this comment.
Should it be total-planning-duration?
There was a problem hiding this comment.
Same for all below. CommitMetrics are aligned.
| #include "iceberg/metrics/scan_report.h" | ||
| #include "iceberg/result.h" | ||
|
|
||
| namespace iceberg { |
There was a problem hiding this comment.
Rename this file to json_serde_internal.h to not install it by accident.
wgtmac
left a comment
There was a problem hiding this comment.
I've finished reviewing on non-test files. Great work! Let me know what you think. Thanks!
|
|
||
| /// \brief MetricsContext backed by DefaultCounter and DefaultTimer instances. | ||
| /// | ||
| /// Thread-safe for metric *increments*; the unordered_map lookup/insert is NOT |
There was a problem hiding this comment.
Should we use a mutex to make it thread safe? It should not hurt too much.
| std::shared_ptr<Timer> GetTimer(std::string_view name) override; | ||
|
|
||
| private: | ||
| std::unordered_map<std::string, std::shared_ptr<Counter>> counters_; |
There was a problem hiding this comment.
Should we use StringHash and StringEqual defined in string_util.h to enable heterougenous lookup to avoid repeated string allocation?
| /// \brief Return the null (no-op) MetricsContext singleton. | ||
| /// | ||
| /// All metrics returned by the null context are noop; nothing is allocated. | ||
| static MetricsContext& Null(); |
There was a problem hiding this comment.
Should we change the return type to std::shared_ptr<MetricsContext>? It really depends on the downstream use case. It will not be helpful if any place that uses it expects a shared_ptr.
| /// Called by MetricsReporters::Load() before the first Report() invocation. | ||
| /// The default implementation is a no-op. Override to perform property-based | ||
| /// setup (e.g., configure endpoints, credentials, sampling rates). | ||
| virtual void Initialize( |
There was a problem hiding this comment.
Do we want to return Status for both Initialize and Report just in case a report impl will actually return any error?
| const std::unordered_set<std::shared_ptr<MetricsReporter>>& Reporters() const; | ||
|
|
||
| private: | ||
| std::unordered_set<std::shared_ptr<MetricsReporter>> reporters_; |
There was a problem hiding this comment.
Usually we won't have too much reporters. Perhaps std::vector is sufficient and more performant
| /// \brief A MetricsReporter that delegates to multiple reporters. | ||
| /// | ||
| /// Combines several reporters so that every report is delivered to each of them. | ||
| /// Any exception thrown by an individual reporter is caught and swallowed; |
There was a problem hiding this comment.
Should we make it a contract that a MetricsReporter should not throw but return an error status instead?
|
|
||
| /// \brief Register a factory for a metrics reporter type. | ||
| /// | ||
| /// This method is not thread-safe. All registrations should be done during |
There was a problem hiding this comment.
How about making it thread safe?
Initial commit for addressing #533