Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 45 additions & 1 deletion doc/pg_clickhouse.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,44 @@ are not supported and will raise an error.
* `quantile(double)` => [quantile](https://clickhouse.com/docs/sql-reference/aggregate-functions/reference/quantile)
* `quantileExact(double)` => [quantileExact](https://clickhouse.com/docs/sql-reference/aggregate-functions/reference/quantileexact)

### Session Settings

Set the `pg_clickhouse.session_settings` runtime parameter to configure
[ClickHouse settings] to be set on subsequent queries. Example:

```sql
SET pg_clickhouse.session_settings = 'join_use_nulls=1, final=1';
```

The default is `join_use_nulls=1`. Set it to an empty string to fall back on
the ClickHouse server's settings.

```sql
SET pg_clickhouse.session_settings = '';
```

The syntax is a comma-delimited list of key/value pairs separated by an equal
sign. Keys must correspond to [ClickHouse settings]. Escape spaces, commas,
and backslashes in values with a backslash:

```sql
SET pg_clickhouse.session_settings = 'join_algorithm = grace_hash\,hash';
```

Or use single quoted values to avoid escaping spaces and commas; consider
using [dollar quoting] to avoid the need to double-quote:

```sql
SET pg_clickhouse.session_settings = $$join_algorithm = 'grace_hash,hash'$$;
```

pg_clickhouse does not validate the settings, but passes them on to ClickHouse
for every query. It thus supports all settings for each ClickHouse version.

Note that pg_clickhouse must be loaded before setting
`pg_clickhouse.session_settings`; either use [library preloading] or simply
use one of the objects in the extension to ensure it loads.

## Authors

* [Ildus Kurbangaliev](https://github.com/ildus)
Expand All @@ -173,4 +211,10 @@ are not supported and will raise an error.
"PostgreSQL Docs: Writing a Foreign Data Wrapper"
[ClickHouse]: https://clickhouse.com/clickhouse
[ordered-set aggregate functions]: https://www.postgresql.org/docs/current/functions-aggregate.html#FUNCTIONS-ORDEREDSET-TABLE
[Parametric aggregate functions]: https://clickhouse.com/docs/sql-reference/aggregate-functions/parametric-functions
[Parametric aggregate functions]: https://clickhouse.com/docs/sql-reference/aggregate-functions/parametric-functions
[ClickHouse settings]: https://clickhouse.com/docs/operations/settings/settings
"ClickHouse Docs: Session Settings"
[dollar quoting]: https://www.postgresql.org/docs/current/sql-syntax-lexical.html#SQL-SYNTAX-DOLLAR-QUOTING
"PostgreSQL Docs: Dollar-Quoted String Constants"
[library preloading]: https://www.postgresql.org/docs/18/runtime-config-client.html#RUNTIME-CONFIG-CLIENT-PRELOAD
"PostgreSQL Docs: Shared Library Preloading
44 changes: 37 additions & 7 deletions src/binary.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,30 @@ static void set_resp_error(ch_binary_response_t * resp, const char * str)
strcpy(resp->error, str);
}

/*
* Converts query->settings to QuerySettings.
*/
static QuerySettings ch_binary_settings(const ch_query *query)
{
ListCell *lc;
auto res = QuerySettings{};
foreach (lc, (List *) query->settings)
{
/*
* foreach reads a non-const, so we have to cast. Would be nice to use
* foreach_ptr:
*
* foreach_ptr(DefElem, setting, query->settings)
*
* But it's only available in Postgres 17 and later.
*/
DefElem *setting = (DefElem *) lfirst(lc);
res.insert_or_assign(setting->defname, QuerySettingsField{strVal(setting->arg), 1});
}

return res;
}

static void set_state_error(ch_binary_read_state_t * state, const char * str)
{
assert(state->error == NULL);
Expand All @@ -176,7 +200,7 @@ static void set_state_error(ch_binary_read_state_t * state, const char * str)
}

ch_binary_response_t * ch_binary_simple_query(
ch_binary_connection_t * conn, const char * query, bool (*check_cancel)(void))
ch_binary_connection_t * conn, const ch_query * query, bool (*check_cancel)(void))
{
Client * client = (Client *)conn->client;
ch_binary_response_t * resp;
Expand All @@ -187,10 +211,9 @@ ch_binary_response_t * ch_binary_simple_query(
resp = new ch_binary_response_t();
values = new std::vector<std::vector<clickhouse::ColumnRef>>();
client->Select(
clickhouse::Query(query).SetQuerySettings(QuerySettings{
/* Enable SQL compatibility. */
{"join_use_nulls", QuerySettingsField{ "1", 1 }},
}).OnDataCancelable([&resp, &values, &check_cancel](const Block & block) {
clickhouse::Query(query->sql).SetQuerySettings(
ch_binary_settings(query)
).OnDataCancelable([&resp, &values, &check_cancel](const Block & block) {
if (check_cancel && check_cancel())
{
set_resp_error(resp, "query was canceled");
Expand Down Expand Up @@ -315,14 +338,21 @@ void ch_binary_insert_state_free(void * c)
}
}

void ch_binary_prepare_insert(void * conn, char * query, ch_binary_insert_state * state)
void ch_binary_prepare_insert(void * conn, const ch_query * query, ch_binary_insert_state * state)
{
/* Start the INSERT. */
Block * block;
Client * client = (Client *)((ch_binary_connection_t *)conn)->client;
try
{
block = new Block(client->BeginInsert(std::string(query) + " VALUES"));
block = new Block(client->BeginInsert(std::string(query->sql) + " VALUES"));
/* XXX https://github.com/ClickHouse/clickhouse-cpp/pull/453/
block = new Block(client->BeginInsert(
clickhouse::Query(std::string(query->sql)+ " VALUES").SetQuerySettings(
ch_binary_settings(query)
)
));
*/
}
catch (const std::exception & e)
{
Expand Down
12 changes: 7 additions & 5 deletions src/fdw.c.in
Original file line number Diff line number Diff line change
Expand Up @@ -284,12 +284,12 @@ static void merge_fdw_options(CHFdwRelationInfo * fpinfo,
Datum
clickhouse_raw_query(PG_FUNCTION_ARGS)
{
char *connstring = text_to_cstring(PG_GETARG_TEXT_P(1)),
*query = text_to_cstring(PG_GETARG_TEXT_P(0));
char *connstring = text_to_cstring(PG_GETARG_TEXT_P(1));
ch_query query = new_query(text_to_cstring(PG_GETARG_TEXT_P(0)));

ch_connection_details *details = connstring_parse(connstring);
ch_connection conn = chfdw_http_connect(details);
ch_cursor *cursor = conn.methods->simple_query(conn.conn, query);
ch_cursor *cursor = conn.methods->simple_query(conn.conn, &query);
text *res = chfdw_http_fetch_raw_data(cursor);

MemoryContextDelete(cursor->memcxt);
Expand Down Expand Up @@ -973,14 +973,15 @@ clickhouseIterateForeignScan(ForeignScanState * node)
struct timeval time1,
time2;
TupleDesc tupdesc;
ch_query query = new_query(fsstate->query);

/* make query if needed */
if (fsstate->ch_cursor == NULL)
{
MemoryContext old = MemoryContextSwitchTo(fsstate->batch_cxt);

fsstate->ch_cursor = fsstate->conn.methods->simple_query(fsstate->conn.conn,
fsstate->query);
&query);

time_used += fsstate->ch_cursor->request_time;
MemoryContextSwitchTo(old);
Expand Down Expand Up @@ -1379,6 +1380,7 @@ create_foreign_modify(EState * estate,
UserMapping *user;
MemoryContext old_mcxt;
Relation rel = rri->ri_RelationDesc;
ch_query q = new_query(query);

/* Begin constructing CHFdwModifyState. */
fmstate = (CHFdwModifyState *) palloc0(sizeof(CHFdwModifyState));
Expand All @@ -1403,7 +1405,7 @@ create_foreign_modify(EState * estate,

old_mcxt = MemoryContextSwitchTo(PortalContext);
fmstate->state = fmstate->conn.methods->prepare_insert(fmstate->conn.conn,
rri, target_attrs, query, table_name);
rri, target_attrs, &q, table_name);
MemoryContextSwitchTo(old_mcxt);

/* Create context for per-query temp workspace. */
Expand Down
37 changes: 23 additions & 14 deletions src/http.c
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ ch_http_connection(ch_connection_details * details)
goto cleanup;

conn->base_url = connstring;
conn->base_url_len = strlen(conn->base_url);

return conn;

Expand All @@ -152,12 +151,16 @@ set_query_id(ch_http_response_t * resp)
}

ch_http_response_t *
ch_http_simple_query(ch_http_connection_t * conn, const char *query)
ch_http_simple_query(ch_http_connection_t * conn, const ch_query * query)
{
char *url;
CURLcode errcode;
static char errbuffer[CURL_ERROR_SIZE];
struct curl_slist *headers = NULL;
CURLU *cu = curl_url();
ListCell *lc;
DefElem *setting;
char *buf = NULL;

ch_http_response_t *resp = calloc(sizeof(ch_http_response_t), 1);

Expand All @@ -168,12 +171,22 @@ ch_http_simple_query(ch_http_connection_t * conn, const char *query)

assert(conn && conn->curl);

/* Enable SQL compatibility. */
const char *params = "join_use_nulls=1";
/* Construct the base URL with the query ID. */
curl_url_set(cu, CURLUPART_URL, conn->base_url, 0);
buf = psprintf("query_id=%s", resp->query_id);
curl_url_set(cu, CURLUPART_QUERY, buf, CURLU_APPENDQUERY | CURLU_URLENCODE);
pfree(buf);

/* construct url: query_id + ?query_id= + params */
url = malloc(conn->base_url_len + 37 + 12 + strlen(params));
sprintf(url, "%s?query_id=%s&%s", conn->base_url, resp->query_id, params);
/* Append each of the settings as a query param. */
foreach(lc, (List *) query->settings)
{
setting = (DefElem *) lfirst(lc);
buf = psprintf("%s=%s", setting->defname, strVal(setting->arg));
curl_url_set(cu, CURLUPART_QUERY, buf, CURLU_APPENDQUERY | CURLU_URLENCODE);
pfree(buf);
}
curl_url_get(cu, CURLUPART_URL, &url, 0);
curl_url_cleanup(cu);

/* constant */
errbuffer[0] = '\0';
Expand All @@ -186,7 +199,7 @@ ch_http_simple_query(ch_http_connection_t * conn, const char *query)

/* variable */
curl_easy_setopt(conn->curl, CURLOPT_WRITEDATA, resp);
curl_easy_setopt(conn->curl, CURLOPT_POSTFIELDS, query);
curl_easy_setopt(conn->curl, CURLOPT_POSTFIELDS, query->sql);
curl_easy_setopt(conn->curl, CURLOPT_VERBOSE, curl_verbose);
if (curl_progressfunc)
{
Expand All @@ -198,17 +211,13 @@ ch_http_simple_query(ch_http_connection_t * conn, const char *query)
curl_easy_setopt(conn->curl, CURLOPT_NOPROGRESS, 1L);
if (conn->dbname)
{
char *buf = palloc(strlen(conn->dbname) + strlen(DATABASE_HEADER) + 3);

sprintf(buf, "%s: %s", DATABASE_HEADER, conn->dbname);
headers = curl_slist_append(headers, buf);
headers = curl_slist_append(headers, psprintf("%s: %s", DATABASE_HEADER, conn->dbname));
curl_easy_setopt(conn->curl, CURLOPT_HTTPHEADER, headers);
pfree(buf);
}

curl_error_happened = false;
errcode = curl_easy_perform(conn->curl);
free(url);
curl_free(url);
if (headers)
curl_slist_free_all(headers);

Expand Down
6 changes: 3 additions & 3 deletions src/include/binary.hh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#ifndef CLICKHOUSE_BINARY_H
#define CLICKHOUSE_BINARY_H

#include "connect.h"
#include "engine.h"

#ifdef __cplusplus
extern "C"
Expand Down Expand Up @@ -71,7 +71,7 @@ extern "C"
extern ch_binary_connection_t * ch_binary_connect(ch_connection_details * details, char **error);
extern void ch_binary_close(ch_binary_connection_t * conn);
extern ch_binary_response_t * ch_binary_simple_query(ch_binary_connection_t * conn,
const char *query, bool (*check_cancel) (void));
const ch_query * query, bool (*check_cancel) (void));
extern void ch_binary_response_free(ch_binary_response_t * resp);

/* reading */
Expand All @@ -83,7 +83,7 @@ extern "C"
void ch_binary_free_convert_state(void *);

/* insertion */
void ch_binary_prepare_insert(void *conn, char *query,
void ch_binary_prepare_insert(void *conn, const ch_query * query,
ch_binary_insert_state * state);
void ch_binary_insert_columns(ch_binary_insert_state * state);
void ch_binary_column_append_data(ch_binary_insert_state * state, size_t colidx);
Expand Down
15 changes: 0 additions & 15 deletions src/include/connect.h

This file was deleted.

29 changes: 29 additions & 0 deletions src/include/engine.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#ifndef CLICKHOUSE_ENGINE_H
#define CLICKHOUSE_ENGINE_H

#include "nodes/pathnodes.h"

/*
* ch_connection_details defines the details for connecting to ClickHouse.
*/
typedef struct
{
char *host;
int port;
char *username;
char *password;
char *dbname;
} ch_connection_details;

/*
* ch_query an SQL query to execute on ClickHouse.
*/
typedef struct
{
const char *sql;
const List *settings;
} ch_query;

#define new_query(sql) {sql, chfdw_parse_options(ch_session_settings, true)}

#endif /* CLICKHOUSE_ENGINE_H */
10 changes: 4 additions & 6 deletions src/include/fdw.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
#include "optimizer/optimizer.h"
#include "nodes/pathnodes.h"
#include "access/heapam.h"
#include "connect.h"
#include "engine.h"

#if PG_VERSION_NUM < 150000
#define FirstUnpinnedObjectId FirstBootstrapObjectId
Expand All @@ -48,20 +48,18 @@ typedef struct ch_cursor

typedef void (*disconnect_method) (void *conn);
typedef void (*check_conn_method) (const char *password, UserMapping * user);
typedef ch_cursor * (*simple_query_method) (void *conn, const char *query);
typedef void (*simple_insert_method) (void *conn, const char *query);
typedef void (*cursor_free_method) (ch_cursor * cursor);
typedef ch_cursor * (*simple_query_method) (void *conn, const ch_query *query);
typedef void (*simple_insert_method) (void *conn, const ch_query *query);
typedef void **(*cursor_fetch_row_method) (ch_cursor * cursor, List * attrs,
TupleDesc tupdesc, Datum * values, bool *nulls);
typedef void *(*prepare_insert_method) (void *conn, ResultRelInfo *, List *,
char *, char *);
const ch_query *, char *);
typedef void (*insert_tuple_method) (void *state, TupleTableSlot * slot);

typedef struct
{
disconnect_method disconnect;
simple_query_method simple_query;
cursor_free_method cursor_free;
cursor_fetch_row_method fetch_row;
prepare_insert_method prepare_insert;
insert_tuple_method insert_tuple;
Expand Down
4 changes: 2 additions & 2 deletions src/include/http.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#include "postgres.h"
#include "nodes/pg_list.h"
#include "lib/stringinfo.h"
#include "connect.h"
#include "engine.h"

typedef struct ch_http_connection_t ch_http_connection_t;
typedef struct ch_http_response_t
Expand Down Expand Up @@ -47,7 +47,7 @@ void ch_http_init(int verbose, uint32_t query_id_prefix);
void ch_http_set_progress_func(void *progressfunc);
ch_http_connection_t *ch_http_connection(ch_connection_details * details);
void ch_http_close(ch_http_connection_t * conn);
ch_http_response_t *ch_http_simple_query(ch_http_connection_t * conn, const char *query);
ch_http_response_t *ch_http_simple_query(ch_http_connection_t * conn, const ch_query *query);
char *ch_http_last_error(void);

/* read */
Expand Down
Loading
Loading