diff --git a/Cargo.toml b/Cargo.toml index f5aefd4..c956b65 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,9 +7,7 @@ authors = ["WalletConnect Team"] license = "MIT" [workspace] -members = [ - "crates/*", -] +members = ["crates/*"] [features] default = [] @@ -23,7 +21,7 @@ full = [ "geoip", "metrics", "rate_limit", - "websocket" + "websocket", ] alloc = ["dep:alloc"] analytics = ["dep:analytics"] diff --git a/crates/analytics/Cargo.toml b/crates/analytics/Cargo.toml index 2bb16ad..e0f9fad 100644 --- a/crates/analytics/Cargo.toml +++ b/crates/analytics/Cargo.toml @@ -3,20 +3,42 @@ name = "analytics" version = "0.1.0" edition = "2021" +[features] +default = [] +full = ["parquet-native", "parquet-serde"] +parquet-native = ["parquet"] +parquet-serde = ["parquet", "dep:serde", "dep:serde_arrow"] +parquet = ["dep:parquet", "dep:arrow"] + [dependencies] future = { path = "../future" } - async-trait = "0.1" -tokio = { version = "1", default-features = false, features = ["rt", "rt-multi-thread", "sync", "time", "macros"] } +tokio = { version = "1", default-features = false, features = [ + "rt", + "rt-multi-thread", + "sync", + "time", + "macros", +] } tracing = "0.1" +# Parquet serialization +parquet = { optional = true, version = "57.1", default-features = false, features = [ + "arrow", + "flate2-rust_backened", +] } +arrow = { optional = true, version = "57.1" } +serde_arrow = { optional = true, version = "0.13", features = ["arrow-57"] } +serde = { optional = true, version = "1.0", features = ["derive", "rc"] } + # Misc thiserror = "1.0" anyhow = "1" tap = "1.0" - chrono = { version = "0.4" } aws-sdk-s3.workspace = true -bytes = "1.5" -parquet = { git = "https://github.com/WalletConnect/arrow-rs.git", rev = "e862f32", default-features = false, features = ["flate2-rust_backened"] } -parquet_derive = { git = "https://github.com/WalletConnect/arrow-rs.git", rev = "e862f32" } +bytes = "1.11" + +[dev-dependencies] +analytics = { path = ".", features = ["full"] } +parquet_derive = "57.1" diff --git a/crates/analytics/src/lib.rs b/crates/analytics/src/lib.rs index 1e49f86..7da792a 100644 --- a/crates/analytics/src/lib.rs +++ b/crates/analytics/src/lib.rs @@ -9,7 +9,7 @@ use { pub use { collectors::{BatchCollector, CollectionError, CollectorConfig}, exporters::{AwsConfig, AwsError, AwsExporter, NoopExporter}, - serializers::{NoopBatchFactory, ParquetBatchFactory, ParquetConfig, ParquetError}, + serializers::*, }; mod collectors; diff --git a/crates/analytics/src/serializers.rs b/crates/analytics/src/serializers.rs index 7e48070..5d475ea 100644 --- a/crates/analytics/src/serializers.rs +++ b/crates/analytics/src/serializers.rs @@ -1,14 +1,11 @@ -pub use parquet::errors::ParquetError; use { - crate::{AnalyticsEvent, Batch, BatchFactory}, - parquet::{ - basic::Compression, - file::{properties::WriterProperties, writer::SerializedFileWriter}, - record::RecordWriter, - }, - std::{convert::Infallible, sync::Arc}, + crate::{Batch, BatchFactory}, + std::convert::Infallible, }; +#[cfg(feature = "parquet")] +pub mod parquet; + pub struct NoopBatchFactory; impl BatchFactory for NoopBatchFactory { @@ -41,99 +38,3 @@ impl Batch for NoopBatch { Ok(Vec::new()) } } - -#[derive(Debug, Clone)] -pub struct ParquetConfig { - /// The maximum number of records the batch can hold. Pushing more records - /// will trigger export. - pub batch_capacity: usize, - - /// The data buffer initially allocated for serialization. Specifying a low - /// value would cause memory reallocation potentially affecting performance. - pub alloc_buffer_size: usize, -} - -impl Default for ParquetConfig { - fn default() -> Self { - Self { - batch_capacity: 1024 * 128, - alloc_buffer_size: 1024 * 1024 * 130, - } - } -} - -pub struct ParquetBatchFactory { - config: ParquetConfig, -} - -impl ParquetBatchFactory { - pub fn new(config: ParquetConfig) -> Self { - Self { config } - } -} - -impl BatchFactory for ParquetBatchFactory -where - T: AnalyticsEvent, - for<'a> &'a [T]: RecordWriter, -{ - type Batch = ParquetBatch; - type Error = ParquetError; - - fn create(&self) -> Result { - let props = WriterProperties::builder() - .set_compression(Compression::GZIP(Default::default())) - .build(); - let props = Arc::new(props); - let schema = (&[] as &[T]).schema()?; - - Ok(ParquetBatch { - capacity: self.config.batch_capacity, - data: Vec::with_capacity(self.config.batch_capacity), - writer: SerializedFileWriter::new( - Vec::with_capacity(self.config.alloc_buffer_size), - schema, - props, - )?, - }) - } -} - -pub struct ParquetBatch { - capacity: usize, - data: Vec, - writer: SerializedFileWriter>, -} - -impl Batch for ParquetBatch -where - T: AnalyticsEvent, - for<'a> &'a [T]: RecordWriter, -{ - type Error = ParquetError; - - fn push(&mut self, data: T) -> Result<(), Self::Error> { - self.data.push(data); - Ok(()) - } - - fn is_full(&self) -> bool { - self.data.len() >= self.capacity - } - - fn is_empty(&self) -> bool { - self.data.is_empty() - } - - fn serialize(mut self) -> Result, Self::Error> { - let mut row_group_writer = self.writer.next_row_group()?; - - self.data - .as_slice() - .write_to_row_group(&mut row_group_writer)?; - - row_group_writer.close()?; - - self.writer.into_inner() - } -} diff --git a/crates/analytics/src/serializers/parquet.rs b/crates/analytics/src/serializers/parquet.rs new file mode 100644 index 0000000..d5a6403 --- /dev/null +++ b/crates/analytics/src/serializers/parquet.rs @@ -0,0 +1,14 @@ +use { + parquet::schema::{parser::parse_message_type, types::TypePtr}, + std::sync::Arc, +}; + +#[cfg(feature = "parquet-native")] +pub mod native; +#[cfg(feature = "parquet-serde")] +pub mod serde; + +/// Returns `parquet` schema from parsing the string. +pub fn schema_from_str(schema: &str) -> anyhow::Result { + Ok(Arc::new(parse_message_type(schema)?)) +} diff --git a/crates/analytics/src/serializers/parquet/native.rs b/crates/analytics/src/serializers/parquet/native.rs new file mode 100644 index 0000000..56e176e --- /dev/null +++ b/crates/analytics/src/serializers/parquet/native.rs @@ -0,0 +1,140 @@ +pub use parquet::{self, errors::ParquetError as Error}; +use { + crate::AnalyticsEvent, + parquet::{ + basic::Compression, + file::{properties::WriterProperties, writer::SerializedFileWriter}, + record::RecordWriter, + schema::types::TypePtr, + }, + std::{marker::PhantomData, sync::Arc}, +}; + +/// Returns `parquet` schema based on the [`RecordWriter`] implementation. +pub fn schema() -> Result +where + for<'a> &'a [T]: RecordWriter, +{ + (&[] as &[T]).schema() +} + +#[derive(Debug, Clone)] +pub struct Config { + /// The maximum number of records the batch can hold. Pushing more records + /// will trigger export. + /// + /// Default value: `1024 * 128`. + pub batch_capacity: usize, + + /// The data buffer initially allocated for serialization. Specifying a low + /// value would cause memory reallocation potentially affecting performance. + /// + /// Default value: `1024 * 1024 * 130`. + pub alloc_buffer_size: usize, + + /// Native [`parquet`] [`WriterProperties`]. Configures parquet export + /// parameters. + /// + /// Default value: `WriterProperties::default()` with enabled GZIP + /// compression. + pub writer_properties: WriterProperties, +} + +impl Default for Config { + fn default() -> Self { + Self { + batch_capacity: 1024 * 128, + alloc_buffer_size: 1024 * 1024 * 130, + writer_properties: WriterProperties::builder() + .set_compression(Compression::GZIP(Default::default())) + .build(), + } + } +} + +pub struct BatchFactory { + batch_capacity: usize, + alloc_buffer_size: usize, + writer_properties: Arc, + schema: TypePtr, + _marker: PhantomData, +} + +impl BatchFactory +where + T: AnalyticsEvent, + for<'a> &'a [T]: RecordWriter, +{ + pub fn new(config: Config) -> Result { + Ok(Self { + batch_capacity: config.batch_capacity, + alloc_buffer_size: config.alloc_buffer_size, + writer_properties: Arc::new(config.writer_properties), + schema: (&[] as &[T]).schema()?, + _marker: PhantomData, + }) + } +} + +impl crate::BatchFactory for BatchFactory +where + T: AnalyticsEvent, + for<'a> &'a [T]: RecordWriter, +{ + type Batch = Batch; + type Error = Error; + + fn create(&self) -> Result { + let writer = SerializedFileWriter::new( + Vec::with_capacity(self.alloc_buffer_size), + self.schema.clone(), + self.writer_properties.clone(), + )?; + + Ok(Batch { + capacity: self.batch_capacity, + data: Vec::with_capacity(self.batch_capacity), + writer, + }) + } +} + +pub struct Batch { + capacity: usize, + data: Vec, + writer: SerializedFileWriter>, +} + +impl crate::Batch for Batch +where + T: AnalyticsEvent, + for<'a> &'a [T]: RecordWriter, +{ + type Error = Error; + + fn push(&mut self, data: T) -> Result<(), Self::Error> { + self.data.push(data); + Ok(()) + } + + fn is_full(&self) -> bool { + self.data.len() >= self.capacity + } + + fn is_empty(&self) -> bool { + self.data.is_empty() + } + + fn serialize(mut self) -> Result, Self::Error> { + let mut row_group_writer = self.writer.next_row_group()?; + + self.data + .as_slice() + .write_to_row_group(&mut row_group_writer)?; + + row_group_writer.close()?; + + self.writer.flush()?; + self.writer.into_inner() + } +} diff --git a/crates/analytics/src/serializers/parquet/serde.rs b/crates/analytics/src/serializers/parquet/serde.rs new file mode 100644 index 0000000..676a8df --- /dev/null +++ b/crates/analytics/src/serializers/parquet/serde.rs @@ -0,0 +1,156 @@ +use { + crate::AnalyticsEvent, + arrow::datatypes::FieldRef, + parquet::{ + arrow::{ArrowSchemaConverter, ArrowWriter}, + basic::Compression, + file::properties::WriterProperties, + schema::types::TypePtr, + }, + serde::{de::DeserializeOwned, Serialize}, + serde_arrow::schema::{SchemaLike, TracingOptions}, + std::{io, marker::PhantomData, sync::Arc}, +}; +pub use { + parquet::{self, errors::ParquetError as Error}, + serde_arrow::Error as SerdeArrowError, +}; + +/// Returns `parquet` schema generated with [`serde_arrow`]. +pub fn schema(name: &str, tracing_options: TracingOptions) -> Result +where + T: Serialize + DeserializeOwned, +{ + let fields = Vec::::from_type::(tracing_options)?; + let arrow_schema = serde_arrow::to_record_batch::>(&fields, &Vec::new())?.schema(); + let parquet_schema = ArrowSchemaConverter::new() + .schema_root(name) + .convert(&arrow_schema) + .map_err(|err| SerdeArrowError::custom(err.to_string()))?; + + Ok(parquet_schema.root_schema_ptr()) +} + +#[derive(Debug, Clone)] +pub struct Config { + /// The maximum number of records the batch can hold. Pushing more records + /// will trigger export. + /// + /// Default value: `1024 * 128`. + pub batch_capacity: usize, + + /// The data buffer initially allocated for serialization. Specifying a low + /// value would cause memory reallocation potentially affecting performance. + /// + /// Default value: `1024 * 1024 * 130`. + pub alloc_buffer_size: usize, + + /// Native [`parquet`] [`WriterProperties`]. Configures parquet export + /// parameters. + /// + /// Default value: `WriterProperties::default()` with enabled GZIP + /// compression. + pub writer_properties: WriterProperties, + + /// Native [`serde_arrow`] [`TracingOptions`]. Configures parquet schema + /// generation based on the implemented `serde` traits. + /// + /// Default value: `TracingOptions::default()`. + pub tracing_options: TracingOptions, +} + +impl Default for Config { + fn default() -> Self { + Self { + batch_capacity: 1024 * 128, + alloc_buffer_size: 1024 * 1024 * 130, + writer_properties: WriterProperties::builder() + .set_compression(Compression::GZIP(Default::default())) + .build(), + tracing_options: Default::default(), + } + } +} + +pub struct BatchFactory { + batch_capacity: usize, + alloc_buffer_size: usize, + writer_properties: WriterProperties, + fields: Arc>, + _marker: PhantomData, +} + +impl BatchFactory +where + T: AnalyticsEvent + Serialize + DeserializeOwned, +{ + pub fn new(config: Config) -> Result { + let fields = + Vec::::from_type::(config.tracing_options).map_err(io::Error::other)?; + + Ok(Self { + batch_capacity: config.batch_capacity, + alloc_buffer_size: config.alloc_buffer_size, + writer_properties: config.writer_properties, + fields: Arc::new(fields), + _marker: PhantomData, + }) + } +} + +impl crate::BatchFactory for BatchFactory +where + T: AnalyticsEvent + Serialize + DeserializeOwned, +{ + type Batch = Batch; + type Error = Error; + + fn create(&self) -> Result { + Ok(Batch { + capacity: self.batch_capacity, + data: Vec::with_capacity(self.batch_capacity), + buffer: Vec::with_capacity(self.alloc_buffer_size), + fields: self.fields.clone(), + writer_properties: self.writer_properties.clone(), + }) + } +} + +pub struct Batch { + capacity: usize, + data: Vec, + buffer: Vec, + fields: Arc>, + writer_properties: WriterProperties, +} + +impl crate::Batch for Batch +where + T: AnalyticsEvent + Serialize + DeserializeOwned, +{ + type Error = Error; + + fn push(&mut self, data: T) -> Result<(), Self::Error> { + self.data.push(data); + Ok(()) + } + + fn is_full(&self) -> bool { + self.data.len() >= self.capacity + } + + fn is_empty(&self) -> bool { + self.data.is_empty() + } + + fn serialize(self) -> Result, Self::Error> { + let batch = + serde_arrow::to_record_batch(&self.fields, &self.data).map_err(io::Error::other)?; + + let mut writer = + ArrowWriter::try_new(self.buffer, batch.schema(), Some(self.writer_properties))?; + + writer.write(&batch)?; + writer.into_inner() + } +} diff --git a/crates/analytics/tests/integration.rs b/crates/analytics/tests/integration.rs index 5e0834b..0a01c61 100644 --- a/crates/analytics/tests/integration.rs +++ b/crates/analytics/tests/integration.rs @@ -1,18 +1,26 @@ use { + ::parquet::{ + file::reader::{FileReader as _, SerializedFileReader}, + record::RecordReader as _, + }, analytics::{ + parquet, AnalyticsExt, + Batch, BatchCollector, + BatchFactory, BatchObserver, CollectionObserver, Collector, CollectorConfig, ExportObserver, Exporter, - ParquetBatchFactory, - ParquetConfig, }, async_trait::async_trait, - parquet_derive::ParquetRecordWriter, + bytes::Bytes, + parquet_derive::{ParquetRecordReader, ParquetRecordWriter}, + serde::{Deserialize, Serialize}, + serde_arrow::schema::TracingOptions, std::{ sync::{ atomic::{AtomicUsize, Ordering}, @@ -20,7 +28,7 @@ use { }, time::Duration, }, - tokio::sync::{mpsc, mpsc::error::TrySendError}, + tokio::sync::mpsc::{self, error::TrySendError}, }; #[derive(Clone)] @@ -40,10 +48,12 @@ impl Exporter for MockExporter { } } -#[derive(ParquetRecordWriter)] -struct DataA { +#[derive( + Debug, Clone, ParquetRecordWriter, ParquetRecordReader, Serialize, Deserialize, PartialEq, Eq, +)] +struct Record { a: u32, - b: &'static str, + b: String, c: bool, } @@ -56,17 +66,19 @@ async fn export_by_timeout() { export_interval: Duration::from_millis(200), ..Default::default() }, - ParquetBatchFactory::new(ParquetConfig { + parquet::native::BatchFactory::new(parquet::native::Config { batch_capacity: 128, alloc_buffer_size: 8192, - }), + ..Default::default() + }) + .unwrap(), MockExporter(tx), ); collector - .collect(DataA { + .collect(Record { a: 1, - b: "foo", + b: "foo".into(), c: true, }) .unwrap(); @@ -91,25 +103,27 @@ async fn export_by_num_rows() { export_interval: Duration::from_millis(200), ..Default::default() }, - ParquetBatchFactory::new(ParquetConfig { + parquet::native::BatchFactory::new(parquet::native::Config { batch_capacity: 2, alloc_buffer_size: 8192, - }), + ..Default::default() + }) + .unwrap(), MockExporter(tx), ); collector - .collect(DataA { + .collect(Record { a: 1, - b: "foo", + b: "foo".into(), c: true, }) .unwrap(); collector - .collect(DataA { + .collect(Record { a: 2, - b: "bar", + b: "bar".into(), c: false, }) .unwrap(); @@ -163,27 +177,29 @@ async fn observability() { export_interval: Duration::from_millis(200), ..Default::default() }, - ParquetBatchFactory::new(ParquetConfig { + parquet::native::BatchFactory::new(parquet::native::Config { batch_capacity: 2, alloc_buffer_size: 8192, + ..Default::default() }) + .unwrap() .with_observer(observer.clone()), MockExporter(tx).with_observer(observer.clone()), ) .with_observer(observer.clone()); collector - .collect(DataA { + .collect(Record { a: 1, - b: "foo", + b: "foo".into(), c: true, }) .unwrap(); collector - .collect(DataA { + .collect(Record { a: 2, - b: "bar", + b: "bar".into(), c: false, }) .unwrap(); @@ -200,3 +216,72 @@ async fn observability() { assert_eq!(observer.batch_serialization.load(Ordering::SeqCst), 1); assert_eq!(observer.collection.load(Ordering::SeqCst), 2); } + +#[test] +fn parquet_schema() { + let schema_str = " + message rust_schema { + REQUIRED INT32 a (INTEGER(32, false)); + REQUIRED BINARY b (STRING); + REQUIRED BOOLEAN c; + } + "; + + let from_str = parquet::schema_from_str(schema_str).unwrap(); + let from_native_writer = parquet::native::schema::().unwrap(); + let from_serde_arrow = parquet::serde::schema::( + "rust_schema", + TracingOptions::new().strings_as_large_utf8(false), + ) + .unwrap(); + + assert_eq!(from_str, from_native_writer); + assert_eq!(from_str, from_serde_arrow); +} + +#[test] +fn parquet_native_serialization() { + verify_parquet_serialization(parquet::native::BatchFactory::new(Default::default()).unwrap()); +} + +#[test] +fn parquet_serde_serialization() { + verify_parquet_serialization(parquet::serde::BatchFactory::new(Default::default()).unwrap()); +} + +fn verify_parquet_serialization(factory: impl BatchFactory) { + let expected_data = generate_records(); + let mut batch = factory.create().unwrap(); + + for data in &expected_data { + batch.push(data.clone()).unwrap(); + } + + let actual_data = read_records(batch.serialize().unwrap().into(), expected_data.len()); + assert_eq!(actual_data, expected_data); +} + +fn generate_records() -> Vec { + vec![ + Record { + a: 1, + b: "foo".into(), + c: true, + }, + Record { + a: 2, + b: "bar".into(), + c: false, + }, + ] +} + +fn read_records(serialized: Bytes, num_records: usize) -> Vec { + let mut samples = Vec::new(); + let reader = SerializedFileReader::new(serialized).unwrap(); + let mut row_group = reader.get_row_group(0).unwrap(); + samples + .read_from_row_group(&mut *row_group, num_records) + .unwrap(); + samples +}