Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 147 additions & 1 deletion crates/iceberg/src/transaction/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ pub struct FastAppendAction {
key_metadata: Option<Vec<u8>>,
snapshot_properties: HashMap<String, String>,
added_data_files: Vec<DataFile>,
// Optional tag name to create atomically with the snapshot.
tag_name: Option<String>,
// Optional retention for the tag in milliseconds.
tag_retention: Option<i64>,
}

impl FastAppendAction {
Expand All @@ -47,6 +51,8 @@ impl FastAppendAction {
key_metadata: None,
snapshot_properties: HashMap::default(),
added_data_files: vec![],
tag_name: None,
tag_retention: None,
}
}

Expand Down Expand Up @@ -79,17 +85,42 @@ impl FastAppendAction {
self.snapshot_properties = snapshot_properties;
self
}

/// Set a tag name to be created atomically with the snapshot.
/// The tag will use the table's default retention policy (history.expire.max-ref-age-ms).
///
/// Use `with_tag_retention()` to override the retention policy.
pub fn with_tag(mut self, tag_name: impl Into<String>) -> Self {
self.tag_name = Some(tag_name.into());
self
}

/// Set the retention period for the tag in milliseconds.
/// This overrides the table's default retention policy.
/// Use i64::MAX to make the tag never expire.
///
/// Can be called before or after `with_tag()`. Only takes effect if a tag name is set.
pub fn with_tag_retention(mut self, max_ref_age_ms: i64) -> Self {
self.tag_retention = Some(max_ref_age_ms);
self
}
}

