diff --git a/Cargo.lock b/Cargo.lock index 76484079..8de1cf8b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7254,6 +7254,7 @@ dependencies = [ "serde", "serde_json", "snafu", + "uuid", ] [[package]] diff --git a/crates/executor/src/service.rs b/crates/executor/src/service.rs index f1d01ad6..820bbf3c 100644 --- a/crates/executor/src/service.rs +++ b/crates/executor/src/service.rs @@ -34,7 +34,7 @@ use crate::utils::{Config, MemPoolType}; use catalog::catalog_list::EmbucketCatalogList; use catalog_metastore::{InMemoryMetastore, Metastore, TableIdent as MetastoreTableIdent}; #[cfg(feature = "state-store")] -use state_store::{DynamoDbStateStore, StateStore}; +use state_store::{DynamoDbStateStore, StateStore, models::Query}; use tokio::sync::RwLock; use tokio::time::Duration; use tracing::Instrument; @@ -490,6 +490,25 @@ impl ExecutionService for CoreExecutionService { ) -> Result { let user_session = self.get_session(session_id).await?; + #[cfg(feature = "state-store")] + { + let query_record = Query::new( + query, + query_context.query_id, + session_id, + query_context.request_id, + ); + self.state_store + .put_query(query_record) + .await + .context(ex_error::StateStoreSnafu)?; + let _ = self + .state_store + .get_query(&query_context.query_id.to_string()) + .await + .context(ex_error::StateStoreSnafu)?; + } + if self.queries.count() >= self.config.max_concurrency_level { return ex_error::ConcurrencyLimitSnafu.fail(); } diff --git a/crates/state-store/Cargo.toml b/crates/state-store/Cargo.toml index 9d8dfbaf..83c6e6b6 100644 --- a/crates/state-store/Cargo.toml +++ b/crates/state-store/Cargo.toml @@ -16,6 +16,7 @@ serde = { workspace = true } serde_json = {workspace = true} snafu = { workspace = true } chrono = { workspace = true } +uuid = { workspace = true } [lints] workspace = true diff --git a/crates/state-store/README.md b/crates/state-store/README.md index 240fa0d8..1148a69a 100644 --- a/crates/state-store/README.md +++ b/crates/state-store/README.md @@ -19,7 +19,7 @@ docker run -p 8000:8000 -e AWS_REGION=us-east-2 -e AWS_ACCESS_KEY_ID=local -e AW ### Create table The state-store uses a single DynamoDB table for sessions, views, and queries. The -query-specific GSIs (`query_id`, `request_id`) are populated only on query records. +query-specific GSIs (`query_id`, `request_id`, `session_id`) are populated only on query records. ```bash aws dynamodb create-table \ @@ -29,10 +29,12 @@ aws dynamodb create-table \ AttributeName=SK,AttributeType=S \ AttributeName=query_id,AttributeType=S \ AttributeName=request_id,AttributeType=S \ + AttributeName=session_id,AttributeType=S \ --key-schema AttributeName=PK,KeyType=HASH AttributeName=SK,KeyType=RANGE \ --global-secondary-indexes \ "IndexName=GSI_QUERY_ID_INDEX,KeySchema=[{AttributeName=query_id,KeyType=HASH}],Projection={ProjectionType=ALL},ProvisionedThroughput={ReadCapacityUnits=5,WriteCapacityUnits=5}" \ "IndexName=GSI_REQUEST_ID_INDEX,KeySchema=[{AttributeName=request_id,KeyType=HASH}],Projection={ProjectionType=ALL},ProvisionedThroughput={ReadCapacityUnits=5,WriteCapacityUnits=5}" \ + "IndexName=GSI_SESSION_ID_INDEX,KeySchema=[{AttributeName=session_id,KeyType=HASH}],Projection={ProjectionType=ALL},ProvisionedThroughput={ReadCapacityUnits=5,WriteCapacityUnits=5}" \ --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 \ --endpoint-url http://localhost:8000 \ --region us-east-2 diff --git a/crates/state-store/src/models.rs b/crates/state-store/src/models.rs index 0d52ac03..168eea87 100644 --- a/crates/state-store/src/models.rs +++ b/crates/state-store/src/models.rs @@ -1,8 +1,10 @@ +use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::collections::HashMap; use std::fmt::Display; use std::time::{SystemTime, UNIX_EPOCH}; +use uuid::Uuid; #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] pub enum Entities { @@ -21,8 +23,9 @@ impl Display for Entities { } } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] pub enum QueryStatus { + #[default] Created, Queued, Running, @@ -120,12 +123,13 @@ pub struct Variable { #[serde(default, skip_serializing_if = "Option::is_none")] pub updated_at: Option, } -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize, Default)] pub struct Query { - pub query_id: String, - pub request_id: Option, + pub query_id: Uuid, + pub request_id: Option, pub query_status: QueryStatus, pub query_text: String, + pub session_id: String, #[serde(default, skip_serializing_if = "Option::is_none")] pub database_id: Option, #[serde(default, skip_serializing_if = "Option::is_none")] @@ -136,10 +140,10 @@ pub struct Query { pub schema_name: Option, #[serde(default, skip_serializing_if = "Option::is_none")] pub query_type: Option, - pub session_id: String, #[serde(default, skip_serializing_if = "Option::is_none")] pub authn_event_id: Option, - pub user_name: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub user_name: Option, #[serde(default, skip_serializing_if = "Option::is_none")] pub role_name: Option, #[serde(default, skip_serializing_if = "Option::is_none")] @@ -160,9 +164,9 @@ pub struct Query { pub error_code: Option, #[serde(default, skip_serializing_if = "Option::is_none")] pub error_message: Option, - pub start_time: String, + pub start_time: DateTime, #[serde(default, skip_serializing_if = "Option::is_none")] - pub end_time: Option, + pub end_time: Option>, #[serde(default, skip_serializing_if = "Option::is_none")] pub total_elapsed_time: Option, #[serde(default, skip_serializing_if = "Option::is_none")] @@ -290,6 +294,23 @@ pub struct Query { } impl Query { + #[must_use] + pub fn new( + query_str: &str, + query_id: Uuid, + session_id: &str, + request_id: Option, + ) -> Self { + Self { + query_id, + query_text: query_str.to_string(), + session_id: session_id.to_string(), + request_id, + start_time: Utc::now(), + ..Self::default() + } + } + #[must_use] pub fn entity(&self) -> String { Entities::Query.to_string() diff --git a/crates/state-store/src/state_store.rs b/crates/state-store/src/state_store.rs index 77a2b4d1..acd19daf 100644 --- a/crates/state-store/src/state_store.rs +++ b/crates/state-store/src/state_store.rs @@ -8,7 +8,7 @@ use crate::error::{ }; use crate::models::{Query, SessionRecord}; use aws_sdk_dynamodb::{Client, types::AttributeValue}; -use chrono::{DateTime, FixedOffset}; +use chrono::{DateTime, Utc}; use serde::de::DeserializeOwned; use snafu::ResultExt; @@ -21,6 +21,7 @@ const REQUEST_ID: &str = "request_id"; const SESSION_ID: &str = "session_id"; const QUERY_ID_INDEX: &str = "GSI_QUERY_ID_INDEX"; const REQUEST_ID_INDEX: &str = "GSI_REQUEST_ID_INDEX"; +const SESSION_ID_INDEX: &str = "GSI_SESSION_ID_INDEX"; #[async_trait::async_trait] pub trait StateStore: Send + Sync { @@ -32,6 +33,7 @@ pub trait StateStore: Send + Sync { async fn put_query(&self, query: Query) -> Result<()>; async fn get_query(&self, query_id: &str) -> Result; async fn get_query_by_request_id(&self, request_id: &str) -> Result; + async fn get_queries_by_session_id(&self, session_id: &str) -> Result>; async fn delete_query(&self, query_id: &str) -> Result<()>; async fn update_query(&self, query: Query) -> Result<()>; } @@ -73,19 +75,14 @@ impl DynamoDbStateStore { format!("SESSION#{key}") } - fn query_pk(start_time: &DateTime) -> String { + fn query_pk(start_time: &DateTime) -> String { format!("QUERY#{}", start_time.format("%Y-%m-%d")) } - fn query_sk(start_time: &DateTime) -> String { + fn query_sk(start_time: &DateTime) -> String { start_time.timestamp_millis().to_string() } - fn parse_start_time(start_time: &str) -> Result> { - DateTime::parse_from_rfc3339(start_time).map_err(|_| Error::InvalidTime { - value: start_time.to_string(), - }) - } async fn query_item_by_query_id( &self, query_id: &str, @@ -127,6 +124,27 @@ impl DynamoDbStateStore { items.into_iter().next().ok_or(Error::NotFound) } + + async fn query_items_by_session_id( + &self, + session_id: &str, + ) -> Result>> { + let items = self + .client + .query() + .table_name(&self.table_name) + .index_name(SESSION_ID_INDEX) + .key_condition_expression("#session_id = :session_id") + .expression_attribute_names("#session_id", SESSION_ID) + .expression_attribute_values(":session_id", AttributeValue::S(session_id.to_string())) + .send() + .await + .context(DynamoDbQueryOutputSnafu)? + .items + .unwrap_or_default(); + + Ok(items) + } } #[async_trait::async_trait] @@ -201,9 +219,8 @@ impl StateStore for DynamoDbStateStore { async fn put_query(&self, query: Query) -> Result<()> { let mut item = HashMap::new(); - let parsed_start_time = Self::parse_start_time(&query.start_time)?; - let pk = Self::query_pk(&parsed_start_time); - let sk = Self::query_sk(&parsed_start_time); + let pk = Self::query_pk(&query.start_time); + let sk = Self::query_sk(&query.start_time); item.insert(PK.to_string(), AttributeValue::S(pk)); item.insert(SK.to_string(), AttributeValue::S(sk)); item.insert(ENTITY.to_string(), AttributeValue::S(query.entity())); @@ -213,16 +230,16 @@ impl StateStore for DynamoDbStateStore { ); item.insert( QUERY_ID.to_string(), - AttributeValue::S(query.query_id.clone()), + AttributeValue::S(query.query_id.to_string()), ); item.insert( SESSION_ID.to_string(), - AttributeValue::S(query.session_id.clone()), + AttributeValue::S(query.session_id.to_string()), ); if let Some(request_id) = &query.request_id { item.insert( REQUEST_ID.to_string(), - AttributeValue::S(request_id.clone()), + AttributeValue::S(request_id.to_string()), ); } @@ -247,6 +264,11 @@ impl StateStore for DynamoDbStateStore { deserialize_data(item) } + async fn get_queries_by_session_id(&self, session_id: &str) -> Result> { + let items = self.query_items_by_session_id(session_id).await?; + deserialize_items(items) + } + async fn delete_query(&self, query_id: &str) -> Result<()> { let item = self.query_item_by_query_id(query_id).await?; let pk = required_string_attr(&item, PK)?; @@ -276,6 +298,15 @@ fn deserialize_data(mut item: HashMap( + items: Vec>, +) -> Result> { + items + .into_iter() + .map(deserialize_data) + .collect::>>() +} + fn required_string_attr(item: &HashMap, key: &str) -> Result { item.get(key) .and_then(|attr| attr.as_s().ok())