diff --git a/src/iceberg/avro/avro_data_util.cc b/src/iceberg/avro/avro_data_util.cc index 17bbb394b..fb2f58bd1 100644 --- a/src/iceberg/avro/avro_data_util.cc +++ b/src/iceberg/avro/avro_data_util.cc @@ -457,6 +457,11 @@ Status AppendFieldToBuilder(const ::avro::NodePtr& avro_node, const SchemaField& projected_field, const arrow::MetadataColumnContext& metadata_context, ::arrow::ArrayBuilder* array_builder) { + if (projection.kind == FieldProjection::Kind::kNull) { + ICEBERG_ARROW_RETURN_NOT_OK(array_builder->AppendNull()); + return {}; + } + if (avro_node->type() == ::avro::AVRO_UNION) { size_t branch = avro_datum.unionBranch(); if (avro_node->leafAt(branch)->type() == ::avro::AVRO_NULL) { @@ -507,6 +512,9 @@ Status ExtractDatumFromArray(const ::arrow::Array& array, int64_t index, } if (array.IsNull(index)) { + if (datum->type() == ::avro::AVRO_NULL) { + return {}; + } if (!datum->isUnion()) [[unlikely]] { return InvalidSchema("Cannot extract null to non-union type: {}", ::avro::toString(datum->type())); diff --git a/src/iceberg/avro/avro_direct_decoder.cc b/src/iceberg/avro/avro_direct_decoder.cc index cb4e869cc..19ce77bbd 100644 --- a/src/iceberg/avro/avro_direct_decoder.cc +++ b/src/iceberg/avro/avro_direct_decoder.cc @@ -588,6 +588,12 @@ Status DecodeFieldToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& d const SchemaField& projected_field, const arrow::MetadataColumnContext& metadata_context, ::arrow::ArrayBuilder* array_builder, DecodeContext& ctx) { + if (projection.kind == FieldProjection::Kind::kNull) { + ICEBERG_RETURN_UNEXPECTED(SkipAvroValue(avro_node, decoder)); + ICEBERG_ARROW_RETURN_NOT_OK(array_builder->AppendNull()); + return {}; + } + if (avro_node->type() == ::avro::AVRO_UNION) { const size_t branch_index = decoder.decodeUnionIndex(); diff --git a/src/iceberg/avro/avro_direct_encoder.cc b/src/iceberg/avro/avro_direct_encoder.cc index caab7f699..045535077 100644 --- a/src/iceberg/avro/avro_direct_encoder.cc +++ b/src/iceberg/avro/avro_direct_encoder.cc @@ -80,7 +80,7 @@ Status EncodeArrowToAvro(const ::avro::NodePtr& avro_node, ::avro::Encoder& enco return EncodeArrowToAvro(branches.value_node, encoder, type, array, row_index, ctx); } - if (is_null) { + if (is_null && avro_node->type() != ::avro::AVRO_NULL) { return InvalidArgument("Null value in non-nullable field"); } diff --git a/src/iceberg/avro/avro_schema_util.cc b/src/iceberg/avro/avro_schema_util.cc index 3d61d283f..5a72ee6df 100644 --- a/src/iceberg/avro/avro_schema_util.cc +++ b/src/iceberg/avro/avro_schema_util.cc @@ -237,6 +237,11 @@ Status ToAvroNodeVisitor::Visit(const BinaryType& type, ::avro::NodePtr* node) { return {}; } +Status ToAvroNodeVisitor::Visit(const UnknownType&, ::avro::NodePtr* node) { + *node = std::make_shared<::avro::NodePrimitive>(::avro::AVRO_NULL); + return {}; +} + Status ToAvroNodeVisitor::Visit(const StructType& type, ::avro::NodePtr* node) { *node = std::make_shared<::avro::NodeRecord>(); @@ -338,7 +343,7 @@ Status ToAvroNodeVisitor::Visit(const SchemaField& field, ::avro::NodePtr* node) field_ids_.push(field.field_id()); ICEBERG_RETURN_UNEXPECTED(VisitTypeInline(*field.type(), /*visitor=*/this, node)); - if (field.optional()) { + if (field.optional() && (*node)->type() != ::avro::AVRO_NULL) { ::avro::MultiLeaves union_types; union_types.add(std::make_shared<::avro::NodePrimitive>(::avro::AVRO_NULL)); union_types.add(std::move(*node)); @@ -383,8 +388,8 @@ Status HasIdVisitor::Visit(const ::avro::NodePtr& node) { case ::avro::AVRO_STRING: case ::avro::AVRO_BYTES: case ::avro::AVRO_FIXED: - return {}; case ::avro::AVRO_NULL: + return {}; case ::avro::AVRO_ENUM: default: return InvalidSchema("Unsupported Avro type: {}", static_cast(node->type())); @@ -512,6 +517,10 @@ Result GetFieldId(const ::avro::NodePtr& node, size_t field_idx) { Status ValidateAvroSchemaEvolution(const Type& expected_type, const ::avro::NodePtr& avro_node) { + if (avro_node->type() == ::avro::AVRO_NULL) { + return {}; + } + switch (expected_type.type_id()) { case TypeId::kBoolean: if (avro_node->type() == ::avro::AVRO_BOOL) { @@ -615,6 +624,8 @@ Status ValidateAvroSchemaEvolution(const Type& expected_type, return {}; } break; + case TypeId::kUnknown: + return {}; default: break; } @@ -650,6 +661,35 @@ Result ProjectNested(const Type& expected_type, const ::avro::NodePtr& avro_node, bool prune_source); +Result ProjectField(const SchemaField& expected_field, + const ::avro::NodePtr& avro_node, + size_t source_index, bool prune_source) { + const Type& expected_type = *expected_field.type(); + ::avro::NodePtr field_node; + ICEBERG_RETURN_UNEXPECTED(UnwrapUnion(avro_node, &field_node)); + + FieldProjection projection; + if (expected_type.type_id() == TypeId::kUnknown || + field_node->type() == ::avro::AVRO_NULL) { + if (!expected_field.optional()) { + return InvalidSchema("Cannot project required field with ID: {} as null", + expected_field.field_id()); + } + projection.kind = FieldProjection::Kind::kNull; + return projection; + } + + if (expected_type.is_nested()) { + ICEBERG_ASSIGN_OR_RAISE(projection, + ProjectNested(expected_type, field_node, prune_source)); + } else { + ICEBERG_RETURN_UNEXPECTED(ValidateAvroSchemaEvolution(expected_type, field_node)); + } + projection.from = source_index; + projection.kind = FieldProjection::Kind::kProjected; + return projection; +} + Result ProjectStruct(const StructType& struct_type, const ::avro::NodePtr& avro_node, bool prune_source) { @@ -685,18 +725,9 @@ Result ProjectStruct(const StructType& struct_type, FieldProjection child_projection; if (auto iter = node_info_map.find(field_id); iter != node_info_map.cend()) { - ::avro::NodePtr field_node; - ICEBERG_RETURN_UNEXPECTED(UnwrapUnion(iter->second.field_node, &field_node)); - if (expected_field.type()->is_nested()) { - ICEBERG_ASSIGN_OR_RAISE( - child_projection, - ProjectNested(*expected_field.type(), field_node, prune_source)); - } else { - ICEBERG_RETURN_UNEXPECTED( - ValidateAvroSchemaEvolution(*expected_field.type(), field_node)); - } - child_projection.from = iter->second.local_index; - child_projection.kind = FieldProjection::Kind::kProjected; + ICEBERG_ASSIGN_OR_RAISE(child_projection, + ProjectField(expected_field, iter->second.field_node, + iter->second.local_index, prune_source)); } else if (MetadataColumns::IsMetadataColumn(field_id)) { child_projection.kind = FieldProjection::Kind::kMetadata; } else if (expected_field.optional()) { @@ -733,20 +764,9 @@ Result ProjectList(const ListType& list_type, } FieldProjection element_projection; - ::avro::NodePtr element_node; - ICEBERG_RETURN_UNEXPECTED(UnwrapUnion(avro_node->leafAt(0), &element_node)); - if (expected_element_field.type()->is_nested()) { - ICEBERG_ASSIGN_OR_RAISE( - element_projection, - ProjectNested(*expected_element_field.type(), element_node, prune_source)); - } else { - ICEBERG_RETURN_UNEXPECTED( - ValidateAvroSchemaEvolution(*expected_element_field.type(), element_node)); - } - - // Set the element projection metadata but preserve its children - element_projection.kind = FieldProjection::Kind::kProjected; - element_projection.from = size_t{0}; + ICEBERG_ASSIGN_OR_RAISE(element_projection, + ProjectField(expected_element_field, avro_node->leafAt(0), + size_t{0}, prune_source)); FieldProjection result; result.children.emplace_back(std::move(element_projection)); @@ -802,18 +822,10 @@ Result ProjectMap(const MapType& map_type, for (size_t i = 0; i < map_node->leaves(); ++i) { FieldProjection sub_projection; - ::avro::NodePtr sub_node; - ICEBERG_RETURN_UNEXPECTED(UnwrapUnion(map_node->leafAt(i), &sub_node)); const auto& expected_sub_field = map_type.fields()[i]; - if (expected_sub_field.type()->is_nested()) { - ICEBERG_ASSIGN_OR_RAISE(sub_projection, ProjectNested(*expected_sub_field.type(), - sub_node, prune_source)); - } else { - ICEBERG_RETURN_UNEXPECTED( - ValidateAvroSchemaEvolution(*expected_sub_field.type(), sub_node)); - } - sub_projection.kind = FieldProjection::Kind::kProjected; - sub_projection.from = i; + ICEBERG_ASSIGN_OR_RAISE( + sub_projection, + ProjectField(expected_sub_field, map_node->leafAt(i), i, prune_source)); result.children.emplace_back(std::move(sub_projection)); } @@ -1049,9 +1061,9 @@ Result<::avro::NodePtr> MakeAvroNodeWithFieldIds(const ::avro::NodePtr& original case ::avro::AVRO_STRING: case ::avro::AVRO_BYTES: case ::avro::AVRO_FIXED: + case ::avro::AVRO_NULL: // For primitive types, just return a copy return original_node; - case ::avro::AVRO_NULL: case ::avro::AVRO_ENUM: default: return InvalidSchema("Unsupported Avro type for field ID application: {}", diff --git a/src/iceberg/avro/avro_schema_util_internal.h b/src/iceberg/avro/avro_schema_util_internal.h index e3b7a7ffd..f5049e5cf 100644 --- a/src/iceberg/avro/avro_schema_util_internal.h +++ b/src/iceberg/avro/avro_schema_util_internal.h @@ -58,6 +58,7 @@ class ToAvroNodeVisitor { Status Visit(const UuidType& type, ::avro::NodePtr* node); Status Visit(const FixedType& type, ::avro::NodePtr* node); Status Visit(const BinaryType& type, ::avro::NodePtr* node); + Status Visit(const UnknownType&, ::avro::NodePtr*); Status Visit(const StructType& type, ::avro::NodePtr* node); Status Visit(const ListType& type, ::avro::NodePtr* node); Status Visit(const MapType& type, ::avro::NodePtr* node); diff --git a/src/iceberg/avro/avro_writer.cc b/src/iceberg/avro/avro_writer.cc index 63fc31462..d51725abd 100644 --- a/src/iceberg/avro/avro_writer.cc +++ b/src/iceberg/avro/avro_writer.cc @@ -20,7 +20,11 @@ #include "iceberg/avro/avro_writer.h" #include +#include +#include +#include +#include #include #include #include @@ -38,8 +42,12 @@ #include "iceberg/avro/avro_schema_util_internal.h" #include "iceberg/avro/avro_stream_internal.h" #include "iceberg/metrics_config.h" +#include "iceberg/parquet/parquet_data_util_internal.h" #include "iceberg/schema.h" #include "iceberg/schema_internal.h" +#include "iceberg/schema_util.h" +#include "iceberg/type.h" +#include "iceberg/util/checked_cast.h" #include "iceberg/util/macros.h" namespace iceberg::avro { @@ -81,6 +89,131 @@ Result> ParseCodecLevel(const WriterProperties& propertie return level; } +enum class FieldContext { + kTopLevel, + kStruct, + kListElement, + kMapKey, + kMapValue, +}; + +Result> PruneUnknownField(const SchemaField& field, + FieldContext context) { + if (field.type()->type_id() == TypeId::kUnknown) { + ICEBERG_PRECHECK(field.optional(), "Unknown type field '{}' must be optional", + field.name()); + ICEBERG_PRECHECK(context != FieldContext::kListElement, + "Cannot write list element '{}' of unknown type because it has no " + "physical Avro representation", + field.name()); + ICEBERG_PRECHECK(context != FieldContext::kMapKey, + "Cannot write map key '{}' of unknown type because it has no " + "physical Avro representation", + field.name()); + ICEBERG_PRECHECK(context != FieldContext::kMapValue, + "Cannot write map value '{}' of unknown type because it has no " + "physical Avro representation", + field.name()); + return std::nullopt; + } + + switch (field.type()->type_id()) { + case TypeId::kStruct: { + const auto& struct_type = internal::checked_cast(*field.type()); + std::vector pruned_fields; + pruned_fields.reserve(struct_type.fields().size()); + bool changed = false; + for (const auto& child : struct_type.fields()) { + ICEBERG_ASSIGN_OR_RAISE(auto pruned_child, + PruneUnknownField(child, FieldContext::kStruct)); + if (pruned_child.has_value()) { + if (!(pruned_child.value() == child)) { + changed = true; + } + pruned_fields.push_back(std::move(pruned_child.value())); + } else { + changed = true; + } + } + + if (!changed) { + return field; + } + + ICEBERG_PRECHECK( + !pruned_fields.empty(), + "Cannot write struct field '{}' because all child fields are unknown and " + "would be omitted from Avro", + field.name()); + + return SchemaField(field.field_id(), field.name(), + std::make_shared(std::move(pruned_fields)), + field.optional(), field.doc()); + } + case TypeId::kList: { + const auto& list_type = internal::checked_cast(*field.type()); + const auto& element = list_type.element(); + ICEBERG_ASSIGN_OR_RAISE(auto pruned_element, + PruneUnknownField(element, FieldContext::kListElement)); + ICEBERG_PRECHECK(pruned_element.has_value(), + "Cannot write list field '{}' because its element has no " + "physical Avro representation", + field.name()); + if (pruned_element.value() == element) { + return field; + } + return SchemaField(field.field_id(), field.name(), + std::make_shared(std::move(pruned_element.value())), + field.optional(), field.doc()); + } + case TypeId::kMap: { + const auto& map_type = internal::checked_cast(*field.type()); + ICEBERG_ASSIGN_OR_RAISE(auto pruned_key, + PruneUnknownField(map_type.key(), FieldContext::kMapKey)); + ICEBERG_ASSIGN_OR_RAISE( + auto pruned_value, + PruneUnknownField(map_type.value(), FieldContext::kMapValue)); + ICEBERG_PRECHECK(pruned_key.has_value(), + "Cannot write map field '{}' because its key has no physical " + "Avro representation", + field.name()); + ICEBERG_PRECHECK(pruned_value.has_value(), + "Cannot write map field '{}' because its value has no physical " + "Avro representation", + field.name()); + if (pruned_key.value() == map_type.key() && + pruned_value.value() == map_type.value()) { + return field; + } + return SchemaField(field.field_id(), field.name(), + std::make_shared(std::move(pruned_key.value()), + std::move(pruned_value.value())), + field.optional(), field.doc()); + } + default: + return field; + } +} + +Result> PhysicalWriteSchema(const Schema& schema) { + std::vector pruned_fields; + pruned_fields.reserve(schema.fields().size()); + for (const auto& field : schema.fields()) { + ICEBERG_ASSIGN_OR_RAISE(auto pruned_field, + PruneUnknownField(field, FieldContext::kTopLevel)); + if (pruned_field.has_value()) { + pruned_fields.push_back(std::move(pruned_field.value())); + } + } + + ICEBERG_PRECHECK( + !pruned_fields.empty(), + "Cannot write schema because all fields are unknown and would be omitted from " + "Avro"); + + return std::make_shared(std::move(pruned_fields), schema.schema_id()); +} + // Abstract base class for Avro write backends. class AvroWriteBackend { public: @@ -178,17 +311,14 @@ class GenericDatumBackend : public AvroWriteBackend { class AvroWriter::Impl { public: - ~Impl() { - if (arrow_schema_.release != nullptr) { - ArrowSchemaRelease(&arrow_schema_); - } - } - Status Open(const WriterOptions& options) { - write_schema_ = options.schema; + schema_ = options.schema; + ICEBERG_ASSIGN_OR_RAISE(physical_schema_, PhysicalWriteSchema(*schema_)); + ICEBERG_ASSIGN_OR_RAISE(projection_, iceberg::Project(*physical_schema_, *schema_, + /*prune_source=*/false)); ::avro::NodePtr root; - ICEBERG_RETURN_UNEXPECTED(ToAvroNodeVisitor{}.Visit(*write_schema_, &root)); + ICEBERG_RETURN_UNEXPECTED(ToAvroNodeVisitor{}.Visit(*physical_schema_, &root)); if (const auto& schema_name = options.properties.Get(WriterProperties::kAvroSchemaName); !schema_name.empty()) { @@ -227,19 +357,43 @@ class AvroWriter::Impl { options.properties.Get(WriterProperties::kAvroSyncInterval), codec, compression_level, metadata)); - ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*write_schema_, &arrow_schema_)); + ArrowSchema input_arrow_c_schema; + ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*schema_, &input_arrow_c_schema)); + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto input_type, + ::arrow::ImportType(&input_arrow_c_schema)); + input_arrow_type_ = internal::checked_pointer_cast<::arrow::StructType>(input_type); + input_arrow_schema_ = ::arrow::schema(input_arrow_type_->fields()); + + ArrowSchema physical_arrow_c_schema; + ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*physical_schema_, &physical_arrow_c_schema)); + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto physical_type, + ::arrow::ImportType(&physical_arrow_c_schema)); + write_arrow_type_ = + internal::checked_pointer_cast<::arrow::StructType>(physical_type); + write_arrow_schema_ = ::arrow::schema(write_arrow_type_->fields()); return {}; } Status Write(ArrowArray* data) { ICEBERG_ARROW_ASSIGN_OR_RETURN(auto result, - ::arrow::ImportArray(data, &arrow_schema_)); + ::arrow::ImportArray(data, input_arrow_type_)); + auto input_struct_array = + internal::checked_pointer_cast<::arrow::StructArray>(result); + auto batch = ::arrow::RecordBatch::Make(input_arrow_schema_, result->length(), + input_struct_array->fields()); + ICEBERG_ASSIGN_OR_RAISE( + batch, iceberg::parquet::ProjectRecordBatch( + std::move(batch), write_arrow_schema_, *physical_schema_, projection_, + arrow::MetadataColumnContext{}, ::arrow::default_memory_pool())); + + auto write_array = std::make_shared<::arrow::StructArray>( + write_arrow_type_, batch->num_rows(), batch->columns()); - for (int64_t i = 0; i < result->length(); i++) { - ICEBERG_RETURN_UNEXPECTED(backend_->WriteRow(*write_schema_, *result, i)); + for (int64_t i = 0; i < write_array->length(); i++) { + ICEBERG_RETURN_UNEXPECTED(backend_->WriteRow(*physical_schema_, *write_array, i)); } - num_records_ += result->length(); + num_records_ += write_array->length(); return {}; } @@ -267,19 +421,28 @@ class AvroWriter::Impl { if (!Closed()) { return Invalid("AvroWriter is not closed"); } - return AvroMetrics::GetMetrics(*write_schema_, num_records_, - *MetricsConfig::Default()); + return AvroMetrics::GetMetrics(*schema_, num_records_, *MetricsConfig::Default()); } private: - // The schema to write. - std::shared_ptr<::iceberg::Schema> write_schema_; + // Schema supplied by the caller. + std::shared_ptr<::iceberg::Schema> schema_; + // Schema used to write physical Avro fields after pruning unknown fields. + std::shared_ptr<::iceberg::Schema> physical_schema_; + // Arrow type used to import caller-provided ArrowArray data. + std::shared_ptr<::arrow::StructType> input_arrow_type_; + // Arrow schema used to project caller-provided data. + std::shared_ptr<::arrow::Schema> input_arrow_schema_; + // Arrow type used by the Avro writer backends. + std::shared_ptr<::arrow::StructType> write_arrow_type_; + // Arrow schema used to write physical Avro fields. + std::shared_ptr<::arrow::Schema> write_arrow_schema_; + // Projection from the logical Iceberg schema to the physical write schema. + SchemaProjection projection_; // The avro schema to write. std::shared_ptr<::avro::ValidSchema> avro_schema_; // Arrow output stream of the Avro file to write std::shared_ptr<::arrow::io::OutputStream> arrow_output_stream_; - // Arrow schema to write data. - ArrowSchema arrow_schema_; // Total length of the written Avro file. int64_t total_bytes_ = 0; // Number of records written. diff --git a/src/iceberg/json_serde.cc b/src/iceberg/json_serde.cc index 3944e510c..0902b7421 100644 --- a/src/iceberg/json_serde.cc +++ b/src/iceberg/json_serde.cc @@ -377,6 +377,8 @@ nlohmann::json ToJson(const Type& type) { } case TypeId::kUuid: return "uuid"; + case TypeId::kUnknown: + return "unknown"; } std::unreachable(); } @@ -441,12 +443,22 @@ Result> StructTypeFromJson(const nlohmann::json& json) { return std::make_unique(std::move(fields)); } +Status ValidateUnknownFieldOptional(const Type& type, bool optional, + std::string_view field_name) { + if (type.type_id() == TypeId::kUnknown && !optional) { + return JsonParseError("Unknown type field '{}' must be optional", field_name); + } + return {}; +} + Result> ListTypeFromJson(const nlohmann::json& json) { ICEBERG_ASSIGN_OR_RAISE(auto element_type, TypeFromJson(json[kElement])); ICEBERG_ASSIGN_OR_RAISE(auto element_id, GetJsonValue(json, kElementId)); ICEBERG_ASSIGN_OR_RAISE(auto element_required, GetJsonValue(json, kElementRequired)); + ICEBERG_RETURN_UNEXPECTED(ValidateUnknownFieldOptional(*element_type, !element_required, + ListType::kElementName)); return std::make_unique( SchemaField(element_id, std::string(ListType::kElementName), std::move(element_type), !element_required)); @@ -462,6 +474,11 @@ Result> MapTypeFromJson(const nlohmann::json& json) { ICEBERG_ASSIGN_OR_RAISE(auto value_id, GetJsonValue(json, kValueId)); ICEBERG_ASSIGN_OR_RAISE(auto value_required, GetJsonValue(json, kValueRequired)); + if (key_type->type_id() == TypeId::kUnknown) { + return JsonParseError("Map 'key' cannot be unknown type"); + } + ICEBERG_RETURN_UNEXPECTED( + ValidateUnknownFieldOptional(*value_type, !value_required, MapType::kValueName)); SchemaField key_field(key_id, std::string(MapType::kKeyName), std::move(key_type), /*optional=*/false); SchemaField value_field(value_id, std::string(MapType::kValueName), @@ -502,6 +519,8 @@ Result> TypeFromJson(const nlohmann::json& json) { return std::make_unique(); } else if (type_str == "uuid") { return std::make_unique(); + } else if (type_str == "unknown") { + return std::make_unique(); } else if (type_str.starts_with("fixed")) { std::regex fixed_regex(R"(fixed\[\s*(\d+)\s*\])"); std::smatch match; @@ -548,6 +567,7 @@ Result> FieldFromJson(const nlohmann::json& json) { ICEBERG_ASSIGN_OR_RAISE(auto required, GetJsonValue(json, kRequired)); ICEBERG_ASSIGN_OR_RAISE(auto doc, GetJsonValueOrDefault(json, kDoc)); + ICEBERG_RETURN_UNEXPECTED(ValidateUnknownFieldOptional(*type, !required, name)); return std::make_unique(field_id, std::move(name), std::move(type), !required, doc); } @@ -949,6 +969,7 @@ Result> ParseSchemas( for (const auto& schema_json : schema_array) { ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr schema, SchemaFromJson(schema_json)); + ICEBERG_RETURN_UNEXPECTED(schema->Validate(format_version)); if (schema->schema_id() == current_schema_id) { current_schema = schema; } @@ -965,6 +986,7 @@ Result> ParseSchemas( ICEBERG_ASSIGN_OR_RAISE(auto schema_json, GetJsonValue(json, kSchema)); ICEBERG_ASSIGN_OR_RAISE(current_schema, SchemaFromJson(schema_json)); + ICEBERG_RETURN_UNEXPECTED(current_schema->Validate(format_version)); current_schema_id = current_schema->schema_id(); schemas.push_back(current_schema); } diff --git a/src/iceberg/parquet/parquet_data_util.cc b/src/iceberg/parquet/parquet_data_util.cc index 43efd1cbd..0265f3327 100644 --- a/src/iceberg/parquet/parquet_data_util.cc +++ b/src/iceberg/parquet/parquet_data_util.cc @@ -166,7 +166,14 @@ Result> ProjectListArrayImpl( const auto& output_element_type = output_list_type->value_type(); std::shared_ptr<::arrow::Array> projected_values; - if (element_field.type()->is_nested()) { + if (element_projection.kind == FieldProjection::Kind::kNull) { + ICEBERG_ASSIGN_OR_RAISE( + projected_values, + MakeNullArray(output_element_type, list_array->values()->length(), pool)); + } else if (element_projection.kind != FieldProjection::Kind::kProjected) { + return NotImplemented("Unsupported list element projection kind: {}", + ToString(element_projection.kind)); + } else if (element_field.type()->is_nested()) { const auto& nested_type = internal::checked_cast(*element_field.type()); ICEBERG_ASSIGN_OR_RAISE( @@ -219,7 +226,14 @@ Result> ProjectMapArray( // Project keys std::shared_ptr<::arrow::Array> projected_keys; - if (key_type->is_nested()) { + if (key_projection.kind == FieldProjection::Kind::kNull) { + ICEBERG_ASSIGN_OR_RAISE( + projected_keys, + MakeNullArray(output_map_type->key_type(), map_array->keys()->length(), pool)); + } else if (key_projection.kind != FieldProjection::Kind::kProjected) { + return NotImplemented("Unsupported map key projection kind: {}", + ToString(key_projection.kind)); + } else if (key_type->is_nested()) { const auto& nested_type = internal::checked_cast(*key_type); ICEBERG_ASSIGN_OR_RAISE( projected_keys, @@ -233,7 +247,14 @@ Result> ProjectMapArray( // Project values std::shared_ptr<::arrow::Array> projected_items; - if (value_type->is_nested()) { + if (value_projection.kind == FieldProjection::Kind::kNull) { + ICEBERG_ASSIGN_OR_RAISE( + projected_items, + MakeNullArray(output_map_type->item_type(), map_array->items()->length(), pool)); + } else if (value_projection.kind != FieldProjection::Kind::kProjected) { + return NotImplemented("Unsupported map value projection kind: {}", + ToString(value_projection.kind)); + } else if (value_type->is_nested()) { const auto& nested_type = internal::checked_cast(*value_type); ICEBERG_ASSIGN_OR_RAISE( projected_items, diff --git a/src/iceberg/parquet/parquet_schema_util.cc b/src/iceberg/parquet/parquet_schema_util.cc index 849bbd1f8..8a71ce0f6 100644 --- a/src/iceberg/parquet/parquet_schema_util.cc +++ b/src/iceberg/parquet/parquet_schema_util.cc @@ -59,10 +59,59 @@ std::optional GetFieldId(const ::parquet::arrow::SchemaField& parquet_f return FieldIdFromMetadata(parquet_field.field->metadata()); } -// TODO(gangwu): support v3 unknown type +bool IsNullPhysicalField(const ::parquet::arrow::SchemaField& parquet_field) { + return parquet_field.field->type()->id() == ::arrow::Type::NA; +} + +bool HasSelectedColumn(const FieldProjection& projection) { + if (projection.attributes) { + const auto& attributes = + internal::checked_cast(*projection.attributes); + if (attributes.column_id) { + return true; + } + } + return std::ranges::any_of(projection.children, HasSelectedColumn); +} + +std::optional FirstColumnIndex( + const ::parquet::arrow::SchemaField& parquet_field) { + if (parquet_field.column_index >= 0) { + return parquet_field.column_index; + } + for (const auto& child : parquet_field.children) { + if (auto column_index = FirstColumnIndex(child)) { + return column_index; + } + } + return std::nullopt; +} + +void SelectShapeColumnIfNeeded( + FieldProjection* projection, + const std::vector<::parquet::arrow::SchemaField>& parquet_fields) { + if (HasSelectedColumn(*projection)) { + return; + } + for (const auto& parquet_field : parquet_fields) { + if (auto column_index = FirstColumnIndex(parquet_field)) { + projection->attributes = + std::make_shared(column_index.value()); + return; + } + } +} + +} // namespace + Status ValidateParquetSchemaEvolution( const Type& expected_type, const ::parquet::arrow::SchemaField& parquet_field) { const auto& arrow_type = parquet_field.field->type(); + // Some Parquet files may contain null-only physical fields. Allow reading them as + // any optional projected field type. + if (arrow_type->id() == ::arrow::Type::NA) { + return {}; + } switch (expected_type.type_id()) { case TypeId::kBoolean: if (arrow_type->id() == ::arrow::Type::BOOL) { @@ -186,6 +235,8 @@ Status ValidateParquetSchemaEvolution( } } break; + case TypeId::kUnknown: + return {}; case TypeId::kStruct: if (arrow_type->id() == ::arrow::Type::STRUCT) { return {}; @@ -209,11 +260,41 @@ Status ValidateParquetSchemaEvolution( expected_type, arrow_type->ToString()); } +namespace { + // Forward declaration Result ProjectNested( const Type& nested_type, const std::vector<::parquet::arrow::SchemaField>& parquet_fields); +Result ProjectField(const SchemaField& expected_field, + const ::parquet::arrow::SchemaField& parquet_field, + size_t source_index) { + const Type& expected_type = *expected_field.type(); + ICEBERG_RETURN_UNEXPECTED(ValidateParquetSchemaEvolution(expected_type, parquet_field)); + + FieldProjection projection; + if (expected_type.type_id() == TypeId::kUnknown || IsNullPhysicalField(parquet_field)) { + if (!expected_field.optional()) { + return InvalidSchema("Cannot project required field with id {} as null", + expected_field.field_id()); + } + projection.kind = FieldProjection::Kind::kNull; + return projection; + } + + if (expected_type.is_nested()) { + ICEBERG_ASSIGN_OR_RAISE(projection, + ProjectNested(expected_type, parquet_field.children)); + } else { + projection.attributes = + std::make_shared(parquet_field.column_index); + } + projection.from = source_index; + projection.kind = FieldProjection::Kind::kProjected; + return projection; +} + Result ProjectStruct( const StructType& struct_type, const std::vector<::parquet::arrow::SchemaField>& parquet_fields) { @@ -248,17 +329,8 @@ Result ProjectStruct( if (auto iter = field_context_map.find(field_id); iter != field_context_map.cend()) { const auto& parquet_field = iter->second.parquet_field; - ICEBERG_RETURN_UNEXPECTED( - ValidateParquetSchemaEvolution(*field.type(), parquet_field)); - if (field.type()->is_nested()) { - ICEBERG_ASSIGN_OR_RAISE(child_projection, - ProjectNested(*field.type(), parquet_field.children)); - } else { - child_projection.attributes = - std::make_shared(parquet_field.column_index); - } - child_projection.from = iter->second.local_index; - child_projection.kind = FieldProjection::Kind::kProjected; + ICEBERG_ASSIGN_OR_RAISE( + child_projection, ProjectField(field, parquet_field, iter->second.local_index)); } else if (MetadataColumns::IsMetadataColumn(field_id)) { child_projection.kind = FieldProjection::Kind::kMetadata; } else if (field.optional()) { @@ -270,6 +342,7 @@ Result ProjectStruct( result.children.emplace_back(std::move(child_projection)); } + SelectShapeColumnIfNeeded(&result, parquet_fields); PruneFieldProjection(result); return result; } @@ -294,23 +367,12 @@ Result ProjectList( element_field.field_id(), element_field_id.value()); } - ICEBERG_RETURN_UNEXPECTED( - ValidateParquetSchemaEvolution(*element_field.type(), parquet_field)); - - FieldProjection element_projection; - if (element_field.type()->is_nested()) { - ICEBERG_ASSIGN_OR_RAISE(element_projection, - ProjectNested(*element_field.type(), parquet_field.children)); - } else { - element_projection.attributes = - std::make_shared(parquet_field.column_index); - } - - element_projection.kind = FieldProjection::Kind::kProjected; - element_projection.from = size_t{0}; + ICEBERG_ASSIGN_OR_RAISE(auto element_projection, + ProjectField(element_field, parquet_field, size_t{0})); FieldProjection result; result.children.emplace_back(std::move(element_projection)); + SelectShapeColumnIfNeeded(&result, parquet_fields); return result; } @@ -346,23 +408,20 @@ Result ProjectMap( result.children.reserve(2); for (size_t i = 0; i < parquet_fields.size(); ++i) { - FieldProjection sub_projection; const auto& sub_node = parquet_fields[i]; const auto& sub_field = map_type.fields()[i]; - ICEBERG_RETURN_UNEXPECTED( - ValidateParquetSchemaEvolution(*sub_field.type(), sub_node)); - if (sub_field.type()->is_nested()) { - ICEBERG_ASSIGN_OR_RAISE(sub_projection, - ProjectNested(*sub_field.type(), sub_node.children)); - } else { - sub_projection.attributes = - std::make_shared(sub_node.column_index); + ICEBERG_ASSIGN_OR_RAISE(auto sub_projection, ProjectField(sub_field, sub_node, i)); + if (sub_projection.kind == FieldProjection::Kind::kNull && + !HasSelectedColumn(sub_projection)) { + if (auto column_index = FirstColumnIndex(sub_node)) { + sub_projection.attributes = + std::make_shared(column_index.value()); + } } - sub_projection.kind = FieldProjection::Kind::kProjected; - sub_projection.from = i; result.children.emplace_back(std::move(sub_projection)); } + SelectShapeColumnIfNeeded(&result, parquet_fields); return result; } diff --git a/src/iceberg/parquet/parquet_schema_util_internal.h b/src/iceberg/parquet/parquet_schema_util_internal.h index 8e06b0bcf..567069291 100644 --- a/src/iceberg/parquet/parquet_schema_util_internal.h +++ b/src/iceberg/parquet/parquet_schema_util_internal.h @@ -62,4 +62,8 @@ std::vector SelectedColumnIndices(const SchemaProjection& projection); /// \return True if the Parquet schema has field IDs, false otherwise. bool HasFieldIds(const ::parquet::schema::NodePtr& root_node); +/// \brief Validate whether a projected Iceberg type is compatible with a Parquet field. +Status ValidateParquetSchemaEvolution(const Type& expected_type, + const ::parquet::arrow::SchemaField& parquet_field); + } // namespace iceberg::parquet diff --git a/src/iceberg/parquet/parquet_writer.cc b/src/iceberg/parquet/parquet_writer.cc index da794cc3e..90c5df5f1 100644 --- a/src/iceberg/parquet/parquet_writer.cc +++ b/src/iceberg/parquet/parquet_writer.cc @@ -20,7 +20,9 @@ #include "iceberg/parquet/parquet_writer.h" #include +#include #include +#include #include #include @@ -34,8 +36,12 @@ #include "iceberg/arrow/arrow_io_internal.h" #include "iceberg/arrow/arrow_status_internal.h" +#include "iceberg/parquet/parquet_data_util_internal.h" #include "iceberg/parquet/parquet_metrics_internal.h" #include "iceberg/schema_internal.h" +#include "iceberg/schema_util.h" +#include "iceberg/type.h" +#include "iceberg/util/checked_cast.h" #include "iceberg/util/macros.h" namespace iceberg::parquet { @@ -47,6 +53,131 @@ Result> OpenOutputStream( return arrow::OpenArrowOutputStream(options.io, options.path); } +enum class FieldContext { + kTopLevel, + kStruct, + kListElement, + kMapKey, + kMapValue, +}; + +Result> PruneUnknownField(const SchemaField& field, + FieldContext context) { + if (field.type()->type_id() == TypeId::kUnknown) { + ICEBERG_PRECHECK(field.optional(), "Unknown type field '{}' must be optional", + field.name()); + ICEBERG_PRECHECK(context != FieldContext::kListElement, + "Cannot write list element '{}' of unknown type because it has no " + "physical Parquet representation", + field.name()); + ICEBERG_PRECHECK(context != FieldContext::kMapKey, + "Cannot write map key '{}' of unknown type because it has no " + "physical Parquet representation", + field.name()); + ICEBERG_PRECHECK(context != FieldContext::kMapValue, + "Cannot write map value '{}' of unknown type because it has no " + "physical Parquet representation", + field.name()); + return std::nullopt; + } + + switch (field.type()->type_id()) { + case TypeId::kStruct: { + const auto& struct_type = internal::checked_cast(*field.type()); + std::vector pruned_fields; + pruned_fields.reserve(struct_type.fields().size()); + bool changed = false; + for (const auto& child : struct_type.fields()) { + ICEBERG_ASSIGN_OR_RAISE(auto pruned_child, + PruneUnknownField(child, FieldContext::kStruct)); + if (pruned_child.has_value()) { + if (!(pruned_child.value() == child)) { + changed = true; + } + pruned_fields.push_back(std::move(pruned_child.value())); + } else { + changed = true; + } + } + + if (!changed) { + return field; + } + + ICEBERG_PRECHECK( + !pruned_fields.empty(), + "Cannot write struct field '{}' because all child fields are unknown and " + "would be omitted from Parquet", + field.name()); + + return SchemaField(field.field_id(), field.name(), + std::make_shared(std::move(pruned_fields)), + field.optional(), field.doc()); + } + case TypeId::kList: { + const auto& list_type = internal::checked_cast(*field.type()); + const auto& element = list_type.element(); + ICEBERG_ASSIGN_OR_RAISE(auto pruned_element, + PruneUnknownField(element, FieldContext::kListElement)); + ICEBERG_PRECHECK(pruned_element.has_value(), + "Cannot write list field '{}' because its element has no " + "physical Parquet representation", + field.name()); + if (pruned_element.value() == element) { + return field; + } + return SchemaField(field.field_id(), field.name(), + std::make_shared(std::move(pruned_element.value())), + field.optional(), field.doc()); + } + case TypeId::kMap: { + const auto& map_type = internal::checked_cast(*field.type()); + ICEBERG_ASSIGN_OR_RAISE(auto pruned_key, + PruneUnknownField(map_type.key(), FieldContext::kMapKey)); + ICEBERG_ASSIGN_OR_RAISE( + auto pruned_value, + PruneUnknownField(map_type.value(), FieldContext::kMapValue)); + ICEBERG_PRECHECK(pruned_key.has_value(), + "Cannot write map field '{}' because its key has no physical " + "Parquet representation", + field.name()); + ICEBERG_PRECHECK(pruned_value.has_value(), + "Cannot write map field '{}' because its value has no physical " + "Parquet representation", + field.name()); + if (pruned_key.value() == map_type.key() && + pruned_value.value() == map_type.value()) { + return field; + } + return SchemaField(field.field_id(), field.name(), + std::make_shared(std::move(pruned_key.value()), + std::move(pruned_value.value())), + field.optional(), field.doc()); + } + default: + return field; + } +} + +Result> PhysicalWriteSchema(const Schema& schema) { + std::vector pruned_fields; + pruned_fields.reserve(schema.fields().size()); + for (const auto& field : schema.fields()) { + ICEBERG_ASSIGN_OR_RAISE(auto pruned_field, + PruneUnknownField(field, FieldContext::kTopLevel)); + if (pruned_field.has_value()) { + pruned_fields.push_back(std::move(pruned_field.value())); + } + } + + ICEBERG_PRECHECK( + !pruned_fields.empty(), + "Cannot write schema because all fields are unknown and would be omitted from " + "Parquet"); + + return std::make_shared(std::move(pruned_fields), schema.schema_id()); +} + Result<::arrow::Compression::type> ParseCompression(const WriterProperties& properties) { const auto& compression_name = properties.Get(WriterProperties::kParquetCompression); if (compression_name == "uncompressed") { @@ -89,6 +220,9 @@ class ParquetWriter::Impl { public: Status Open(const WriterOptions& options) { schema_ = options.schema; + ICEBERG_ASSIGN_OR_RAISE(physical_schema_, PhysicalWriteSchema(*schema_)); + ICEBERG_ASSIGN_OR_RAISE(projection_, iceberg::Project(*physical_schema_, *schema_, + /*prune_source=*/false)); ICEBERG_ASSIGN_OR_RAISE(auto compression, ParseCompression(options.properties)); ICEBERG_ASSIGN_OR_RAISE(auto compression_level, ParseCodecLevel(options.properties)); @@ -108,10 +242,15 @@ class ParquetWriter::Impl { ArrowSchema c_schema; ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*schema_, &c_schema)); - ICEBERG_ARROW_ASSIGN_OR_RETURN(arrow_schema_, ::arrow::ImportSchema(&c_schema)); + ICEBERG_ARROW_ASSIGN_OR_RETURN(input_arrow_schema_, ::arrow::ImportSchema(&c_schema)); + + ArrowSchema physical_c_schema; + ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*physical_schema_, &physical_c_schema)); + ICEBERG_ARROW_ASSIGN_OR_RETURN(write_arrow_schema_, + ::arrow::ImportSchema(&physical_c_schema)); ICEBERG_ARROW_RETURN_NOT_OK( - ::parquet::arrow::ToParquetSchema(arrow_schema_.get(), *writer_properties, + ::parquet::arrow::ToParquetSchema(write_arrow_schema_.get(), *writer_properties, *arrow_writer_properties, &parquet_schema_)); auto schema_node = std::static_pointer_cast<::parquet::schema::GroupNode>( parquet_schema_->schema_root()); @@ -123,9 +262,9 @@ class ParquetWriter::Impl { auto file_writer = ::parquet::ParquetFileWriter::Open( output_stream_, std::move(schema_node), std::move(writer_properties), std::make_shared<::arrow::KeyValueMetadata>(options.metadata)); - ICEBERG_ARROW_RETURN_NOT_OK( - ::parquet::arrow::FileWriter::Make(pool_, std::move(file_writer), arrow_schema_, - std::move(arrow_writer_properties), &writer_)); + ICEBERG_ARROW_RETURN_NOT_OK(::parquet::arrow::FileWriter::Make( + pool_, std::move(file_writer), write_arrow_schema_, + std::move(arrow_writer_properties), &writer_)); metrics_config_ = options.metrics_config; @@ -133,8 +272,12 @@ class ParquetWriter::Impl { } Status Write(ArrowArray* array) { - ICEBERG_ARROW_ASSIGN_OR_RETURN(auto batch, - ::arrow::ImportRecordBatch(array, arrow_schema_)); + ICEBERG_ARROW_ASSIGN_OR_RETURN( + auto batch, ::arrow::ImportRecordBatch(array, input_arrow_schema_)); + ICEBERG_ASSIGN_OR_RAISE( + batch, + ProjectRecordBatch(std::move(batch), write_arrow_schema_, *physical_schema_, + projection_, arrow::MetadataColumnContext{}, pool_)); ICEBERG_ARROW_RETURN_NOT_OK(writer_->WriteRecordBatch(*batch)); @@ -189,8 +332,14 @@ class ParquetWriter::Impl { ::arrow::MemoryPool* pool_ = ::arrow::default_memory_pool(); // Schema to write from the Iceberg table. std::shared_ptr schema_; - // Schema to write from the Parquet file. - std::shared_ptr<::arrow::Schema> arrow_schema_; + // Schema used to write physical Parquet fields after pruning unknown fields. + std::shared_ptr physical_schema_; + // Schema used to import caller-provided ArrowArray data. + std::shared_ptr<::arrow::Schema> input_arrow_schema_; + // Schema used by the Parquet writer. + std::shared_ptr<::arrow::Schema> write_arrow_schema_; + // Projection from the logical Iceberg schema to the physical write schema. + SchemaProjection projection_; // Parquet schema descriptor generated from the Arrow schema. std::shared_ptr<::parquet::SchemaDescriptor> parquet_schema_; // Metrics config for collecting metrics during write. diff --git a/src/iceberg/row/arrow_array_wrapper.cc b/src/iceberg/row/arrow_array_wrapper.cc index e97293bcd..0d6d48c15 100644 --- a/src/iceberg/row/arrow_array_wrapper.cc +++ b/src/iceberg/row/arrow_array_wrapper.cc @@ -44,6 +44,8 @@ Result ExtractValue(const ArrowSchema* schema, const ArrowArray* array, } switch (array_view->storage_type) { + case NANOARROW_TYPE_NA: + return std::monostate{}; case NANOARROW_TYPE_BOOL: return static_cast(ArrowArrayViewGetIntUnsafe(array_view, index)); case NANOARROW_TYPE_INT32: diff --git a/src/iceberg/schema.cc b/src/iceberg/schema.cc index 5fdd47998..5f8431d8d 100644 --- a/src/iceberg/schema.cc +++ b/src/iceberg/schema.cc @@ -35,6 +35,45 @@ namespace iceberg { +namespace { + +Status ValidateUnknownFieldsOptional(const Type& type) { + switch (type.type_id()) { + case TypeId::kStruct: { + const auto& struct_type = static_cast(type); + for (const auto& field : struct_type.fields()) { + ICEBERG_PRECHECK(field.optional() || field.type()->type_id() != TypeId::kUnknown, + "Unknown type field '{}' must be optional", field.name()); + ICEBERG_RETURN_UNEXPECTED(ValidateUnknownFieldsOptional(*field.type())); + } + return {}; + } + case TypeId::kList: { + const auto& list_type = static_cast(type); + const auto& element = list_type.element(); + ICEBERG_PRECHECK( + element.optional() || element.type()->type_id() != TypeId::kUnknown, + "Unknown type field '{}' must be optional", element.name()); + return ValidateUnknownFieldsOptional(*element.type()); + } + case TypeId::kMap: { + const auto& map_type = static_cast(type); + const auto& key = map_type.key(); + const auto& value = map_type.value(); + ICEBERG_PRECHECK(key.type()->type_id() != TypeId::kUnknown, + "Map 'key' cannot be unknown type"); + ICEBERG_PRECHECK(value.optional() || value.type()->type_id() != TypeId::kUnknown, + "Unknown type field '{}' must be optional", value.name()); + ICEBERG_RETURN_UNEXPECTED(ValidateUnknownFieldsOptional(*key.type())); + return ValidateUnknownFieldsOptional(*value.type()); + } + default: + return {}; + } +} + +} // namespace + Schema::Schema(std::vector fields, int32_t schema_id) : StructType(std::move(fields)), schema_id_(schema_id), @@ -282,6 +321,8 @@ bool Schema::SameSchema(const Schema& other) const { } Status Schema::Validate(int32_t format_version) const { + ICEBERG_RETURN_UNEXPECTED(ValidateUnknownFieldsOptional(*this)); + // Get all fields including nested ones ICEBERG_ASSIGN_OR_RAISE(auto id_to_field, cache_->GetIdToFieldMap()); diff --git a/src/iceberg/schema_internal.cc b/src/iceberg/schema_internal.cc index bdd5b859f..c32ceb2a6 100644 --- a/src/iceberg/schema_internal.cc +++ b/src/iceberg/schema_internal.cc @@ -150,6 +150,9 @@ ArrowErrorCode ToArrowSchema(const Type& type, bool optional, std::string_view n ArrowMetadataBuilderAppend(&metadata_buffer, ArrowCharView(kArrowExtensionName), ArrowCharView(kArrowUuidExtensionName))); } break; + case TypeId::kUnknown: + NANOARROW_RETURN_NOT_OK(ArrowSchemaSetType(schema, NANOARROW_TYPE_NA)); + break; } if (!name.empty()) { @@ -217,6 +220,9 @@ Result> FromArrowSchema(const ArrowSchema& schema) { auto field_id = GetFieldId(schema); bool is_optional = (schema.flags & ARROW_FLAG_NULLABLE) != 0; + if (field_type->type_id() == TypeId::kUnknown && !is_optional) { + return InvalidSchema("Arrow null field '{}' must be nullable", schema.name); + } return std::make_unique(field_id, schema.name, std::move(field_type), is_optional); }; @@ -312,6 +318,8 @@ Result> FromArrowSchema(const ArrowSchema& schema) { } return iceberg::fixed(schema_view.fixed_size); } + case NANOARROW_TYPE_NA: + return iceberg::unknown(); default: return InvalidSchema("Unsupported Arrow type: {}", ArrowTypeString(schema_view.type)); diff --git a/src/iceberg/schema_util.cc b/src/iceberg/schema_util.cc index 4acdab631..68e18b03f 100644 --- a/src/iceberg/schema_util.cc +++ b/src/iceberg/schema_util.cc @@ -49,6 +49,9 @@ Status ValidateSchemaEvolution(const Type& expected_type, const Type& source_typ if (expected_type == source_type) { return {}; } + if (source_type.type_id() == TypeId::kUnknown && expected_type.is_primitive()) { + return {}; + } switch (expected_type.type_id()) { case TypeId::kLong: { @@ -79,6 +82,49 @@ Status ValidateSchemaEvolution(const Type& expected_type, const Type& source_typ return NotSupported("Cannot read {} from {}", expected_type, source_type); } +Result ProjectNested(const Type& expected_type, const Type& source_type, + bool prune_source); + +Result ProjectField(const SchemaField& expected_field, + const SchemaField& source_field, size_t source_index, + bool prune_source) { + FieldProjection projection; + + if (expected_field.type()->type_id() == TypeId::kUnknown) { + if (!expected_field.optional()) { + return InvalidSchema("Cannot project required field with id {} as null", + expected_field.field_id()); + } + projection.kind = FieldProjection::Kind::kNull; + return projection; + } + + if (source_field.type()->type_id() == TypeId::kUnknown && !expected_field.optional()) { + return InvalidSchema("Cannot project required field with id {} as null", + expected_field.field_id()); + } + + if (expected_field.type()->is_nested()) { + if (source_field.type()->type_id() == TypeId::kUnknown) { + ICEBERG_RETURN_UNEXPECTED( + ValidateSchemaEvolution(*expected_field.type(), *source_field.type())); + } + ICEBERG_ASSIGN_OR_RAISE( + projection, + ProjectNested(*expected_field.type(), *source_field.type(), prune_source)); + } else { + ICEBERG_RETURN_UNEXPECTED( + ValidateSchemaEvolution(*expected_field.type(), *source_field.type())); + } + + // If `prune_source` is false, all fields will be read so the local index is exactly + // the position to read data. Otherwise, the local index is computed by pruning all + // non-projected fields. + projection.from = source_index; + projection.kind = FieldProjection::Kind::kProjected; + return projection; +} + Result ProjectNested(const Type& expected_type, const Type& source_type, bool prune_source) { if (!expected_type.is_nested()) { @@ -120,19 +166,9 @@ Result ProjectNested(const Type& expected_type, const Type& sou FieldProjection child_projection; if (auto iter = source_field_map.find(field_id); iter != source_field_map.cend()) { - if (expected_field.type()->is_nested()) { - ICEBERG_ASSIGN_OR_RAISE(child_projection, - ProjectNested(*expected_field.type(), - *iter->second.field->type(), prune_source)); - } else { - ICEBERG_RETURN_UNEXPECTED( - ValidateSchemaEvolution(*expected_field.type(), *iter->second.field->type())); - } - // If `prune_source` is false, all fields will be read so the local index - // is exactly the position to read data. Otherwise, the local index is computed - // by pruning all non-projected fields - child_projection.from = iter->second.local_index; - child_projection.kind = FieldProjection::Kind::kProjected; + ICEBERG_ASSIGN_OR_RAISE(child_projection, + ProjectField(expected_field, *iter->second.field, + iter->second.local_index, prune_source)); } else if (MetadataColumns::IsMetadataColumn(field_id)) { child_projection.kind = FieldProjection::Kind::kMetadata; } else if (expected_field.optional()) { diff --git a/src/iceberg/table_metadata.h b/src/iceberg/table_metadata.h index 2f0c7e181..335fedadc 100644 --- a/src/iceberg/table_metadata.h +++ b/src/iceberg/table_metadata.h @@ -80,6 +80,7 @@ struct ICEBERG_EXPORT TableMetadata { static inline const std::unordered_map kMinFormatVersions = { {TypeId::kTimestampNs, 3}, {TypeId::kTimestampTzNs, 3}, + {TypeId::kUnknown, 3}, }; /// An integer version number for the format diff --git a/src/iceberg/test/arrow_test.cc b/src/iceberg/test/arrow_test.cc index dcfdb6b56..2e2a80096 100644 --- a/src/iceberg/test/arrow_test.cc +++ b/src/iceberg/test/arrow_test.cc @@ -107,7 +107,9 @@ INSTANTIATE_TEST_SUITE_P( ToArrowSchemaParam{.iceberg_type = iceberg::uuid(), .arrow_type = ::arrow::extension::uuid()}, ToArrowSchemaParam{.iceberg_type = iceberg::fixed(20), - .arrow_type = ::arrow::fixed_size_binary(20)})); + .arrow_type = ::arrow::fixed_size_binary(20)}, + ToArrowSchemaParam{.iceberg_type = iceberg::unknown(), + .arrow_type = ::arrow::null()})); namespace { @@ -233,6 +235,81 @@ TEST(ToArrowSchemaTest, MapType) { /*nullable=*/true, kValueFieldId)); } +TEST(ToArrowSchemaTest, NestedUnknownFieldsRoundTrip) { + Schema schema( + { + SchemaField::MakeOptional( + /*field_id=*/1, "profile", + std::make_shared(std::vector{ + SchemaField::MakeOptional(/*field_id=*/2, "mystery", + iceberg::unknown()), + })), + SchemaField::MakeOptional( + /*field_id=*/3, "mysteries", + std::make_shared(SchemaField::MakeOptional( + /*field_id=*/4, "element", iceberg::unknown()))), + SchemaField::MakeOptional( + /*field_id=*/5, "properties", + std::make_shared( + SchemaField::MakeRequired(/*field_id=*/6, "key", iceberg::string()), + SchemaField::MakeOptional(/*field_id=*/7, "value", + iceberg::unknown()))), + }, + /*schema_id=*/0); + + ArrowSchema arrow_c_schema; + ASSERT_THAT(ToArrowSchema(schema, &arrow_c_schema), IsOk()); + + auto imported_schema = ::arrow::ImportSchema(&arrow_c_schema).ValueOrDie(); + ASSERT_EQ(imported_schema->num_fields(), 3); + + auto profile_type = + std::static_pointer_cast<::arrow::StructType>(imported_schema->field(0)->type()); + ASSERT_EQ(profile_type->num_fields(), 1); + ASSERT_NO_FATAL_FAILURE(CheckArrowField(*profile_type->field(0), ::arrow::Type::NA, + "mystery", /*nullable=*/true, + /*field_id=*/2)); + + auto mysteries_type = + std::static_pointer_cast<::arrow::ListType>(imported_schema->field(1)->type()); + ASSERT_NO_FATAL_FAILURE(CheckArrowField(*mysteries_type->value_field(), + ::arrow::Type::NA, "element", + /*nullable=*/true, /*field_id=*/4)); + + auto properties_type = + std::static_pointer_cast<::arrow::MapType>(imported_schema->field(2)->type()); + ASSERT_NO_FATAL_FAILURE(CheckArrowField(*properties_type->key_field(), + ::arrow::Type::STRING, "key", + /*nullable=*/false, /*field_id=*/6)); + ASSERT_NO_FATAL_FAILURE(CheckArrowField(*properties_type->item_field(), + ::arrow::Type::NA, "value", + /*nullable=*/true, /*field_id=*/7)); + + ArrowSchema exported_schema; + ASSERT_TRUE(::arrow::ExportSchema(*imported_schema, &exported_schema).ok()); + auto schema_result = FromArrowSchema(exported_schema, /*schema_id=*/0); + ASSERT_THAT(schema_result, IsOk()); + ArrowSchemaRelease(&exported_schema); + + const auto& round_tripped_schema = *schema_result.value(); + ASSERT_EQ(round_tripped_schema.fields().size(), 3); + + const auto* profile = + dynamic_cast(round_tripped_schema.fields()[0].type().get()); + ASSERT_NE(profile, nullptr); + ASSERT_EQ(profile->fields()[0].type()->type_id(), TypeId::kUnknown); + + const auto* mysteries = + dynamic_cast(round_tripped_schema.fields()[1].type().get()); + ASSERT_NE(mysteries, nullptr); + ASSERT_EQ(mysteries->fields()[0].type()->type_id(), TypeId::kUnknown); + + const auto* properties = + dynamic_cast(round_tripped_schema.fields()[2].type().get()); + ASSERT_NE(properties, nullptr); + ASSERT_EQ(properties->value().type()->type_id(), TypeId::kUnknown); +} + struct FromArrowSchemaParam { std::shared_ptr arrow_type; bool optional = true; @@ -307,7 +384,51 @@ INSTANTIATE_TEST_SUITE_P( FromArrowSchemaParam{.arrow_type = ::arrow::extension::uuid(), .iceberg_type = iceberg::uuid()}, FromArrowSchemaParam{.arrow_type = ::arrow::fixed_size_binary(20), - .iceberg_type = iceberg::fixed(20)})); + .iceberg_type = iceberg::fixed(20)}, + FromArrowSchemaParam{.arrow_type = ::arrow::null(), + .iceberg_type = iceberg::unknown()})); + +TEST(FromArrowSchemaTest, RejectRequiredNullFieldAsUnknown) { + auto metadata = + ::arrow::key_value_metadata(std::unordered_map{ + {std::string(kParquetFieldIdKey), "1"}}); + auto arrow_schema = ::arrow::schema({::arrow::field( + "mystery", ::arrow::null(), /*nullable=*/false, std::move(metadata))}); + + ArrowSchema exported_schema; + ASSERT_TRUE(::arrow::ExportSchema(*arrow_schema, &exported_schema).ok()); + + auto schema_result = FromArrowSchema(exported_schema, /*schema_id=*/0); + ArrowSchemaRelease(&exported_schema); + + ASSERT_THAT(schema_result, IsError(ErrorKind::kInvalidSchema)); + ASSERT_THAT(schema_result, + HasErrorMessage("Arrow null field 'mystery' must be nullable")); +} + +TEST(FromArrowSchemaTest, RejectRequiredNullListElementAsUnknown) { + auto list_metadata = + ::arrow::key_value_metadata(std::unordered_map{ + {std::string(kParquetFieldIdKey), "1"}}); + auto element_metadata = + ::arrow::key_value_metadata(std::unordered_map{ + {std::string(kParquetFieldIdKey), "2"}}); + auto element_field = ::arrow::field("element", ::arrow::null(), /*nullable=*/false, + std::move(element_metadata)); + auto arrow_schema = + ::arrow::schema({::arrow::field("mysteries", ::arrow::list(element_field), + /*nullable=*/true, std::move(list_metadata))}); + + ArrowSchema exported_schema; + ASSERT_TRUE(::arrow::ExportSchema(*arrow_schema, &exported_schema).ok()); + + auto schema_result = FromArrowSchema(exported_schema, /*schema_id=*/0); + ArrowSchemaRelease(&exported_schema); + + ASSERT_THAT(schema_result, IsError(ErrorKind::kInvalidSchema)); + ASSERT_THAT(schema_result, + HasErrorMessage("Arrow null field 'element' must be nullable")); +} TEST(FromArrowSchemaTest, StructType) { constexpr int32_t kStructFieldId = 1; diff --git a/src/iceberg/test/avro_data_test.cc b/src/iceberg/test/avro_data_test.cc index 2979ad9bd..7731f58d3 100644 --- a/src/iceberg/test/avro_data_test.cc +++ b/src/iceberg/test/avro_data_test.cc @@ -1241,6 +1241,27 @@ TEST(ExtractDatumFromArrayTest, NullHandling) { EXPECT_EQ(record2.fieldAt(0).type(), ::avro::AVRO_NULL); } +TEST(ExtractDatumFromArrayTest, UnknownType) { + Schema iceberg_schema({SchemaField::MakeOptional(1, "a", unknown())}); + ::avro::NodePtr avro_node; + ASSERT_THAT(ToAvroNodeVisitor{}.Visit(iceberg_schema, &avro_node), IsOk()); + + ArrowSchema arrow_c_schema; + ASSERT_THAT(ToArrowSchema(iceberg_schema, &arrow_c_schema), IsOk()); + auto arrow_schema = ::arrow::ImportSchema(&arrow_c_schema).ValueOrDie(); + auto arrow_struct_type = std::make_shared<::arrow::StructType>(arrow_schema->fields()); + + auto arrow_array = + ::arrow::json::ArrayFromJSONString(arrow_struct_type, R"([{"a": null}])") + .ValueOrDie(); + + ::avro::GenericDatum datum(avro_node); + ASSERT_THAT(ExtractDatumFromArray(*arrow_array, 0, &datum), IsOk()); + + const auto& record = datum.value<::avro::GenericRecord>(); + EXPECT_EQ(record.fieldAt(0).type(), ::avro::AVRO_NULL); +} + struct RoundTripParam { std::string name; std::shared_ptr iceberg_schema; diff --git a/src/iceberg/test/avro_schema_test.cc b/src/iceberg/test/avro_schema_test.cc index dc2cb0a51..ffb668abc 100644 --- a/src/iceberg/test/avro_schema_test.cc +++ b/src/iceberg/test/avro_schema_test.cc @@ -250,6 +250,12 @@ TEST(ToAvroNodeVisitorTest, BinaryType) { EXPECT_EQ(node->type(), ::avro::AVRO_BYTES); } +TEST(ToAvroNodeVisitorTest, UnknownType) { + ::avro::NodePtr node; + EXPECT_THAT(ToAvroNodeVisitor{}.Visit(UnknownType{}, &node), IsOk()); + EXPECT_EQ(node->type(), ::avro::AVRO_NULL); +} + TEST(ToAvroNodeVisitorTest, StructType) { StructType struct_type{{SchemaField{/*field_id=*/1, "bool_field", iceberg::boolean(), /*optional=*/false}, @@ -276,6 +282,70 @@ TEST(ToAvroNodeVisitorTest, StructType) { EXPECT_EQ(node->leafAt(1)->leafAt(1)->type(), ::avro::AVRO_INT); } +TEST(ToAvroNodeVisitorTest, OptionalUnknownField) { + StructType struct_type{{SchemaField{/*field_id=*/1, "mystery", iceberg::unknown(), + /*optional=*/true}}}; + + ::avro::NodePtr node; + EXPECT_THAT(ToAvroNodeVisitor{}.Visit(struct_type, &node), IsOk()); + + ASSERT_EQ(node->leaves(), 1); + EXPECT_EQ(node->leafAt(0)->type(), ::avro::AVRO_NULL); + ASSERT_EQ(node->customAttributes(), 1); + ASSERT_NO_FATAL_FAILURE(CheckFieldIdAt(node, /*index=*/0, /*field_id=*/1)); +} + +TEST(ToAvroNodeVisitorTest, NestedUnknownFields) { + StructType struct_type{ + {SchemaField::MakeOptional( + /*field_id=*/1, "profile", + std::make_shared(std::vector{ + SchemaField::MakeOptional(/*field_id=*/2, "mystery", iceberg::unknown()), + })), + SchemaField::MakeOptional( + /*field_id=*/3, "mysteries", + std::make_shared(SchemaField::MakeOptional( + /*field_id=*/4, "element", iceberg::unknown()))), + SchemaField::MakeOptional( + /*field_id=*/5, "properties", + std::make_shared( + SchemaField::MakeRequired(/*field_id=*/6, "key", iceberg::string()), + SchemaField::MakeOptional(/*field_id=*/7, "value", iceberg::unknown())))}}; + + ::avro::NodePtr node; + EXPECT_THAT(ToAvroNodeVisitor{}.Visit(struct_type, &node), IsOk()); + + ASSERT_EQ(node->leaves(), 3); + auto profile_union = node->leafAt(0); + ASSERT_EQ(profile_union->type(), ::avro::AVRO_UNION); + auto profile_node = profile_union->leafAt(1); + ASSERT_EQ(profile_node->type(), ::avro::AVRO_RECORD); + ASSERT_EQ(profile_node->leaves(), 1); + EXPECT_EQ(profile_node->leafAt(0)->type(), ::avro::AVRO_NULL); + ASSERT_NO_FATAL_FAILURE(CheckFieldIdAt(profile_node, /*index=*/0, /*field_id=*/2)); + + auto list_union = node->leafAt(1); + ASSERT_EQ(list_union->type(), ::avro::AVRO_UNION); + auto list_node = list_union->leafAt(1); + ASSERT_EQ(list_node->type(), ::avro::AVRO_ARRAY); + ASSERT_EQ(list_node->leaves(), 1); + EXPECT_EQ(list_node->leafAt(0)->type(), ::avro::AVRO_NULL); + ASSERT_NO_FATAL_FAILURE(CheckFieldIdAt(list_node, /*index=*/0, /*field_id=*/4, + /*key=*/"element-id")); + + auto map_union = node->leafAt(2); + ASSERT_EQ(map_union->type(), ::avro::AVRO_UNION); + auto map_node = map_union->leafAt(1); + ASSERT_EQ(map_node->type(), ::avro::AVRO_MAP); + ASSERT_EQ(map_node->leaves(), 2); + EXPECT_EQ(map_node->leafAt(0)->type(), ::avro::AVRO_STRING); + EXPECT_EQ(map_node->leafAt(1)->type(), ::avro::AVRO_NULL); + ASSERT_NO_FATAL_FAILURE(CheckFieldIdAt(map_node, /*index=*/0, /*field_id=*/6, + /*key=*/"key-id")); + ASSERT_NO_FATAL_FAILURE(CheckFieldIdAt(map_node, /*index=*/0, /*field_id=*/7, + /*key=*/"value-id")); +} + TEST(ToAvroNodeVisitorTest, StructTypeWithFieldNames) { StructType struct_type{ {SchemaField{/*field_id=*/1, "user-name", iceberg::string(), @@ -480,6 +550,13 @@ TEST(HasIdVisitorTest, HasNoIds) { EXPECT_FALSE(visitor.AllHaveIds()); } +TEST(HasIdVisitorTest, NullType) { + HasIdVisitor visitor; + EXPECT_THAT(visitor.Visit(::avro::compileJsonSchemaFromString("\"null\"")), IsOk()); + EXPECT_TRUE(visitor.HasNoIds()); + EXPECT_FALSE(visitor.AllHaveIds()); +} + TEST(HasIdVisitorTest, RecordWithFieldIds) { const std::string schema_json = R"({ "type": "record", @@ -899,6 +976,146 @@ TEST(AvroSchemaProjectionTest, ProjectSchemaEvolutionFloatToDouble) { ASSERT_EQ(std::get<1>(projection.fields[0].from), 0); } +TEST(AvroSchemaProjectionTest, ProjectUnknownExpectedFieldAsNull) { + Schema expected_schema({ + SchemaField::MakeOptional(/*field_id=*/1, "mystery", iceberg::unknown()), + }); + + std::string avro_schema_json = R"({ + "type": "record", + "name": "iceberg_schema", + "fields": [ + {"name": "mystery", "type": "int", "field-id": 1} + ] + })"; + auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json); + + auto projection_result = + Project(expected_schema, avro_schema.root(), /*prune_source=*/false); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 1); + ASSERT_EQ(projection.fields[0].kind, FieldProjection::Kind::kNull); +} + +TEST(AvroSchemaProjectionTest, ProjectNestedUnknownExpectedFieldsAsNull) { + Schema expected_schema({ + SchemaField::MakeOptional( + /*field_id=*/1, "profile", + std::make_shared(std::vector{ + SchemaField::MakeOptional(/*field_id=*/2, "name", iceberg::string()), + SchemaField::MakeOptional(/*field_id=*/3, "mystery", iceberg::unknown()), + })), + SchemaField::MakeOptional( + /*field_id=*/4, "mysteries", + std::make_shared(SchemaField::MakeOptional( + /*field_id=*/5, "element", iceberg::unknown()))), + SchemaField::MakeOptional( + /*field_id=*/6, "properties", + std::make_shared( + SchemaField::MakeRequired(/*field_id=*/7, "key", iceberg::string()), + SchemaField::MakeOptional(/*field_id=*/8, "value", iceberg::unknown()))), + }); + + std::string avro_schema_json = R"({ + "type": "record", + "name": "iceberg_schema", + "fields": [ + {"name": "profile", "type": ["null", { + "type": "record", + "name": "profile_record", + "fields": [ + {"name": "name", "type": ["null", "string"], "field-id": 2}, + {"name": "mystery", "type": ["null", "int"], "field-id": 3} + ] + }], "field-id": 1}, + {"name": "mysteries", "type": ["null", { + "type": "array", + "items": ["null", "int"], + "element-id": 5 + }], "field-id": 4}, + {"name": "properties", "type": ["null", { + "type": "map", + "values": ["null", "int"], + "key-id": 7, + "value-id": 8 + }], "field-id": 6} + ] + })"; + auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json); + + auto projection_result = + Project(expected_schema, avro_schema.root(), /*prune_source=*/false); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 3); + + ASSERT_EQ(projection.fields[0].kind, FieldProjection::Kind::kProjected); + ASSERT_EQ(projection.fields[0].children.size(), 2); + ASSERT_EQ(projection.fields[0].children[0].kind, FieldProjection::Kind::kProjected); + ASSERT_EQ(projection.fields[0].children[1].kind, FieldProjection::Kind::kNull); + + ASSERT_EQ(projection.fields[1].kind, FieldProjection::Kind::kProjected); + ASSERT_EQ(projection.fields[1].children.size(), 1); + ASSERT_EQ(projection.fields[1].children[0].kind, FieldProjection::Kind::kNull); + + ASSERT_EQ(projection.fields[2].kind, FieldProjection::Kind::kProjected); + ASSERT_EQ(projection.fields[2].children.size(), 2); + ASSERT_EQ(projection.fields[2].children[0].kind, FieldProjection::Kind::kProjected); + ASSERT_EQ(projection.fields[2].children[1].kind, FieldProjection::Kind::kNull); +} + +TEST(AvroSchemaProjectionTest, RejectNullLeafForRequiredField) { + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "value", iceberg::int32()), + }); + + std::string avro_schema_json = R"({ + "type": "record", + "name": "iceberg_schema", + "fields": [ + {"name": "value", "type": "null", "field-id": 1} + ] + })"; + auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json); + + auto projection_result = + Project(expected_schema, avro_schema.root(), /*prune_source=*/false); + ASSERT_THAT(projection_result, IsError(ErrorKind::kInvalidSchema)); + ASSERT_THAT(projection_result, + HasErrorMessage("Cannot project required field with ID: 1 as null")); +} + +TEST(AvroSchemaProjectionTest, RejectNullListElementForRequiredElement) { + Schema expected_schema({ + SchemaField::MakeOptional( + /*field_id=*/1, "numbers", + std::make_shared(SchemaField::MakeRequired( + /*field_id=*/101, "element", iceberg::int32()))), + }); + + std::string avro_schema_json = R"({ + "type": "record", + "name": "iceberg_schema", + "fields": [ + {"name": "numbers", "type": ["null", { + "type": "array", + "items": "null", + "element-id": 101 + }], "field-id": 1} + ] + })"; + auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json); + + auto projection_result = + Project(expected_schema, avro_schema.root(), /*prune_source=*/false); + ASSERT_THAT(projection_result, IsError(ErrorKind::kInvalidSchema)); + ASSERT_THAT(projection_result, + HasErrorMessage("Cannot project required field with ID: 101 as null")); +} + TEST(AvroSchemaProjectionTest, ProjectSchemaEvolutionIncompatibleTypes) { // Create iceberg schema expecting an int Schema expected_schema({ diff --git a/src/iceberg/test/avro_test.cc b/src/iceberg/test/avro_test.cc index b74fe829b..7e12313b0 100644 --- a/src/iceberg/test/avro_test.cc +++ b/src/iceberg/test/avro_test.cc @@ -17,7 +17,10 @@ * under the License. */ +#include #include +#include +#include #include #include @@ -30,6 +33,7 @@ #include #include "iceberg/arrow/arrow_io_internal.h" +#include "iceberg/avro/avro_constants.h" #include "iceberg/avro/avro_register.h" #include "iceberg/avro/avro_stream_internal.h" #include "iceberg/avro/avro_writer.h" @@ -45,6 +49,35 @@ namespace iceberg::avro { +namespace { + +::avro::NodePtr UnwrapOptional(const ::avro::NodePtr& node) { + if (node->type() != ::avro::AVRO_UNION) { + return node; + } + + for (size_t i = 0; i < node->leaves(); ++i) { + if (node->leafAt(i)->type() != ::avro::AVRO_NULL) { + return node->leafAt(i); + } + } + return node; +} + +std::optional FieldIdAt(const ::avro::NodePtr& node, size_t index) { + if (index >= node->customAttributes()) { + return std::nullopt; + } + + auto field_id = node->customAttributesAt(index).getAttribute(std::string(kFieldIdProp)); + if (!field_id.has_value()) { + return std::nullopt; + } + return std::stoi(field_id.value()); +} + +} // namespace + class AvroReaderTest : public TempFileTestBase { protected: static void SetUpTestSuite() { RegisterAll(); } @@ -740,6 +773,28 @@ class AvroWriterTest : public ::testing::Test, ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader)); } + ::avro::ValidSchema PhysicalAvroSchema() { + auto& mock_io = internal::checked_cast(*file_io_); + auto input = mock_io.fs()->OpenInputFile(temp_avro_file_).ValueOrDie(); + auto input_stream = std::make_unique(std::move(input), 1024 * 1024); + ::avro::DataFileReader<::avro::GenericDatum> avro_reader(std::move(input_stream)); + return avro_reader.dataSchema(); + } + + void ExpectOpenFails(std::shared_ptr schema, + std::string_view expected_message) { + WriterProperties writer_properties; + writer_properties.Set(WriterProperties::kAvroSkipDatum, skip_datum_); + auto writer = WriterFactoryRegistry::Open( + FileFormatType::kAvro, {.path = temp_avro_file_, + .schema = std::move(schema), + .io = arrow::ArrowFileSystemFileIO::MakeMockFileIO(), + .properties = std::move(writer_properties)}); + + EXPECT_THAT(writer, IsError(ErrorKind::kInvalidArgument)); + EXPECT_THAT(writer, HasErrorMessage(std::string(expected_message))); + } + std::shared_ptr file_io_; std::string temp_avro_file_; bool skip_datum_{true}; @@ -890,6 +945,160 @@ TEST_P(AvroWriterTest, WriteOptionalFields) { VerifyWrittenData(test_data); } +TEST_P(AvroWriterTest, DoesNotMaterializeUnknownFieldsOnWrite) { + auto schema = std::make_shared(std::vector{ + SchemaField::MakeOptional(1, "id", int32()), + SchemaField::MakeOptional(2, "mystery", unknown()), + SchemaField::MakeOptional(3, "profile", + std::make_shared(std::vector{ + SchemaField::MakeOptional(4, "name", string()), + SchemaField::MakeOptional(5, "secret", unknown()), + })), + }); + + std::string test_data = R"([ + [1, null, {"name": "Person0", "secret": null}], + [2, null, {"name": "Person1", "secret": null}] + ])"; + + WriteAvroFile(schema, test_data); + + auto avro_schema = PhysicalAvroSchema(); + auto root = avro_schema.root(); + ASSERT_EQ(root->type(), ::avro::AVRO_RECORD); + ASSERT_EQ(root->leaves(), 2); + EXPECT_EQ(root->nameAt(0), "id"); + EXPECT_EQ(FieldIdAt(root, 0), std::make_optional(1)); + EXPECT_EQ(root->nameAt(1), "profile"); + EXPECT_EQ(FieldIdAt(root, 1), std::make_optional(3)); + + auto profile = UnwrapOptional(root->leafAt(1)); + ASSERT_EQ(profile->type(), ::avro::AVRO_RECORD); + ASSERT_EQ(profile->leaves(), 1); + EXPECT_EQ(profile->nameAt(0), "name"); + EXPECT_EQ(FieldIdAt(profile, 0), std::make_optional(4)); + + VerifyWrittenData(test_data); +} + +TEST_P(AvroWriterTest, RejectsUnknownInsideListOrMapOnWrite) { + std::vector> schemas = { + std::make_shared(std::vector{ + SchemaField::MakeOptional(1, "mysteries", + std::make_shared(SchemaField::MakeOptional( + 2, ListType::kElementName, iceberg::unknown()))), + }), + std::make_shared(std::vector{ + SchemaField::MakeOptional( + 1, "properties", + std::make_shared( + SchemaField::MakeRequired(2, MapType::kKeyName, iceberg::string()), + SchemaField::MakeOptional(3, MapType::kValueName, iceberg::unknown()))), + }), + }; + + for (const auto& schema : schemas) { + ExpectOpenFails(schema, "physical Avro representation"); + } +} + +TEST_P(AvroWriterTest, RejectsRequiredUnknownOnWrite) { + auto schema = std::make_shared( + std::vector{SchemaField(1, "mystery", unknown(), + /*optional=*/false)}); + + ExpectOpenFails(schema, "Unknown type field 'mystery' must be optional"); +} + +TEST_P(AvroWriterTest, RejectsUnknownOnlyStructOnWrite) { + std::vector> schemas = { + std::make_shared(std::vector{ + SchemaField::MakeOptional(1, "wrapper", + std::make_shared(std::vector{ + SchemaField::MakeOptional(2, "secret", unknown()), + })), + }), + std::make_shared(std::vector{ + SchemaField::MakeOptional( + 1, "wrappers", + std::make_shared(SchemaField::MakeOptional( + 2, ListType::kElementName, + std::make_shared(std::vector{ + SchemaField::MakeOptional(3, "secret", unknown()), + })))), + }), + std::make_shared(std::vector{ + SchemaField::MakeOptional( + 1, "properties", + std::make_shared( + SchemaField::MakeRequired(2, MapType::kKeyName, iceberg::string()), + SchemaField::MakeOptional( + 3, MapType::kValueName, + std::make_shared(std::vector{ + SchemaField::MakeOptional(4, "secret", unknown()), + })))), + }), + }; + + for (const auto& schema : schemas) { + ExpectOpenFails(schema, "all child fields are unknown"); + } +} + +TEST_P(AvroWriterTest, WritesUnknownFieldsNestedInsideListOrMapStructs) { + auto schema = std::make_shared(std::vector{ + SchemaField::MakeOptional(1, "id", int32()), + SchemaField::MakeOptional(2, "events", + std::make_shared(SchemaField::MakeOptional( + 3, ListType::kElementName, + std::make_shared(std::vector{ + SchemaField::MakeOptional(4, "name", string()), + SchemaField::MakeOptional(5, "secret", unknown()), + })))), + SchemaField::MakeOptional( + 6, "properties", + std::make_shared( + SchemaField::MakeRequired(7, MapType::kKeyName, iceberg::string()), + SchemaField::MakeOptional( + 8, MapType::kValueName, + std::make_shared(std::vector{ + SchemaField::MakeOptional(9, "label", string()), + SchemaField::MakeOptional(10, "secret", unknown()), + })))), + }); + + std::string test_data = R"([ + [1, [{"name": "open", "secret": null}, {"name": "close", "secret": null}], [["a", {"label": "A", "secret": null}]]], + [2, [], []] + ])"; + + WriteAvroFile(schema, test_data); + + auto avro_schema = PhysicalAvroSchema(); + auto root = avro_schema.root(); + ASSERT_EQ(root->type(), ::avro::AVRO_RECORD); + ASSERT_EQ(root->leaves(), 3); + + auto events = UnwrapOptional(root->leafAt(1)); + ASSERT_EQ(events->type(), ::avro::AVRO_ARRAY); + auto event = UnwrapOptional(events->leafAt(0)); + ASSERT_EQ(event->type(), ::avro::AVRO_RECORD); + ASSERT_EQ(event->leaves(), 1); + EXPECT_EQ(event->nameAt(0), "name"); + EXPECT_EQ(FieldIdAt(event, 0), std::make_optional(4)); + + auto properties = UnwrapOptional(root->leafAt(2)); + ASSERT_EQ(properties->type(), ::avro::AVRO_MAP); + ASSERT_EQ(properties->leaves(), 2); + auto value = UnwrapOptional(properties->leafAt(1)); + ASSERT_EQ(value->type(), ::avro::AVRO_RECORD); + ASSERT_EQ(value->leaves(), 1); + EXPECT_EQ(value->nameAt(0), "label"); + EXPECT_EQ(FieldIdAt(value, 0), std::make_optional(9)); + + VerifyWrittenData(test_data); +} + TEST_P(AvroWriterTest, WriteLargeDataset) { auto schema = std::make_shared(std::vector{ SchemaField::MakeRequired(1, "id", std::make_shared()), diff --git a/src/iceberg/test/metadata_serde_test.cc b/src/iceberg/test/metadata_serde_test.cc index 0d3b5959b..2c2b2bd1e 100644 --- a/src/iceberg/test/metadata_serde_test.cc +++ b/src/iceberg/test/metadata_serde_test.cc @@ -21,7 +21,9 @@ #include #include +#include +#include "iceberg/json_serde_internal.h" #include "iceberg/partition_field.h" #include "iceberg/partition_spec.h" #include "iceberg/schema.h" @@ -404,6 +406,57 @@ TEST(MetadataSerdeTest, DeserializeUnsupportedVersion) { "Cannot read unsupported version"); } +TEST(MetadataSerdeTest, DeserializeRejectsUnknownSchemaBeforeFormatV3) { + auto v1_metadata_json = nlohmann::json::parse(R"({ + "format-version": 1, + "location": "s3://bucket/test/location", + "last-column-id": 1, + "last-updated-ms": 1602638573874, + "schema": { + "type": "struct", + "schema-id": 0, + "fields": [ + {"id": 1, "name": "mystery", "type": "unknown", "required": false} + ] + }, + "partition-spec": [] + })"); + + auto result = TableMetadataFromJson(v1_metadata_json); + ASSERT_THAT(result, IsError(ErrorKind::kInvalidSchema)); + EXPECT_THAT(result, HasErrorMessage( + "Invalid type for mystery: unknown is not supported until v3")); + + auto v2_metadata_json = nlohmann::json::parse(R"({ + "format-version": 2, + "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1", + "location": "s3://bucket/test/location", + "last-sequence-number": 0, + "last-column-id": 1, + "last-updated-ms": 1602638573874, + "schemas": [ + { + "type": "struct", + "schema-id": 0, + "fields": [ + {"id": 1, "name": "mystery", "type": "unknown", "required": false} + ] + } + ], + "current-schema-id": 0, + "partition-specs": [{"spec-id": 0, "fields": []}], + "default-spec-id": 0, + "last-partition-id": 999, + "sort-orders": [{"order-id": 0, "fields": []}], + "default-sort-order-id": 0 + })"); + + result = TableMetadataFromJson(v2_metadata_json); + ASSERT_THAT(result, IsError(ErrorKind::kInvalidSchema)); + EXPECT_THAT(result, HasErrorMessage( + "Invalid type for mystery: unknown is not supported until v3")); +} + TEST(MetadataSerdeTest, DeserializeV1MissingSchemaType) { ReadTableMetadataExpectError("TableMetadataV1MissingSchemaType.json", "Missing 'type'"); } diff --git a/src/iceberg/test/parquet_data_test.cc b/src/iceberg/test/parquet_data_test.cc index 9ed28114e..606ad8ca5 100644 --- a/src/iceberg/test/parquet_data_test.cc +++ b/src/iceberg/test/parquet_data_test.cc @@ -316,6 +316,50 @@ TEST(ProjectRecordBatchTest, MapStringToInt) { VerifyProjectRecordBatch(iceberg_schema, iceberg_schema, input_json, input_json)); } +TEST(ProjectRecordBatchTest, NestedUnknownFields) { + Schema projected_schema({ + SchemaField::MakeRequired(1, "profile", + std::make_shared(std::vector{ + SchemaField::MakeRequired(2, "name", string()), + SchemaField::MakeOptional(3, "mystery", unknown()), + })), + SchemaField::MakeRequired( + 4, "mysteries", + std::make_shared(SchemaField::MakeOptional(5, "element", unknown()))), + SchemaField::MakeRequired( + 6, "properties", + std::make_shared(SchemaField::MakeRequired(7, "key", string()), + SchemaField::MakeOptional(8, "value", unknown()))), + }); + + Schema source_schema({ + SchemaField::MakeRequired(1, "profile", + std::make_shared(std::vector{ + SchemaField::MakeRequired(2, "name", string()), + SchemaField::MakeOptional(3, "mystery", int32()), + })), + SchemaField::MakeRequired( + 4, "mysteries", + std::make_shared(SchemaField::MakeOptional(5, "element", int32()))), + SchemaField::MakeRequired( + 6, "properties", + std::make_shared(SchemaField::MakeRequired(7, "key", string()), + SchemaField::MakeOptional(8, "value", int32()))), + }); + + const std::string input_json = R"([ + {"profile": {"name": "Person0", "mystery": 10}, "mysteries": [1, 2], "properties": [["a", 100], ["b", 200]]}, + {"profile": {"name": "Person1", "mystery": null}, "mysteries": [], "properties": []} + ])"; + const std::string expected_json = R"([ + {"profile": {"name": "Person0", "mystery": null}, "mysteries": [null, null], "properties": [["a", null], ["b", null]]}, + {"profile": {"name": "Person1", "mystery": null}, "mysteries": [], "properties": []} + ])"; + + ASSERT_NO_FATAL_FAILURE(VerifyProjectRecordBatch(projected_schema, source_schema, + input_json, expected_json)); +} + TEST(ProjectRecordBatchTest, MapStringToStruct) { Schema iceberg_schema({ SchemaField::MakeRequired( diff --git a/src/iceberg/test/parquet_schema_test.cc b/src/iceberg/test/parquet_schema_test.cc index a9da3f9f7..40b06adbd 100644 --- a/src/iceberg/test/parquet_schema_test.cc +++ b/src/iceberg/test/parquet_schema_test.cc @@ -17,7 +17,12 @@ * under the License. */ +#include +#include +#include + #include +#include #include #include #include @@ -123,6 +128,30 @@ ::parquet::arrow::SchemaManifest MakeSchemaManifest( return manifest; } +::parquet::arrow::SchemaField MakeNullSchemaField(const std::string& name, int field_id) { + ::parquet::arrow::SchemaField schema_field; + schema_field.field = + ::arrow::field(name, ::arrow::null()) + ->WithMetadata(::arrow::key_value_metadata({std::string(kParquetFieldIdKey)}, + {std::to_string(field_id)})); + return schema_field; +} + +::parquet::arrow::SchemaField MakeListSchemaFieldWithNullElement(const std::string& name, + int field_id, + int element_field_id) { + ::parquet::arrow::SchemaField element_field = + MakeNullSchemaField("element", element_field_id); + + ::parquet::arrow::SchemaField schema_field; + schema_field.field = + ::arrow::field(name, ::arrow::list(element_field.field)) + ->WithMetadata(::arrow::key_value_metadata({std::string(kParquetFieldIdKey)}, + {std::to_string(field_id)})); + schema_field.children = {std::move(element_field)}; + return schema_field; +} + #define ASSERT_PROJECTED_FIELD(field_projection, index) \ ASSERT_EQ(field_projection.kind, FieldProjection::Kind::kProjected); \ ASSERT_EQ(std::get<1>(field_projection.from), index); @@ -303,6 +332,177 @@ TEST(ParquetSchemaProjectionTest, ProjectSchemaEvolutionFloatToDouble) { ASSERT_PROJECTED_FIELD(projection.fields[0], 0); } +TEST(ParquetSchemaProjectionTest, ValidateSchemaEvolutionAllowsNullPhysicalType) { + ::parquet::arrow::SchemaField parquet_field; + parquet_field.field = ::arrow::field("value", ::arrow::null()); + + auto status = ValidateParquetSchemaEvolution(*iceberg::int32(), parquet_field); + ASSERT_THAT(status, IsOk()); +} + +TEST(ParquetSchemaProjectionTest, ProjectNullPhysicalFieldsAsNull) { + Schema expected_schema({ + SchemaField::MakeOptional(/*field_id=*/1, "age", iceberg::int32()), + SchemaField::MakeOptional( + /*field_id=*/2, "profile", + std::make_shared(std::vector{ + SchemaField::MakeRequired(/*field_id=*/201, "name", iceberg::string()), + })), + SchemaField::MakeOptional( + /*field_id=*/3, "numbers", + std::make_shared(SchemaField::MakeRequired( + /*field_id=*/301, "element", iceberg::int32()))), + SchemaField::MakeOptional( + /*field_id=*/4, "counts", + std::make_shared( + SchemaField::MakeRequired(/*field_id=*/401, "key", iceberg::string()), + SchemaField::MakeOptional(/*field_id=*/402, "value", iceberg::int32()))), + }); + + ::parquet::arrow::SchemaManifest schema_manifest; + schema_manifest.schema_fields = { + MakeNullSchemaField("age", /*field_id=*/1), + MakeNullSchemaField("profile", /*field_id=*/2), + MakeNullSchemaField("numbers", /*field_id=*/3), + MakeNullSchemaField("counts", /*field_id=*/4), + }; + + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 4); + for (const auto& field_projection : projection.fields) { + ASSERT_PROJECTED_NULL_FIELD(field_projection); + ASSERT_TRUE(field_projection.children.empty()); + } + + ASSERT_TRUE(SelectedColumnIndices(projection).empty()); +} + +TEST(ParquetSchemaProjectionTest, RejectNullPhysicalFieldForRequiredField) { + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "age", iceberg::int32()), + }); + + ::parquet::arrow::SchemaManifest schema_manifest; + schema_manifest.schema_fields = { + MakeNullSchemaField("age", /*field_id=*/1), + }; + + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsError(ErrorKind::kInvalidSchema)); + ASSERT_THAT(projection_result, + HasErrorMessage("Cannot project required field with id 1 as null")); +} + +TEST(ParquetSchemaProjectionTest, RejectNullPhysicalListElementForRequiredElement) { + Schema expected_schema({ + SchemaField::MakeOptional( + /*field_id=*/1, "numbers", + std::make_shared(SchemaField::MakeRequired( + /*field_id=*/101, "element", iceberg::int32()))), + }); + + ::parquet::arrow::SchemaManifest schema_manifest; + schema_manifest.schema_fields = { + MakeListSchemaFieldWithNullElement("numbers", /*field_id=*/1, + /*element_field_id=*/101), + }; + + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsError(ErrorKind::kInvalidSchema)); + ASSERT_THAT(projection_result, + HasErrorMessage("Cannot project required field with id 101 as null")); +} + +TEST(ParquetSchemaProjectionTest, ProjectUnknownExpectedFieldAsNull) { + Schema expected_schema({ + SchemaField::MakeOptional(/*field_id=*/1, "mystery", iceberg::unknown()), + }); + + auto parquet_schema = + MakeGroupNode("iceberg_schema", {MakeInt32Node("mystery", /*field_id=*/1)}); + + auto schema_manifest = MakeSchemaManifest(parquet_schema); + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 1); + ASSERT_PROJECTED_NULL_FIELD(projection.fields[0]); + ASSERT_TRUE(SelectedColumnIndices(projection).empty()); +} + +TEST(ParquetSchemaProjectionTest, ProjectNestedUnknownExpectedFieldsAsNull) { + Schema expected_schema({ + SchemaField::MakeOptional( + /*field_id=*/1, "profile", + std::make_shared(std::vector{ + SchemaField::MakeOptional(/*field_id=*/2, "name", iceberg::string()), + SchemaField::MakeOptional(/*field_id=*/3, "mystery", iceberg::unknown()), + })), + SchemaField::MakeOptional( + /*field_id=*/4, "mysteries", + std::make_shared(SchemaField::MakeOptional( + /*field_id=*/5, "element", iceberg::unknown()))), + SchemaField::MakeOptional( + /*field_id=*/6, "properties", + std::make_shared( + SchemaField::MakeRequired(/*field_id=*/7, "key", iceberg::string()), + SchemaField::MakeOptional(/*field_id=*/8, "value", iceberg::unknown()))), + SchemaField::MakeOptional( + /*field_id=*/9, "wrapper", + std::make_shared(std::vector{ + SchemaField::MakeOptional(/*field_id=*/10, "mystery", iceberg::unknown()), + })), + }); + + auto parquet_schema = MakeGroupNode( + "iceberg_schema", + { + MakeGroupNode("profile", + {MakeStringNode("name", /*field_id=*/2), + MakeInt32Node("mystery", /*field_id=*/3)}, + /*field_id=*/1), + MakeListNode("mysteries", MakeInt32Node("element", /*field_id=*/5), + /*field_id=*/4), + MakeMapNode("properties", + MakeStringNode("key", /*field_id=*/7, /*optional=*/false), + MakeInt32Node("value", /*field_id=*/8), + /*field_id=*/6), + MakeGroupNode("wrapper", {MakeInt32Node("mystery", /*field_id=*/10)}, + /*field_id=*/9), + }); + + auto schema_manifest = MakeSchemaManifest(parquet_schema); + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 4); + + ASSERT_PROJECTED_FIELD(projection.fields[0], 0); + ASSERT_EQ(projection.fields[0].children.size(), 2); + ASSERT_PROJECTED_FIELD(projection.fields[0].children[0], 0); + ASSERT_PROJECTED_NULL_FIELD(projection.fields[0].children[1]); + + ASSERT_PROJECTED_FIELD(projection.fields[1], 1); + ASSERT_EQ(projection.fields[1].children.size(), 1); + ASSERT_PROJECTED_NULL_FIELD(projection.fields[1].children[0]); + + ASSERT_PROJECTED_FIELD(projection.fields[2], 2); + ASSERT_EQ(projection.fields[2].children.size(), 2); + ASSERT_PROJECTED_FIELD(projection.fields[2].children[0], 0); + ASSERT_PROJECTED_NULL_FIELD(projection.fields[2].children[1]); + + ASSERT_PROJECTED_FIELD(projection.fields[3], 3); + ASSERT_EQ(projection.fields[3].children.size(), 1); + ASSERT_PROJECTED_NULL_FIELD(projection.fields[3].children[0]); + + ASSERT_EQ(SelectedColumnIndices(projection), std::vector({0, 2, 3, 4, 5})); +} + TEST(ParquetSchemaProjectionTest, ProjectSchemaEvolutionIncompatibleTypes) { Schema expected_schema({ SchemaField::MakeRequired(/*field_id=*/1, "value", iceberg::int32()), diff --git a/src/iceberg/test/parquet_test.cc b/src/iceberg/test/parquet_test.cc index 70fb9880f..5d9cfac83 100644 --- a/src/iceberg/test/parquet_test.cc +++ b/src/iceberg/test/parquet_test.cc @@ -33,6 +33,7 @@ #include #include #include +#include #include #include "iceberg/arrow/arrow_io_internal.h" @@ -461,6 +462,78 @@ TEST_F(ParquetReaderTest, ReadMetadataOnlyProjection) { ASSERT_NO_FATAL_FAILURE(VerifyNextBatch(*reader, kExpectedJson)); } +TEST_F(ParquetReaderTest, ReadNestedUnknownProjection) { + temp_parquet_file_ = "nested_unknown.parquet"; + auto write_schema = std::make_shared(std::vector{ + SchemaField::MakeOptional(1, "profile", + std::make_shared(std::vector{ + SchemaField::MakeOptional(2, "name", string()), + SchemaField::MakeOptional(3, "mystery", int32()), + })), + SchemaField::MakeOptional( + 4, "mysteries", + std::make_shared(SchemaField::MakeOptional(5, "element", int32()))), + SchemaField::MakeOptional( + 6, "properties", + std::make_shared(SchemaField::MakeRequired(7, "key", string()), + SchemaField::MakeOptional(8, "value", int32()))), + SchemaField::MakeOptional(9, "wrapper", + std::make_shared(std::vector{ + SchemaField::MakeOptional(10, "mystery", int32()), + })), + }); + auto read_schema = std::make_shared(std::vector{ + SchemaField::MakeOptional(1, "profile", + std::make_shared(std::vector{ + SchemaField::MakeOptional(2, "name", string()), + SchemaField::MakeOptional(3, "mystery", unknown()), + })), + SchemaField::MakeOptional( + 4, "mysteries", + std::make_shared(SchemaField::MakeOptional(5, "element", unknown()))), + SchemaField::MakeOptional( + 6, "properties", + std::make_shared(SchemaField::MakeRequired(7, "key", string()), + SchemaField::MakeOptional(8, "value", unknown()))), + SchemaField::MakeOptional(9, "wrapper", + std::make_shared(std::vector{ + SchemaField::MakeOptional(10, "mystery", unknown()), + })), + }); + + ArrowSchema arrow_c_schema; + ASSERT_THAT(ToArrowSchema(*write_schema, &arrow_c_schema), IsOk()); + auto arrow_type = ::arrow::ImportType(&arrow_c_schema).ValueOrDie(); + auto array = ::arrow::json::ArrayFromJSONString(arrow_type, + R"([ + {"profile": {"name": "Person0", "mystery": 10}, "mysteries": [1, 2], "properties": [["a", 100], ["b", 200]], "wrapper": {"mystery": 300}}, + {"profile": {"name": "Person1", "mystery": null}, "mysteries": [], "properties": [], "wrapper": {"mystery": null}} + ])") + .ValueOrDie(); + + WriterProperties writer_properties; + writer_properties.Set(WriterProperties::kParquetCompression, + std::string("uncompressed")); + ASSERT_THAT(WriteArray(array, {.path = temp_parquet_file_, + .schema = write_schema, + .io = file_io_, + .properties = std::move(writer_properties)}), + IsOk()); + + ICEBERG_UNWRAP_OR_FAIL( + auto reader, + ReaderFactoryRegistry::Open( + FileFormatType::kParquet, + {.path = temp_parquet_file_, .io = file_io_, .projection = read_schema})); + + ASSERT_NO_FATAL_FAILURE(VerifyNextBatch(*reader, + R"([ + {"profile": {"name": "Person0", "mystery": null}, "mysteries": [null, null], "properties": [["a", null], ["b", null]], "wrapper": {"mystery": null}}, + {"profile": {"name": "Person1", "mystery": null}, "mysteries": [], "properties": [], "wrapper": {"mystery": null}} + ])")); + ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader)); +} + class ParquetReadWrite : public ::testing::Test { protected: static void SetUpTestSuite() { parquet::RegisterAll(); } @@ -509,6 +582,220 @@ TEST_F(ParquetReadWrite, RejectsUnavailableCompressionCodec) { " is not available in the current build")); } +TEST_F(ParquetReadWrite, RejectsUnknownInsideListOrMapOnWrite) { + std::vector> schemas = { + std::make_shared(std::vector{ + SchemaField::MakeOptional(1, "mysteries", + std::make_shared(SchemaField::MakeOptional( + 2, ListType::kElementName, iceberg::unknown()))), + }), + std::make_shared(std::vector{ + SchemaField::MakeOptional( + 1, "properties", + std::make_shared( + SchemaField::MakeRequired(2, MapType::kKeyName, iceberg::string()), + SchemaField::MakeOptional(3, MapType::kValueName, iceberg::unknown()))), + }), + }; + + for (const auto& schema : schemas) { + auto writer = WriterFactoryRegistry::Open( + FileFormatType::kParquet, {.path = "unknown_nested.parquet", + .schema = schema, + .io = arrow::ArrowFileSystemFileIO::MakeMockFileIO()}); + + EXPECT_THAT(writer, IsError(ErrorKind::kInvalidArgument)); + EXPECT_THAT(writer, HasErrorMessage("physical Parquet representation")); + } +} + +TEST_F(ParquetReadWrite, RejectsRequiredUnknownOnWrite) { + auto schema = std::make_shared( + std::vector{SchemaField(1, "mystery", unknown(), + /*optional=*/false)}); + + auto writer = WriterFactoryRegistry::Open( + FileFormatType::kParquet, {.path = "required_unknown.parquet", + .schema = schema, + .io = arrow::ArrowFileSystemFileIO::MakeMockFileIO()}); + + EXPECT_THAT(writer, IsError(ErrorKind::kInvalidArgument)); + EXPECT_THAT(writer, HasErrorMessage("Unknown type field 'mystery' must be optional")); +} + +TEST_F(ParquetReadWrite, RejectsUnknownOnlyStructOnWrite) { + std::vector> schemas = { + std::make_shared(std::vector{ + SchemaField::MakeOptional(1, "wrapper", + std::make_shared(std::vector{ + SchemaField::MakeOptional(2, "secret", unknown()), + })), + }), + std::make_shared(std::vector{ + SchemaField::MakeOptional( + 1, "wrappers", + std::make_shared(SchemaField::MakeOptional( + 2, ListType::kElementName, + std::make_shared(std::vector{ + SchemaField::MakeOptional(3, "secret", unknown()), + })))), + }), + std::make_shared(std::vector{ + SchemaField::MakeOptional( + 1, "properties", + std::make_shared( + SchemaField::MakeRequired(2, MapType::kKeyName, iceberg::string()), + SchemaField::MakeOptional( + 3, MapType::kValueName, + std::make_shared(std::vector{ + SchemaField::MakeOptional(4, "secret", unknown()), + })))), + }), + }; + + for (const auto& schema : schemas) { + auto writer = WriterFactoryRegistry::Open( + FileFormatType::kParquet, {.path = "unknown_only_struct.parquet", + .schema = schema, + .io = arrow::ArrowFileSystemFileIO::MakeMockFileIO()}); + + EXPECT_THAT(writer, IsError(ErrorKind::kInvalidArgument)); + EXPECT_THAT(writer, HasErrorMessage("all child fields are unknown")); + } +} + +TEST_F(ParquetReadWrite, WritesUnknownFieldsNestedInsideListOrMapStructs) { + auto schema = std::make_shared(std::vector{ + SchemaField::MakeOptional(1, "id", int32()), + SchemaField::MakeOptional(2, "events", + std::make_shared(SchemaField::MakeOptional( + 3, ListType::kElementName, + std::make_shared(std::vector{ + SchemaField::MakeOptional(4, "name", string()), + SchemaField::MakeOptional(5, "secret", unknown()), + })))), + SchemaField::MakeOptional( + 6, "properties", + std::make_shared( + SchemaField::MakeRequired(7, MapType::kKeyName, iceberg::string()), + SchemaField::MakeOptional( + 8, MapType::kValueName, + std::make_shared(std::vector{ + SchemaField::MakeOptional(9, "label", string()), + SchemaField::MakeOptional(10, "secret", unknown()), + })))), + }); + + ArrowSchema arrow_c_schema; + ASSERT_THAT(ToArrowSchema(*schema, &arrow_c_schema), IsOk()); + auto arrow_schema = ::arrow::ImportType(&arrow_c_schema).ValueOrDie(); + + auto array = + ::arrow::json::ArrayFromJSONString(::arrow::struct_(arrow_schema->fields()), + R"([ + {"id": 1, "events": [{"name": "open", "secret": null}, {"name": "close", "secret": null}], "properties": [["a", {"label": "A", "secret": null}]]}, + {"id": 2, "events": [], "properties": []} + ])") + .ValueOrDie(); + + std::shared_ptr file_io = arrow::ArrowFileSystemFileIO::MakeMockFileIO(); + const std::string basePath = "nested_unknown_fields.parquet"; + WriterProperties writer_properties; + writer_properties.Set(WriterProperties::kParquetCompression, + std::string("uncompressed")); + ASSERT_THAT(WriteArray(array, {.path = basePath, + .schema = schema, + .io = file_io, + .properties = std::move(writer_properties)}), + IsOk()); + + auto& arrow_file_io = internal::checked_cast(*file_io); + auto input_file = arrow_file_io.fs()->OpenInputFile(basePath).ValueOrDie(); + auto parquet_reader = ::parquet::ParquetFileReader::Open(input_file); + auto parquet_schema = parquet_reader->metadata()->schema(); + + std::vector field_ids; + for (int i = 0; i < parquet_schema->num_columns(); ++i) { + field_ids.push_back(parquet_schema->Column(i)->schema_node()->field_id()); + } + EXPECT_THAT(field_ids, ::testing::UnorderedElementsAre(1, 4, 7, 9)); + + std::shared_ptr<::arrow::Array> out; + ASSERT_THAT(ReadArray(out, {.path = basePath, .io = file_io, .projection = schema}, + /*metadata=*/nullptr), + IsOk()); + auto expected = + ::arrow::json::ArrayFromJSONString(::arrow::struct_(arrow_schema->fields()), + R"([ + {"id": 1, "events": [{"name": "open", "secret": null}, {"name": "close", "secret": null}], "properties": [["a", {"label": "A", "secret": null}]]}, + {"id": 2, "events": [], "properties": []} + ])") + .ValueOrDie(); + ASSERT_TRUE(out->Equals(*expected)) << "actual:\n" + << out->ToString() << "expected:\n" + << expected->ToString(); +} + +TEST_F(ParquetReadWrite, DoesNotMaterializeUnknownFieldsOnWrite) { + auto schema = std::make_shared(std::vector{ + SchemaField::MakeOptional(1, "id", int32()), + SchemaField::MakeOptional(2, "mystery", unknown()), + SchemaField::MakeOptional(3, "profile", + std::make_shared(std::vector{ + SchemaField::MakeOptional(4, "name", string()), + SchemaField::MakeOptional(5, "secret", unknown()), + })), + }); + + ArrowSchema arrow_c_schema; + ASSERT_THAT(ToArrowSchema(*schema, &arrow_c_schema), IsOk()); + auto arrow_schema = ::arrow::ImportType(&arrow_c_schema).ValueOrDie(); + + auto array = + ::arrow::json::ArrayFromJSONString(::arrow::struct_(arrow_schema->fields()), + R"([ + [1, null, {"name": "Person0", "secret": null}], + [2, null, {"name": "Person1", "secret": null}] + ])") + .ValueOrDie(); + + std::shared_ptr file_io = arrow::ArrowFileSystemFileIO::MakeMockFileIO(); + const std::string basePath = "unknown_fields.parquet"; + + WriterProperties writer_properties; + writer_properties.Set(WriterProperties::kParquetCompression, + std::string("uncompressed")); + ASSERT_THAT(WriteArray(array, {.path = basePath, + .schema = schema, + .io = file_io, + .properties = std::move(writer_properties)}), + IsOk()); + + auto& arrow_file_io = internal::checked_cast(*file_io); + auto input_file = arrow_file_io.fs()->OpenInputFile(basePath).ValueOrDie(); + auto parquet_reader = ::parquet::ParquetFileReader::Open(input_file); + auto parquet_schema = parquet_reader->metadata()->schema(); + + ASSERT_EQ(parquet_schema->num_columns(), 2); + EXPECT_EQ(parquet_schema->Column(0)->schema_node()->field_id(), 1); + EXPECT_EQ(parquet_schema->Column(1)->schema_node()->field_id(), 4); + + std::shared_ptr<::arrow::Array> out; + ASSERT_THAT(ReadArray(out, {.path = basePath, .io = file_io, .projection = schema}, + /*metadata=*/nullptr), + IsOk()); + auto expected = + ::arrow::json::ArrayFromJSONString(::arrow::struct_(arrow_schema->fields()), + R"([ + [1, null, {"name": "Person0", "secret": null}], + [2, null, {"name": "Person1", "secret": null}] + ])") + .ValueOrDie(); + ASSERT_TRUE(out->Equals(*expected)) << "actual:\n" + << out->ToString() << "expected:\n" + << expected->ToString(); +} + TEST_F(ParquetReadWrite, SimpleStructRoundTrip) { auto schema = std::make_shared(std::vector{ SchemaField::MakeOptional(1, "a", diff --git a/src/iceberg/test/schema_json_test.cc b/src/iceberg/test/schema_json_test.cc index c9532eeb6..4fdd07da9 100644 --- a/src/iceberg/test/schema_json_test.cc +++ b/src/iceberg/test/schema_json_test.cc @@ -64,6 +64,7 @@ INSTANTIATE_TEST_SUITE_P( SchemaJsonParam{.json = "\"string\"", .type = iceberg::string()}, SchemaJsonParam{.json = "\"binary\"", .type = iceberg::binary()}, SchemaJsonParam{.json = "\"uuid\"", .type = iceberg::uuid()}, + SchemaJsonParam{.json = "\"unknown\"", .type = iceberg::unknown()}, SchemaJsonParam{.json = "\"fixed[8]\"", .type = iceberg::fixed(8)}, SchemaJsonParam{.json = "\"decimal(10,2)\"", .type = iceberg::decimal(10, 2)}, SchemaJsonParam{.json = "\"date\"", .type = iceberg::date()}, @@ -136,6 +137,127 @@ TEST(SchemaJsonTest, RoundTrip) { ASSERT_EQ(dumped_json, json); } +TEST(SchemaJsonTest, UnknownFieldRoundTrip) { + constexpr std::string_view json = + R"({"fields":[{"id":1,"name":"mystery","required":false,"type":"unknown"}],"schema-id":1,"type":"struct"})"; + + ICEBERG_UNWRAP_OR_FAIL(auto schema, SchemaFromJson(nlohmann::json::parse(json))); + ASSERT_EQ(schema->fields().size(), 1); + + const auto& field = schema->fields()[0]; + ASSERT_EQ(field.field_id(), 1); + ASSERT_EQ(field.name(), "mystery"); + ASSERT_EQ(field.type()->type_id(), TypeId::kUnknown); + ASSERT_TRUE(field.optional()); + ASSERT_EQ(ToJson(*schema).dump(), json); +} + +TEST(SchemaJsonTest, NestedUnknownFieldsRoundTrip) { + constexpr std::string_view json = + R"({ + "fields": [ + { + "id": 1, + "name": "profile", + "required": false, + "type": { + "fields": [ + {"id": 2, "name": "mystery", "required": false, "type": "unknown"} + ], + "type": "struct" + } + }, + { + "id": 3, + "name": "mysteries", + "required": false, + "type": { + "element": "unknown", + "element-id": 4, + "element-required": false, + "type": "list" + } + }, + { + "id": 5, + "name": "properties", + "required": false, + "type": { + "key": "string", + "key-id": 6, + "type": "map", + "value": "unknown", + "value-id": 7, + "value-required": false + } + } + ], + "schema-id": 1, + "type": "struct" + })"; + const auto parsed_json = nlohmann::json::parse(json); + + ICEBERG_UNWRAP_OR_FAIL(auto schema, SchemaFromJson(parsed_json)); + ASSERT_EQ(schema->fields().size(), 3); + + const auto* profile = dynamic_cast(schema->fields()[0].type().get()); + ASSERT_NE(profile, nullptr); + ASSERT_EQ(profile->fields().size(), 1); + ASSERT_EQ(profile->fields()[0].type()->type_id(), TypeId::kUnknown); + ASSERT_TRUE(profile->fields()[0].optional()); + + const auto* mysteries = dynamic_cast(schema->fields()[1].type().get()); + ASSERT_NE(mysteries, nullptr); + ASSERT_EQ(mysteries->fields()[0].type()->type_id(), TypeId::kUnknown); + ASSERT_TRUE(mysteries->fields()[0].optional()); + + const auto* properties = dynamic_cast(schema->fields()[2].type().get()); + ASSERT_NE(properties, nullptr); + ASSERT_EQ(properties->value().type()->type_id(), TypeId::kUnknown); + ASSERT_TRUE(properties->value().optional()); + + ASSERT_EQ(ToJson(*schema), parsed_json); +} + +TEST(SchemaJsonTest, RejectRequiredUnknownField) { + constexpr std::string_view json = + R"({"fields":[{"id":1,"name":"mystery","required":true,"type":"unknown"}],"schema-id":1,"type":"struct"})"; + + auto schema_result = SchemaFromJson(nlohmann::json::parse(json)); + ASSERT_THAT(schema_result, IsError(ErrorKind::kJsonParseError)); + ASSERT_THAT(schema_result, + HasErrorMessage("Unknown type field 'mystery' must be optional")); +} + +TEST(SchemaJsonTest, RejectRequiredUnknownListElement) { + constexpr std::string_view json = + R"({"fields":[{"id":1,"name":"mysteries","required":false,"type":{"element":"unknown","element-id":2,"element-required":true,"type":"list"}}],"schema-id":1,"type":"struct"})"; + + auto schema_result = SchemaFromJson(nlohmann::json::parse(json)); + ASSERT_THAT(schema_result, IsError(ErrorKind::kJsonParseError)); + ASSERT_THAT(schema_result, + HasErrorMessage("Unknown type field 'element' must be optional")); +} + +TEST(SchemaJsonTest, RejectUnknownMapKey) { + constexpr std::string_view json = + R"({"fields":[{"id":1,"name":"mysteries","required":false,"type":{"key":"unknown","key-id":2,"type":"map","value":"string","value-id":3,"value-required":false}}],"schema-id":1,"type":"struct"})"; + + auto schema_result = SchemaFromJson(nlohmann::json::parse(json)); + ASSERT_THAT(schema_result, IsError(ErrorKind::kJsonParseError)); + ASSERT_THAT(schema_result, HasErrorMessage("Map 'key' cannot be unknown type")); +} + +TEST(SchemaJsonTest, RejectRequiredUnknownMapValue) { + constexpr std::string_view json = + R"({"fields":[{"id":1,"name":"mysteries","required":false,"type":{"key":"string","key-id":2,"type":"map","value":"unknown","value-id":3,"value-required":true}}],"schema-id":1,"type":"struct"})"; + + auto schema_result = SchemaFromJson(nlohmann::json::parse(json)); + ASSERT_THAT(schema_result, IsError(ErrorKind::kJsonParseError)); + ASSERT_THAT(schema_result, + HasErrorMessage("Unknown type field 'value' must be optional")); +} + TEST(SchemaJsonTest, IdentifierFieldIds) { // Test schema with identifier-field-ids constexpr std::string_view json_with_identifier_str = diff --git a/src/iceberg/test/schema_test.cc b/src/iceberg/test/schema_test.cc index 838b57600..8f1b20035 100644 --- a/src/iceberg/test/schema_test.cc +++ b/src/iceberg/test/schema_test.cc @@ -102,6 +102,8 @@ TEST(SchemaTest, ValidateRejectsV3TypesBeforeFormatV3) { {iceberg::SchemaField(1, "timestamp_ns", iceberg::timestamp_ns(), false)}); iceberg::Schema timestamptz_ns_schema( {iceberg::SchemaField(1, "timestamptz_ns", iceberg::timestamptz_ns(), false)}); + iceberg::Schema unknown_schema( + {iceberg::SchemaField(1, "unknown", iceberg::unknown(), true)}); auto status = timestamp_ns_schema.Validate(2); ASSERT_THAT(status, iceberg::IsError(iceberg::ErrorKind::kInvalidSchema)); @@ -115,12 +117,42 @@ TEST(SchemaTest, ValidateRejectsV3TypesBeforeFormatV3) { "Invalid type for timestamptz_ns: timestamptz_ns is not " "supported until v3")); + status = unknown_schema.Validate(2); + ASSERT_THAT(status, iceberg::IsError(iceberg::ErrorKind::kInvalidSchema)); + EXPECT_THAT(status, iceberg::HasErrorMessage( + "Invalid type for unknown: unknown is not supported until v3")); + EXPECT_THAT( timestamp_ns_schema.Validate(iceberg::TableMetadata::kSupportedTableFormatVersion), iceberg::IsOk()); EXPECT_THAT(timestamptz_ns_schema.Validate( iceberg::TableMetadata::kSupportedTableFormatVersion), iceberg::IsOk()); + EXPECT_THAT( + unknown_schema.Validate(iceberg::TableMetadata::kSupportedTableFormatVersion), + iceberg::IsOk()); +} + +TEST(SchemaTest, ValidateRejectsInvalidUnknownFields) { + iceberg::Schema required_unknown_schema( + {iceberg::SchemaField(1, "mystery", iceberg::unknown(), false)}); + auto status = required_unknown_schema.Validate( + iceberg::TableMetadata::kSupportedTableFormatVersion); + ASSERT_THAT(status, iceberg::IsError(iceberg::ErrorKind::kInvalidArgument)); + EXPECT_THAT(status, + iceberg::HasErrorMessage("Unknown type field 'mystery' must be optional")); + + iceberg::Schema map_key_unknown_schema({iceberg::SchemaField::MakeOptional( + 1, "properties", + std::make_shared( + iceberg::SchemaField::MakeRequired(2, iceberg::MapType::kKeyName, + iceberg::unknown()), + iceberg::SchemaField::MakeOptional(3, iceberg::MapType::kValueName, + iceberg::string())))}); + status = map_key_unknown_schema.Validate( + iceberg::TableMetadata::kSupportedTableFormatVersion); + ASSERT_THAT(status, iceberg::IsError(iceberg::ErrorKind::kInvalidArgument)); + EXPECT_THAT(status, iceberg::HasErrorMessage("Map 'key' cannot be unknown type")); } TEST(SchemaTest, IdentifierFields) { diff --git a/src/iceberg/test/schema_util_test.cc b/src/iceberg/test/schema_util_test.cc index fe6579ab3..b4e5286c5 100644 --- a/src/iceberg/test/schema_util_test.cc +++ b/src/iceberg/test/schema_util_test.cc @@ -226,6 +226,127 @@ TEST(SchemaUtilTest, ProjectSchemaEvolutionFloatToDouble) { AssertProjectedField(projection.fields[0], 0); } +TEST(SchemaUtilTest, ProjectSchemaEvolutionUnknownToPrimitive) { + Schema source_schema( + {SchemaField::MakeOptional(/*field_id=*/2, "value", iceberg::unknown())}); + Schema expected_schema( + {SchemaField::MakeOptional(/*field_id=*/2, "value", iceberg::string())}); + + auto projection_result = + Project(expected_schema, source_schema, /*prune_source=*/false); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 1); + AssertProjectedField(projection.fields[0], 0); +} + +TEST(SchemaUtilTest, RejectSchemaEvolutionUnknownToRequiredPrimitive) { + Schema source_schema( + {SchemaField::MakeOptional(/*field_id=*/2, "value", iceberg::unknown())}); + Schema expected_schema( + {SchemaField::MakeRequired(/*field_id=*/2, "value", iceberg::string())}); + + auto projection_result = + Project(expected_schema, source_schema, /*prune_source=*/false); + ASSERT_THAT(projection_result, IsError(ErrorKind::kInvalidSchema)); + ASSERT_THAT(projection_result, + HasErrorMessage("Cannot project required field with id 2 as null")); +} + +TEST(SchemaUtilTest, ProjectSchemaEvolutionNestedFieldsToUnknown) { + Schema source_schema({ + SchemaField::MakeOptional( + /*field_id=*/2, "profile", + std::make_shared(std::vector{ + SchemaField::MakeOptional(/*field_id=*/201, "mystery", iceberg::int32()), + SchemaField::MakeOptional(/*field_id=*/202, "name", iceberg::string()), + })), + SchemaField::MakeOptional( + /*field_id=*/3, "items", + std::make_shared(SchemaField::MakeOptional( + /*field_id=*/301, "element", iceberg::int32()))), + SchemaField::MakeOptional( + /*field_id=*/4, "attributes", + std::make_shared( + SchemaField::MakeRequired(/*field_id=*/401, "key", iceberg::string()), + SchemaField::MakeOptional(/*field_id=*/402, "value", iceberg::int32()))), + }); + Schema expected_schema({ + SchemaField::MakeOptional( + /*field_id=*/2, "profile", + std::make_shared(std::vector{ + SchemaField::MakeOptional(/*field_id=*/201, "mystery", iceberg::unknown()), + SchemaField::MakeOptional(/*field_id=*/202, "name", iceberg::string()), + })), + SchemaField::MakeOptional( + /*field_id=*/3, "items", + std::make_shared(SchemaField::MakeOptional( + /*field_id=*/301, "element", iceberg::unknown()))), + SchemaField::MakeOptional( + /*field_id=*/4, "attributes", + std::make_shared( + SchemaField::MakeRequired(/*field_id=*/401, "key", iceberg::string()), + SchemaField::MakeOptional(/*field_id=*/402, "value", iceberg::unknown()))), + }); + + for (bool prune_source : {false, true}) { + auto projection_result = Project(expected_schema, source_schema, prune_source); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 3); + AssertProjectedField(projection.fields[0], 0); + AssertProjectedField(projection.fields[1], 1); + AssertProjectedField(projection.fields[2], 2); + + ASSERT_EQ(projection.fields[0].children.size(), 2); + ASSERT_EQ(projection.fields[0].children[0].kind, FieldProjection::Kind::kNull); + AssertProjectedField(projection.fields[0].children[1], prune_source ? 0 : 1); + + ASSERT_EQ(projection.fields[1].children.size(), 1); + ASSERT_EQ(projection.fields[1].children[0].kind, FieldProjection::Kind::kNull); + + ASSERT_EQ(projection.fields[2].children.size(), 2); + AssertProjectedField(projection.fields[2].children[0], 0); + ASSERT_EQ(projection.fields[2].children[1].kind, FieldProjection::Kind::kNull); + } +} + +TEST(SchemaUtilTest, RejectSchemaEvolutionUnknownToNested) { + Schema source_schema({ + SchemaField::MakeOptional(/*field_id=*/2, "profile", iceberg::unknown()), + SchemaField::MakeOptional(/*field_id=*/3, "items", iceberg::unknown()), + SchemaField::MakeOptional(/*field_id=*/4, "attributes", iceberg::unknown()), + }); + Schema expected_schema({ + SchemaField::MakeOptional(/*field_id=*/2, "profile", CreateNestedStruct()), + SchemaField::MakeOptional(/*field_id=*/3, "items", CreateListOfStruct()), + SchemaField::MakeOptional(/*field_id=*/4, "attributes", CreateMapWithStructValue()), + }); + + for (bool prune_source : {false, true}) { + auto projection_result = Project(expected_schema, source_schema, prune_source); + ASSERT_THAT(projection_result, IsError(ErrorKind::kNotSupported)); + ASSERT_THAT(projection_result, HasErrorMessage("Cannot read")); + } +} + +TEST(SchemaUtilTest, RejectSchemaEvolutionUnknownToRequiredNested) { + Schema source_schema({ + SchemaField::MakeOptional(/*field_id=*/2, "profile", iceberg::unknown()), + }); + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/2, "profile", CreateNestedStruct()), + }); + + auto projection_result = + Project(expected_schema, source_schema, /*prune_source=*/false); + ASSERT_THAT(projection_result, IsError(ErrorKind::kInvalidSchema)); + ASSERT_THAT(projection_result, + HasErrorMessage("Cannot project required field with id 2 as null")); +} + TEST(SchemaUtilTest, ProjectSchemaEvolutionDecimalCompatible) { Schema source_schema( {SchemaField::MakeOptional(/*field_id=*/2, "value", iceberg::decimal(9, 2))}); diff --git a/src/iceberg/test/type_test.cc b/src/iceberg/test/type_test.cc index e68843be4..d405cccc1 100644 --- a/src/iceberg/test/type_test.cc +++ b/src/iceberg/test/type_test.cc @@ -90,7 +90,7 @@ TEST_P(TypeTest, StdFormat) { ASSERT_EQ(test_case.repr, std::format("{}", *test_case.type)); } -const static std::array kPrimitiveTypes = {{ +const static std::array kPrimitiveTypes = {{ { .name = "boolean", .type = iceberg::boolean(), @@ -217,6 +217,13 @@ const static std::array kPrimitiveTypes = {{ .primitive = true, .repr = "uuid", }, + { + .name = "unknown", + .type = iceberg::unknown(), + .type_id = iceberg::TypeId::kUnknown, + .primitive = true, + .repr = "unknown", + }, }}; const static std::array kNestedTypes = {{ diff --git a/src/iceberg/test/update_schema_test.cc b/src/iceberg/test/update_schema_test.cc index 8550c8b56..5c612f454 100644 --- a/src/iceberg/test/update_schema_test.cc +++ b/src/iceberg/test/update_schema_test.cc @@ -20,6 +20,7 @@ #include "iceberg/update/update_schema.h" #include +#include #include @@ -29,6 +30,7 @@ #include "iceberg/test/update_test_base.h" #include "iceberg/type.h" #include "iceberg/util/checked_cast.h" +#include "iceberg/util/type_util.h" namespace iceberg { @@ -1054,6 +1056,56 @@ TEST_F(UpdateSchemaTest, UpdateColumnFloatToDouble) { EXPECT_EQ(*field_opt->get().type(), *float64()); } +TEST_F(UpdateSchemaTest, UpdateColumnUnknownToPrimitive) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema()); + update->AddColumn("mystery", unknown(), "A null-only placeholder"); + update->UpdateColumn("mystery", string()); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + + ICEBERG_UNWRAP_OR_FAIL(auto field_opt, result.schema->FindFieldByName("mystery")); + ASSERT_TRUE(field_opt.has_value()); + EXPECT_EQ(*field_opt->get().type(), *string()); + EXPECT_TRUE(field_opt->get().optional()); + EXPECT_EQ(field_opt->get().doc(), "A null-only placeholder"); +} + +TEST_F(UpdateSchemaTest, UpdateColumnPromotionToNestedTypesForbidden) { + std::vector> nested_types = { + struct_({SchemaField::MakeOptional(3, "name", string())}), + list(SchemaField::MakeOptional(4, "element", string())), + map(SchemaField::MakeRequired(5, "key", string()), + SchemaField::MakeOptional(6, "value", string())), + }; + + for (const auto& nested_type : nested_types) { + SCOPED_TRACE(nested_type->ToString()); + + EXPECT_FALSE(IsPromotionAllowed(unknown(), nested_type)); + EXPECT_FALSE(IsPromotionAllowed(int32(), nested_type)); + } +} + +TEST_F(UpdateSchemaTest, AddRequiredUnknownColumnFails) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema()); + update->AllowIncompatibleChanges().AddRequiredColumn("mystery", unknown()); + + auto result = update->Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Unknown type field 'mystery' must be optional")); +} + +TEST_F(UpdateSchemaTest, AddColumnWithRequiredNestedUnknownFails) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema()); + update->AddColumn("profile", struct_({ + SchemaField::MakeRequired(3, "mystery", unknown()), + })); + + auto result = update->Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Unknown type field 'mystery' must be optional")); +} + TEST_F(UpdateSchemaTest, UpdateColumnSameType) { ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema()); update->AddColumn("id", int32()); diff --git a/src/iceberg/test/visit_type_test.cc b/src/iceberg/test/visit_type_test.cc index 7104581f5..f038f906f 100644 --- a/src/iceberg/test/visit_type_test.cc +++ b/src/iceberg/test/visit_type_test.cc @@ -53,7 +53,7 @@ std::string TypeTestCaseToString(const ::testing::TestParamInfo& i return info.param.name; } -const static std::array kPrimitiveTypes = {{ +const static std::array kPrimitiveTypes = {{ { .name = "boolean", .type = iceberg::boolean(), @@ -180,6 +180,13 @@ const static std::array kPrimitiveTypes = {{ .primitive = true, .repr = "uuid", }, + { + .name = "unknown", + .type = iceberg::unknown(), + .type_id = iceberg::TypeId::kUnknown, + .primitive = true, + .repr = "unknown", + }, }}; const static std::array kNestedTypes = {{ diff --git a/src/iceberg/type.cc b/src/iceberg/type.cc index b5bee37e2..057dcf513 100644 --- a/src/iceberg/type.cc +++ b/src/iceberg/type.cc @@ -350,6 +350,10 @@ TypeId UuidType::type_id() const { return kTypeId; } std::string UuidType::ToString() const { return "uuid"; } bool UuidType::Equals(const Type& other) const { return other.type_id() == kTypeId; } +TypeId UnknownType::type_id() const { return kTypeId; } +std::string UnknownType::ToString() const { return "unknown"; } +bool UnknownType::Equals(const Type& other) const { return other.type_id() == kTypeId; } + FixedType::FixedType(int32_t length) : length_(length) { ICEBERG_CHECK_OR_DIE(length >= 0, "FixedType: length must be >= 0, was {}", length); } @@ -392,6 +396,7 @@ TYPE_FACTORY(timestamptz_ns, TimestampTzNsType) TYPE_FACTORY(binary, BinaryType) TYPE_FACTORY(string, StringType) TYPE_FACTORY(uuid, UuidType) +TYPE_FACTORY(unknown, UnknownType) #undef TYPE_FACTORY @@ -455,6 +460,8 @@ std::string_view ToString(TypeId id) { return "fixed"; case TypeId::kBinary: return "binary"; + case TypeId::kUnknown: + return "unknown"; } std::unreachable(); diff --git a/src/iceberg/type.h b/src/iceberg/type.h index 53237cdb5..c0966759e 100644 --- a/src/iceberg/type.h +++ b/src/iceberg/type.h @@ -503,6 +503,21 @@ class ICEBERG_EXPORT UuidType : public PrimitiveType { bool Equals(const Type& other) const override; }; +/// \brief A null-only placeholder type used when a more specific type is not known. +class ICEBERG_EXPORT UnknownType : public PrimitiveType { + public: + constexpr static const TypeId kTypeId = TypeId::kUnknown; + + UnknownType() = default; + ~UnknownType() override = default; + + TypeId type_id() const override; + std::string ToString() const override; + + protected: + bool Equals(const Type& other) const override; +}; + /// @} /// \defgroup type-factories Factory functions for creating primitive data types @@ -538,6 +553,8 @@ ICEBERG_EXPORT const std::shared_ptr& binary(); ICEBERG_EXPORT const std::shared_ptr& string(); /// \brief Return a UuidType instance. ICEBERG_EXPORT const std::shared_ptr& uuid(); +/// \brief Return an UnknownType instance. +ICEBERG_EXPORT const std::shared_ptr& unknown(); /// \brief Create a DecimalType with the given precision and scale. /// \param precision The number of decimal digits (max 38). diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 064ec285a..745c63acb 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -31,7 +31,7 @@ namespace iceberg { /// This is not a complete data type by itself because some types are nested /// and/or parameterized. /// -/// Iceberg V3 types are not currently supported. +/// Iceberg V3's `unknown` type is supported as a null-only placeholder type. enum class TypeId { kStruct, kList, @@ -52,6 +52,7 @@ enum class TypeId { kUuid, kFixed, kBinary, + kUnknown, }; /// \brief The time unit. In Iceberg V3 nanoseconds are also supported. @@ -83,6 +84,7 @@ class TimestampTzType; class TimestampNsType; class TimestampTzNsType; class Type; +class UnknownType; class UuidType; /// \brief Data values. diff --git a/src/iceberg/update/update_schema.cc b/src/iceberg/update/update_schema.cc index 1f35781fa..f4f660b42 100644 --- a/src/iceberg/update/update_schema.cc +++ b/src/iceberg/update/update_schema.cc @@ -49,6 +49,44 @@ namespace iceberg { namespace { constexpr int32_t kTableRootId = -1; +Status ValidateUnknownFieldsOptional(const Type& type) { + switch (type.type_id()) { + case TypeId::kStruct: { + const auto& struct_type = internal::checked_cast(type); + for (const auto& field : struct_type.fields()) { + ICEBERG_CHECK(field.optional() || field.type()->type_id() != TypeId::kUnknown, + "Unknown type field '{}' must be optional", field.name()); + ICEBERG_RETURN_UNEXPECTED(ValidateUnknownFieldsOptional(*field.type())); + } + break; + } + case TypeId::kList: { + const auto& list_type = internal::checked_cast(type); + const auto& element = list_type.element(); + ICEBERG_CHECK(element.optional() || element.type()->type_id() != TypeId::kUnknown, + "Unknown type field '{}' must be optional", element.name()); + ICEBERG_RETURN_UNEXPECTED(ValidateUnknownFieldsOptional(*element.type())); + break; + } + case TypeId::kMap: { + const auto& map_type = internal::checked_cast(type); + const auto& key = map_type.key(); + const auto& value = map_type.value(); + ICEBERG_CHECK(key.type()->type_id() != TypeId::kUnknown, + "Map 'key' cannot be unknown type"); + ICEBERG_CHECK(value.optional() || value.type()->type_id() != TypeId::kUnknown, + "Unknown type field '{}' must be optional", value.name()); + ICEBERG_RETURN_UNEXPECTED(ValidateUnknownFieldsOptional(*key.type())); + ICEBERG_RETURN_UNEXPECTED(ValidateUnknownFieldsOptional(*value.type())); + break; + } + default: + break; + } + + return {}; +} + /// \brief Visitor for applying schema changes recursively to nested types class ApplyChangesVisitor { public: @@ -578,6 +616,8 @@ Result UpdateSchema::Apply() { auto new_struct_type = internal::checked_pointer_cast(new_type); auto temp_schema = new_struct_type->ToSchema(); + ICEBERG_RETURN_UNEXPECTED(ValidateUnknownFieldsOptional(*temp_schema)); + std::vector fresh_identifier_ids; for (const auto& name : identifier_field_names_) { ICEBERG_ASSIGN_OR_RAISE(auto field_opt, diff --git a/src/iceberg/util/struct_like_set.cc b/src/iceberg/util/struct_like_set.cc index 433cfa681..12648ea5e 100644 --- a/src/iceberg/util/struct_like_set.cc +++ b/src/iceberg/util/struct_like_set.cc @@ -263,6 +263,8 @@ Status ValidateScalarAgainstType(const Scalar& scalar, const Type& type) { } switch (type.type_id()) { + case TypeId::kUnknown: + return InvalidArgument("Expected unknown but got {}", ScalarTypeName(scalar)); case TypeId::kBoolean: ICEBERG_PRECHECK(std::holds_alternative(scalar), "Expected boolean but got {}", ScalarTypeName(scalar)); diff --git a/src/iceberg/util/type_util.cc b/src/iceberg/util/type_util.cc index c6b9bb3ed..e22b9b148 100644 --- a/src/iceberg/util/type_util.cc +++ b/src/iceberg/util/type_util.cc @@ -426,6 +426,10 @@ bool IsPromotionAllowed(const std::shared_ptr& from_type, TypeId from_id = from_type->type_id(); TypeId to_id = to_type->type_id(); + if (from_id == TypeId::kUnknown) { + return true; + } + // int -> long if (from_id == TypeId::kInt && to_id == TypeId::kLong) { return true; diff --git a/src/iceberg/util/visitor_generate.h b/src/iceberg/util/visitor_generate.h index 7a3648546..a5b0c2ced 100644 --- a/src/iceberg/util/visitor_generate.h +++ b/src/iceberg/util/visitor_generate.h @@ -38,6 +38,7 @@ namespace iceberg { ACTION(Uuid); \ ACTION(Fixed); \ ACTION(Binary); \ + ACTION(Unknown); \ ACTION(Struct); \ ACTION(List); \ ACTION(Map);