#[async_trait]
impl TransactionAction for FastAppendAction {
async fn commit(self: Arc<Self>, table: &Table) -> Result<ActionCommit> {
let tag_ref = self
.tag_name
.as_ref()
.map(|name| (name.clone(), self.tag_retention));

let snapshot_producer = SnapshotProducer::new(
table,
self.commit_uuid.unwrap_or_else(Uuid::now_v7),
self.key_metadata.clone(),
self.snapshot_properties.clone(),
self.added_data_files.clone(),
tag_ref,
);

// validate added files
Expand Down Expand Up @@ -150,7 +181,8 @@ mod tests {
use std::sync::Arc;

use crate::spec::{
DataContentType, DataFileBuilder, DataFileFormat, Literal, MAIN_BRANCH, Struct,
DataContentType, DataFileBuilder, DataFileFormat, Literal, MAIN_BRANCH, SnapshotRetention,
Struct,
};
use crate::transaction::tests::make_v2_minimal_table;
use crate::transaction::{Transaction, TransactionAction};
Expand Down Expand Up @@ -333,4 +365,118 @@ mod tests {
);
assert_eq!(data_file, *manifest.entries()[0].data_file());
}

#[tokio::test]
async fn test_fast_append_with_tag() {
let table = make_v2_minimal_table();
let tx = Transaction::new(&table);

let data_file = DataFileBuilder::default()
.content(DataContentType::Data)
.file_path("test/tagged.parquet".to_string())
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition_spec_id(table.metadata().default_partition_spec_id())
.partition(Struct::from_iter([Some(Literal::long(300))]))
.build()
.unwrap();

let action = tx
.fast_append()
.add_data_files(vec![data_file])
.with_tag("v1.0.0");

let mut action_commit = Arc::new(action).commit(&table).await.unwrap();
let updates = action_commit.take_updates();

// Should have 3 updates: AddSnapshot, SetSnapshotRef (main), SetSnapshotRef (tag)
assert_eq!(updates.len(), 3);

// First update: AddSnapshot
let snapshot_id = if let TableUpdate::AddSnapshot { snapshot } = &updates[0] {
snapshot.snapshot_id()
} else {
panic!("Expected AddSnapshot as first update");
};

// Second update: SetSnapshotRef for main branch
if let TableUpdate::SetSnapshotRef {
ref_name,
reference,
} = &updates[1]
{
assert_eq!(ref_name, MAIN_BRANCH);
assert_eq!(reference.snapshot_id, snapshot_id);
assert!(reference.is_branch());
} else {
panic!("Expected SetSnapshotRef for main branch as second update");
}

// Third update: SetSnapshotRef for tag
if let TableUpdate::SetSnapshotRef {
ref_name,
reference,
} = &updates[2]
{
assert_eq!(ref_name, "v1.0.0");
assert_eq!(reference.snapshot_id, snapshot_id);
assert!(!reference.is_branch()); // Should be a tag, not a branch
// Should use table defaults (None)
if let SnapshotRetention::Tag { max_ref_age_ms } = &reference.retention {
assert_eq!(max_ref_age_ms, &None);
} else {
panic!("Expected Tag retention");
}
} else {
panic!("Expected SetSnapshotRef for tag as third update");
}
}

#[tokio::test]
async fn test_fast_append_with_tag_retention() {
let table = make_v2_minimal_table();
let tx = Transaction::new(&table);

let data_file = DataFileBuilder::default()
.content(DataContentType::Data)
.file_path("test/tagged_never_expire.parquet".to_string())
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition_spec_id(table.metadata().default_partition_spec_id())
.partition(Struct::from_iter([Some(Literal::long(300))]))
.build()
.unwrap();

let action = tx
.fast_append()
.add_data_files(vec![data_file])
.with_tag("v2.0.0")
.with_tag_retention(i64::MAX); // Never expire

let mut action_commit = Arc::new(action).commit(&table).await.unwrap();
let updates = action_commit.take_updates();

// Should have 3 updates: AddSnapshot, SetSnapshotRef (main), SetSnapshotRef (tag)
assert_eq!(updates.len(), 3);

// Third update: SetSnapshotRef for tag with custom retention
if let TableUpdate::SetSnapshotRef {
ref_name,
reference,
} = &updates[2]
{
assert_eq!(ref_name, "v2.0.0");
assert!(!reference.is_branch());
// Should have custom retention set to i64::MAX
if let SnapshotRetention::Tag { max_ref_age_ms } = &reference.retention {
assert_eq!(max_ref_age_ms, &Some(i64::MAX));
} else {
panic!("Expected Tag retention");
}
} else {
panic!("Expected SetSnapshotRef for tag as third update");
}
}
}
15 changes: 14 additions & 1 deletion crates/iceberg/src/transaction/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ pub(crate) struct SnapshotProducer<'a> {
// It starts from 0 and increments for each new manifest file.
// Note: This counter is limited to the range of (0..u64::MAX).
manifest_counter: RangeFrom<u64>,
// Optional tag (name, retention) to create atomically with the snapshot.
tag_ref: Option<(String, Option<i64>)>,
}

impl<'a> SnapshotProducer<'a> {
Expand All @@ -127,6 +129,7 @@ impl<'a> SnapshotProducer<'a> {
key_metadata: Option<Vec<u8>>,
snapshot_properties: HashMap<String, String>,
added_data_files: Vec<DataFile>,
tag_ref: Option<(String, Option<i64>)>,
) -> Self {
Self {
table,
Expand All @@ -136,6 +139,7 @@ impl<'a> SnapshotProducer<'a> {
snapshot_properties,
added_data_files,
manifest_counter: (0..),
tag_ref,
}
}

Expand Down Expand Up @@ -485,7 +489,7 @@ impl<'a> SnapshotProducer<'a> {
new_snapshot.build()
};

let updates = vec![
let mut updates = vec![
TableUpdate::AddSnapshot {
snapshot: new_snapshot,
},
Expand All @@ -498,6 +502,15 @@ impl<'a> SnapshotProducer<'a> {
},
];

if let Some((tag_name, max_ref_age_ms)) = self.tag_ref {
updates.push(TableUpdate::SetSnapshotRef {
ref_name: tag_name,
reference: SnapshotReference::new(self.snapshot_id, SnapshotRetention::Tag {
max_ref_age_ms,
}),
});
}

let requirements = vec![
TableRequirement::UuidMatch {
uuid: self.table.metadata().uuid(),
Expand Down
Loading