From 96b068db8b81712fa8a04c6eeead5961659c9b20 Mon Sep 17 00:00:00 2001 From: morsapaes Date: Wed, 3 Dec 2025 09:54:57 +0100 Subject: [PATCH] Improve Kafka Table Engine documentation --- .../kafka/kafka-table-engine.md | 442 ++++++++++++------ 1 file changed, 309 insertions(+), 133 deletions(-) diff --git a/docs/integrations/data-ingestion/kafka/kafka-table-engine.md b/docs/integrations/data-ingestion/kafka/kafka-table-engine.md index ef102450bd5..17e94902167 100644 --- a/docs/integrations/data-ingestion/kafka/kafka-table-engine.md +++ b/docs/integrations/data-ingestion/kafka/kafka-table-engine.md @@ -1,8 +1,8 @@ --- -sidebar_label: 'Kafka Table Engine' +sidebar_label: 'Kafka table engine' sidebar_position: 5 slug: /integrations/kafka/kafka-table-engine -description: 'Using the Kafka Table Engine' +description: 'Using the Kafka table engine to read data from and write data to Apache Kafka and other Kafka API-compatible brokers (e.g. Redpanda, Amazon MSK)' title: 'Using the Kafka table engine' --- @@ -11,73 +11,96 @@ import kafka_01 from '@site/static/images/integrations/data-ingestion/kafka/kafk import kafka_02 from '@site/static/images/integrations/data-ingestion/kafka/kafka_02.png'; import kafka_03 from '@site/static/images/integrations/data-ingestion/kafka/kafka_03.png'; import kafka_04 from '@site/static/images/integrations/data-ingestion/kafka/kafka_04.png'; +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; # Using the Kafka table engine -The Kafka table engine can be used to [**read** data from](#kafka-to-clickhouse) and [**write** data to](#clickhouse-to-kafka) Apache Kafka and other Kafka API-compatible brokers (e.g., Redpanda, Amazon MSK). +The Kafka table engine can be used to [**read** data from](#kafka-to-clickhouse) and [**write** data to](#clickhouse-to-kafka) Apache Kafka and other Kafka API-compatible brokers (e.g., Redpanda, Amazon MSK). This engine is bundled with open-source ClickHouse and is available across all deployment types. ### Kafka to ClickHouse {#kafka-to-clickhouse} -:::note -If you're on ClickHouse Cloud, we recommend using [ClickPipes](/integrations/clickpipes) instead. ClickPipes natively supports private network connections, scaling ingestion and cluster resources independently, and comprehensive monitoring for streaming Kafka data into ClickHouse. +:::tip +If you're on ClickHouse Cloud, we recommend using **[ClickPipes](/integrations/clickpipes)** instead. ClickPipes natively supports private network connections, scaling ingestion and cluster resources independently, and comprehensive monitoring for streaming Kafka data into ClickHouse. ::: -To use the Kafka table engine, you should be broadly familiar with [ClickHouse materialized views](../../../guides/developer/cascading-materialized-views.md). +You can use the Kafka table engine to ingest data from Kafka topics into ClickHouse. The engine is designed to continuously consume and stream messages to attached materialized views, which then insert data into target tables for persistent storage. For this reason, you should be broadly familiar with [materialized views](../../../guides/developer/cascading-materialized-views.md) and [Merge Tree family tables](../../../engines/table-engines/mergetree-family/index.md) when using the Kafka table engine to read data from Apache Kafka and other Kafka API-compatible brokers. -#### Overview {#overview} +Kafka table engine architecture diagram -Initially, we focus on the most common use case: using the Kafka table engine to insert data into ClickHouse from Kafka. +The engine ensures reliable processing through **at-least-once** semantics: consumer offsets are only committed to Kafka after all attached materialized views successfully process each batch of messages. If there is an error in any materialized view attached to the engine, the consumer offsets will not be committed, and the same messages will be retried until all materialized views succeed. This means that it is possible to get [duplicates in failure scenarios](#delivery-semantics-and-challenges-with-duplicates). -The Kafka table engine allows ClickHouse to read from a Kafka topic directly. Whilst useful for viewing messages on a topic, the engine by design only permits one-time retrieval, i.e. when a query is issued to the table, it consumes data from the queue and increases the consumer offset before returning results to the caller. Data cannot, in effect, be re-read without resetting these offsets. +#### Quickstart {#quickstart} -To persist this data from a read of the table engine, we need a means of capturing the data and inserting it into another table. Trigger-based materialized views natively provide this functionality. A materialized view initiates a read on the table engine, receiving batches of documents. The TO clause determines the destination of the data - typically a table of the [Merge Tree family](../../../engines/table-engines/mergetree-family/index.md). This process is visualized below: +To get started ingesting data from Kafka into ClickHouse, follow the steps below. If you already have an existing topic you'd like to consume data from, skip to [Step 3](#3-configure-data-ingestion). -Kafka table engine architecture diagram +##### 1. Prepare a sample dataset {#1-prepare-a-sample-dataset} -#### Steps {#steps} +To create a new topic with a sample dataset for testing, you can use [this Github dataset](https://datasets-documentation.s3.eu-west-3.amazonaws.com/kafka/github_all_columns.ndjson). To download the dataset, run: +```sh +curl -O https://datasets-documentation.s3.eu-west-3.amazonaws.com/kafka/github_all_columns.ndjson +``` -##### 1. Prepare {#1-prepare} +This dataset is a subset of the [GitHub archive dataset](https://ghe.clickhouse.tech/), modified to include only GitHub events for the [ClickHouse repository](https://github.com/ClickHouse/ClickHouse). Most queries [published with the dataset](https://ghe.clickhouse.tech/) can be used with this modified version to explore the data in ClickHouse, once ingested. -If you have data populated on a target topic, you can adapt the following for use in your dataset. Alternatively, a sample Github dataset is provided [here](https://datasets-documentation.s3.eu-west-3.amazonaws.com/kafka/github_all_columns.ndjson). This dataset is used in the examples below and uses a reduced schema and subset of the rows (specifically, we limit to Github events concerning the [ClickHouse repository](https://github.com/ClickHouse/ClickHouse)), compared to the full dataset available [here](https://ghe.clickhouse.tech/), for brevity. This is still sufficient for most of the queries [published with the dataset](https://ghe.clickhouse.tech/) to work. +##### 2. Create and populate the topic {#2-create-and-populate-the-topic} +[//]: # "TODO Consider providing instructions specific to all supported brokers or hosted services. Otherwise, using the CLI tools that ship with Kafka installations makes it easier to get started with no dependencies." -##### 2. Configure ClickHouse {#2-configure-clickhouse} +Next, create a new topic in your target broker. For example, if you're running Kafka locally, you can use the built-in [Kafka CLI tools](https://docs.confluent.io/kafka/operations-tools/kafka-tools.html): -This step is required if you are connecting to a secure Kafka. These settings cannot be passed through the SQL DDL commands and must be configured in the ClickHouse config.xml. We assume you are connecting to a SASL secured instance. This is the simplest method when interacting with Confluent Cloud. +```bash +bin/kafka-topics.sh --bootstrap-server : --topic github --partitions 3 +``` +If you're using a hosted service like Confluent Cloud, you can use the [Cloud Console](https://docs.confluent.io/cloud/current/topics/overview.html) or a client like the [Confluent CLI](https://docs.confluent.io/confluent-cli/current/install.html): -```xml - - - username - password - sasl_ssl - PLAIN - - +```bash +confluent kafka topic create --if-not-exists github --partitions 3 ``` -Either place the above snippet inside a new file under your conf.d/ directory or merge it into existing configuration files. For settings that can be configured, see [here](../../../engines/table-engines/integrations/kafka.md#configuration). - -We're also going to create a database called `KafkaEngine` to use in this tutorial: +To load the sample dataset into the topic, you can then use a command-line tool like [kcat](https://github.com/edenhill/kcat). For example, if you're running Kafka locally with authentication disabled: -```sql -CREATE DATABASE KafkaEngine; +```bash +cat github_all_columns.ndjson | +kcat -P \ + -b : \ + -t github ``` -Once you've created the database, you'll need to switch over to it: +Or if you're using a hosted service like Confluent Cloud with SASL authentication: -```sql -USE KafkaEngine; +```bash +cat github_all_columns.ndjson | +kcat -P \ + -b : \ + -t github \ + -X security.protocol=sasl_ssl \ + -X sasl.mechanisms=PLAIN \ + -X sasl.username= \ + -X sasl.password= ``` -##### 3. Create the destination table {#3-create-the-destination-table} +The dataset contains 200,000 rows, so it should be available in the specified topic in a few seconds. If you want to work with a larger dataset, take a look at [the large datasets section](https://github.com/ClickHouse/kafka-samples/tree/main/producer#large-datasets) of the [ClickHouse/kafka-samples](https://github.com/ClickHouse/kafka-samples) GitHub repository. + +##### 3. Configure data ingestion {#3-configure-data-ingestion} -Prepare your destination table. In the example below we use the reduced GitHub schema for purposes of brevity. Note that although we use a MergeTree table engine, this example could easily be adapted for any member of the [MergeTree family](../../../engines/table-engines/mergetree-family/index.md). +[//]: # "TODO We should not teach users to select directly from the Kafka table, since this approach isn't reliable. When we rollout the v2 engine, we can reconsider, but now we should direct users to finish setting up and check ingestion progress in the system catalog table." + +Before ClickHouse can ingest data from a Kafka topic, you must first provide details on how to connect to and authenticate with your Kafka broker, as well as how to interpret the data. In this example, the Kafka broker uses simple authentication (SASL), the source data is `JSON`-encoded and no schema registry is used. For a complete overview of all the supported formats, features, and configuration options, see the [reference documentation](../../../engines/table-engines/integrations/kafka). + + + + +In ClickHouse Cloud, you can provide inline credentials in the Kafka table engine `CREATE TABLE` statement using the `SETTINGS` clause. See the [reference documentation](../../../engines/table-engines/integrations/kafka.md#creating-a-table) for supported setting configurations. + +:::info +It is **not** possible to connect to brokers using TLS/SSL from ClickHouse Cloud, since there is no mechanism to upload and rotate certificates yet — only SASL is supported. If this is a requirement for your use case, we recommend using [ClickPipes](/integrations/clickpipes) or the [Kafka Connect Sink](/integrations/kafka/clickhouse-kafka-connect-sink) instead. +::: ```sql -CREATE TABLE github +CREATE TABLE github_queue ( file_time DateTime, event_type Enum('CommitCommentEvent' = 1, 'CreateEvent' = 2, 'DeleteEvent' = 3, 'ForkEvent' = 4, 'GollumEvent' = 5, 'IssueCommentEvent' = 6, 'IssuesEvent' = 7, 'MemberEvent' = 8, 'PublicEvent' = 9, 'PullRequestEvent' = 10, 'PullRequestReviewCommentEvent' = 11, 'PushEvent' = 12, 'ReleaseEvent' = 13, 'SponsorshipEvent' = 14, 'WatchEvent' = 15, 'GistEvent' = 16, 'FollowEvent' = 17, 'DownloadEvent' = 18, 'PullRequestReviewEvent' = 19, 'ForkApplyEvent' = 20, 'Event' = 21, 'TeamAddEvent' = 22), @@ -104,50 +127,109 @@ CREATE TABLE github merged_by LowCardinality(String), review_comments UInt32, member_login LowCardinality(String) -) ENGINE = MergeTree ORDER BY (event_type, repo_name, created_at) -``` +) +ENGINE = Kafka() +SETTINGS kafka_broker_list=':', + kafka_topic_list='github', + kafka_group_name='clickhouse', + kafka_format = 'JSONEachRow', + kafka_thread_per_consumer = 0, + kafka_num_consumers = 1, + -- Connecting to a Confluent Cloud broker using + -- simple username/password authentication + kafka_security_protocol='sasl_ssl', + kafka_sasl_mechanism = 'PLAIN', + kafka_sasl_username = '', + kafka_sasl_password = ''; +``` + + + + +In self-hosted ClickHouse, you can configure credentials using [configuration files](../../../operations/configuration-files.md), [named collections](../../../operations/named-collections.md#named-collections-for-accessing-kafka), or inline in the `CREATE TABLE` statement using the [`SETTINGS` clause](../../../engines/table-engines/integrations/kafka.md#creating-a-table). + +:::tip +Inline credentials are a good fit for prototyping (e.g., to follow the steps in this guide). For production environments, or environments with a large number of tables reading from the same broker, we recommend using configuration files or named collections to manage credentials. +::: -##### 4. Create and populate the topic {#4-create-and-populate-the-topic} +**Configuration files** -Next, we're going to create a topic. There are several tools that we can use to do this. If we're running Kafka locally on our machine or inside a Docker container, [RPK](https://docs.redpanda.com/current/get-started/rpk-install/) works well. We can create a topic called `github` with 5 partitions by running the following command: +The Kafka table engine supports extended configuration using ClickHouse config files. You can either place the Kafka-specific configuration in a new file under the `conf.d/` directory, or append it to existing configuration files. -```bash -rpk topic create -p 5 github --brokers : +```xml + + + username + password + sasl_ssl + PLAIN + + ``` -If we're running Kafka on the Confluent Cloud, we might prefer to use the [Confluent CLI](https://docs.confluent.io/platform/current/tutorials/examples/clients/docs/kcat.html#produce-records): +See the [reference documentation](../../../engines/table-engines/integrations/kafka.md#configuration) for supported configuration keys. -```bash -confluent kafka topic create --if-not-exists github -``` +**Named collections** -Now we need to populate this topic with some data, which we'll do using [kcat](https://github.com/edenhill/kcat). We can run a command similar to the following if we're running Kafka locally with authentication disabled: +You can use [named collections](../../../operations/named-collections.md#named-collections-for-accessing-kafka) to securely store and reuse credentials across multiple `CREATE TABLE` statements. -```bash -cat github_all_columns.ndjson | -kcat -P \ - -b : \ - -t github +```sql +CREATE NAMED COLLECTION my_kafka_cluster AS + kafka_broker_list = ':', + kafka_security_protocol='sasl_ssl', + kafka_sasl_mechanism = 'PLAIN', + kafka_sasl_username = '', + kafka_sasl_password = ''; ``` -Or the following if our Kafka cluster uses SASL to authenticate: +You can then inline a named collection in the `ENGINE` clause of Kafka table engine `CREATE TABLE` statements. -```bash -cat github_all_columns.ndjson | -kcat -P \ - -b : \ - -t github - -X security.protocol=sasl_ssl \ - -X sasl.mechanisms=PLAIN \ - -X sasl.username= \ - -X sasl.password= \ +```sql +CREATE TABLE github_queue +( + file_time DateTime, + event_type Enum('CommitCommentEvent' = 1, 'CreateEvent' = 2, 'DeleteEvent' = 3, 'ForkEvent' = 4, 'GollumEvent' = 5, 'IssueCommentEvent' = 6, 'IssuesEvent' = 7, 'MemberEvent' = 8, 'PublicEvent' = 9, 'PullRequestEvent' = 10, 'PullRequestReviewCommentEvent' = 11, 'PushEvent' = 12, 'ReleaseEvent' = 13, 'SponsorshipEvent' = 14, 'WatchEvent' = 15, 'GistEvent' = 16, 'FollowEvent' = 17, 'DownloadEvent' = 18, 'PullRequestReviewEvent' = 19, 'ForkApplyEvent' = 20, 'Event' = 21, 'TeamAddEvent' = 22), + actor_login LowCardinality(String), + repo_name LowCardinality(String), + created_at DateTime, + updated_at DateTime, + action Enum('none' = 0, 'created' = 1, 'added' = 2, 'edited' = 3, 'deleted' = 4, 'opened' = 5, 'closed' = 6, 'reopened' = 7, 'assigned' = 8, 'unassigned' = 9, 'labeled' = 10, 'unlabeled' = 11, 'review_requested' = 12, 'review_request_removed' = 13, 'synchronize' = 14, 'started' = 15, 'published' = 16, 'update' = 17, 'create' = 18, 'fork' = 19, 'merged' = 20), + comment_id UInt64, + path String, + ref LowCardinality(String), + ref_type Enum('none' = 0, 'branch' = 1, 'tag' = 2, 'repository' = 3, 'unknown' = 4), + creator_user_login LowCardinality(String), + number UInt32, + title String, + labels Array(LowCardinality(String)), + state Enum('none' = 0, 'open' = 1, 'closed' = 2), + assignee LowCardinality(String), + assignees Array(LowCardinality(String)), + closed_at DateTime, + merged_at DateTime, + merge_commit_sha String, + requested_reviewers Array(LowCardinality(String)), + merged_by LowCardinality(String), + review_comments UInt32, + member_login LowCardinality(String) +) +ENGINE = Kafka(my_kafka_cluster) +SETTINGS kafka_topic_list='github', + kafka_group_name='clickhouse', + kafka_format = 'JSONEachRow', + kafka_thread_per_consumer = 0, + kafka_num_consumers = 1; ``` -The dataset contains 200,000 rows, so it should be ingested in just a few seconds. If you want to work with a larger dataset, take a look at [the large datasets section](https://github.com/ClickHouse/kafka-samples/tree/main/producer#large-datasets) of the [ClickHouse/kafka-samples](https://github.com/ClickHouse/kafka-samples) GitHub repository. +See the [reference documentation](../../../engines/table-engines/integrations/kafka.md#configuration) for supported configuration keys and [this guide](./kafka-table-engine-named-collections.md) for a complete walkthrough of using named collections with the Kafka table engine. -##### 5. Create the Kafka table engine {#5-create-the-kafka-table-engine} +**Inline** -The below example creates a table engine with the same schema as the merge tree table. This isn't strictly required, as you can have an alias or ephemeral columns in the target table. The settings are important; however - note the use of `JSONEachRow` as the data type for consuming JSON from a Kafka topic. The values `github` and `clickhouse` represent the name of the topic and consumer group names, respectively. The topics can actually be a list of values. +You can provide inline credentials in the Kafka table engine `CREATE TABLE` statement using the `SETTINGS` clause. See the [reference documentation](../../../engines/table-engines/integrations/kafka.md#creating-a-table) for supported setting configurations. + +:::info +It is **not** possible to configure certificates to connect to brokers using TLS/SSL using this method, since the required options are not exposed via SQL — only SASL is supported. If this is the case, use one of the other methods instead. +::: ```sql CREATE TABLE github_queue @@ -178,16 +260,47 @@ CREATE TABLE github_queue review_comments UInt32, member_login LowCardinality(String) ) - ENGINE = Kafka('kafka_host:9092', 'github', 'clickhouse', - 'JSONEachRow') SETTINGS kafka_thread_per_consumer = 0, kafka_num_consumers = 1; +ENGINE = Kafka() +SETTINGS kafka_broker_list=':', + kafka_topic_list='github', + kafka_group_name='clickhouse', + kafka_format = 'JSONEachRow', + kafka_thread_per_consumer = 0, + kafka_num_consumers = 1, + -- Connecting to a Kafka broker using simple + -- username/password authentication + kafka_security_protocol='sasl_ssl', + kafka_sasl_mechanism = 'PLAIN', + kafka_sasl_username = '', + kafka_sasl_password = ''; +``` + + + + +It's important to note that creating a Kafka table engine table does **not** start data ingestion — it simply configures a consumer. After this step, you must create a target table and a materialized view to start data ingestion from the specified topic. + +:::tip +The Kafka table engine is designed for one-time data retrieval. You should **never** select data from a Kafka table directly, but use a materialized view and query its associated target table instead. +::: + +##### 4. Create a target table {#4-create-a-target-table} + +Once you define the schema of your Kafka table engine table, you must create a target table that will persist the data in ClickHouse. If the schema of your target table is the same as the schema you defined for the ingestion table (`github_queue`), you can use the [`CREATE TABLE AS` syntax](../../../sql-reference/statements/create/table#with-a-schema-similar-to-other-table-with-a-schema-similar-to-other-table) to copy that schema over. + +```sql +CREATE TABLE github AS github_queue +ENGINE = MergeTree() +ORDER BY (event_type, repo_name, created_at); ``` +This table must use an engine of the [Merge Tree family](../../../engines/table-engines/mergetree-family/index.md). For simplicity, this example uses the `MergeTree()` engine, but you should evaluate the best fit for your use case. -We discuss engine settings and performance tuning below. At this point, a simple select on the table `github_queue` should read some rows. Note that this will move the consumer offsets forward, preventing these rows from being re-read without a [reset](#common-operations). Note the limit and required parameter `stream_like_engine_allow_direct_select.` +##### 5. Create a materialized view {#5-create-a-materialized-view} -##### 6. Create the materialized view {#6-create-the-materialized-view} +To ingest data, the Kafka table engine must be attached to a materialized view. As new messages are detected in the upstream Kafka broker, the materialized view is triggered to insert data into persistent storage (i.e., the target table you created in the previous step). -The materialized view will connect the two previously created tables, reading data from the Kafka table engine and inserting it into the target merge tree table. We can do a number of data transformations. We will do a simple read and insert. The use of * assumes column names are identical (case sensitive). +In this example, the target table (`github`) has the same schema as the ingestion table (`github_queue`), so the materialized view will do a simple `SELECT *`. ```sql CREATE MATERIALIZED VIEW github_mv TO github AS @@ -195,24 +308,77 @@ SELECT * FROM github_queue; ``` -At the point of creation, the materialized view connects to the Kafka engine and commences reading: inserting rows into the target table. This process will continue indefinitely, with subsequent message inserts into Kafka being consumed. Feel free to re-run the insertion script to insert further messages to Kafka. +Once created, the materialized view connects to the Kafka table engine and kickstarts data ingestion. This process continues indefinitely: reading new data from the upstream Kafka broker, triggering the materialized view, and inserting data into the target table. -##### 7. Confirm rows have been inserted {#7-confirm-rows-have-been-inserted} +##### 6. Confirm rows have been inserted {#6-confirm-rows-have-been-inserted} -Confirm data exists in the target table: +To confirm that all messages were processed and stored in ClickHouse, run a count against your target table. ```sql SELECT count() FROM github; ``` -You should see 200,000 rows: -```response +It's important to note that the Kafka table engine processes data in discrete batches (controlled by settings like `kafka_max_block_size` and `kafka_flush_interval_ms`), which means that you might see previous state while a batch of rows is being processed, but never partially processed batches. When all data has been processed, you should see 200,000 rows: + +```sql ┌─count()─┐ │ 200000 │ └─────────┘ ``` -#### Common Operations {#common-operations} +To monitor ingestion progress and debug errors with the Kafka consumer, you can query the [`system.kafka_consumers` system table](../../../operations/system-tables/kafka_consumers). If your deployment has multiple replicas (e.g., ClickHouse Cloud), you must use the [`clusterAllReplicas`](../../../sql-reference/table-functions/cluster.md) table function. + +```sql +SELECT * FROM clusterAllReplicas('default',system.kafka_consumers) +ORDER BY assignments.partition_id ASC; +``` + +#### Common operations {#common-operations-read} + +##### Troubleshooting {#troubleshooting} + + + + +**System tables** + +To troubleshoot errors with the Kafka consumer, you can query the [`system.kafka_consumers` system table](../../../operations/system-tables/kafka_consumers). + +```sql +SELECT * FROM clusterAllReplicas('default',system.kafka_consumers) +ORDER BY assignments.partition_id ASC; +``` + +**Log files** + +Logging for the Kafka table engine is reported in the ClickHouse server logs, which are not exposed to users in ClickHouse Cloud. If you can't track down the issue using the methods above, you should [contact the ClickHouse support team](https://clickhouse.com/support/program) for server-level log analysis. + + + + +**System tables** + +To troubleshoot errors with the Kafka consumer, you can query the [`system.kafka_consumers` system table](../../../operations/system-tables/kafka_consumers). + +```sql +SELECT * FROM system.kafka_consumers +ORDER BY assignments.partition_id ASC; +``` + +If your deployment has multiple replicas, you must use the [`clusterAllReplicas`](../../../sql-reference/table-functions/cluster.md) table function. + +**Log files** + +Logging for the Kafka table engine is reported in the ClickHouse server logs. For troubleshooting, use the +Errors such as authentication issues are not reported in responses to Kafka engine DDL. For diagnosing issues, we recommend using the main ClickHouse log file clickhouse-server.err.log. You can enable further trace logging for the underlying Kafka client library ([librdkafka](https://github.com/edenhill/librdkafka)) through configuration. + +```xml + + all + +``` + + ##### Stopping & restarting message consumption {#stopping--restarting-message-consumption} @@ -228,17 +394,17 @@ This will not impact the offsets of the consumer group. To restart consumption, ATTACH TABLE github_queue; ``` -##### Adding Kafka Metadata {#adding-kafka-metadata} +##### Using Kafka metadata {#using-kafka-metadata} -It can be useful to keep track of the metadata from the original Kafka messages after it's been ingested into ClickHouse. For example, we may want to know how much of a specific topic or partition we have consumed. For this purpose, the Kafka table engine exposes several [virtual columns](../../../engines/table-engines/index.md#table_engines-virtual_columns). These can be persisted as columns in our target table by modifying our schema and materialized view's select statement. +In addition to the message value, the Kafka table engine also exposes Kafka metadata fields like the message key, headers, and others as [virtual columns](../../../engines/table-engines/index.md#table_engines-virtual_columns). These virtual columns are prefixed with `_` and can be added as columns in your target table on creation. -First, we perform the stop operation described above before adding columns to our target table. +If you want to add metadata columns to an existing target table, you must first [detach](../../../sql-reference/statements/detach.md) the Kafka table to stop data ingestion. ```sql DETACH TABLE github_queue; ``` -Below we add information columns to identify the source topic and the partition from which the row originated. +To add new columns to the target table for persisting Kafka metadata, use the [`ALTER TABLE...ADD COLUMN` statement](../../../sql-reference/statements/alter/column.md). For example: ```sql ALTER TABLE github @@ -246,11 +412,7 @@ ALTER TABLE github ADD COLUMN partition UInt64; ``` -Next, we need to ensure virtual columns are mapped as required. -Virtual columns are prefixed with `_`. -A complete listing of virtual columns can be found [here](../../../engines/table-engines/integrations/kafka.md#virtual-columns). - -To update our table with the virtual columns, we'll need to drop the materialized view, re-attach the Kafka engine table, and re-create the materialized view. +Next, you must adjust the materialized view to start consuming these metadata columns. With the Kafka table detached, it's safe to drop the materialized view, re-attach the table, and re-create the materialized view to resume ingestion. ```sql DROP VIEW github_mv; @@ -262,11 +424,11 @@ ATTACH TABLE github_queue; ```sql CREATE MATERIALIZED VIEW github_mv TO github AS -SELECT *, _topic AS topic, _partition as partition +SELECT *, _topic as topic, _partition as partition FROM github_queue; ``` -Newly consumed rows should have the metadata. +When you select from the target table, you should see the additional columns populated. ```sql SELECT actor_login, event_type, created_at, topic, partition @@ -274,8 +436,6 @@ FROM github LIMIT 10; ``` -The result looks like: - | actor_login | event_type | created_at | topic | partition | | :--- | :--- | :--- | :--- | :--- | | IgorMinar | CommitCommentEvent | 2011-02-12 02:22:00 | github | 0 | @@ -289,20 +449,13 @@ The result looks like: | jpn | CommitCommentEvent | 2011-02-12 12:24:31 | github | 0 | | Oxonium | CommitCommentEvent | 2011-02-12 12:31:28 | github | 0 | +See the [reference documentation](../../../engines/table-engines/integrations/kafka.md#virtual-columns) for a complete list of supported Kafka metadata fields. -##### Modify Kafka Engine Settings {#modify-kafka-engine-settings} - -We recommend dropping the Kafka engine table and recreating it with the new settings. The materialized view does not need to be modified during this process - message consumption will resume once the Kafka engine table is recreated. - -##### Debugging Issues {#debugging-issues} +##### Modifying table settings {#modifying-table-settings} -Errors such as authentication issues are not reported in responses to Kafka engine DDL. For diagnosing issues, we recommend using the main ClickHouse log file clickhouse-server.err.log. Further trace logging for the underlying Kafka client library [librdkafka](https://github.com/edenhill/librdkafka) can be enabled through configuration. +To modify Kafka table settings, we recommend **dropping and recreating** the table with the new configuration. The materialized view does not need to be modified - message consumption will automatically resume from the last committed offset when the table is recreated. -```xml - - all - -``` +While you can use [`ALTER TABLE...MODIFY SETTING`](../../../sql-reference//statements/alter/setting.md) for simple settings like `kafka_max_block_size`, dropping and recreating is more reliable (and often required) for significant configuration changes such as broker lists, consumer groups, topics, or authentication settings. ##### Handling malformed messages {#handling-malformed-messages} @@ -314,11 +467,11 @@ Kafka is often used as a "dumping ground" for data. This leads to topics contain ##### Delivery Semantics and challenges with duplicates {#delivery-semantics-and-challenges-with-duplicates} -The Kafka table engine has at-least-once semantics. Duplicates are possible in several known rare circumstances. For example, messages could be read from Kafka and successfully inserted into ClickHouse. Before the new offset can be committed, the connection to Kafka is lost. A retry of the block in this situation is required. The block may be [de-duplicated ](/engines/table-engines/mergetree-family/replication)using a distributed table or ReplicatedMergeTree as the target table. While this reduces the chance of duplicate rows, it relies on identical blocks. Events such as a Kafka rebalancing may invalidate this assumption, causing duplicates in rare circumstances. +The Kafka table engine has at-least-once semantics. Duplicates are possible in several known rare circumstances. For example, messages could be read from Kafka and successfully inserted into ClickHouse. Before the new offset can be committed, the connection to Kafka is lost. A retry of the block in this situation is required. The block may be [de-duplicated ](/engines/table-engines/mergetree-family/replication) using a distributed table or ReplicatedMergeTree as the target table. While this reduces the chance of duplicate rows, it relies on identical blocks. Events such as a Kafka rebalancing may invalidate this assumption, causing duplicates in rare circumstances. ##### Quorum based Inserts {#quorum-based-inserts} -You may need [quorum-based inserts](/operations/settings/settings#insert_quorum) for cases where higher delivery guarantees are required in ClickHouse. This can't be set on the materialized view or the target table. It can, however, be set for user profiles e.g. +You may need [quorum-based inserts](/operations/settings/settings#insert_quorum) for cases where higher delivery guarantees are required in ClickHouse. This can't be set on the materialized view or the target table. It can, however, be set for user profiles. For example: ```xml @@ -330,52 +483,39 @@ You may need [quorum-based inserts](/operations/settings/settings#insert_quorum) ### ClickHouse to Kafka {#clickhouse-to-kafka} -Although a rarer use case, ClickHouse data can also be persisted in Kafka. For example, we will insert rows manually into a Kafka table engine. This data will be read by the same Kafka engine, whose materialized view will place the data into a Merge Tree table. Finally, we demonstrate the application of materialized views in inserts to Kafka to read tables from existing source tables. - -#### Steps {#steps-1} +:::note +If you're on ClickHouse Cloud, it's important to note that private network connections are not supported. This means that your broker(s) must be configured for public access. +::: -Our initial objective is best illustrated: +You can use the Kafka table engine to write data from ClickHouse to Kafka topics. The engine is designed to push messages to a topic when triggered by direct inserts or updates in attached materialized views. For this reason, you should be broadly familiar with [materialized views](../../../guides/developer/cascading-materialized-views.md) when using the Kafka table engine to write data to Apache Kafka and other Kafka API-compatible brokers. Kafka table engine with inserts diagram -We assume you have the tables and views created under steps for [Kafka to ClickHouse](#kafka-to-clickhouse) and that the topic has been fully consumed. +The engine ensures reliable delivery through **at-least-once** semantics: when data is inserted into a Kafka table, the operation only succeeds after the data is successfully sent to the Kafka topic. If there is an error sending data to Kafka (e.g., network connectivity issues, Kafka broker unavailability), the engine will automatically handle retries. This means that it is possible to get duplicates in failure scenarios if, for example, data reaches Kafka but the acknowledgment is lost due to network connectivity issues, causing the operation to be retried. +#### Quickstart {#quickstart-1} -##### 1. Inserting rows directly {#1-inserting-rows-directly} +To get started writing data from ClickHouse into Kafka, follow the steps below. If you already have an existing topic you'd like to produce data to, skip to [Step 2](#2-produce-data-to-the-target-topic). -First, confirm the count of the target table. +##### 1. Create a target topic {#1-create-a-target-topic} -```sql -SELECT count() FROM github; -``` +Create a new topic in your target broker. For example, if you're running Kafka locally, you can use the built-in [Kafka CLI tools](https://docs.confluent.io/kafka/operations-tools/kafka-tools.html): -You should have 200,000 rows: -```response -┌─count()─┐ -│ 200000 │ -└─────────┘ +```bash +bin/kafka-topics.sh --bootstrap-server : --topic github_out --partitions 3 ``` -Now insert rows from the GitHub target table back into the Kafka table engine github_queue. Note how we utilize JSONEachRow format and LIMIT the select to 100. +If you're using a hosted service like Confluent Cloud, you can use the [Cloud Console](https://docs.confluent.io/cloud/current/topics/overview.html) or a client like the [Confluent CLI](https://docs.confluent.io/confluent-cli/current/install.html): -```sql -INSERT INTO github_queue SELECT * FROM github LIMIT 100 FORMAT JSONEachRow +```bash +confluent kafka topic create --if-not-exists github_out --partitions 3 ``` -Recount the row in GitHub to confirm it has increased by 100. As shown in the above diagram, rows have been inserted into Kafka via the Kafka table engine before being re-read by the same engine and inserted into the GitHub target table by our materialized view! +##### 2. Produce data to the target topic {#2-produce-data-to-the-target-topic} -```sql -SELECT count() FROM github; -``` - -You should see 100 additional rows: -```response -┌─count()─┐ -│ 200100 │ -└─────────┘ -``` +To produce data. -##### 2. Using materialized views {#2-using-materialized-views} +**Using materialized views** We can utilize materialized views to push messages to a Kafka engine (and a topic) when documents are inserted into a table. When rows are inserted into the GitHub table, a materialized view is triggered, which causes the rows to be inserted back into a Kafka engine and into a new topic. Again this is best illustrated: @@ -413,7 +553,7 @@ CREATE TABLE github_out_queue member_login LowCardinality(String) ) ENGINE = Kafka('host:port', 'github_out', 'clickhouse_out', - 'JSONEachRow') SETTINGS kafka_thread_per_consumer = 0, kafka_num_consumers = 1; + 'JSONEachRow') settings kafka_thread_per_consumer = 0, kafka_num_consumers = 1; ``` Now create a new materialized view `github_out_mv` to point at the GitHub table, inserting rows to the above engine when it triggers. Additions to the GitHub table will, as a result, be pushed to our new Kafka topic. @@ -430,6 +570,40 @@ FROM github FORMAT JsonEachRow; ``` +**Using direct inserts** + +First, confirm the count of the target table. + +```sql +SELECT count() FROM github; +``` + +You should have 200,000 rows: +```response +┌─count()─┐ +│ 200000 │ +└─────────┘ +``` + +Now insert rows from the GitHub target table back into the Kafka table engine github_queue. Note how we utilize JSONEachRow format and LIMIT the select to 100. + +```sql +INSERT INTO github_queue SELECT * FROM github LIMIT 100 FORMAT JSONEachRow +``` + +Recount the row in GitHub to confirm it has increased by 100. As shown in the above diagram, rows have been inserted into Kafka via the Kafka table engine before being re-read by the same engine and inserted into the GitHub target table by our materialized view! + +```sql +SELECT count() FROM github; +``` + +You should see 100 additional rows: +```response +┌─count()─┐ +│ 200100 │ +└─────────┘ +``` + Should you insert into the original github topic, created as part of [Kafka to ClickHouse](#kafka-to-clickhouse), documents will magically appear in the "github_clickhouse" topic. Confirm this with native Kafka tooling. For example, below, we insert 100 rows onto the github topic using [kcat](https://github.com/edenhill/kcat) for a Confluent Cloud hosted topic: ```sql @@ -459,6 +633,8 @@ wc -l Although an elaborate example, this illustrates the power of materialized views when used in conjunction with the Kafka engine. +#### Common operations {#common-operations-write} + ### Clusters and Performance {#clusters-and-performance} #### Working with ClickHouse Clusters {#working-with-clickhouse-clusters}