Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 20 additions & 1 deletion crates/executor/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::utils::{Config, MemPoolType};
use catalog::catalog_list::EmbucketCatalogList;
use catalog_metastore::{InMemoryMetastore, Metastore, TableIdent as MetastoreTableIdent};
#[cfg(feature = "state-store")]
use state_store::{DynamoDbStateStore, StateStore};
use state_store::{DynamoDbStateStore, StateStore, models::Query};
use tokio::sync::RwLock;
use tokio::time::Duration;
use tracing::Instrument;
Expand Down Expand Up @@ -490,6 +490,25 @@ impl ExecutionService for CoreExecutionService {
) -> Result<QueryId> {
let user_session = self.get_session(session_id).await?;

#[cfg(feature = "state-store")]
{
let query_record = Query::new(
query,
query_context.query_id,
session_id,
query_context.request_id,
);
self.state_store
.put_query(query_record)
.await
.context(ex_error::StateStoreSnafu)?;
let _ = self
.state_store
.get_query(&query_context.query_id.to_string())
.await
.context(ex_error::StateStoreSnafu)?;
}

if self.queries.count() >= self.config.max_concurrency_level {
return ex_error::ConcurrencyLimitSnafu.fail();
}
Expand Down
1 change: 1 addition & 0 deletions crates/state-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ serde = { workspace = true }
serde_json = {workspace = true}
snafu = { workspace = true }
chrono = { workspace = true }
uuid = { workspace = true }

[lints]
workspace = true
4 changes: 3 additions & 1 deletion crates/state-store/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ docker run -p 8000:8000 -e AWS_REGION=us-east-2 -e AWS_ACCESS_KEY_ID=local -e AW
### Create table

The state-store uses a single DynamoDB table for sessions, views, and queries. The
query-specific GSIs (`query_id`, `request_id`) are populated only on query records.
query-specific GSIs (`query_id`, `request_id`, `session_id`) are populated only on query records.

```bash
aws dynamodb create-table \
Expand All @@ -29,10 +29,12 @@ aws dynamodb create-table \
AttributeName=SK,AttributeType=S \
AttributeName=query_id,AttributeType=S \
AttributeName=request_id,AttributeType=S \
AttributeName=session_id,AttributeType=S \
--key-schema AttributeName=PK,KeyType=HASH AttributeName=SK,KeyType=RANGE \
--global-secondary-indexes \
"IndexName=GSI_QUERY_ID_INDEX,KeySchema=[{AttributeName=query_id,KeyType=HASH}],Projection={ProjectionType=ALL},ProvisionedThroughput={ReadCapacityUnits=5,WriteCapacityUnits=5}" \
"IndexName=GSI_REQUEST_ID_INDEX,KeySchema=[{AttributeName=request_id,KeyType=HASH}],Projection={ProjectionType=ALL},ProvisionedThroughput={ReadCapacityUnits=5,WriteCapacityUnits=5}" \
"IndexName=GSI_SESSION_ID_INDEX,KeySchema=[{AttributeName=session_id,KeyType=HASH}],Projection={ProjectionType=ALL},ProvisionedThroughput={ReadCapacityUnits=5,WriteCapacityUnits=5}" \
--provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 \
--endpoint-url http://localhost:8000 \
--region us-east-2
Expand Down
37 changes: 29 additions & 8 deletions crates/state-store/src/models.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::fmt::Display;
use std::time::{SystemTime, UNIX_EPOCH};
use uuid::Uuid;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum Entities {
Expand All @@ -21,8 +23,9 @@ impl Display for Entities {
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
pub enum QueryStatus {
#[default]
Created,
Queued,
Running,
Expand Down Expand Up @@ -120,12 +123,13 @@ pub struct Variable {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub updated_at: Option<u64>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize, Default)]
pub struct Query {
pub query_id: String,
pub request_id: Option<String>,
pub query_id: Uuid,
pub request_id: Option<Uuid>,
pub query_status: QueryStatus,
pub query_text: String,
pub session_id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub database_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
Expand All @@ -136,10 +140,10 @@ pub struct Query {
pub schema_name: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub query_type: Option<String>,
pub session_id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub authn_event_id: Option<String>,
pub user_name: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub user_name: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub role_name: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
Expand All @@ -160,9 +164,9 @@ pub struct Query {
pub error_code: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub error_message: Option<String>,
pub start_time: String,
pub start_time: DateTime<Utc>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub end_time: Option<String>,
pub end_time: Option<DateTime<Utc>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub total_elapsed_time: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
Expand Down Expand Up @@ -290,6 +294,23 @@ pub struct Query {
}

impl Query {
#[must_use]
pub fn new(
query_str: &str,
query_id: Uuid,
session_id: &str,
request_id: Option<Uuid>,
) -> Self {
Self {
query_id,
query_text: query_str.to_string(),
session_id: session_id.to_string(),
request_id,
start_time: Utc::now(),
..Self::default()
}
}

#[must_use]
pub fn entity(&self) -> String {
Entities::Query.to_string()
Expand Down
59 changes: 45 additions & 14 deletions crates/state-store/src/state_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::error::{
};
use crate::models::{Query, SessionRecord};
use aws_sdk_dynamodb::{Client, types::AttributeValue};
use chrono::{DateTime, FixedOffset};
use chrono::{DateTime, Utc};
use serde::de::DeserializeOwned;
use snafu::ResultExt;

Expand All @@ -21,6 +21,7 @@ const REQUEST_ID: &str = "request_id";
const SESSION_ID: &str = "session_id";
const QUERY_ID_INDEX: &str = "GSI_QUERY_ID_INDEX";
const REQUEST_ID_INDEX: &str = "GSI_REQUEST_ID_INDEX";
const SESSION_ID_INDEX: &str = "GSI_SESSION_ID_INDEX";

#[async_trait::async_trait]
pub trait StateStore: Send + Sync {
Expand All @@ -32,6 +33,7 @@ pub trait StateStore: Send + Sync {
async fn put_query(&self, query: Query) -> Result<()>;
async fn get_query(&self, query_id: &str) -> Result<Query>;
async fn get_query_by_request_id(&self, request_id: &str) -> Result<Query>;
async fn get_queries_by_session_id(&self, session_id: &str) -> Result<Vec<Query>>;
async fn delete_query(&self, query_id: &str) -> Result<()>;
async fn update_query(&self, query: Query) -> Result<()>;
}
Expand Down Expand Up @@ -73,19 +75,14 @@ impl DynamoDbStateStore {
format!("SESSION#{key}")
}

fn query_pk(start_time: &DateTime<FixedOffset>) -> String {
fn query_pk(start_time: &DateTime<Utc>) -> String {
format!("QUERY#{}", start_time.format("%Y-%m-%d"))
}

fn query_sk(start_time: &DateTime<FixedOffset>) -> String {
fn query_sk(start_time: &DateTime<Utc>) -> String {
start_time.timestamp_millis().to_string()
}

fn parse_start_time(start_time: &str) -> Result<DateTime<FixedOffset>> {
DateTime::parse_from_rfc3339(start_time).map_err(|_| Error::InvalidTime {
value: start_time.to_string(),
})
}
async fn query_item_by_query_id(
&self,
query_id: &str,
Expand Down Expand Up @@ -127,6 +124,27 @@ impl DynamoDbStateStore {

items.into_iter().next().ok_or(Error::NotFound)
}

async fn query_items_by_session_id(
&self,
session_id: &str,
) -> Result<Vec<HashMap<String, AttributeValue>>> {
let items = self
.client
.query()
.table_name(&self.table_name)
.index_name(SESSION_ID_INDEX)
.key_condition_expression("#session_id = :session_id")
.expression_attribute_names("#session_id", SESSION_ID)
.expression_attribute_values(":session_id", AttributeValue::S(session_id.to_string()))
.send()
.await
.context(DynamoDbQueryOutputSnafu)?
.items
.unwrap_or_default();

Ok(items)
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -201,9 +219,8 @@ impl StateStore for DynamoDbStateStore {

async fn put_query(&self, query: Query) -> Result<()> {
let mut item = HashMap::new();
let parsed_start_time = Self::parse_start_time(&query.start_time)?;
let pk = Self::query_pk(&parsed_start_time);
let sk = Self::query_sk(&parsed_start_time);
let pk = Self::query_pk(&query.start_time);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while we have pk as start_time here? I assumed pk should be shorten to date for pk.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check the function query_pk

let sk = Self::query_sk(&query.start_time);
item.insert(PK.to_string(), AttributeValue::S(pk));
item.insert(SK.to_string(), AttributeValue::S(sk));
item.insert(ENTITY.to_string(), AttributeValue::S(query.entity()));
Expand All @@ -213,16 +230,16 @@ impl StateStore for DynamoDbStateStore {
);
item.insert(
QUERY_ID.to_string(),
AttributeValue::S(query.query_id.clone()),
AttributeValue::S(query.query_id.to_string()),
);
item.insert(
SESSION_ID.to_string(),
AttributeValue::S(query.session_id.clone()),
AttributeValue::S(query.session_id.to_string()),
);
if let Some(request_id) = &query.request_id {
item.insert(
REQUEST_ID.to_string(),
AttributeValue::S(request_id.clone()),
AttributeValue::S(request_id.to_string()),
);
}

Expand All @@ -247,6 +264,11 @@ impl StateStore for DynamoDbStateStore {
deserialize_data(item)
}

async fn get_queries_by_session_id(&self, session_id: &str) -> Result<Vec<Query>> {
let items = self.query_items_by_session_id(session_id).await?;
deserialize_items(items)
}

async fn delete_query(&self, query_id: &str) -> Result<()> {
let item = self.query_item_by_query_id(query_id).await?;
let pk = required_string_attr(&item, PK)?;
Expand Down Expand Up @@ -276,6 +298,15 @@ fn deserialize_data<T: DeserializeOwned>(mut item: HashMap<String, AttributeValu
serde_json::from_str(&data).context(FailedToParseJsonSnafu)
}

fn deserialize_items<T: DeserializeOwned>(
items: Vec<HashMap<String, AttributeValue>>,
) -> Result<Vec<T>> {
items
.into_iter()
.map(deserialize_data)
.collect::<Result<Vec<_>>>()
}

fn required_string_attr(item: &HashMap<String, AttributeValue>, key: &str) -> Result<String> {
item.get(key)
.and_then(|attr| attr.as_s().ok())
Expand Down