Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions .github/workflows/sanitizer_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,18 @@ jobs:
- name: Run Tests
working-directory: build
env:
ASAN_OPTIONS: log_path=out.log:detect_leaks=1:symbolize=1:strict_string_checks=1:halt_on_error=1:detect_container_overflow=0
ASAN_OPTIONS: log_path=${{ github.workspace }}/asan.log:detect_leaks=1:symbolize=1:strict_string_checks=1:halt_on_error=1:detect_container_overflow=0
LSAN_OPTIONS: suppressions=${{ github.workspace }}/.github/lsan-suppressions.txt
UBSAN_OPTIONS: log_path=out.log:halt_on_error=1:print_stacktrace=1:suppressions=${{ github.workspace }}/.github/ubsan-suppressions.txt
UBSAN_OPTIONS: log_path=${{ github.workspace }}/ubsan.log:halt_on_error=1:print_stacktrace=1:suppressions=${{ github.workspace }}/.github/ubsan-suppressions.txt
run: |
ctest --output-on-failure
- name: Save the test output
if: always()
uses: actions/upload-artifact@043fb46d1a93c77aae656e7c1c64a875d1fc6a0a # v7.0.1
with:
name: test-output
path: build/test/out.log*
path: |
asan.log*
ubsan.log*
build/Testing/Temporary/LastTest.log
build/Testing/Temporary/LastTestsFailed.log
1 change: 1 addition & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ set(ICEBERG_SOURCES
type.cc
update/expire_snapshots.cc
update/fast_append.cc
update/merging_snapshot_update.cc
update/pending_update.cc
update/set_snapshot.cc
update/snapshot_manager.cc
Expand Down
2 changes: 1 addition & 1 deletion src/iceberg/manifest/manifest_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ struct ICEBERG_EXPORT ManifestEntry {
ManifestEntry AsAdded() const {
ManifestEntry copy = *this;
copy.status = ManifestStatus::kAdded;
if (copy.data_file->first_row_id.has_value()) {
if (copy.data_file != nullptr && copy.data_file->first_row_id.has_value()) {
copy.data_file = std::make_unique<DataFile>(*copy.data_file);
copy.data_file->first_row_id = std::nullopt;
}
Expand Down
139 changes: 113 additions & 26 deletions src/iceberg/manifest/manifest_filter_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,25 @@ Result<std::string> FormatPartitionPath(const PartitionSpecsById& specs_by_id,

} // namespace

size_t ManifestFilterManager::DeleteFileKeyHash::operator()(
const DeleteFileKey& key) const {
size_t hash = std::hash<std::string>{}(key.path);
auto combine = [&hash](const auto& value) {
size_t value_hash = value.has_value() ? std::hash<int64_t>{}(*value) : 0;
hash ^= value_hash + 0x9e3779b9 + (hash << 6) + (hash >> 2);
};
combine(key.content_offset);
combine(key.content_size_in_bytes);
return hash;
}

ManifestFilterManager::DeleteFileKey ManifestFilterManager::MakeDeleteFileKey(
const DataFile& file) {
return DeleteFileKey{.path = file.file_path,
.content_offset = file.content_offset,
.content_size_in_bytes = file.content_size_in_bytes};
}

ManifestFilterManager::ManifestFilterManager(ManifestContent content,
std::shared_ptr<FileIO> file_io)
: manifest_content_(content),
Expand Down Expand Up @@ -93,15 +112,19 @@ void ManifestFilterManager::DeleteFile(std::string_view path) {

Status ManifestFilterManager::DeleteFile(std::shared_ptr<DataFile> file) {
ICEBERG_PRECHECK(file != nullptr, "Cannot delete file: null");
delete_paths_.insert(file->file_path);
delete_files_.insert(std::move(file));
delete_file_keys_.insert(MakeDeleteFileKey(*file));
return {};
}

const DataFileSet& ManifestFilterManager::FilesToBeDeleted() const {
return delete_files_;
}

const std::vector<std::shared_ptr<DataFile>>& ManifestFilterManager::DeletedFiles()
const {
return deleted_files_;
}

void ManifestFilterManager::DropPartition(int32_t spec_id, PartitionValues partition) {
drop_partitions_.add(spec_id, std::move(partition));
}
Expand All @@ -114,15 +137,24 @@ void ManifestFilterManager::FailAnyDelete() { fail_any_delete_ = true; }

bool ManifestFilterManager::ContainsDeletes() const {
return HasRowFilterExpression(delete_expr_) || !delete_paths_.empty() ||
!drop_partitions_.empty();
!delete_file_keys_.empty() || !drop_partitions_.empty();
}

void ManifestFilterManager::DropDeleteFilesOlderThan(int64_t sequence_number) {
min_sequence_number_ = sequence_number;
}

void ManifestFilterManager::RemoveDanglingDeletesFor(const DataFileSet& deleted_files) {
for (const auto& file : deleted_files) {
removed_data_file_paths_.insert(file->file_path);
}
}

Result<bool> ManifestFilterManager::CanContainDroppedFiles(const ManifestFile&) const {
// TODO(Guotao): Use the manifest descriptor to skip unrelated object-delete
// manifests once object-delete partitions are tracked separately.
// Currently, DeleteFile(std::shared_ptr<DataFile>) degrades to a path-based delete,
// which forces scanning all manifests.
return !delete_paths_.empty();
return !delete_paths_.empty() || !delete_file_keys_.empty() ||
!removed_data_file_paths_.empty();
}

Result<bool> ManifestFilterManager::CanContainDroppedPartitions(
Expand Down Expand Up @@ -208,8 +240,9 @@ Result<bool> ManifestFilterManager::ShouldDelete(const ManifestEntry& entry,
const DataFile& file = *entry.data_file;
int32_t spec_id = file.partition_spec_id.value_or(manifest_spec_id);

// Path-based and partition-drop checks
// Path/object-based and partition-drop checks.
if (delete_paths_.count(file.file_path) ||
delete_file_keys_.count(MakeDeleteFileKey(file)) ||
drop_partitions_.contains(spec_id, file.partition)) {
if (fail_any_delete_) {
ICEBERG_ASSIGN_OR_RAISE(auto partition_path,
Expand All @@ -219,6 +252,25 @@ Result<bool> ManifestFilterManager::ShouldDelete(const ManifestEntry& entry,
return true;
}

// Delete-manifest-specific cleanup (only for ManifestContent::kDeletes).
if (manifest_content_ == ManifestContent::kDeletes) {
// Drop delete files whose data sequence number is older than the minimum
// retained by the table (they can no longer match any live data rows).
// seq == 0 (kInitialSequenceNumber / nullopt) is intentionally excluded:
// those entries predate sequence number assignment and must not be pruned.
int64_t seq = entry.sequence_number.value_or(0);
if (min_sequence_number_ > 0 && seq > 0 && seq < min_sequence_number_) {
return true;
}

// Drop DVs that reference a data file that has been removed (dangling DV).
if (!removed_data_file_paths_.empty() && file.IsDeletionVector() &&
file.referenced_data_file.has_value() &&
removed_data_file_paths_.count(*file.referenced_data_file)) {
return true;
}
}

if (HasRowFilterExpression(delete_expr_)) {
ICEBERG_ASSIGN_OR_RAISE(auto* residual_eval,
GetResidualEvaluator(schema, specs_by_id, spec_id));
Expand Down Expand Up @@ -265,8 +317,7 @@ bool ManifestFilterManager::CanTrustManifestReferences(
Result<ManifestFile> ManifestFilterManager::FilterManifest(
const std::shared_ptr<Schema>& schema, const PartitionSpecsById& specs_by_id,
const ManifestFile& manifest, bool trust_manifest_references,
const ManifestWriterFactory& writer_factory,
std::unordered_set<std::string>& found_paths) {
const ManifestWriterFactory& writer_factory, FoundDeletes& found_deletes) {
ICEBERG_ASSIGN_OR_RAISE(
auto can_contain_deleted_files,
CanContainDeletedFiles(manifest, schema, specs_by_id, trust_manifest_references));
Expand All @@ -287,7 +338,7 @@ Result<ManifestFile> ManifestFilterManager::FilterManifest(
}

return FilterManifestWithDeletedFiles(entries, spec_id, schema, specs_by_id,
writer_factory, found_paths);
writer_factory, found_deletes);
}

Result<bool> ManifestFilterManager::ManifestHasDeletedFiles(
Expand All @@ -306,21 +357,30 @@ Result<bool> ManifestFilterManager::ManifestHasDeletedFiles(
Result<ManifestFile> ManifestFilterManager::FilterManifestWithDeletedFiles(
const std::vector<ManifestEntry>& entries, int32_t manifest_spec_id,
const std::shared_ptr<Schema>& schema, const PartitionSpecsById& specs_by_id,
const ManifestWriterFactory& writer_factory,
std::unordered_set<std::string>& found_paths) {
const ManifestWriterFactory& writer_factory, FoundDeletes& found_deletes) {
ICEBERG_ASSIGN_OR_RAISE(auto writer,
writer_factory(manifest_spec_id, manifest_content_));
for (const auto& entry : entries) {
ICEBERG_ASSIGN_OR_RAISE(auto should_delete,
ShouldDelete(entry, schema, specs_by_id, manifest_spec_id));
if (should_delete) {
if (entry.data_file && delete_paths_.count(entry.data_file->file_path)) {
found_paths.insert(entry.data_file->file_path);
}
if (entry.data_file) {
// TODO(Guotao): Track duplicate deletes and avoid full DataFile copies when
// summary generation can use lighter records.
delete_files_.insert(std::make_shared<DataFile>(*entry.data_file));
const auto key = MakeDeleteFileKey(*entry.data_file);
if (delete_paths_.count(entry.data_file->file_path)) {
found_deletes.paths.insert(entry.data_file->file_path);
}
if (delete_file_keys_.count(key)) {
found_deletes.files.insert(key);
}

auto file = std::make_shared<DataFile>(*entry.data_file);
delete_files_.insert(file);
auto [_, inserted] = deleted_file_keys_.insert(key);
if (inserted) {
deleted_files_.push_back(std::move(file));
} else {
++duplicate_deletes_count_;
}
}
ICEBERG_RETURN_UNEXPECTED(writer->WriteDeletedEntry(entry));
} else {
Expand All @@ -333,18 +393,24 @@ Result<ManifestFile> ManifestFilterManager::FilterManifestWithDeletedFiles(
}

Status ManifestFilterManager::ValidateRequiredDeletes(
const std::unordered_set<std::string>& found_paths) const {
const FoundDeletes& found_deletes) const {
if (!fail_missing_delete_paths_) {
return {};
}

std::string missing;
for (const auto& path : delete_paths_) {
if (!found_paths.count(path)) {
if (!found_deletes.paths.count(path)) {
if (!missing.empty()) missing += ", ";
missing += path;
}
}
for (const auto& key : delete_file_keys_) {
if (!found_deletes.files.count(key)) {
if (!missing.empty()) missing += ", ";
missing += key.path;
}
}
if (!missing.empty()) {
return InvalidArgument("Missing delete paths: {}", missing);
}
Expand All @@ -354,8 +420,21 @@ Status ManifestFilterManager::ValidateRequiredDeletes(
Result<std::vector<ManifestFile>> ManifestFilterManager::FilterManifests(
const TableMetadata& metadata, const std::shared_ptr<Snapshot>& base_snapshot,
const ManifestWriterFactory& writer_factory) {
ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
return FilterManifests(schema, metadata, base_snapshot, writer_factory);
}

Result<std::vector<ManifestFile>> ManifestFilterManager::FilterManifests(
const std::shared_ptr<Schema>& schema, const TableMetadata& metadata,
const std::shared_ptr<Snapshot>& base_snapshot,
const ManifestWriterFactory& writer_factory) {
delete_files_.clear();
deleted_files_.clear();
deleted_file_keys_.clear();
duplicate_deletes_count_ = 0;
replaced_manifests_count_ = 0;
if (!base_snapshot) {
ICEBERG_RETURN_UNEXPECTED(ValidateRequiredDeletes({}));
ICEBERG_RETURN_UNEXPECTED(ValidateRequiredDeletes(FoundDeletes{}));
return std::vector<ManifestFile>{};
}

Expand All @@ -371,7 +450,6 @@ Result<std::vector<ManifestFile>> ManifestFilterManager::FilterManifests(
manifests.push_back(&manifest);
}

ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
TableMetadataCache metadata_cache(&metadata);
ICEBERG_ASSIGN_OR_RAISE(auto specs_by_id, metadata_cache.GetPartitionSpecsById());

Expand All @@ -394,15 +472,21 @@ Result<std::vector<ManifestFile>> ManifestFilterManager::FilterManifests(
}
}

std::unordered_set<std::string> found_paths;
FoundDeletes found_deletes;
delete_files_.clear();
deleted_files_.clear();
deleted_file_keys_.clear();
duplicate_deletes_count_ = 0;
if (manifests.empty()) {
ICEBERG_RETURN_UNEXPECTED(ValidateRequiredDeletes(found_paths));
replaced_manifests_count_ = 0;
ICEBERG_RETURN_UNEXPECTED(ValidateRequiredDeletes(found_deletes));
return std::vector<ManifestFile>{};
}

bool trust_manifest_references = CanTrustManifestReferences(manifests);
manifest_evaluator_cache_.clear();
residual_evaluator_cache_.clear();
replaced_manifests_count_ = 0;

// TODO(Guotao): Parallelize manifest filtering with per-manifest results, then
// merge found paths and deleted files after the loop.
Expand All @@ -412,11 +496,14 @@ Result<std::vector<ManifestFile>> ManifestFilterManager::FilterManifests(
ICEBERG_ASSIGN_OR_RAISE(
auto filtered_manifest,
FilterManifest(schema, specs_by_id, *manifest_ptr, trust_manifest_references,
writer_factory, found_paths));
writer_factory, found_deletes));
if (filtered_manifest.manifest_path != manifest_ptr->manifest_path) {
++replaced_manifests_count_;
}
filtered.push_back(std::move(filtered_manifest));
}

ICEBERG_RETURN_UNEXPECTED(ValidateRequiredDeletes(found_paths));
ICEBERG_RETURN_UNEXPECTED(ValidateRequiredDeletes(found_deletes));
return filtered;
}

Expand Down
Loading
Loading