From 9190a30223407ec1289caa7c57664282d5ad74b5 Mon Sep 17 00:00:00 2001 From: Zhuo Wang Date: Thu, 25 Dec 2025 10:08:04 +0800 Subject: [PATCH] feat: InMemoryCatalog::CreateTable & StageCreateTable api --- src/iceberg/CMakeLists.txt | 1 + .../catalog/memory/in_memory_catalog.cc | 71 +++++++++-- src/iceberg/meson.build | 1 + src/iceberg/table_identifier.h | 32 +++++ src/iceberg/table_metadata.cc | 117 +++++++++++++++++- src/iceberg/table_metadata.h | 6 + src/iceberg/table_properties.cc | 7 ++ src/iceberg/table_properties.h | 3 + src/iceberg/table_requirements.cc | 22 ++++ src/iceberg/table_requirements.h | 4 + src/iceberg/test/in_memory_catalog_test.cc | 15 +++ src/iceberg/util/meson.build | 1 + src/iceberg/util/property_util.cc | 52 ++++++++ src/iceberg/util/property_util.h | 36 ++++++ 14 files changed, 356 insertions(+), 12 deletions(-) create mode 100644 src/iceberg/util/property_util.cc create mode 100644 src/iceberg/util/property_util.h diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 519757d26..57142091f 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -84,6 +84,7 @@ set(ICEBERG_SOURCES util/decimal.cc util/gzip_internal.cc util/murmurhash3_internal.cc + util/property_util.cc util/snapshot_util.cc util/temporal_util.cc util/timepoint.cc diff --git a/src/iceberg/catalog/memory/in_memory_catalog.cc b/src/iceberg/catalog/memory/in_memory_catalog.cc index b3fd0060a..ef56b2e89 100644 --- a/src/iceberg/catalog/memory/in_memory_catalog.cc +++ b/src/iceberg/catalog/memory/in_memory_catalog.cc @@ -26,7 +26,9 @@ #include "iceberg/table_identifier.h" #include "iceberg/table_metadata.h" #include "iceberg/table_requirement.h" +#include "iceberg/table_requirements.h" #include "iceberg/table_update.h" +#include "iceberg/transaction.h" #include "iceberg/util/macros.h" namespace iceberg { @@ -318,7 +320,7 @@ Result InMemoryNamespace::GetTableMetadataLocation( ICEBERG_RETURN_UNEXPECTED(ns); const auto it = ns.value()->table_metadata_locations_.find(table_ident.name); if (it == ns.value()->table_metadata_locations_.end()) { - return NotFound("{} does not exist", table_ident.name); + return NotFound("Table does not exist: {}", table_ident); } return it->second; } @@ -405,7 +407,24 @@ Result> InMemoryCatalog::CreateTable( const std::string& location, const std::unordered_map& properties) { std::unique_lock lock(mutex_); - return NotImplemented("create table"); + if (root_namespace_->TableExists(identifier).value_or(false)) { + return AlreadyExists("Table already exists: {}", identifier); + } + + std::string base_location = + location.empty() ? warehouse_location_ + "/" + identifier.ToString() : location; + + ICEBERG_ASSIGN_OR_RAISE(auto table_metadata, TableMetadata::Make(*schema, *spec, *order, + location, properties)); + + ICEBERG_ASSIGN_OR_RAISE( + auto metadata_file_location, + TableMetadataUtil::Write(*file_io_, nullptr, "", *table_metadata)); + ICEBERG_RETURN_UNEXPECTED( + root_namespace_->UpdateTableMetadataLocation(identifier, metadata_file_location)); + return Table::Make(identifier, std::move(table_metadata), + std::move(metadata_file_location), file_io_, + std::static_pointer_cast(shared_from_this())); } Result> InMemoryCatalog::UpdateTable( @@ -413,24 +432,41 @@ Result> InMemoryCatalog::UpdateTable( const std::vector>& requirements, const std::vector>& updates) { std::unique_lock lock(mutex_); - ICEBERG_ASSIGN_OR_RAISE(auto base_metadata_location, - root_namespace_->GetTableMetadataLocation(identifier)); - - ICEBERG_ASSIGN_OR_RAISE(auto base, - TableMetadataUtil::Read(*file_io_, base_metadata_location)); + auto base_metadata_location = root_namespace_->GetTableMetadataLocation(identifier); + std::unique_ptr base; + std::unique_ptr builder; + ICEBERG_ASSIGN_OR_RAISE(auto is_create, TableRequirements::IsCreate(requirements)); + if (is_create) { + if (base_metadata_location.has_value()) { + return AlreadyExists("Table already exists: {}", identifier); + } + int8_t format_version = TableMetadata::kDefaultTableFormatVersion; + for (const auto& update : updates) { + if (update->kind() == TableUpdate::Kind::kUpgradeFormatVersion) { + format_version = + dynamic_cast(*update).format_version(); + } + } + builder = TableMetadataBuilder::BuildFromEmpty(format_version); + } else { + ICEBERG_RETURN_UNEXPECTED(base_metadata_location); + ICEBERG_ASSIGN_OR_RAISE( + base, TableMetadataUtil::Read(*file_io_, base_metadata_location.value())); + builder = TableMetadataBuilder::BuildFrom(base.get()); + } for (const auto& requirement : requirements) { ICEBERG_RETURN_UNEXPECTED(requirement->Validate(base.get())); } - auto builder = TableMetadataBuilder::BuildFrom(base.get()); for (const auto& update : updates) { update->ApplyTo(*builder); } ICEBERG_ASSIGN_OR_RAISE(auto updated, builder->Build()); ICEBERG_ASSIGN_OR_RAISE( auto new_metadata_location, - TableMetadataUtil::Write(*file_io_, base.get(), base_metadata_location, *updated)); + TableMetadataUtil::Write(*file_io_, base.get(), base_metadata_location.value(), + *updated)); ICEBERG_RETURN_UNEXPECTED( root_namespace_->UpdateTableMetadataLocation(identifier, new_metadata_location)); TableMetadataUtil::DeleteRemovedMetadataFiles(*file_io_, base.get(), *updated); @@ -445,7 +481,20 @@ Result> InMemoryCatalog::StageCreateTable( const std::string& location, const std::unordered_map& properties) { std::unique_lock lock(mutex_); - return NotImplemented("stage create table"); + if (root_namespace_->TableExists(identifier).value_or(false)) { + return AlreadyExists("Table already exists: {}", identifier); + } + + std::string base_location = + location.empty() ? warehouse_location_ + "/" + identifier.ToString() : location; + + ICEBERG_ASSIGN_OR_RAISE( + auto table_metadata, + TableMetadata::Make(*schema, *spec, *order, base_location, properties)); + ICEBERG_ASSIGN_OR_RAISE( + auto table, StagedTable::Make(identifier, std::move(table_metadata), "", file_io_, + shared_from_this())); + return Transaction::Make(std::move(table), Transaction::Kind::kCreate, false); } Result InMemoryCatalog::TableExists(const TableIdentifier& identifier) const { @@ -495,7 +544,7 @@ Result> InMemoryCatalog::RegisterTable( std::unique_lock lock(mutex_); if (!root_namespace_->NamespaceExists(identifier.ns)) { - return NoSuchNamespace("table namespace does not exist."); + return NoSuchNamespace("Table namespace does not exist: {}", identifier.ns); } if (!root_namespace_->RegisterTable(identifier, metadata_file_location)) { return UnknownError("The registry failed."); diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 8327ca2e0..d5c3714d7 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -106,6 +106,7 @@ iceberg_sources = files( 'util/decimal.cc', 'util/gzip_internal.cc', 'util/murmurhash3_internal.cc', + 'util/property_util.cc', 'util/snapshot_util.cc', 'util/temporal_util.cc', 'util/timepoint.cc', diff --git a/src/iceberg/table_identifier.h b/src/iceberg/table_identifier.h index bef9b81dd..3eff54e4d 100644 --- a/src/iceberg/table_identifier.h +++ b/src/iceberg/table_identifier.h @@ -22,6 +22,8 @@ /// \file iceberg/table_identifier.h /// A TableIdentifier is a unique identifier for a table +#include +#include #include #include @@ -35,6 +37,15 @@ struct ICEBERG_EXPORT Namespace { std::vector levels; bool operator==(const Namespace& other) const { return levels == other.levels; } + + std::string ToString() const { + std::ostringstream oss; + for (size_t i = 0; i < levels.size(); ++i) { + if (i) oss << '.'; + oss << levels[i]; + } + return oss.str(); + } }; /// \brief Identifies a table in iceberg catalog. @@ -53,6 +64,27 @@ struct ICEBERG_EXPORT TableIdentifier { } return {}; } + + std::string ToString() const { return ns.ToString() + '.' + name; } }; } // namespace iceberg + +namespace std { + +template <> +struct formatter : std::formatter { + constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); } + auto format(const iceberg::Namespace& ns, format_context& ctx) const { + return std::formatter::format(ns.ToString(), ctx); + } +}; + +template <> +struct formatter : std::formatter { + constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); } + auto format(const iceberg::TableIdentifier& id, format_context& ctx) const { + return std::formatter::format(id.ToString(), ctx); + } +}; +} // namespace std diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc index ba5b8f328..19aea895a 100644 --- a/src/iceberg/table_metadata.cc +++ b/src/iceberg/table_metadata.cc @@ -20,6 +20,7 @@ #include "iceberg/table_metadata.h" #include +#include #include #include #include @@ -37,6 +38,7 @@ #include "iceberg/exception.h" #include "iceberg/file_io.h" #include "iceberg/json_internal.h" +#include "iceberg/metrics_config.h" #include "iceberg/partition_field.h" #include "iceberg/partition_spec.h" #include "iceberg/result.h" @@ -50,12 +52,73 @@ #include "iceberg/util/gzip_internal.h" #include "iceberg/util/location_util.h" #include "iceberg/util/macros.h" +#include "iceberg/util/property_util.h" +#include "iceberg/util/type_util.h" #include "iceberg/util/uuid.h" namespace iceberg { namespace { const TimePointMs kInvalidLastUpdatedMs = TimePointMs::min(); constexpr int32_t kLastAdded = -1; constexpr std::string_view kMetadataFolderName = "metadata"; + +// TableMetadata private static methods +Result> FreshPartitionSpec( + int32_t spec_id, const PartitionSpec& spec, const Schema& base_schema, + const Schema& fresh_schema, std::function next_id) { + std::vector partition_fields; + for (auto& field : spec.fields()) { + ICEBERG_ASSIGN_OR_RAISE(auto source_name, + base_schema.FindColumnNameById(field.source_id())); + int32_t source_id; + if (!source_name.has_value()) { + // In the case of a source field not found, the column has been deleted. + // This only happens in V1 tables where the reference is still around as a void + // transform + source_id = field.source_id(); + } else { + ICEBERG_ASSIGN_OR_RAISE(auto fresh_field, + fresh_schema.FindFieldByName(source_name.value())); + if (!fresh_field.has_value()) [[unlikely]] { + return InvalidSchema("Partition field {} does not exist in the schema", + source_name.value()); + } + source_id = fresh_field.value().get().field_id(); + } + partition_fields.emplace_back(source_id, next_id ? next_id() : field.field_id(), + std::string(field.name()), field.transform()); + } + return PartitionSpec::Make(fresh_schema, spec_id, std::move(partition_fields), false); +} + +Result> FreshSortOrder(int32_t order_id, const Schema& schema, + const SortOrder& order) { + if (order.is_unsorted()) { + return SortOrder::Unsorted(); + } + + std::vector fresh_fields; + for (const auto& field : order.fields()) { + ICEBERG_ASSIGN_OR_RAISE(auto source_name, + schema.FindColumnNameById(field.source_id())); + if (!source_name.has_value()) { + return InvalidSchema("Unable to find source field with ID {} in the old schema", + field.source_id()); + } + + ICEBERG_ASSIGN_OR_RAISE(auto fresh_field, + schema.FindFieldByName(source_name.value())); + if (!fresh_field.has_value()) { + return InvalidSchema("Unable to find field '{}' in the new schema", + source_name.value()); + } + + int32_t new_source_id = fresh_field.value().get().field_id(); + fresh_fields.emplace_back(new_source_id, field.transform(), field.direction(), + field.null_order()); + } + + return SortOrder::Make(order_id, std::move(fresh_fields)); +} } // namespace std::string ToString(const SnapshotLogEntry& entry) { @@ -68,6 +131,53 @@ std::string ToString(const MetadataLogEntry& entry) { entry.metadata_file); } +Result> TableMetadata::Make( + const iceberg::Schema& schema, const iceberg::PartitionSpec& spec, + const iceberg::SortOrder& sort_order, const std::string& location, + const std::unordered_map& properties, int format_version) { + for (const auto& [key, _] : properties) { + if (TableProperties::reserved_properties().contains(key)) { + return InvalidArgument( + "Table properties should not contain reserved properties, but got {}", key); + } + } + + // Reassign all column ids to ensure consistency + std::atomic last_column_id = 0; + auto next_id = [&last_column_id]() -> int32_t { return ++last_column_id; }; + ICEBERG_ASSIGN_OR_RAISE(auto fresh_schema, + AssignFreshIds(Schema::kInitialSchemaId, schema, next_id)); + + // Rebuild the partition spec using the new column ids + std::atomic last_partition_field_id = PartitionSpec::kInvalidPartitionFieldId; + auto next_partition_field_id = [&last_partition_field_id]() -> int32_t { + return ++last_partition_field_id; + }; + ICEBERG_ASSIGN_OR_RAISE(auto fresh_spec, + FreshPartitionSpec(PartitionSpec::kInitialSpecId, spec, schema, + *fresh_schema, next_partition_field_id)); + + // rebuild the sort order using the new column ids + int32_t fresh_order_id = + sort_order.is_unsorted() ? sort_order.order_id() : SortOrder::kInitialSortOrderId; + ICEBERG_ASSIGN_OR_RAISE(auto fresh_order, + FreshSortOrder(fresh_order_id, *fresh_schema, sort_order)) + + // Validata the metrics configuration. + ICEBERG_RETURN_UNEXPECTED( + MetricsConfig::VerifyReferencedColumns(properties, *fresh_schema)); + + PropertyUtil::ValidateCommitProperties(properties); + + return TableMetadataBuilder::BuildFromEmpty(format_version) + ->SetLocation(location) + .SetCurrentSchema(std::move(fresh_schema), last_column_id.load()) + .SetDefaultPartitionSpec(std::move(fresh_spec)) + .SetDefaultSortOrder(std::move(fresh_order)) + .SetProperties(properties) + .Build(); +} + Result> TableMetadata::Schema() const { return SchemaById(current_schema_id); } @@ -408,6 +518,10 @@ class TableMetadataBuilder::Impl { const TableMetadata* base() const { return base_; } const TableMetadata& metadata() const { return metadata_; } + void SetLocation(std::string_view location) { + metadata_.location = std::string(location); + } + void SetMetadataLocation(std::string_view metadata_location) { metadata_location_ = std::string(metadata_location); if (base_ != nullptr) { @@ -917,7 +1031,8 @@ TableMetadataBuilder& TableMetadataBuilder::RemoveProperties( } TableMetadataBuilder& TableMetadataBuilder::SetLocation(std::string_view location) { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + impl_->SetLocation(location); + return *this; } TableMetadataBuilder& TableMetadataBuilder::AddEncryptionKey( diff --git a/src/iceberg/table_metadata.h b/src/iceberg/table_metadata.h index daaada6e6..b84bdb4d1 100644 --- a/src/iceberg/table_metadata.h +++ b/src/iceberg/table_metadata.h @@ -124,6 +124,12 @@ struct ICEBERG_EXPORT TableMetadata { /// A `long` higher than all assigned row IDs int64_t next_row_id; + static Result> Make( + const iceberg::Schema& schema, const iceberg::PartitionSpec& spec, + const iceberg::SortOrder& sort_order, const std::string& location, + const std::unordered_map& properties, + int format_version = kDefaultTableFormatVersion); + /// \brief Get the current schema, return NotFoundError if not found Result> Schema() const; /// \brief Get the current schema by ID, return NotFoundError if not found diff --git a/src/iceberg/table_properties.cc b/src/iceberg/table_properties.cc index 96633bc24..db6adedcf 100644 --- a/src/iceberg/table_properties.cc +++ b/src/iceberg/table_properties.cc @@ -31,6 +31,13 @@ const std::unordered_set& TableProperties::reserved_properties() { return kReservedProperties; } +const std::unordered_set& TableProperties::commit_properties() { + static const std::unordered_set kCommitProperties = { + kCommitNumRetries.key(), kCommitMinRetryWaitMs.key(), kCommitMaxRetryWaitMs.key(), + kCommitTotalRetryTimeMs.key()}; + return kCommitProperties; +} + TableProperties TableProperties::default_properties() { return {}; } TableProperties TableProperties::FromMap( diff --git a/src/iceberg/table_properties.h b/src/iceberg/table_properties.h index debe61da2..feb4a2001 100644 --- a/src/iceberg/table_properties.h +++ b/src/iceberg/table_properties.h @@ -286,6 +286,9 @@ class ICEBERG_EXPORT TableProperties : public ConfigBase { /// \return The set of reserved property keys static const std::unordered_set& reserved_properties(); + /// \brief Get the set of commit table property keys. + static const std::unordered_set& commit_properties(); + /// \brief Create a default TableProperties instance. /// /// \return A unique pointer to a TableProperties instance with default values diff --git a/src/iceberg/table_requirements.cc b/src/iceberg/table_requirements.cc index 6de6c59e6..67641b721 100644 --- a/src/iceberg/table_requirements.cc +++ b/src/iceberg/table_requirements.cc @@ -19,7 +19,9 @@ #include "iceberg/table_requirements.h" +#include #include +#include #include "iceberg/snapshot.h" #include "iceberg/table_metadata.h" @@ -134,4 +136,24 @@ Result>> TableRequirements::ForUpd return context.Build(); } +Result TableRequirements::IsCreate( + const std::vector>& requirements) { + bool is_create = std::ranges::any_of(requirements, [](const auto& req) { + return dynamic_cast(req.get()) != nullptr; + }); + + if (is_create) { + std::vector> invalid_requirements; + bool invalid = std::ranges::any_of(requirements, [](const auto& req) { + return dynamic_cast(req.get()) == nullptr; + }); + if (invalid) { + return InvalidArgument( + "Cannot have other requirements than AssertDoesNotExist in a table creation"); + } + } + + return is_create; +} + } // namespace iceberg diff --git a/src/iceberg/table_requirements.h b/src/iceberg/table_requirements.h index f79f0bead..6deefdf3a 100644 --- a/src/iceberg/table_requirements.h +++ b/src/iceberg/table_requirements.h @@ -144,6 +144,10 @@ class ICEBERG_EXPORT TableRequirements { static Result>> ForUpdateTable( const TableMetadata& base, const std::vector>& table_updates); + + /// \brief Check if the requirements are for table creation + static Result IsCreate( + const std::vector>& requirements); }; } // namespace iceberg diff --git a/src/iceberg/test/in_memory_catalog_test.cc b/src/iceberg/test/in_memory_catalog_test.cc index 194d6da51..d5d443c86 100644 --- a/src/iceberg/test/in_memory_catalog_test.cc +++ b/src/iceberg/test/in_memory_catalog_test.cc @@ -28,7 +28,9 @@ #include #include "iceberg/arrow/arrow_fs_file_io_internal.h" +#include "iceberg/partition_spec.h" #include "iceberg/schema.h" +#include "iceberg/sort_order.h" #include "iceberg/table.h" #include "iceberg/table_identifier.h" #include "iceberg/table_metadata.h" @@ -106,6 +108,19 @@ TEST_F(InMemoryCatalogTest, TableExists) { EXPECT_THAT(result, HasValue(::testing::Eq(false))); } +TEST_F(InMemoryCatalogTest, CreateTable) { + TableIdentifier table_ident{.ns = {}, .name = "t1"}; + auto schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "x", int64())}, + /*schema_id=*/1); + auto spec = PartitionSpec::Unpartitioned(); + + auto table = catalog_->CreateTable(table_ident, schema, spec, SortOrder::Unsorted(), + GenerateTestTableLocation(table_ident.name), + {{"property1", "value1"}}); + EXPECT_THAT(table, IsOk()); +} + TEST_F(InMemoryCatalogTest, RegisterTable) { TableIdentifier tableIdent{.ns = {}, .name = "t1"}; diff --git a/src/iceberg/util/meson.build b/src/iceberg/util/meson.build index 9f3277533..188981b73 100644 --- a/src/iceberg/util/meson.build +++ b/src/iceberg/util/meson.build @@ -31,6 +31,7 @@ install_headers( 'location_util.h', 'macros.h', 'partition_value_util.h', + 'property_util.h', 'string_util.h', 'temporal_util.h', 'timepoint.h', diff --git a/src/iceberg/util/property_util.cc b/src/iceberg/util/property_util.cc new file mode 100644 index 000000000..636083fdd --- /dev/null +++ b/src/iceberg/util/property_util.cc @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/util/property_util.h" + +#include + +#include "iceberg/table_properties.h" + +namespace iceberg { + +Status PropertyUtil::ValidateCommitProperties( + const std::unordered_map& properties) { + for (const auto& property : TableProperties::commit_properties()) { + if (auto it = properties.find(property); it != properties.end()) { + int32_t parsed; + auto [ptr, ec] = std::from_chars(it->second.data(), + it->second.data() + it->second.size(), parsed); + if (ec == std::errc::invalid_argument) { + return ValidationFailed("Table property {} must have integer value, but got {}", + property, it->second); + } else if (ec == std::errc::result_out_of_range) { + return ValidationFailed("Table property {} value out of range {}", property, + it->second); + } + if (parsed < 0) { + return ValidationFailed( + "Table property {} must have non negative integer value, but got {}", + property, parsed); + } + } + } + return {}; +} + +} // namespace iceberg diff --git a/src/iceberg/util/property_util.h b/src/iceberg/util/property_util.h new file mode 100644 index 000000000..4e3e9b125 --- /dev/null +++ b/src/iceberg/util/property_util.h @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" + +namespace iceberg { + +class ICEBERG_EXPORT PropertyUtil { + public: + static Status ValidateCommitProperties( + const std::unordered_map& properties); +}; + +} // namespace iceberg