From 6b85bf000177b5dac8822d78f305d3040cddf77d Mon Sep 17 00:00:00 2001 From: Ivan Reshetnikov Date: Tue, 9 Dec 2025 13:43:13 +0100 Subject: [PATCH 1/4] feat: serde-based analytics exporter --- Cargo.toml | 11 +- crates/analytics/Cargo.toml | 11 +- crates/analytics/src/lib.rs | 8 +- crates/analytics/src/serializers.rs | 108 +------------- crates/analytics/src/serializers/parquet.rs | 44 ++++++ .../src/serializers/parquet/native.rs | 132 +++++++++++++++++ .../src/serializers/parquet/serde.rs | 133 +++++++++++++++++ crates/analytics/tests/integration.rs | 135 +++++++++++++++--- 8 files changed, 450 insertions(+), 132 deletions(-) create mode 100644 crates/analytics/src/serializers/parquet.rs create mode 100644 crates/analytics/src/serializers/parquet/native.rs create mode 100644 crates/analytics/src/serializers/parquet/serde.rs diff --git a/Cargo.toml b/Cargo.toml index f5aefd4..c559f83 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ members = [ ] [features] -default = [] +default = ["full"] full = [ "alloc", "analytics", @@ -71,3 +71,12 @@ required-features = ["alloc", "metrics"] [[example]] name = "geoblock" required-features = ["geoblock"] + +[patch.crates-io] +arrow = { git = "https://github.com/WalletConnect/arrow-rs.git", rev = "e862f32" } +arrow-array = { git = "https://github.com/WalletConnect/arrow-rs.git", rev = "e862f32" } +arrow-buffer = { git = "https://github.com/WalletConnect/arrow-rs.git", rev = "e862f32" } +arrow-data = { git = "https://github.com/WalletConnect/arrow-rs.git", rev = "e862f32" } +arrow-schema = { git = "https://github.com/WalletConnect/arrow-rs.git", rev = "e862f32" } +parquet = { git = "https://github.com/WalletConnect/arrow-rs.git", rev = "e862f32" } +parquet_derive = { git = "https://github.com/WalletConnect/arrow-rs.git", rev = "e862f32" } diff --git a/crates/analytics/Cargo.toml b/crates/analytics/Cargo.toml index 2bb16ad..000b94b 100644 --- a/crates/analytics/Cargo.toml +++ b/crates/analytics/Cargo.toml @@ -17,6 +17,11 @@ tap = "1.0" chrono = { version = "0.4" } aws-sdk-s3.workspace = true -bytes = "1.5" -parquet = { git = "https://github.com/WalletConnect/arrow-rs.git", rev = "e862f32", default-features = false, features = ["flate2-rust_backened"] } -parquet_derive = { git = "https://github.com/WalletConnect/arrow-rs.git", rev = "e862f32" } +bytes = "1.11" + +# Parquet serialization +arrow = "57.1" +parquet = { version = "57.1", default-features = false, features = ["arrow", "flate2-rust_backened"] } +parquet_derive = { version = "57.1" } +serde_arrow = { version = "0.13", features = ["arrow-57"] } +serde = { version = "1.0", features = ["derive", "rc"] } diff --git a/crates/analytics/src/lib.rs b/crates/analytics/src/lib.rs index 1e49f86..d327d00 100644 --- a/crates/analytics/src/lib.rs +++ b/crates/analytics/src/lib.rs @@ -9,7 +9,7 @@ use { pub use { collectors::{BatchCollector, CollectionError, CollectorConfig}, exporters::{AwsConfig, AwsError, AwsExporter, NoopExporter}, - serializers::{NoopBatchFactory, ParquetBatchFactory, ParquetConfig, ParquetError}, + serializers::{parquet, NoopBatch, NoopBatchFactory}, }; mod collectors; @@ -211,5 +211,9 @@ pub fn noop_collector() -> BatchCollector where T: AnalyticsEvent, { - BatchCollector::new(Default::default(), NoopBatchFactory, NoopExporter) + BatchCollector::new( + Default::default(), + serializers::NoopBatchFactory, + NoopExporter, + ) } diff --git a/crates/analytics/src/serializers.rs b/crates/analytics/src/serializers.rs index 7e48070..3f7c7d4 100644 --- a/crates/analytics/src/serializers.rs +++ b/crates/analytics/src/serializers.rs @@ -1,14 +1,10 @@ -pub use parquet::errors::ParquetError; use { - crate::{AnalyticsEvent, Batch, BatchFactory}, - parquet::{ - basic::Compression, - file::{properties::WriterProperties, writer::SerializedFileWriter}, - record::RecordWriter, - }, - std::{convert::Infallible, sync::Arc}, + crate::{Batch, BatchFactory}, + std::convert::Infallible, }; +pub mod parquet; + pub struct NoopBatchFactory; impl BatchFactory for NoopBatchFactory { @@ -41,99 +37,3 @@ impl Batch for NoopBatch { Ok(Vec::new()) } } - -#[derive(Debug, Clone)] -pub struct ParquetConfig { - /// The maximum number of records the batch can hold. Pushing more records - /// will trigger export. - pub batch_capacity: usize, - - /// The data buffer initially allocated for serialization. Specifying a low - /// value would cause memory reallocation potentially affecting performance. - pub alloc_buffer_size: usize, -} - -impl Default for ParquetConfig { - fn default() -> Self { - Self { - batch_capacity: 1024 * 128, - alloc_buffer_size: 1024 * 1024 * 130, - } - } -} - -pub struct ParquetBatchFactory { - config: ParquetConfig, -} - -impl ParquetBatchFactory { - pub fn new(config: ParquetConfig) -> Self { - Self { config } - } -} - -impl BatchFactory for ParquetBatchFactory -where - T: AnalyticsEvent, - for<'a> &'a [T]: RecordWriter, -{ - type Batch = ParquetBatch; - type Error = ParquetError; - - fn create(&self) -> Result { - let props = WriterProperties::builder() - .set_compression(Compression::GZIP(Default::default())) - .build(); - let props = Arc::new(props); - let schema = (&[] as &[T]).schema()?; - - Ok(ParquetBatch { - capacity: self.config.batch_capacity, - data: Vec::with_capacity(self.config.batch_capacity), - writer: SerializedFileWriter::new( - Vec::with_capacity(self.config.alloc_buffer_size), - schema, - props, - )?, - }) - } -} - -pub struct ParquetBatch { - capacity: usize, - data: Vec, - writer: SerializedFileWriter>, -} - -impl Batch for ParquetBatch -where - T: AnalyticsEvent, - for<'a> &'a [T]: RecordWriter, -{ - type Error = ParquetError; - - fn push(&mut self, data: T) -> Result<(), Self::Error> { - self.data.push(data); - Ok(()) - } - - fn is_full(&self) -> bool { - self.data.len() >= self.capacity - } - - fn is_empty(&self) -> bool { - self.data.is_empty() - } - - fn serialize(mut self) -> Result, Self::Error> { - let mut row_group_writer = self.writer.next_row_group()?; - - self.data - .as_slice() - .write_to_row_group(&mut row_group_writer)?; - - row_group_writer.close()?; - - self.writer.into_inner() - } -} diff --git a/crates/analytics/src/serializers/parquet.rs b/crates/analytics/src/serializers/parquet.rs new file mode 100644 index 0000000..d6a4c6c --- /dev/null +++ b/crates/analytics/src/serializers/parquet.rs @@ -0,0 +1,44 @@ +use { + ::serde::{de::DeserializeOwned, Serialize}, + arrow::datatypes::FieldRef, + parquet::{ + arrow::ArrowSchemaConverter, + record::RecordWriter, + schema::{parser::parse_message_type, types::TypePtr}, + }, + serde_arrow::schema::{SchemaLike as _, TracingOptions}, + std::sync::Arc, +}; + +pub mod native; +pub mod serde; + +/// Returns `parquet` schema from parsing the string. +pub fn schema_from_str(schema: &str) -> anyhow::Result { + Ok(Arc::new(parse_message_type(schema)?)) +} + +/// Returns `parquet` schema based on the [`RecordWriter`] implementation. +pub fn schema_from_native_writer() -> anyhow::Result +where + for<'a> &'a [T]: RecordWriter, +{ + Ok((&[] as &[T]).schema()?) +} + +/// Returns `parquet` schema generated from [`serde_arrow`]. +pub fn schema_from_serde_arrow( + name: &str, + tracing_options: TracingOptions, +) -> anyhow::Result +where + T: Serialize + DeserializeOwned, +{ + let fields = Vec::::from_type::(tracing_options)?; + let arrow_schema = serde_arrow::to_record_batch::>(&fields, &Vec::new())?.schema(); + let parquet_schema = ArrowSchemaConverter::new() + .schema_root(name) + .convert(&arrow_schema)?; + + Ok(parquet_schema.root_schema_ptr()) +} diff --git a/crates/analytics/src/serializers/parquet/native.rs b/crates/analytics/src/serializers/parquet/native.rs new file mode 100644 index 0000000..5ae4b8d --- /dev/null +++ b/crates/analytics/src/serializers/parquet/native.rs @@ -0,0 +1,132 @@ +pub use parquet::{self, errors::ParquetError as Error}; +use { + crate::AnalyticsEvent, + parquet::{ + basic::Compression, + file::{properties::WriterProperties, writer::SerializedFileWriter}, + record::RecordWriter, + schema::types::TypePtr, + }, + std::{marker::PhantomData, sync::Arc}, +}; + +#[derive(Debug, Clone)] +pub struct Config { + /// The maximum number of records the batch can hold. Pushing more records + /// will trigger export. + /// + /// Default value: `1024 * 128`. + pub batch_capacity: usize, + + /// The data buffer initially allocated for serialization. Specifying a low + /// value would cause memory reallocation potentially affecting performance. + /// + /// Default value: `1024 * 1024 * 130`. + pub alloc_buffer_size: usize, + + /// Native [`parquet`] [`WriterProperties`]. Configures parquet export + /// parameters. + /// + /// Default value: `WriterProperties::default()` with enabled GZIP + /// compression. + pub writer_properties: WriterProperties, +} + +impl Default for Config { + fn default() -> Self { + Self { + batch_capacity: 1024 * 128, + alloc_buffer_size: 1024 * 1024 * 130, + writer_properties: WriterProperties::builder() + .set_compression(Compression::GZIP(Default::default())) + .build(), + } + } +} + +pub struct BatchFactory { + batch_capacity: usize, + alloc_buffer_size: usize, + writer_properties: Arc, + schema: TypePtr, + _marker: PhantomData, +} + +impl BatchFactory +where + T: AnalyticsEvent, + for<'a> &'a [T]: RecordWriter, +{ + pub fn new(config: Config) -> Result { + Ok(Self { + batch_capacity: config.batch_capacity, + alloc_buffer_size: config.alloc_buffer_size, + writer_properties: Arc::new(config.writer_properties), + schema: (&[] as &[T]).schema()?, + _marker: PhantomData, + }) + } +} + +impl crate::BatchFactory for BatchFactory +where + T: AnalyticsEvent, + for<'a> &'a [T]: RecordWriter, +{ + type Batch = Batch; + type Error = Error; + + fn create(&self) -> Result { + let writer = SerializedFileWriter::new( + Vec::with_capacity(self.alloc_buffer_size), + self.schema.clone(), + self.writer_properties.clone(), + )?; + + Ok(Batch { + capacity: self.batch_capacity, + data: Vec::with_capacity(self.batch_capacity), + writer, + }) + } +} + +pub struct Batch { + capacity: usize, + data: Vec, + writer: SerializedFileWriter>, +} + +impl crate::Batch for Batch +where + T: AnalyticsEvent, + for<'a> &'a [T]: RecordWriter, +{ + type Error = Error; + + fn push(&mut self, data: T) -> Result<(), Self::Error> { + self.data.push(data); + Ok(()) + } + + fn is_full(&self) -> bool { + self.data.len() >= self.capacity + } + + fn is_empty(&self) -> bool { + self.data.is_empty() + } + + fn serialize(mut self) -> Result, Self::Error> { + let mut row_group_writer = self.writer.next_row_group()?; + + self.data + .as_slice() + .write_to_row_group(&mut row_group_writer)?; + + row_group_writer.close()?; + + self.writer.flush()?; + self.writer.into_inner() + } +} diff --git a/crates/analytics/src/serializers/parquet/serde.rs b/crates/analytics/src/serializers/parquet/serde.rs new file mode 100644 index 0000000..04b2bd0 --- /dev/null +++ b/crates/analytics/src/serializers/parquet/serde.rs @@ -0,0 +1,133 @@ +pub use parquet::{self, errors::ParquetError as Error}; +use { + crate::AnalyticsEvent, + arrow::datatypes::FieldRef, + parquet::{arrow::ArrowWriter, basic::Compression, file::properties::WriterProperties}, + serde::{de::DeserializeOwned, Serialize}, + serde_arrow::schema::{SchemaLike, TracingOptions}, + std::{io, marker::PhantomData, sync::Arc}, +}; + +#[derive(Debug, Clone)] +pub struct Config { + /// The maximum number of records the batch can hold. Pushing more records + /// will trigger export. + /// + /// Default value: `1024 * 128`. + pub batch_capacity: usize, + + /// The data buffer initially allocated for serialization. Specifying a low + /// value would cause memory reallocation potentially affecting performance. + /// + /// Default value: `1024 * 1024 * 130`. + pub alloc_buffer_size: usize, + + /// Native [`parquet`] [`WriterProperties`]. Configures parquet export + /// parameters. + /// + /// Default value: `WriterProperties::default()` with enabled GZIP + /// compression. + pub writer_properties: WriterProperties, + + /// Native [`serde_arrow`] [`TracingOptions`]. Configures parquet schema + /// generation based on the implemented `serde` traits. + /// + /// Default value: `TracingOptions::default()`. + pub tracing_options: TracingOptions, +} + +impl Default for Config { + fn default() -> Self { + Self { + batch_capacity: 1024 * 128, + alloc_buffer_size: 1024 * 1024 * 130, + writer_properties: WriterProperties::builder() + .set_compression(Compression::GZIP(Default::default())) + .build(), + tracing_options: Default::default(), + } + } +} + +pub struct BatchFactory { + batch_capacity: usize, + alloc_buffer_size: usize, + writer_properties: WriterProperties, + fields: Arc>, + _marker: PhantomData, +} + +impl BatchFactory +where + T: AnalyticsEvent + Serialize + DeserializeOwned, +{ + pub fn new(config: Config) -> Result { + let fields = + Vec::::from_type::(config.tracing_options).map_err(io::Error::other)?; + + Ok(Self { + batch_capacity: config.batch_capacity, + alloc_buffer_size: config.alloc_buffer_size, + writer_properties: config.writer_properties, + fields: Arc::new(fields), + _marker: PhantomData, + }) + } +} + +impl crate::BatchFactory for BatchFactory +where + T: AnalyticsEvent + Serialize + DeserializeOwned, +{ + type Batch = Batch; + type Error = Error; + + fn create(&self) -> Result { + Ok(Batch { + capacity: self.batch_capacity, + data: Vec::with_capacity(self.batch_capacity), + buffer: Vec::with_capacity(self.alloc_buffer_size), + fields: self.fields.clone(), + writer_properties: self.writer_properties.clone(), + }) + } +} + +pub struct Batch { + capacity: usize, + data: Vec, + buffer: Vec, + fields: Arc>, + writer_properties: WriterProperties, +} + +impl crate::Batch for Batch +where + T: AnalyticsEvent + Serialize + DeserializeOwned, +{ + type Error = Error; + + fn push(&mut self, data: T) -> Result<(), Self::Error> { + self.data.push(data); + Ok(()) + } + + fn is_full(&self) -> bool { + self.data.len() >= self.capacity + } + + fn is_empty(&self) -> bool { + self.data.is_empty() + } + + fn serialize(self) -> Result, Self::Error> { + let batch = + serde_arrow::to_record_batch(&self.fields, &self.data).map_err(io::Error::other)?; + + let mut writer = + ArrowWriter::try_new(self.buffer, batch.schema(), Some(self.writer_properties))?; + + writer.write(&batch)?; + writer.into_inner() + } +} diff --git a/crates/analytics/tests/integration.rs b/crates/analytics/tests/integration.rs index 5e0834b..25d360a 100644 --- a/crates/analytics/tests/integration.rs +++ b/crates/analytics/tests/integration.rs @@ -1,18 +1,26 @@ use { + ::parquet::{ + file::reader::{FileReader as _, SerializedFileReader}, + record::RecordReader as _, + }, analytics::{ + parquet, AnalyticsExt, + Batch, BatchCollector, + BatchFactory, BatchObserver, CollectionObserver, Collector, CollectorConfig, ExportObserver, Exporter, - ParquetBatchFactory, - ParquetConfig, }, async_trait::async_trait, - parquet_derive::ParquetRecordWriter, + bytes::Bytes, + parquet_derive::{ParquetRecordReader, ParquetRecordWriter}, + serde::{Deserialize, Serialize}, + serde_arrow::schema::TracingOptions, std::{ sync::{ atomic::{AtomicUsize, Ordering}, @@ -20,7 +28,7 @@ use { }, time::Duration, }, - tokio::sync::{mpsc, mpsc::error::TrySendError}, + tokio::sync::mpsc::{self, error::TrySendError}, }; #[derive(Clone)] @@ -40,10 +48,12 @@ impl Exporter for MockExporter { } } -#[derive(ParquetRecordWriter)] -struct DataA { +#[derive( + Debug, Clone, ParquetRecordWriter, ParquetRecordReader, Serialize, Deserialize, PartialEq, Eq, +)] +struct Record { a: u32, - b: &'static str, + b: String, c: bool, } @@ -56,17 +66,19 @@ async fn export_by_timeout() { export_interval: Duration::from_millis(200), ..Default::default() }, - ParquetBatchFactory::new(ParquetConfig { + parquet::native::BatchFactory::new(parquet::native::Config { batch_capacity: 128, alloc_buffer_size: 8192, - }), + ..Default::default() + }) + .unwrap(), MockExporter(tx), ); collector - .collect(DataA { + .collect(Record { a: 1, - b: "foo", + b: "foo".into(), c: true, }) .unwrap(); @@ -91,25 +103,27 @@ async fn export_by_num_rows() { export_interval: Duration::from_millis(200), ..Default::default() }, - ParquetBatchFactory::new(ParquetConfig { + parquet::native::BatchFactory::new(parquet::native::Config { batch_capacity: 2, alloc_buffer_size: 8192, - }), + ..Default::default() + }) + .unwrap(), MockExporter(tx), ); collector - .collect(DataA { + .collect(Record { a: 1, - b: "foo", + b: "foo".into(), c: true, }) .unwrap(); collector - .collect(DataA { + .collect(Record { a: 2, - b: "bar", + b: "bar".into(), c: false, }) .unwrap(); @@ -163,27 +177,29 @@ async fn observability() { export_interval: Duration::from_millis(200), ..Default::default() }, - ParquetBatchFactory::new(ParquetConfig { + parquet::native::BatchFactory::new(parquet::native::Config { batch_capacity: 2, alloc_buffer_size: 8192, + ..Default::default() }) + .unwrap() .with_observer(observer.clone()), MockExporter(tx).with_observer(observer.clone()), ) .with_observer(observer.clone()); collector - .collect(DataA { + .collect(Record { a: 1, - b: "foo", + b: "foo".into(), c: true, }) .unwrap(); collector - .collect(DataA { + .collect(Record { a: 2, - b: "bar", + b: "bar".into(), c: false, }) .unwrap(); @@ -200,3 +216,78 @@ async fn observability() { assert_eq!(observer.batch_serialization.load(Ordering::SeqCst), 1); assert_eq!(observer.collection.load(Ordering::SeqCst), 2); } + +#[test] +fn parquet_schema() { + let schema_str = " + message rust_schema { + REQUIRED INT32 a (INTEGER(32, false)); + REQUIRED BINARY b (STRING); + REQUIRED BOOLEAN c; + } + "; + + let from_str = parquet::schema_from_str(schema_str).unwrap(); + let from_native_writer = parquet::schema_from_native_writer::().unwrap(); + let from_serde_arrow = parquet::schema_from_serde_arrow::( + "rust_schema", + TracingOptions::new().strings_as_large_utf8(false), + ) + .unwrap(); + + assert_eq!(from_str, from_native_writer); + assert_eq!(from_str, from_serde_arrow); +} + +#[test] +fn parquet_native_serialization() { + verify_parquet_serialization(parquet::native::BatchFactory::new(Default::default()).unwrap()); +} + +#[test] +fn parquet_serde_serialization() { + verify_parquet_serialization(parquet::serde::BatchFactory::new(Default::default()).unwrap()); +} + +fn verify_parquet_serialization(factory: impl BatchFactory) { + let expected_data = generate_records(); + let mut batch = factory.create().unwrap(); + + for data in &expected_data { + batch.push(data.clone()).unwrap(); + } + + let actual_data = read_records(batch.serialize().unwrap().into(), expected_data.len()); + assert_eq!(actual_data, expected_data); +} + +fn generate_records() -> Vec { + vec![ + Record { + a: 1, + b: "foo".into(), + c: true, + }, + Record { + a: 2, + b: "bar".into(), + c: false, + }, + ] +} + +fn read_records(serialized: Bytes, num_records: usize) -> Vec { + let mut samples = Vec::new(); + let reader = SerializedFileReader::new(serialized).unwrap(); + let mut row_group = reader.get_row_group(0).unwrap(); + samples + .read_from_row_group(&mut *row_group, num_records) + .unwrap(); + samples +} + +// Ensure `parquet` used allows writing `Arc` values. +#[derive(ParquetRecordWriter)] +struct _RecordWithArcStr { + a: Arc, +} From d726f773c63fa4c959a7a1c91b63f983f554a29a Mon Sep 17 00:00:00 2001 From: Ivan Reshetnikov Date: Tue, 9 Dec 2025 13:49:06 +0100 Subject: [PATCH 2/4] fix: clean up --- crates/analytics/src/lib.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/crates/analytics/src/lib.rs b/crates/analytics/src/lib.rs index d327d00..96476e0 100644 --- a/crates/analytics/src/lib.rs +++ b/crates/analytics/src/lib.rs @@ -211,9 +211,5 @@ pub fn noop_collector() -> BatchCollector where T: AnalyticsEvent, { - BatchCollector::new( - Default::default(), - serializers::NoopBatchFactory, - NoopExporter, - ) + BatchCollector::new(Default::default(), NoopBatchFactory, NoopExporter) } From cd43bfc569e9d572f6a9de4816c4704d99ac4b2a Mon Sep 17 00:00:00 2001 From: Ivan Reshetnikov Date: Wed, 10 Dec 2025 08:52:41 +0100 Subject: [PATCH 3/4] fix: clean up --- Cargo.toml | 15 ++------ crates/analytics/Cargo.toml | 35 +++++++++++++----- crates/analytics/src/lib.rs | 2 +- crates/analytics/src/serializers.rs | 1 + crates/analytics/src/serializers/parquet.rs | 36 ++----------------- .../src/serializers/parquet/native.rs | 8 +++++ .../src/serializers/parquet/serde.rs | 27 ++++++++++++-- crates/analytics/tests/integration.rs | 10 ++---- 8 files changed, 68 insertions(+), 66 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c559f83..d0fe27f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,9 +7,7 @@ authors = ["WalletConnect Team"] license = "MIT" [workspace] -members = [ - "crates/*", -] +members = ["crates/*"] [features] default = ["full"] @@ -23,7 +21,7 @@ full = [ "geoip", "metrics", "rate_limit", - "websocket" + "websocket", ] alloc = ["dep:alloc"] analytics = ["dep:analytics"] @@ -71,12 +69,3 @@ required-features = ["alloc", "metrics"] [[example]] name = "geoblock" required-features = ["geoblock"] - -[patch.crates-io] -arrow = { git = "https://github.com/WalletConnect/arrow-rs.git", rev = "e862f32" } -arrow-array = { git = "https://github.com/WalletConnect/arrow-rs.git", rev = "e862f32" } -arrow-buffer = { git = "https://github.com/WalletConnect/arrow-rs.git", rev = "e862f32" } -arrow-data = { git = "https://github.com/WalletConnect/arrow-rs.git", rev = "e862f32" } -arrow-schema = { git = "https://github.com/WalletConnect/arrow-rs.git", rev = "e862f32" } -parquet = { git = "https://github.com/WalletConnect/arrow-rs.git", rev = "e862f32" } -parquet_derive = { git = "https://github.com/WalletConnect/arrow-rs.git", rev = "e862f32" } diff --git a/crates/analytics/Cargo.toml b/crates/analytics/Cargo.toml index 000b94b..e0f9fad 100644 --- a/crates/analytics/Cargo.toml +++ b/crates/analytics/Cargo.toml @@ -3,25 +3,42 @@ name = "analytics" version = "0.1.0" edition = "2021" +[features] +default = [] +full = ["parquet-native", "parquet-serde"] +parquet-native = ["parquet"] +parquet-serde = ["parquet", "dep:serde", "dep:serde_arrow"] +parquet = ["dep:parquet", "dep:arrow"] + [dependencies] future = { path = "../future" } - async-trait = "0.1" -tokio = { version = "1", default-features = false, features = ["rt", "rt-multi-thread", "sync", "time", "macros"] } +tokio = { version = "1", default-features = false, features = [ + "rt", + "rt-multi-thread", + "sync", + "time", + "macros", +] } tracing = "0.1" +# Parquet serialization +parquet = { optional = true, version = "57.1", default-features = false, features = [ + "arrow", + "flate2-rust_backened", +] } +arrow = { optional = true, version = "57.1" } +serde_arrow = { optional = true, version = "0.13", features = ["arrow-57"] } +serde = { optional = true, version = "1.0", features = ["derive", "rc"] } + # Misc thiserror = "1.0" anyhow = "1" tap = "1.0" - chrono = { version = "0.4" } aws-sdk-s3.workspace = true bytes = "1.11" -# Parquet serialization -arrow = "57.1" -parquet = { version = "57.1", default-features = false, features = ["arrow", "flate2-rust_backened"] } -parquet_derive = { version = "57.1" } -serde_arrow = { version = "0.13", features = ["arrow-57"] } -serde = { version = "1.0", features = ["derive", "rc"] } +[dev-dependencies] +analytics = { path = ".", features = ["full"] } +parquet_derive = "57.1" diff --git a/crates/analytics/src/lib.rs b/crates/analytics/src/lib.rs index 96476e0..7da792a 100644 --- a/crates/analytics/src/lib.rs +++ b/crates/analytics/src/lib.rs @@ -9,7 +9,7 @@ use { pub use { collectors::{BatchCollector, CollectionError, CollectorConfig}, exporters::{AwsConfig, AwsError, AwsExporter, NoopExporter}, - serializers::{parquet, NoopBatch, NoopBatchFactory}, + serializers::*, }; mod collectors; diff --git a/crates/analytics/src/serializers.rs b/crates/analytics/src/serializers.rs index 3f7c7d4..5d475ea 100644 --- a/crates/analytics/src/serializers.rs +++ b/crates/analytics/src/serializers.rs @@ -3,6 +3,7 @@ use { std::convert::Infallible, }; +#[cfg(feature = "parquet")] pub mod parquet; pub struct NoopBatchFactory; diff --git a/crates/analytics/src/serializers/parquet.rs b/crates/analytics/src/serializers/parquet.rs index d6a4c6c..d5a6403 100644 --- a/crates/analytics/src/serializers/parquet.rs +++ b/crates/analytics/src/serializers/parquet.rs @@ -1,44 +1,14 @@ use { - ::serde::{de::DeserializeOwned, Serialize}, - arrow::datatypes::FieldRef, - parquet::{ - arrow::ArrowSchemaConverter, - record::RecordWriter, - schema::{parser::parse_message_type, types::TypePtr}, - }, - serde_arrow::schema::{SchemaLike as _, TracingOptions}, + parquet::schema::{parser::parse_message_type, types::TypePtr}, std::sync::Arc, }; +#[cfg(feature = "parquet-native")] pub mod native; +#[cfg(feature = "parquet-serde")] pub mod serde; /// Returns `parquet` schema from parsing the string. pub fn schema_from_str(schema: &str) -> anyhow::Result { Ok(Arc::new(parse_message_type(schema)?)) } - -/// Returns `parquet` schema based on the [`RecordWriter`] implementation. -pub fn schema_from_native_writer() -> anyhow::Result -where - for<'a> &'a [T]: RecordWriter, -{ - Ok((&[] as &[T]).schema()?) -} - -/// Returns `parquet` schema generated from [`serde_arrow`]. -pub fn schema_from_serde_arrow( - name: &str, - tracing_options: TracingOptions, -) -> anyhow::Result -where - T: Serialize + DeserializeOwned, -{ - let fields = Vec::::from_type::(tracing_options)?; - let arrow_schema = serde_arrow::to_record_batch::>(&fields, &Vec::new())?.schema(); - let parquet_schema = ArrowSchemaConverter::new() - .schema_root(name) - .convert(&arrow_schema)?; - - Ok(parquet_schema.root_schema_ptr()) -} diff --git a/crates/analytics/src/serializers/parquet/native.rs b/crates/analytics/src/serializers/parquet/native.rs index 5ae4b8d..56e176e 100644 --- a/crates/analytics/src/serializers/parquet/native.rs +++ b/crates/analytics/src/serializers/parquet/native.rs @@ -10,6 +10,14 @@ use { std::{marker::PhantomData, sync::Arc}, }; +/// Returns `parquet` schema based on the [`RecordWriter`] implementation. +pub fn schema() -> Result +where + for<'a> &'a [T]: RecordWriter, +{ + (&[] as &[T]).schema() +} + #[derive(Debug, Clone)] pub struct Config { /// The maximum number of records the batch can hold. Pushing more records diff --git a/crates/analytics/src/serializers/parquet/serde.rs b/crates/analytics/src/serializers/parquet/serde.rs index 04b2bd0..676a8df 100644 --- a/crates/analytics/src/serializers/parquet/serde.rs +++ b/crates/analytics/src/serializers/parquet/serde.rs @@ -1,12 +1,35 @@ -pub use parquet::{self, errors::ParquetError as Error}; use { crate::AnalyticsEvent, arrow::datatypes::FieldRef, - parquet::{arrow::ArrowWriter, basic::Compression, file::properties::WriterProperties}, + parquet::{ + arrow::{ArrowSchemaConverter, ArrowWriter}, + basic::Compression, + file::properties::WriterProperties, + schema::types::TypePtr, + }, serde::{de::DeserializeOwned, Serialize}, serde_arrow::schema::{SchemaLike, TracingOptions}, std::{io, marker::PhantomData, sync::Arc}, }; +pub use { + parquet::{self, errors::ParquetError as Error}, + serde_arrow::Error as SerdeArrowError, +}; + +/// Returns `parquet` schema generated with [`serde_arrow`]. +pub fn schema(name: &str, tracing_options: TracingOptions) -> Result +where + T: Serialize + DeserializeOwned, +{ + let fields = Vec::::from_type::(tracing_options)?; + let arrow_schema = serde_arrow::to_record_batch::>(&fields, &Vec::new())?.schema(); + let parquet_schema = ArrowSchemaConverter::new() + .schema_root(name) + .convert(&arrow_schema) + .map_err(|err| SerdeArrowError::custom(err.to_string()))?; + + Ok(parquet_schema.root_schema_ptr()) +} #[derive(Debug, Clone)] pub struct Config { diff --git a/crates/analytics/tests/integration.rs b/crates/analytics/tests/integration.rs index 25d360a..0a01c61 100644 --- a/crates/analytics/tests/integration.rs +++ b/crates/analytics/tests/integration.rs @@ -228,8 +228,8 @@ fn parquet_schema() { "; let from_str = parquet::schema_from_str(schema_str).unwrap(); - let from_native_writer = parquet::schema_from_native_writer::().unwrap(); - let from_serde_arrow = parquet::schema_from_serde_arrow::( + let from_native_writer = parquet::native::schema::().unwrap(); + let from_serde_arrow = parquet::serde::schema::( "rust_schema", TracingOptions::new().strings_as_large_utf8(false), ) @@ -285,9 +285,3 @@ fn read_records(serialized: Bytes, num_records: usize) -> Vec { .unwrap(); samples } - -// Ensure `parquet` used allows writing `Arc` values. -#[derive(ParquetRecordWriter)] -struct _RecordWithArcStr { - a: Arc, -} From a6b0247b9362e1a23f1c8ed522859a12b8c6d5bb Mon Sep 17 00:00:00 2001 From: Ivan Reshetnikov Date: Wed, 10 Dec 2025 09:03:16 +0100 Subject: [PATCH 4/4] fix: clean up --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index d0fe27f..c956b65 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ license = "MIT" members = ["crates/*"] [features] -default = ["full"] +default = [] full = [ "alloc", "analytics",