diff --git a/doc/pg_clickhouse.md b/doc/pg_clickhouse.md index cd8ab7a..61967cf 100644 --- a/doc/pg_clickhouse.md +++ b/doc/pg_clickhouse.md @@ -156,6 +156,44 @@ are not supported and will raise an error. * `quantile(double)` => [quantile](https://clickhouse.com/docs/sql-reference/aggregate-functions/reference/quantile) * `quantileExact(double)` => [quantileExact](https://clickhouse.com/docs/sql-reference/aggregate-functions/reference/quantileexact) +### Session Settings + +Set the `pg_clickhouse.session_settings` runtime parameter to configure +[ClickHouse settings] to be set on subsequent queries. Example: + +```sql +SET pg_clickhouse.session_settings = 'join_use_nulls=1, final=1'; +``` + +The default is `join_use_nulls=1`. Set it to an empty string to fall back on +the ClickHouse server's settings. + +```sql +SET pg_clickhouse.session_settings = ''; +``` + +The syntax is a comma-delimited list of key/value pairs separated by an equal +sign. Keys must correspond to [ClickHouse settings]. Escape spaces, commas, +and backslashes in values with a backslash: + +```sql +SET pg_clickhouse.session_settings = 'join_algorithm = grace_hash\,hash'; +``` + +Or use single quoted values to avoid escaping spaces and commas; consider +using [dollar quoting] to avoid the need to double-quote: + +```sql +SET pg_clickhouse.session_settings = $$join_algorithm = 'grace_hash,hash'$$; +``` + +pg_clickhouse does not validate the settings, but passes them on to ClickHouse +for every query. It thus supports all settings for each ClickHouse version. + +Note that pg_clickhouse must be loaded before setting +`pg_clickhouse.session_settings`; either use [library preloading] or simply +use one of the objects in the extension to ensure it loads. + ## Authors * [Ildus Kurbangaliev](https://github.com/ildus) @@ -173,4 +211,10 @@ are not supported and will raise an error. "PostgreSQL Docs: Writing a Foreign Data Wrapper" [ClickHouse]: https://clickhouse.com/clickhouse [ordered-set aggregate functions]: https://www.postgresql.org/docs/current/functions-aggregate.html#FUNCTIONS-ORDEREDSET-TABLE - [Parametric aggregate functions]: https://clickhouse.com/docs/sql-reference/aggregate-functions/parametric-functions \ No newline at end of file + [Parametric aggregate functions]: https://clickhouse.com/docs/sql-reference/aggregate-functions/parametric-functions + [ClickHouse settings]: https://clickhouse.com/docs/operations/settings/settings + "ClickHouse Docs: Session Settings" + [dollar quoting]: https://www.postgresql.org/docs/current/sql-syntax-lexical.html#SQL-SYNTAX-DOLLAR-QUOTING + "PostgreSQL Docs: Dollar-Quoted String Constants" + [library preloading]: https://www.postgresql.org/docs/18/runtime-config-client.html#RUNTIME-CONFIG-CLIENT-PRELOAD + "PostgreSQL Docs: Shared Library Preloading diff --git a/src/binary.cpp b/src/binary.cpp index 02a6e89..f392da6 100644 --- a/src/binary.cpp +++ b/src/binary.cpp @@ -168,6 +168,30 @@ static void set_resp_error(ch_binary_response_t * resp, const char * str) strcpy(resp->error, str); } +/* + * Converts query->settings to QuerySettings. + */ +static QuerySettings ch_binary_settings(const ch_query *query) +{ + ListCell *lc; + auto res = QuerySettings{}; + foreach (lc, (List *) query->settings) + { + /* + * foreach reads a non-const, so we have to cast. Would be nice to use + * foreach_ptr: + * + * foreach_ptr(DefElem, setting, query->settings) + * + * But it's only available in Postgres 17 and later. + */ + DefElem *setting = (DefElem *) lfirst(lc); + res.insert_or_assign(setting->defname, QuerySettingsField{strVal(setting->arg), 1}); + } + + return res; +} + static void set_state_error(ch_binary_read_state_t * state, const char * str) { assert(state->error == NULL); @@ -176,7 +200,7 @@ static void set_state_error(ch_binary_read_state_t * state, const char * str) } ch_binary_response_t * ch_binary_simple_query( - ch_binary_connection_t * conn, const char * query, bool (*check_cancel)(void)) + ch_binary_connection_t * conn, const ch_query * query, bool (*check_cancel)(void)) { Client * client = (Client *)conn->client; ch_binary_response_t * resp; @@ -187,10 +211,9 @@ ch_binary_response_t * ch_binary_simple_query( resp = new ch_binary_response_t(); values = new std::vector>(); client->Select( - clickhouse::Query(query).SetQuerySettings(QuerySettings{ - /* Enable SQL compatibility. */ - {"join_use_nulls", QuerySettingsField{ "1", 1 }}, - }).OnDataCancelable([&resp, &values, &check_cancel](const Block & block) { + clickhouse::Query(query->sql).SetQuerySettings( + ch_binary_settings(query) + ).OnDataCancelable([&resp, &values, &check_cancel](const Block & block) { if (check_cancel && check_cancel()) { set_resp_error(resp, "query was canceled"); @@ -315,14 +338,21 @@ void ch_binary_insert_state_free(void * c) } } -void ch_binary_prepare_insert(void * conn, char * query, ch_binary_insert_state * state) +void ch_binary_prepare_insert(void * conn, const ch_query * query, ch_binary_insert_state * state) { /* Start the INSERT. */ Block * block; Client * client = (Client *)((ch_binary_connection_t *)conn)->client; try { - block = new Block(client->BeginInsert(std::string(query) + " VALUES")); + block = new Block(client->BeginInsert(std::string(query->sql) + " VALUES")); + /* XXX https://github.com/ClickHouse/clickhouse-cpp/pull/453/ + block = new Block(client->BeginInsert( + clickhouse::Query(std::string(query->sql)+ " VALUES").SetQuerySettings( + ch_binary_settings(query) + ) + )); + */ } catch (const std::exception & e) { diff --git a/src/fdw.c.in b/src/fdw.c.in index 338902d..382f477 100644 --- a/src/fdw.c.in +++ b/src/fdw.c.in @@ -284,12 +284,12 @@ static void merge_fdw_options(CHFdwRelationInfo * fpinfo, Datum clickhouse_raw_query(PG_FUNCTION_ARGS) { - char *connstring = text_to_cstring(PG_GETARG_TEXT_P(1)), - *query = text_to_cstring(PG_GETARG_TEXT_P(0)); + char *connstring = text_to_cstring(PG_GETARG_TEXT_P(1)); + ch_query query = new_query(text_to_cstring(PG_GETARG_TEXT_P(0))); ch_connection_details *details = connstring_parse(connstring); ch_connection conn = chfdw_http_connect(details); - ch_cursor *cursor = conn.methods->simple_query(conn.conn, query); + ch_cursor *cursor = conn.methods->simple_query(conn.conn, &query); text *res = chfdw_http_fetch_raw_data(cursor); MemoryContextDelete(cursor->memcxt); @@ -973,6 +973,7 @@ clickhouseIterateForeignScan(ForeignScanState * node) struct timeval time1, time2; TupleDesc tupdesc; + ch_query query = new_query(fsstate->query); /* make query if needed */ if (fsstate->ch_cursor == NULL) @@ -980,7 +981,7 @@ clickhouseIterateForeignScan(ForeignScanState * node) MemoryContext old = MemoryContextSwitchTo(fsstate->batch_cxt); fsstate->ch_cursor = fsstate->conn.methods->simple_query(fsstate->conn.conn, - fsstate->query); + &query); time_used += fsstate->ch_cursor->request_time; MemoryContextSwitchTo(old); @@ -1379,6 +1380,7 @@ create_foreign_modify(EState * estate, UserMapping *user; MemoryContext old_mcxt; Relation rel = rri->ri_RelationDesc; + ch_query q = new_query(query); /* Begin constructing CHFdwModifyState. */ fmstate = (CHFdwModifyState *) palloc0(sizeof(CHFdwModifyState)); @@ -1403,7 +1405,7 @@ create_foreign_modify(EState * estate, old_mcxt = MemoryContextSwitchTo(PortalContext); fmstate->state = fmstate->conn.methods->prepare_insert(fmstate->conn.conn, - rri, target_attrs, query, table_name); + rri, target_attrs, &q, table_name); MemoryContextSwitchTo(old_mcxt); /* Create context for per-query temp workspace. */ diff --git a/src/http.c b/src/http.c index 5bd8425..c67097f 100644 --- a/src/http.c +++ b/src/http.c @@ -126,7 +126,6 @@ ch_http_connection(ch_connection_details * details) goto cleanup; conn->base_url = connstring; - conn->base_url_len = strlen(conn->base_url); return conn; @@ -152,12 +151,16 @@ set_query_id(ch_http_response_t * resp) } ch_http_response_t * -ch_http_simple_query(ch_http_connection_t * conn, const char *query) +ch_http_simple_query(ch_http_connection_t * conn, const ch_query * query) { char *url; CURLcode errcode; static char errbuffer[CURL_ERROR_SIZE]; struct curl_slist *headers = NULL; + CURLU *cu = curl_url(); + ListCell *lc; + DefElem *setting; + char *buf = NULL; ch_http_response_t *resp = calloc(sizeof(ch_http_response_t), 1); @@ -168,12 +171,22 @@ ch_http_simple_query(ch_http_connection_t * conn, const char *query) assert(conn && conn->curl); - /* Enable SQL compatibility. */ - const char *params = "join_use_nulls=1"; + /* Construct the base URL with the query ID. */ + curl_url_set(cu, CURLUPART_URL, conn->base_url, 0); + buf = psprintf("query_id=%s", resp->query_id); + curl_url_set(cu, CURLUPART_QUERY, buf, CURLU_APPENDQUERY | CURLU_URLENCODE); + pfree(buf); - /* construct url: query_id + ?query_id= + params */ - url = malloc(conn->base_url_len + 37 + 12 + strlen(params)); - sprintf(url, "%s?query_id=%s&%s", conn->base_url, resp->query_id, params); + /* Append each of the settings as a query param. */ + foreach(lc, (List *) query->settings) + { + setting = (DefElem *) lfirst(lc); + buf = psprintf("%s=%s", setting->defname, strVal(setting->arg)); + curl_url_set(cu, CURLUPART_QUERY, buf, CURLU_APPENDQUERY | CURLU_URLENCODE); + pfree(buf); + } + curl_url_get(cu, CURLUPART_URL, &url, 0); + curl_url_cleanup(cu); /* constant */ errbuffer[0] = '\0'; @@ -186,7 +199,7 @@ ch_http_simple_query(ch_http_connection_t * conn, const char *query) /* variable */ curl_easy_setopt(conn->curl, CURLOPT_WRITEDATA, resp); - curl_easy_setopt(conn->curl, CURLOPT_POSTFIELDS, query); + curl_easy_setopt(conn->curl, CURLOPT_POSTFIELDS, query->sql); curl_easy_setopt(conn->curl, CURLOPT_VERBOSE, curl_verbose); if (curl_progressfunc) { @@ -198,17 +211,13 @@ ch_http_simple_query(ch_http_connection_t * conn, const char *query) curl_easy_setopt(conn->curl, CURLOPT_NOPROGRESS, 1L); if (conn->dbname) { - char *buf = palloc(strlen(conn->dbname) + strlen(DATABASE_HEADER) + 3); - - sprintf(buf, "%s: %s", DATABASE_HEADER, conn->dbname); - headers = curl_slist_append(headers, buf); + headers = curl_slist_append(headers, psprintf("%s: %s", DATABASE_HEADER, conn->dbname)); curl_easy_setopt(conn->curl, CURLOPT_HTTPHEADER, headers); - pfree(buf); } curl_error_happened = false; errcode = curl_easy_perform(conn->curl); - free(url); + curl_free(url); if (headers) curl_slist_free_all(headers); diff --git a/src/include/binary.hh b/src/include/binary.hh index d8c85f8..ff9f66c 100644 --- a/src/include/binary.hh +++ b/src/include/binary.hh @@ -1,7 +1,7 @@ #ifndef CLICKHOUSE_BINARY_H #define CLICKHOUSE_BINARY_H -#include "connect.h" +#include "engine.h" #ifdef __cplusplus extern "C" @@ -71,7 +71,7 @@ extern "C" extern ch_binary_connection_t * ch_binary_connect(ch_connection_details * details, char **error); extern void ch_binary_close(ch_binary_connection_t * conn); extern ch_binary_response_t * ch_binary_simple_query(ch_binary_connection_t * conn, - const char *query, bool (*check_cancel) (void)); + const ch_query * query, bool (*check_cancel) (void)); extern void ch_binary_response_free(ch_binary_response_t * resp); /* reading */ @@ -83,7 +83,7 @@ extern "C" void ch_binary_free_convert_state(void *); /* insertion */ - void ch_binary_prepare_insert(void *conn, char *query, + void ch_binary_prepare_insert(void *conn, const ch_query * query, ch_binary_insert_state * state); void ch_binary_insert_columns(ch_binary_insert_state * state); void ch_binary_column_append_data(ch_binary_insert_state * state, size_t colidx); diff --git a/src/include/connect.h b/src/include/connect.h deleted file mode 100644 index 6a2f73a..0000000 --- a/src/include/connect.h +++ /dev/null @@ -1,15 +0,0 @@ -#ifndef CLICKHOUSE_CONNECT_H -#define CLICKHOUSE_CONNECT_H - -#include "nodes/pathnodes.h" - -typedef struct -{ - char *host; - int port; - char *username; - char *password; - char *dbname; -} ch_connection_details; - -#endif /* CLICKHOUSE_CONNECT_H */ diff --git a/src/include/engine.h b/src/include/engine.h new file mode 100644 index 0000000..2d312fb --- /dev/null +++ b/src/include/engine.h @@ -0,0 +1,29 @@ +#ifndef CLICKHOUSE_ENGINE_H +#define CLICKHOUSE_ENGINE_H + +#include "nodes/pathnodes.h" + +/* + * ch_connection_details defines the details for connecting to ClickHouse. + */ +typedef struct +{ + char *host; + int port; + char *username; + char *password; + char *dbname; +} ch_connection_details; + +/* + * ch_query an SQL query to execute on ClickHouse. + */ +typedef struct +{ + const char *sql; + const List *settings; +} ch_query; + +#define new_query(sql) {sql, chfdw_parse_options(ch_session_settings, true)} + +#endif /* CLICKHOUSE_ENGINE_H */ diff --git a/src/include/fdw.h b/src/include/fdw.h index 78971e2..00d2ed5 100644 --- a/src/include/fdw.h +++ b/src/include/fdw.h @@ -24,7 +24,7 @@ #include "optimizer/optimizer.h" #include "nodes/pathnodes.h" #include "access/heapam.h" -#include "connect.h" +#include "engine.h" #if PG_VERSION_NUM < 150000 #define FirstUnpinnedObjectId FirstBootstrapObjectId @@ -48,20 +48,18 @@ typedef struct ch_cursor typedef void (*disconnect_method) (void *conn); typedef void (*check_conn_method) (const char *password, UserMapping * user); -typedef ch_cursor * (*simple_query_method) (void *conn, const char *query); -typedef void (*simple_insert_method) (void *conn, const char *query); -typedef void (*cursor_free_method) (ch_cursor * cursor); +typedef ch_cursor * (*simple_query_method) (void *conn, const ch_query *query); +typedef void (*simple_insert_method) (void *conn, const ch_query *query); typedef void **(*cursor_fetch_row_method) (ch_cursor * cursor, List * attrs, TupleDesc tupdesc, Datum * values, bool *nulls); typedef void *(*prepare_insert_method) (void *conn, ResultRelInfo *, List *, - char *, char *); + const ch_query *, char *); typedef void (*insert_tuple_method) (void *state, TupleTableSlot * slot); typedef struct { disconnect_method disconnect; simple_query_method simple_query; - cursor_free_method cursor_free; cursor_fetch_row_method fetch_row; prepare_insert_method prepare_insert; insert_tuple_method insert_tuple; diff --git a/src/include/http.h b/src/include/http.h index d6184b7..56fe288 100644 --- a/src/include/http.h +++ b/src/include/http.h @@ -4,7 +4,7 @@ #include "postgres.h" #include "nodes/pg_list.h" #include "lib/stringinfo.h" -#include "connect.h" +#include "engine.h" typedef struct ch_http_connection_t ch_http_connection_t; typedef struct ch_http_response_t @@ -47,7 +47,7 @@ void ch_http_init(int verbose, uint32_t query_id_prefix); void ch_http_set_progress_func(void *progressfunc); ch_http_connection_t *ch_http_connection(ch_connection_details * details); void ch_http_close(ch_http_connection_t * conn); -ch_http_response_t *ch_http_simple_query(ch_http_connection_t * conn, const char *query); +ch_http_response_t *ch_http_simple_query(ch_http_connection_t * conn, const ch_query *query); char *ch_http_last_error(void); /* read */ diff --git a/src/include/internal.h b/src/include/internal.h index f53dfb4..e377776 100644 --- a/src/include/internal.h +++ b/src/include/internal.h @@ -8,7 +8,6 @@ typedef struct ch_http_connection_t CURL *curl; char *dbname; char *base_url; - size_t base_url_len; } ch_http_connection_t; typedef struct ch_binary_connection_t diff --git a/src/pglink.c b/src/pglink.c index 40c5a9a..cccae99 100644 --- a/src/pglink.c +++ b/src/pglink.c @@ -27,11 +27,11 @@ static bool initialized = false; static void http_disconnect(void *conn); -static ch_cursor * http_simple_query(void *conn, const char *query); -static void http_simple_insert(void *conn, const char *query); +static ch_cursor * http_simple_query(void *conn, const ch_query * query); +static void http_simple_insert(void *conn, const ch_query * query); static void http_cursor_free(void *); static void **http_fetch_row(ch_cursor *, List *, TupleDesc, Datum *, bool *); -static void *http_prepare_insert(void *, ResultRelInfo *, List *, char *, char *); +static void *http_prepare_insert(void *, ResultRelInfo *, List *, const ch_query *, char *); static void http_insert_tuple(void *, TupleTableSlot *); static libclickhouse_methods http_methods = @@ -44,7 +44,7 @@ static libclickhouse_methods http_methods = }; static void binary_disconnect(void *conn); -static ch_cursor * binary_simple_query(void *conn, const char *query); +static ch_cursor * binary_simple_query(void *conn, const ch_query * query); static void binary_cursor_free(void *cursor); /* static void binary_simple_insert(void *conn, const char *query); */ @@ -52,7 +52,7 @@ static void **binary_fetch_row(ch_cursor * cursor, List * attrs, TupleDesc tupde Datum * values, bool *nulls); static void binary_insert_tuple(void *, TupleTableSlot * slot); static void *binary_prepare_insert(void *, ResultRelInfo *, List *, - char *query, char *table_name); + const ch_query * query, char *table_name); static size_t escape_string(char *to, const char *from, size_t length); @@ -146,17 +146,16 @@ static void kill_query(void *conn, const char *query_id) { ch_http_response_t *resp; - char *query = psprintf("kill query where query_id='%s'", query_id); + ch_query query = new_query(psprintf("kill query where query_id='%s'", query_id)); ch_http_set_progress_func(NULL); - resp = ch_http_simple_query(conn, query); + resp = ch_http_simple_query(conn, &query); if (resp != NULL) ch_http_response_free(resp); - pfree(query); } static ch_cursor * -http_simple_query(void *conn, const char *query) +http_simple_query(void *conn, const ch_query * query) { int attempts = 0; MemoryContext tempcxt, @@ -204,7 +203,7 @@ http_simple_query(void *conn, const char *query) ereport(ERROR, ( errcode(ERRCODE_SQL_ROUTINE_EXCEPTION), errmsg("pg_clickhouse: %s", format_error(error)), - status < 404 ? 0 : errdetail_internal("Remote Query: %.64000s", query), + status < 404 ? 0 : errdetail_internal("Remote Query: %.64000s", query->sql), errcontext("HTTP status code: %li", status) )); } @@ -220,7 +219,7 @@ http_simple_query(void *conn, const char *query) cursor = palloc0(sizeof(ch_cursor)); cursor->query_response = resp; cursor->read_state = palloc0(sizeof(ch_http_read_state)); - cursor->query = pstrdup(query); + cursor->query = pstrdup(query->sql); cursor->request_time = resp->pretransfer_time * 1000; cursor->total_time = resp->total_time * 1000; ch_http_read_state_init(cursor->read_state, resp->data, resp->datasize); @@ -235,7 +234,7 @@ http_simple_query(void *conn, const char *query) } static void -http_simple_insert(void *conn, const char *query) +http_simple_insert(void *conn, const ch_query * query) { ch_http_response_t *resp = ch_http_simple_query(conn, query); @@ -261,7 +260,7 @@ http_simple_insert(void *conn, const char *query) ereport(ERROR, ( errcode(ERRCODE_SQL_ROUTINE_EXCEPTION), errmsg("pg_clickhouse: %s", format_error(error)), - status < 404 ? 0 : errdetail_internal("Remote Query: %.64000s", query), + status < 404 ? 0 : errdetail_internal("Remote Query: %.64000s", query->sql), errcontext("HTTP status code: %li", status) )); } @@ -304,7 +303,7 @@ http_fetch_row(ch_cursor * cursor, List * attrs, TupleDesc tupdesc, Datum * v, b else if (state->val[0] != '\0') values[i] = pstrdup(state->val); else - values[i] = NULL; + values[i] = ""; } if (attcount > 0 && rc != CH_EOL && rc != CH_EOF) @@ -465,12 +464,12 @@ extend_insert_query(ch_http_insert_state * state, TupleTableSlot * slot) static void * http_prepare_insert(void *conn, ResultRelInfo * rri, List * target_attrs, - char *query, char *table_name) + const ch_query * query, char *table_name) { ch_http_insert_state *state = palloc0(sizeof(ch_http_insert_state)); initStringInfo(&state->sql); - state->sql_begin = psprintf("%s FORMAT TSV\n", query); + state->sql_begin = psprintf("%s FORMAT TSV\n", query->sql); state->target_attrs = target_attrs; state->p_nums = list_length(state->target_attrs); state->conn = conn; @@ -488,7 +487,9 @@ http_insert_tuple(void *istate, TupleTableSlot * slot) if ((slot == NULL && state->sql.len > 0) || state->sql.len > (MaxAllocSize / 2 /* 512MB */ )) { - http_simple_insert(state->conn, state->sql.data); + ch_query query = new_query(state->sql.data); + + http_simple_insert(state->conn, &query); resetStringInfo(&state->sql); } } @@ -528,7 +529,7 @@ binary_disconnect(void *conn) } static ch_cursor * -binary_simple_query(void *conn, const char *query) +binary_simple_query(void *conn, const ch_query * query) { MemoryContext tempcxt, oldcxt; @@ -545,7 +546,7 @@ binary_simple_query(void *conn, const char *query) ereport(ERROR, ( errcode(ERRCODE_SQL_ROUTINE_EXCEPTION), errmsg("pg_clickhouse: %s", error), - errdetail_internal("Remote Query: %.64000s", query) + errdetail_internal("Remote Query: %.64000s", query->sql) )); } @@ -556,7 +557,7 @@ binary_simple_query(void *conn, const char *query) cursor = palloc0(sizeof(ch_cursor)); cursor->query_response = resp; state = (ch_binary_read_state_t *) palloc0(sizeof(ch_binary_read_state_t)); - cursor->query = pstrdup(query); + cursor->query = pstrdup(query->sql); cursor->read_state = state; cursor->columns_count = resp->columns_count; ch_binary_read_state_init(cursor->read_state, resp); @@ -699,7 +700,7 @@ binary_cursor_free(void *c) static void * binary_prepare_insert(void *conn, ResultRelInfo * rri, List * target_attrs, - char *query, char *table_name) + const ch_query * query, char *table_name) { ch_binary_insert_state *state = NULL; MemoryContext tempcxt, @@ -895,14 +896,17 @@ chfdw_construct_create_tables(ImportForeignSchemaStmt * stmt, ForeignServer * se UserMapping *user = GetUserMapping(userid, server->serverid); ch_connection conn = chfdw_get_connection(user); ch_cursor *cursor; - char *query; + ch_query query = new_query(NULL); List *result = NIL, *datts = NIL; char **row_values; + char *sql; - query = psprintf("SELECT name, engine, engine_full " - "FROM system.tables WHERE database='%s' and name not like '.inner%%'", stmt->remote_schema); - cursor = conn.methods->simple_query(conn.conn, query); + sql = psprintf("SELECT name, engine, engine_full " + "FROM system.tables WHERE database='%s' and name not like '.inner%%'", stmt->remote_schema); + query.sql = sql; + cursor = conn.methods->simple_query(conn.conn, &query); + pfree(sql); datts = list_make2_int(1, 2); @@ -942,9 +946,11 @@ chfdw_construct_create_tables(ImportForeignSchemaStmt * stmt, ForeignServer * se initStringInfo(&buf); appendStringInfo(&buf, "CREATE FOREIGN TABLE IF NOT EXISTS \"%s\".\"%s\" (\n", stmt->local_schema, table_name); - query = psprintf("select name, type from system.columns where database='%s' and table='%s'", - stmt->remote_schema, table_name); - table_def = conn.methods->simple_query(conn.conn, query); + sql = psprintf("select name, type from system.columns where database='%s' and table='%s'", + stmt->remote_schema, table_name); + query.sql = sql; + table_def = conn.methods->simple_query(conn.conn, &query); + pfree(sql); while ((dvalues = (char **) conn.methods->fetch_row(table_def, datts, NULL, NULL, NULL)) != NULL) diff --git a/test/expected/gucs.out b/test/expected/gucs.out index 846fcd2..2da2f00 100644 --- a/test/expected/gucs.out +++ b/test/expected/gucs.out @@ -1,4 +1,9 @@ \unset ECHO + ch_noop_bigint +---------------- + +(1 row) + NOTICE: OK `` NOTICE: OK `join_use_nulls=1` NOTICE: OK `join_use_nulls=1, xyz=true` @@ -9,4 +14,98 @@ NOTICE: ERR 42601 - pg_clickhouse: missing "=" after "join_use_nulls" in option NOTICE: ERR 42601 - pg_clickhouse: missing "=" after "join_use_nulls" in options string NOTICE: ERR 42601 - pg_clickhouse: unterminated quoted string in options string NOTICE: ERR 42601 - pg_clickhouse: missing comma after "join_use_nulls" value in options string -NOTICE: drop cascades to foreign table remote_settings + name | value +----------------+------- + join_use_nulls | 1 +(1 row) + + name | value +----------------+------- + join_use_nulls | 1 +(1 row) + + pg_clickhouse.session_settings +---------------------------------------------- + + + connect_timeout = 2, + + count_distinct_implementation = uniq, + + join_algorithm = 'prefer_partial_merge',+ + join_use_nulls = 0, + + join_use_nulls = 1, + + log_queries_min_type = QUERY_FINISH, + + max_block_size = 32768, + + max_execution_time = 45, + + max_result_rows = 1024, + + metrics_perf_events_list = 'this,that', + + network_compression_method = ZSTD, + + poll_interval = 5, + + totals_mode = after_having_auto + + +(1 row) + + name | value +-------------------------------+---------------------- + connect_timeout | 2 + count_distinct_implementation | uniq + join_algorithm | prefer_partial_merge + join_use_nulls | 1 + log_queries_min_type | QUERY_FINISH + max_block_size | 32768 + max_execution_time | 45 + max_result_rows | 1024 + metrics_perf_events_list | this,that + network_compression_method | ZSTD + poll_interval | 5 + totals_mode | after_having_auto +(12 rows) + + name | value +-------------------------------+---------------------- + connect_timeout | 2 + count_distinct_implementation | uniq + join_algorithm | prefer_partial_merge + join_use_nulls | 1 + log_queries_min_type | QUERY_FINISH + max_block_size | 32768 + max_execution_time | 45 + max_result_rows | 1024 + metrics_perf_events_list | this,that + network_compression_method | ZSTD + poll_interval | 5 + totals_mode | after_having_auto +(12 rows) + + name | ?column? +-------------------------------+---------- + connect_timeout | t + count_distinct_implementation | t + join_algorithm | t + join_use_nulls | t + log_queries_min_type | t + max_block_size | t + max_execution_time | t + max_result_rows | t + metrics_perf_events_list | t + network_compression_method | t + poll_interval | t + totals_mode | t +(12 rows) + + name | ?column? +-------------------------------+---------- + connect_timeout | t + count_distinct_implementation | t + join_algorithm | t + join_use_nulls | t + log_queries_min_type | t + max_block_size | t + max_execution_time | t + max_result_rows | t + metrics_perf_events_list | t + network_compression_method | t + poll_interval | t + totals_mode | t +(12 rows) + +NOTICE: drop cascades to foreign table bin_remote_settings +NOTICE: drop cascades to foreign table http_remote_settings diff --git a/test/expected/http.out b/test/expected/http.out index f975fb9..ee89b27 100644 --- a/test/expected/http.out +++ b/test/expected/http.out @@ -132,6 +132,13 @@ SELECT c3, (c3 = 'lf\ntab\t\b\f\r') AS true FROM ft3 WHERE c1 = 2; lf\ntab\t\b\f\r | t (1 row) +INSERT INTO ft3 VALUES (3, ''); +SELECT c3, (c3 = '') AS true FROM ft3 WHERE c1 = 3; + c3 | true +----+------ + | t +(1 row) + INSERT INTO ft4 SELECT id, id + 1, diff --git a/test/expected/http_1.out b/test/expected/http_1.out index 32898fd..e1b2921 100644 --- a/test/expected/http_1.out +++ b/test/expected/http_1.out @@ -132,6 +132,13 @@ SELECT c3, (c3 = 'lf\ntab\t\b\f\r') AS true FROM ft3 WHERE c1 = 2; lf\ntab\t\b\f\r | t (1 row) +INSERT INTO ft3 VALUES (3, ''); +SELECT c3, (c3 = '') AS true FROM ft3 WHERE c1 = 3; + c3 | true +----+------ + | t +(1 row) + INSERT INTO ft4 SELECT id, id + 1, diff --git a/test/expected/http_2.out b/test/expected/http_2.out index d4fd921..adc9ed5 100644 --- a/test/expected/http_2.out +++ b/test/expected/http_2.out @@ -132,6 +132,13 @@ SELECT c3, (c3 = 'lf\ntab\t\b\f\r') AS true FROM ft3 WHERE c1 = 2; lf\ntab\t\b\f\r | t (1 row) +INSERT INTO ft3 VALUES (3, ''); +SELECT c3, (c3 = '') AS true FROM ft3 WHERE c1 = 3; + c3 | true +----+------ + | t +(1 row) + INSERT INTO ft4 SELECT id, id + 1, diff --git a/test/expected/http_3.out b/test/expected/http_3.out index 959ad12..cd89213 100644 --- a/test/expected/http_3.out +++ b/test/expected/http_3.out @@ -132,6 +132,13 @@ SELECT c3, (c3 = 'lf\ntab\t\b\f\r') AS true FROM ft3 WHERE c1 = 2; lf\ntab\t\b\f\r | t (1 row) +INSERT INTO ft3 VALUES (3, ''); +SELECT c3, (c3 = '') AS true FROM ft3 WHERE c1 = 3; + c3 | true +----+------ + | t +(1 row) + INSERT INTO ft4 SELECT id, id + 1, diff --git a/test/expected/http_4.out b/test/expected/http_4.out index 8ec4bdb..f95b8f3 100644 --- a/test/expected/http_4.out +++ b/test/expected/http_4.out @@ -132,6 +132,13 @@ SELECT c3, (c3 = 'lf\ntab\t\b\f\r') AS true FROM ft3 WHERE c1 = 2; lf\ntab\t\b\f\r | t (1 row) +INSERT INTO ft3 VALUES (3, ''); +SELECT c3, (c3 = '') AS true FROM ft3 WHERE c1 = 3; + c3 | true +----+------ + | t +(1 row) + INSERT INTO ft4 SELECT id, id + 1, diff --git a/test/expected/http_5.out b/test/expected/http_5.out index 0527ee7..5a29ed0 100644 --- a/test/expected/http_5.out +++ b/test/expected/http_5.out @@ -132,6 +132,13 @@ SELECT c3, (c3 = 'lf\ntab\t\b\f\r') AS true FROM ft3 WHERE c1 = 2; lf\ntab\t\b\f\r | t (1 row) +INSERT INTO ft3 VALUES (3, ''); +SELECT c3, (c3 = '') AS true FROM ft3 WHERE c1 = 3; + c3 | true +----+------ + | t +(1 row) + INSERT INTO ft4 SELECT id, id + 1, diff --git a/test/sql/gucs.sql b/test/sql/gucs.sql index d574f22..2b629d8 100644 --- a/test/sql/gucs.sql +++ b/test/sql/gucs.sql @@ -1,9 +1,8 @@ \unset ECHO SET client_min_messages = notice; -CREATE SERVER guc_loopback FOREIGN DATA WRAPPER clickhouse_fdw - OPTIONS(dbname 'system', driver 'binary'); -CREATE USER MAPPING FOR CURRENT_USER SERVER guc_loopback; +-- Load pg_clickhouse by calling one of its functions. +SELECT ch_noop_bigint(''); -- Test parsing. DO $do$ @@ -34,13 +33,106 @@ BEGIN END; $do$ LANGUAGE plpgsql; -CREATE FOREIGN TABLE remote_settings ( +-- Create servers for each engine. +CREATE SERVER guc_bin_svr FOREIGN DATA WRAPPER clickhouse_fdw + OPTIONS(dbname 'system', driver 'binary'); +CREATE USER MAPPING FOR CURRENT_USER SERVER guc_bin_svr; + +CREATE SERVER guc_http_svr FOREIGN DATA WRAPPER clickhouse_fdw + OPTIONS(dbname 'system', driver 'http'); +CREATE USER MAPPING FOR CURRENT_USER SERVER guc_http_svr; + +-- Create foreign tables for each engine. +CREATE FOREIGN TABLE bin_remote_settings ( + name text, + value text +) + SERVER guc_bin_svr + OPTIONS (table_name 'settings'); + +CREATE FOREIGN TABLE http_remote_settings ( name text, value text ) - SERVER guc_loopback + SERVER guc_http_svr OPTIONS (table_name 'settings'); +-- Reset to defaults. +RESET pg_clickhouse.session_settings; +SELECT name, value FROM bin_remote_settings WHERE name = 'join_use_nulls'; +SELECT name, value FROM http_remote_settings WHERE name = 'join_use_nulls'; + +-- List of seeings changed below. +select '{ + connect_timeout, + count_distinct_implementation, + join_algorithm, + join_use_nulls, + log_queries_min_type, + max_block_size, + max_execution_time, + max_result_rows, + metrics_perf_events_list, + network_compression_method, + poll_interval, + totals_mode +}' set_list \gset + +-- Unset and get defaults. +SET pg_clickhouse.session_settings TO ''; + +CREATE TEMPORARY TABLE default_settings AS +SELECT name, value + FROM bin_remote_settings + WHERE name = ANY(:'set_list'); + +-- Customize all of the above settings. +SET pg_clickhouse.session_settings TO $$ + connect_timeout = 2, + count_distinct_implementation = uniq, + join_algorithm = 'prefer_partial_merge', + join_use_nulls = 0, + join_use_nulls = 1, + log_queries_min_type = QUERY_FINISH, + max_block_size = 32768, + max_execution_time = 45, + max_result_rows = 1024, + metrics_perf_events_list = 'this,that', + network_compression_method = ZSTD, + poll_interval = 5, + totals_mode = after_having_auto +$$; + +SHOW pg_clickhouse.session_settings; + +-- Check the remote settings for both engines. +SELECT name, value + FROM bin_remote_settings + WHERE name = ANY(:'set_list') + ORDER BY name; + +SELECT name, value + FROM http_remote_settings +WHERE name = ANY(:'set_list') +ORDER BY name; + +-- Unset back to defaults. +SET pg_clickhouse.session_settings TO ''; + +SELECT remote.name, remote.value IS NOT DISTINCT FROM def.value + FROM bin_remote_settings remote + JOIN default_settings def ON remote.name = def.name +WHERE remote.name = ANY(:'set_list') +ORDER BY remote.name; + +SELECT remote.name, remote.value IS NOT DISTINCT FROM def.value + FROM http_remote_settings remote + JOIN default_settings def ON remote.name = def.name +WHERE remote.name = ANY(:'set_list') +ORDER BY remote.name; + -- Clean up. -DROP USER MAPPING FOR CURRENT_USER SERVER guc_loopback; -DROP SERVER guc_loopback CASCADE; +DROP USER MAPPING FOR CURRENT_USER SERVER guc_bin_svr; +DROP SERVER guc_bin_svr CASCADE; +DROP USER MAPPING FOR CURRENT_USER SERVER guc_http_svr; +DROP SERVER guc_http_svr CASCADE; diff --git a/test/sql/http.sql b/test/sql/http.sql index eb82516..1a15647 100644 --- a/test/sql/http.sql +++ b/test/sql/http.sql @@ -99,6 +99,8 @@ INSERT INTO ft3 VALUES (1, E'lf\ntab\t\b\f\r'); SELECT c3, (c3 = E'lf\ntab\t\b\f\r') AS true FROM ft3 WHERE c1 = 1; INSERT INTO ft3 VALUES (2, 'lf\ntab\t\b\f\r'); SELECT c3, (c3 = 'lf\ntab\t\b\f\r') AS true FROM ft3 WHERE c1 = 2; +INSERT INTO ft3 VALUES (3, ''); +SELECT c3, (c3 = '') AS true FROM ft3 WHERE c1 = 3; INSERT INTO ft4 SELECT id,