Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions crates/sqllogictest/src/engine/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ use iceberg::spec::{NestedField, PrimitiveType, Schema, Transform, Type, Unbound
use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation};
use iceberg_datafusion::IcebergCatalogProvider;
use indicatif::ProgressBar;
use toml::Table as TomlTable;

use crate::engine::{EngineRunner, run_slt_with_runner};
use crate::engine::{CatalogConfig, EngineRunner, run_slt_with_runner};
use crate::error::Result;

pub struct DataFusionEngine {
Expand Down Expand Up @@ -59,22 +58,27 @@ impl EngineRunner for DataFusionEngine {
}

impl DataFusionEngine {
pub async fn new(config: TomlTable) -> Result<Self> {
pub async fn new(catalog_config: Option<CatalogConfig>) -> Result<Self> {
let session_config = SessionConfig::new()
.with_target_partitions(4)
.with_information_schema(true);
let ctx = SessionContext::new_with_config(session_config);
ctx.register_catalog("default", Self::create_catalog(&config).await?);
ctx.register_catalog(
"default",
Self::create_catalog(catalog_config.as_ref()).await?,
);

Ok(Self {
test_data_path: PathBuf::from("testdata"),
session_context: ctx,
})
}

async fn create_catalog(_: &TomlTable) -> anyhow::Result<Arc<dyn CatalogProvider>> {
// TODO: support dynamic catalog configuration
// See: https://github.com/apache/iceberg-rust/issues/1780
async fn create_catalog(
_catalog_config: Option<&CatalogConfig>,
) -> anyhow::Result<Arc<dyn CatalogProvider>> {
// TODO: Use catalog_config to load different catalog types via iceberg-catalog-loader
// See: https://github.com/apache/iceberg-rust/issues/1780
let catalog = MemoryCatalogBuilder::default()
.load(
"memory",
Expand Down
96 changes: 73 additions & 23 deletions crates/sqllogictest/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,45 @@

mod datafusion;

use std::collections::HashMap;
use std::path::Path;

use anyhow::anyhow;
use serde::Deserialize;
use sqllogictest::{AsyncDB, MakeConnection, Runner, parse_file};
use toml::Table as TomlTable;

use crate::engine::datafusion::DataFusionEngine;
use crate::error::{Error, Result};

const TYPE_DATAFUSION: &str = "datafusion";
/// Configuration for the catalog used by an engine
#[derive(Debug, Clone, Deserialize)]
pub struct CatalogConfig {
/// Catalog type: "memory", "rest", "glue", "hms", "s3tables", "sql"
#[serde(rename = "type")]
pub catalog_type: String,
/// Catalog properties passed to the catalog loader
#[serde(default)]
pub props: HashMap<String, String>,
}

/// Engine configuration as a tagged enum
#[derive(Debug, Clone, Deserialize)]
#[serde(tag = "type", rename_all = "lowercase")]
pub enum EngineConfig {
Datafusion {
#[serde(default)]
catalog: Option<CatalogConfig>,
},
}

#[async_trait::async_trait]
pub trait EngineRunner: Send {
async fn run_slt_file(&mut self, path: &Path) -> Result<()>;
}

pub async fn load_engine_runner(
engine_type: &str,
cfg: TomlTable,
) -> Result<Box<dyn EngineRunner>> {
match engine_type {
TYPE_DATAFUSION => Ok(Box::new(DataFusionEngine::new(cfg).await?)),
_ => Err(anyhow::anyhow!("Unsupported engine type: {engine_type}").into()),
pub async fn load_engine_runner(config: EngineConfig) -> Result<Box<dyn EngineRunner>> {
match config {
EngineConfig::Datafusion { catalog } => Ok(Box::new(DataFusionEngine::new(catalog).await?)),
}
}

Expand All @@ -65,29 +81,63 @@ where

#[cfg(test)]
mod tests {
use crate::engine::{TYPE_DATAFUSION, load_engine_runner};
use crate::engine::{CatalogConfig, EngineConfig, load_engine_runner};

#[tokio::test]
async fn test_engine_invalid_type() {
#[test]
fn test_deserialize_engine_config() {
let input = r#"type = "datafusion""#;

let config: EngineConfig = toml::from_str(input).unwrap();
assert!(matches!(config, EngineConfig::Datafusion { catalog: None }));
}

#[test]
fn test_deserialize_engine_config_with_catalog() {
let input = r#"
type = "datafusion"

[catalog]
type = "rest"

[catalog.props]
uri = "http://localhost:8181"
"#;

let config: EngineConfig = toml::from_str(input).unwrap();
match config {
EngineConfig::Datafusion { catalog: Some(cat) } => {
assert_eq!(cat.catalog_type, "rest");
assert_eq!(
cat.props.get("uri"),
Some(&"http://localhost:8181".to_string())
);
}
_ => panic!("Expected Datafusion with catalog"),
}
}

#[test]
fn test_deserialize_catalog_config() {
let input = r#"
[engines]
random = { type = "random_engine", url = "http://localhost:8181" }
type = "memory"

[props]
warehouse = "file:///tmp/warehouse"
"#;
let tbl = toml::from_str(input).unwrap();
let result = load_engine_runner("random_engine", tbl).await;

assert!(result.is_err());
let config: CatalogConfig = toml::from_str(input).unwrap();
assert_eq!(config.catalog_type, "memory");
assert_eq!(
config.props.get("warehouse"),
Some(&"file:///tmp/warehouse".to_string())
);
}

#[tokio::test]
async fn test_load_datafusion() {
let input = r#"
[engines]
df = { type = "datafusion" }
"#;
let tbl = toml::from_str(input).unwrap();
let result = load_engine_runner(TYPE_DATAFUSION, tbl).await;
let config = EngineConfig::Datafusion { catalog: None };

let result = load_engine_runner(config).await;
assert!(result.is_ok());
}
}
Loading
Loading