From 7ec79918023d31c726e83b0f434ac2766c114612 Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Wed, 17 Dec 2025 17:58:19 +0100 Subject: [PATCH 1/7] allow kafka controllers running with zookeeper metadata manager --- deploy/helm/kafka-operator/crds/crds.yaml | 9 +++- rust/operator-binary/src/config/command.rs | 2 +- rust/operator-binary/src/crd/mod.rs | 52 ++++++++++++------- rust/operator-binary/src/kafka_controller.rs | 41 ++++++--------- .../operator-binary/src/resource/configmap.rs | 2 +- 5 files changed, 59 insertions(+), 47 deletions(-) diff --git a/deploy/helm/kafka-operator/crds/crds.yaml b/deploy/helm/kafka-operator/crds/crds.yaml index 323b1c1d..ff3e5feb 100644 --- a/deploy/helm/kafka-operator/crds/crds.yaml +++ b/deploy/helm/kafka-operator/crds/crds.yaml @@ -731,6 +731,7 @@ spec: authentication: [] authorization: opa: null + metadataManager: ZooKeeper tls: internalSecretClass: tls serverSecretClass: tls @@ -793,6 +794,12 @@ spec: - configMapName type: object type: object + metadataManager: + default: ZooKeeper + enum: + - ZooKeeper + - KRaft + type: string tls: default: internalSecretClass: tls @@ -836,7 +843,7 @@ spec: Provide the name of the ZooKeeper [discovery ConfigMap](https://docs.stackable.tech/home/nightly/concepts/service_discovery) here. When using the [Stackable operator for Apache ZooKeeper](https://docs.stackable.tech/home/nightly/zookeeper/) to deploy a ZooKeeper cluster, this will simply be the name of your ZookeeperCluster resource. - This can only be used up to Kafka version 3.9.x. Since Kafka 4.0.0, ZooKeeper suppport was dropped. + This can only be used up to Kafka version 3.9.x. Since Kafka 4.0.0, ZooKeeper support was dropped. Please use the 'controller' role instead. nullable: true type: string diff --git a/rust/operator-binary/src/config/command.rs b/rust/operator-binary/src/config/command.rs index b2f31e8a..0230318c 100644 --- a/rust/operator-binary/src/config/command.rs +++ b/rust/operator-binary/src/config/command.rs @@ -52,7 +52,7 @@ fn broker_start_command( controller_descriptors: Vec, product_version: &str, ) -> String { - if kafka.is_controller_configured() { + if kafka.is_kraft_mode() { formatdoc! {" POD_INDEX=$(echo \"$POD_NAME\" | grep -oE '[0-9]+$') export REPLICA_ID=$((POD_INDEX+NODE_ID_OFFSET)) diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index e8ea4852..be2d5f17 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -23,6 +23,7 @@ use stackable_operator::{ utils::cluster_info::KubernetesClusterInfo, versioned::versioned, }; +use strum::{Display, EnumIter, EnumString}; use crate::{ config::node_id_hasher::node_id_hash32_offset, @@ -158,9 +159,12 @@ pub mod versioned { /// Provide the name of the ZooKeeper [discovery ConfigMap](DOCS_BASE_URL_PLACEHOLDER/concepts/service_discovery) /// here. When using the [Stackable operator for Apache ZooKeeper](DOCS_BASE_URL_PLACEHOLDER/zookeeper/) /// to deploy a ZooKeeper cluster, this will simply be the name of your ZookeeperCluster resource. - /// This can only be used up to Kafka version 3.9.x. Since Kafka 4.0.0, ZooKeeper suppport was dropped. + /// This can only be used up to Kafka version 3.9.x. Since Kafka 4.0.0, ZooKeeper support was dropped. /// Please use the 'controller' role instead. pub zookeeper_config_map_name: Option, + + #[serde(default = "default_metadata_manager")] + pub metadata_manager: MetadataManager, } } @@ -172,6 +176,7 @@ impl Default for v1alpha1::KafkaClusterConfig { tls: tls::default_kafka_tls(), vector_aggregator_config_map_name: None, zookeeper_config_map_name: None, + metadata_manager: default_metadata_manager(), } } } @@ -186,25 +191,8 @@ impl HasStatusCondition for v1alpha1::KafkaCluster { } impl v1alpha1::KafkaCluster { - /// Supporting Kraft alongside Zookeeper requires a couple of CRD checks - /// - If Kafka 4 and higher is used, no zookeeper config map ref has to be provided - /// - Configuring the controller role means no zookeeper config map ref has to be provided - pub fn check_kraft_vs_zookeeper(&self, product_version: &str) -> Result<(), Error> { - if product_version.starts_with("4.") && self.spec.controllers.is_none() { - return Err(Error::Kafka4RequiresKraft); - } - - if self.spec.controllers.is_some() - && self.spec.cluster_config.zookeeper_config_map_name.is_some() - { - return Err(Error::KraftAndZookeeperConfigured); - } - - Ok(()) - } - - pub fn is_controller_configured(&self) -> bool { - self.spec.controllers.is_some() + pub fn is_kraft_mode(&self) -> bool { + self.spec.cluster_config.metadata_manager == MetadataManager::KRaft } // The cluster-id for Kafka @@ -407,6 +395,30 @@ pub struct KafkaClusterStatus { pub conditions: Vec, } +#[derive( + Clone, + Debug, + Deserialize, + Display, + EnumIter, + Eq, + Hash, + JsonSchema, + PartialEq, + Serialize, + EnumString, +)] +pub enum MetadataManager { + #[strum(serialize = "zookeeper")] + ZooKeeper, + #[strum(serialize = "kraft")] + KRaft, +} + +fn default_metadata_manager() -> MetadataManager { + MetadataManager::ZooKeeper +} + #[cfg(test)] mod tests { use super::*; diff --git a/rust/operator-binary/src/kafka_controller.rs b/rust/operator-binary/src/kafka_controller.rs index aebe52c3..cc5999f1 100644 --- a/rust/operator-binary/src/kafka_controller.rs +++ b/rust/operator-binary/src/kafka_controller.rs @@ -277,11 +277,6 @@ pub async fn reconcile_kafka( .resolve(DOCKER_IMAGE_BASE_NAME, crate::built_info::PKG_VERSION) .context(ResolveProductImageSnafu)?; - // check Kraft vs ZooKeeper and fail if misconfigured - kafka - .check_kraft_vs_zookeeper(&resolved_product_image.product_version) - .context(MisconfiguredKafkaClusterSnafu)?; - let mut cluster_resources = ClusterResources::new( APP_NAME, OPERATOR_NAME, @@ -562,25 +557,23 @@ fn validated_product_config( ), ); - if kafka.is_controller_configured() { - roles.insert( - KafkaRole::Controller.to_string(), - ( - vec![ - PropertyNameKind::File(CONTROLLER_PROPERTIES_FILE.to_string()), - PropertyNameKind::File(JVM_SECURITY_PROPERTIES_FILE.to_string()), - PropertyNameKind::Env, - ], - kafka - .controller_role() - .cloned() - .context(MissingKafkaRoleSnafu { - role: KafkaRole::Controller, - })? - .erase(), - ), - ); - } + roles.insert( + KafkaRole::Controller.to_string(), + ( + vec![ + PropertyNameKind::File(CONTROLLER_PROPERTIES_FILE.to_string()), + PropertyNameKind::File(JVM_SECURITY_PROPERTIES_FILE.to_string()), + PropertyNameKind::Env, + ], + kafka + .controller_role() + .cloned() + .context(MissingKafkaRoleSnafu { + role: KafkaRole::Controller, + })? + .erase(), + ), + ); let role_config = transform_all_roles_to_config(kafka, roles).context(GenerateProductConfigSnafu)?; diff --git a/rust/operator-binary/src/resource/configmap.rs b/rust/operator-binary/src/resource/configmap.rs index ded83c59..434422bf 100644 --- a/rust/operator-binary/src/resource/configmap.rs +++ b/rust/operator-binary/src/resource/configmap.rs @@ -94,7 +94,7 @@ pub fn build_rolegroup_config_map( let kafka_config_file_name = merged_config.config_file_name(); let mut kafka_config = server_properties_file( - kafka.is_controller_configured(), + kafka.is_kraft_mode(), &rolegroup.role, pod_descriptors, listener_config, From c87b2adec23f1b7c6fea58f050af58df03340611 Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Thu, 18 Dec 2025 17:04:03 +0100 Subject: [PATCH 2/7] add zookeeper migration properties to controllers --- rust/operator-binary/src/resource/configmap.rs | 5 +++++ rust/operator-binary/src/resource/statefulset.rs | 15 +++++++++++++++ 2 files changed, 20 insertions(+) diff --git a/rust/operator-binary/src/resource/configmap.rs b/rust/operator-binary/src/resource/configmap.rs index 434422bf..f0698f57 100644 --- a/rust/operator-binary/src/resource/configmap.rs +++ b/rust/operator-binary/src/resource/configmap.rs @@ -261,6 +261,11 @@ fn server_properties_file( result.extend([(KAFKA_CONTROLLER_QUORUM_VOTERS.to_string(), kraft_voters)]); } + result.extend([( + "zookeeper.connect".to_string(), + "${env:ZOOKEEPER}".to_string(), + )]); + Ok(result) } KafkaRole::Broker => { diff --git a/rust/operator-binary/src/resource/statefulset.rs b/rust/operator-binary/src/resource/statefulset.rs index 89154dad..062ccaca 100644 --- a/rust/operator-binary/src/resource/statefulset.rs +++ b/rust/operator-binary/src/resource/statefulset.rs @@ -634,6 +634,21 @@ pub fn build_controller_rolegroup_statefulset( ..EnvVar::default() }); + if let Some(zookeeper_config_map_name) = &kafka.spec.cluster_config.zookeeper_config_map_name { + env.push(EnvVar { + name: "ZOOKEEPER".to_string(), + value_from: Some(EnvVarSource { + config_map_key_ref: Some(ConfigMapKeySelector { + name: zookeeper_config_map_name.to_string(), + key: "ZOOKEEPER".to_string(), + ..ConfigMapKeySelector::default() + }), + ..EnvVarSource::default() + }), + ..EnvVar::default() + }) + }; + cb_kafka .image_from_product_image(resolved_product_image) .command(vec![ From 7ab7872dcc4da885cdb890df2d332ac73873f5e4 Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Thu, 18 Dec 2025 17:04:58 +0100 Subject: [PATCH 3/7] validate controller role only if controllers are defined --- rust/operator-binary/src/kafka_controller.rs | 37 +++++++++++--------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/rust/operator-binary/src/kafka_controller.rs b/rust/operator-binary/src/kafka_controller.rs index cc5999f1..f5b871c2 100644 --- a/rust/operator-binary/src/kafka_controller.rs +++ b/rust/operator-binary/src/kafka_controller.rs @@ -557,23 +557,26 @@ fn validated_product_config( ), ); - roles.insert( - KafkaRole::Controller.to_string(), - ( - vec![ - PropertyNameKind::File(CONTROLLER_PROPERTIES_FILE.to_string()), - PropertyNameKind::File(JVM_SECURITY_PROPERTIES_FILE.to_string()), - PropertyNameKind::Env, - ], - kafka - .controller_role() - .cloned() - .context(MissingKafkaRoleSnafu { - role: KafkaRole::Controller, - })? - .erase(), - ), - ); + // TODO: need this if because controller_role() raises an error + if kafka.spec.controllers.is_some() { + roles.insert( + KafkaRole::Controller.to_string(), + ( + vec![ + PropertyNameKind::File(CONTROLLER_PROPERTIES_FILE.to_string()), + PropertyNameKind::File(JVM_SECURITY_PROPERTIES_FILE.to_string()), + PropertyNameKind::Env, + ], + kafka + .controller_role() + .cloned() + .context(MissingKafkaRoleSnafu { + role: KafkaRole::Controller, + })? + .erase(), + ), + ); + } let role_config = transform_all_roles_to_config(kafka, roles).context(GenerateProductConfigSnafu)?; From 928a045eb2659aee532209e9c300253e2b843d77 Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Thu, 18 Dec 2025 17:37:00 +0100 Subject: [PATCH 4/7] move cluster id to env var --- rust/operator-binary/src/config/command.rs | 9 +++------ rust/operator-binary/src/crd/mod.rs | 17 +++++++++++++++-- rust/operator-binary/src/crd/role/broker.rs | 8 ++++++-- rust/operator-binary/src/crd/role/controller.rs | 8 ++++++-- .../operator-binary/src/resource/statefulset.rs | 5 +---- 5 files changed, 31 insertions(+), 16 deletions(-) diff --git a/rust/operator-binary/src/config/command.rs b/rust/operator-binary/src/config/command.rs index 0230318c..3418a9cf 100644 --- a/rust/operator-binary/src/config/command.rs +++ b/rust/operator-binary/src/config/command.rs @@ -19,7 +19,6 @@ use crate::{ /// Returns the commands to start the main Kafka container pub fn broker_kafka_container_commands( kafka: &v1alpha1::KafkaCluster, - cluster_id: &str, controller_descriptors: Vec, kafka_security: &KafkaTlsSecurity, product_version: &str, @@ -42,13 +41,12 @@ pub fn broker_kafka_container_commands( true => format!("export KERBEROS_REALM=$(grep -oP 'default_realm = \\K.*' {STACKABLE_KERBEROS_KRB5_PATH})"), false => "".to_string(), }, - broker_start_command = broker_start_command(kafka, cluster_id, controller_descriptors, product_version), + broker_start_command = broker_start_command(kafka, controller_descriptors, product_version), } } fn broker_start_command( kafka: &v1alpha1::KafkaCluster, - cluster_id: &str, controller_descriptors: Vec, product_version: &str, ) -> String { @@ -63,7 +61,7 @@ fn broker_start_command( cp {config_dir}/jaas.properties /tmp/jaas.properties config-utils template /tmp/jaas.properties - bin/kafka-storage.sh format --cluster-id {cluster_id} --config /tmp/{properties_file} --ignore-formatted {initial_controller_command} + bin/kafka-storage.sh format --cluster-id \"$KAFKA_CLUSTER_ID\" --config /tmp/{properties_file} --ignore-formatted {initial_controller_command} bin/kafka-server-start.sh /tmp/{properties_file} & ", config_dir = STACKABLE_CONFIG_DIR, @@ -128,7 +126,6 @@ wait_for_termination() "#; pub fn controller_kafka_container_command( - cluster_id: &str, controller_descriptors: Vec, product_version: &str, ) -> String { @@ -145,7 +142,7 @@ pub fn controller_kafka_container_command( config-utils template /tmp/{properties_file} - bin/kafka-storage.sh format --cluster-id {cluster_id} --config /tmp/{properties_file} --ignore-formatted {initial_controller_command} + bin/kafka-storage.sh format --cluster-id \"$KAFKA_CLUSTER_ID\" --config /tmp/{properties_file} --ignore-formatted {initial_controller_command} bin/kafka-server-start.sh /tmp/{properties_file} & wait_for_termination $! diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index be2d5f17..f5e3d7f0 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -195,9 +195,22 @@ impl v1alpha1::KafkaCluster { self.spec.cluster_config.metadata_manager == MetadataManager::KRaft } - // The cluster-id for Kafka + /// The Kafka cluster id when running in Kraft mode. + /// + /// In ZooKeeper mode the cluster id is a UUID generated by Kafka its self and users typically + /// do not need to deal with it. + /// + /// When in Kraft mode, the cluster id is passed on an as the environment variable `KAFKA_CLUSTER_ID`. + /// + /// When migrating to Kraft mode, users *must* set this variable via `envOverrides` to the value + /// found in the `cluster/id` ZooKeeper node or in the `meta.properties` file. + /// + /// For freshly installed clusters, users do not need to deal with the cluster id. pub fn cluster_id(&self) -> Option<&str> { - self.metadata.name.as_deref() + match self.spec.cluster_config.metadata_manager { + MetadataManager::KRaft => self.metadata.name.as_deref(), + _ => None, + } } /// The name of the load-balanced Kubernetes Service providing the bootstrap address. Kafka clients will use this diff --git a/rust/operator-binary/src/crd/role/broker.rs b/rust/operator-binary/src/crd/role/broker.rs index 00d614b9..70ac85d0 100644 --- a/rust/operator-binary/src/crd/role/broker.rs +++ b/rust/operator-binary/src/crd/role/broker.rs @@ -107,11 +107,15 @@ impl Configuration for BrokerConfigFragment { fn compute_env( &self, - _resource: &Self::Configurable, + resource: &Self::Configurable, _role_name: &str, ) -> Result>, stackable_operator::product_config_utils::Error> { - Ok(BTreeMap::new()) + let mut result = BTreeMap::new(); + if let Some(cluster_id) = resource.cluster_id() { + result.insert("KAFKA_CLUSTER_ID".to_string(), Some(cluster_id.to_string())); + } + Ok(result) } fn compute_cli( diff --git a/rust/operator-binary/src/crd/role/controller.rs b/rust/operator-binary/src/crd/role/controller.rs index 5b9513a5..bf1468b6 100644 --- a/rust/operator-binary/src/crd/role/controller.rs +++ b/rust/operator-binary/src/crd/role/controller.rs @@ -97,11 +97,15 @@ impl Configuration for ControllerConfigFragment { fn compute_env( &self, - _resource: &Self::Configurable, + resource: &Self::Configurable, _role_name: &str, ) -> Result>, stackable_operator::product_config_utils::Error> { - Ok(BTreeMap::new()) + let mut result = BTreeMap::new(); + if let Some(cluster_id) = resource.cluster_id() { + result.insert("KAFKA_CLUSTER_ID".to_string(), Some(cluster_id.to_string())); + } + Ok(result) } fn compute_cli( diff --git a/rust/operator-binary/src/resource/statefulset.rs b/rust/operator-binary/src/resource/statefulset.rs index 062ccaca..a7aec39c 100644 --- a/rust/operator-binary/src/resource/statefulset.rs +++ b/rust/operator-binary/src/resource/statefulset.rs @@ -283,8 +283,6 @@ pub fn build_broker_rolegroup_statefulset( ..EnvVar::default() }); - let cluster_id = kafka.cluster_id().context(ClusterIdMissingSnafu)?; - cb_kafka .image_from_product_image(resolved_product_image) .command(vec![ @@ -296,7 +294,6 @@ pub fn build_broker_rolegroup_statefulset( ]) .args(vec![broker_kafka_container_commands( kafka, - cluster_id, // we need controller pods kafka .pod_descriptors( @@ -634,6 +631,7 @@ pub fn build_controller_rolegroup_statefulset( ..EnvVar::default() }); + // Controllers need the ZooKeeper connection string for migration if let Some(zookeeper_config_map_name) = &kafka.spec.cluster_config.zookeeper_config_map_name { env.push(EnvVar { name: "ZOOKEEPER".to_string(), @@ -659,7 +657,6 @@ pub fn build_controller_rolegroup_statefulset( "-c".to_string(), ]) .args(vec![controller_kafka_container_command( - kafka.cluster_id().context(ClusterIdMissingSnafu)?, kafka .pod_descriptors(Some(kafka_role), cluster_info, kafka_security.client_port()) .context(BuildPodDescriptorsSnafu)?, From a2d30770373d5157cb5d0cfe45c20bdfad569ec9 Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Fri, 19 Dec 2025 15:05:47 +0100 Subject: [PATCH 5/7] disable automatic broker.id generation --- rust/operator-binary/src/config/command.rs | 3 ++ rust/operator-binary/src/crd/role/mod.rs | 5 ++++ .../operator-binary/src/resource/configmap.rs | 30 +++++++++---------- 3 files changed, 23 insertions(+), 15 deletions(-) diff --git a/rust/operator-binary/src/config/command.rs b/rust/operator-binary/src/config/command.rs index 3418a9cf..bfe150b1 100644 --- a/rust/operator-binary/src/config/command.rs +++ b/rust/operator-binary/src/config/command.rs @@ -70,6 +70,9 @@ fn broker_start_command( } } else { formatdoc! {" + POD_INDEX=$(echo \"$POD_NAME\" | grep -oE '[0-9]+$') + export REPLICA_ID=$((POD_INDEX+NODE_ID_OFFSET)) + cp {config_dir}/{properties_file} /tmp/{properties_file} config-utils template /tmp/{properties_file} diff --git a/rust/operator-binary/src/crd/role/mod.rs b/rust/operator-binary/src/crd/role/mod.rs index 47210ea4..16f72083 100644 --- a/rust/operator-binary/src/crd/role/mod.rs +++ b/rust/operator-binary/src/crd/role/mod.rs @@ -33,6 +33,11 @@ use crate::{ /// Env var pub const KAFKA_NODE_ID_OFFSET: &str = "NODE_ID_OFFSET"; +/// Past versions of the operator didn't set this explicitly and allowed Kafka to generate random ids. +/// To support Kraft migration, this must be carried over to `KAFKA_NODE_ID` so the operator needs +/// to know it's value for each broker Pod. +pub const KAFKA_BROKER_ID: &str = "broker.id"; + // See: https://kafka.apache.org/documentation/#brokerconfigs /// The node ID associated with the roles this process is playing when process.roles is non-empty. /// This is required configuration when running in KRaft mode. diff --git a/rust/operator-binary/src/resource/configmap.rs b/rust/operator-binary/src/resource/configmap.rs index f0698f57..11c8ae73 100644 --- a/rust/operator-binary/src/resource/configmap.rs +++ b/rust/operator-binary/src/resource/configmap.rs @@ -19,9 +19,10 @@ use crate::{ STACKABLE_LISTENER_BROKER_DIR, listener::{KafkaListenerConfig, KafkaListenerName, node_address_cmd}, role::{ - AnyConfig, KAFKA_ADVERTISED_LISTENERS, KAFKA_CONTROLLER_QUORUM_BOOTSTRAP_SERVERS, - KAFKA_CONTROLLER_QUORUM_VOTERS, KAFKA_LISTENER_SECURITY_PROTOCOL_MAP, KAFKA_LISTENERS, - KAFKA_LOG_DIRS, KAFKA_NODE_ID, KAFKA_PROCESS_ROLES, KafkaRole, + AnyConfig, KAFKA_ADVERTISED_LISTENERS, KAFKA_BROKER_ID, + KAFKA_CONTROLLER_QUORUM_BOOTSTRAP_SERVERS, KAFKA_CONTROLLER_QUORUM_VOTERS, + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP, KAFKA_LISTENERS, KAFKA_LOG_DIRS, KAFKA_NODE_ID, + KAFKA_PROCESS_ROLES, KafkaRole, }, security::KafkaTlsSecurity, v1alpha1, @@ -99,7 +100,6 @@ pub fn build_rolegroup_config_map( pod_descriptors, listener_config, opa_connect_string, - resolved_product_image.product_version.starts_with("3.7"), // needs_quorum_voters )?; match merged_config { @@ -213,7 +213,6 @@ fn server_properties_file( pod_descriptors: &[KafkaPodDescriptor], listener_config: &KafkaListenerConfig, opa_connect_string: Option<&str>, - needs_quorum_voters: bool, ) -> Result, Error> { let kraft_controllers = kraft_controllers(pod_descriptors); @@ -254,12 +253,10 @@ fn server_properties_file( .unwrap_or("".to_string())), ]); - if needs_quorum_voters { - let kraft_voters = - kraft_voters(pod_descriptors).context(NoKraftControllersFoundSnafu)?; + let kraft_voters = + kraft_voters(pod_descriptors).context(NoKraftControllersFoundSnafu)?; - result.extend([(KAFKA_CONTROLLER_QUORUM_VOTERS.to_string(), kraft_voters)]); - } + result.extend([(KAFKA_CONTROLLER_QUORUM_VOTERS.to_string(), kraft_voters)]); result.extend([( "zookeeper.connect".to_string(), @@ -283,6 +280,11 @@ fn server_properties_file( KAFKA_LISTENER_SECURITY_PROTOCOL_MAP.to_string(), listener_config.listener_security_protocol_map(), ), + ( + "broker.id.generation.enable".to_string(), + "false".to_string(), + ), + (KAFKA_BROKER_ID.to_string(), "${env:REPLICA_ID}".to_string()), ]); if kraft_mode { @@ -305,12 +307,10 @@ fn server_properties_file( ), ]); - if needs_quorum_voters { - let kraft_voters = - kraft_voters(pod_descriptors).context(NoKraftControllersFoundSnafu)?; + let kraft_voters = + kraft_voters(pod_descriptors).context(NoKraftControllersFoundSnafu)?; - result.extend([(KAFKA_CONTROLLER_QUORUM_VOTERS.to_string(), kraft_voters)]); - } + result.extend([(KAFKA_CONTROLLER_QUORUM_VOTERS.to_string(), kraft_voters)]); } else { // Running with ZooKeeper enabled result.extend([( From 88f319d9aaff40acfbb38ba662cb0fb270a3c015 Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Fri, 19 Dec 2025 18:18:48 +0100 Subject: [PATCH 6/7] disable default tls --- deploy/helm/kafka-operator/crds/crds.yaml | 9 +-------- rust/operator-binary/src/crd/listener.rs | 13 +++++-------- rust/operator-binary/src/crd/mod.rs | 10 ++++------ rust/operator-binary/src/crd/security.rs | 15 +++++---------- rust/operator-binary/src/crd/tls.rs | 15 +++++---------- 5 files changed, 20 insertions(+), 42 deletions(-) diff --git a/deploy/helm/kafka-operator/crds/crds.yaml b/deploy/helm/kafka-operator/crds/crds.yaml index ff3e5feb..6bab087b 100644 --- a/deploy/helm/kafka-operator/crds/crds.yaml +++ b/deploy/helm/kafka-operator/crds/crds.yaml @@ -732,9 +732,6 @@ spec: authorization: opa: null metadataManager: ZooKeeper - tls: - internalSecretClass: tls - serverSecretClass: tls zookeeperConfigMapName: null description: |- Kafka settings that affect all roles and role groups. @@ -801,14 +798,10 @@ spec: - KRaft type: string tls: - default: - internalSecretClass: tls - serverSecretClass: tls description: TLS encryption settings for Kafka (server, internal). nullable: true properties: internalSecretClass: - default: tls description: |- The [SecretClass](https://docs.stackable.tech/home/nightly/secret-operator/secretclass.html) to use for internal broker communication. Use mutual verification between brokers (mandatory). @@ -817,9 +810,9 @@ spec: - Which ca.crt to use when validating the other brokers Defaults to `tls` + nullable: true type: string serverSecretClass: - default: tls description: |- The [SecretClass](https://docs.stackable.tech/home/nightly/secret-operator/secretclass.html) to use for client connections. This setting controls: diff --git a/rust/operator-binary/src/crd/listener.rs b/rust/operator-binary/src/crd/listener.rs index 97b15b85..abc257b5 100644 --- a/rust/operator-binary/src/crd/listener.rs +++ b/rust/operator-binary/src/crd/listener.rs @@ -401,7 +401,7 @@ mod tests { ), }, }]), - "internalTls".to_string(), + Some("internalTls".to_string()), Some("tls".to_string()), ); let cluster_info = default_cluster_info(); @@ -460,7 +460,7 @@ mod tests { let kafka_security = KafkaTlsSecurity::new( ResolvedAuthenticationClasses::new(vec![]), - "tls".to_string(), + Some("tls".to_string()), Some("tls".to_string()), ); let config = @@ -514,11 +514,8 @@ mod tests { ) ); - let kafka_security = KafkaTlsSecurity::new( - ResolvedAuthenticationClasses::new(vec![]), - "".to_string(), - None, - ); + let kafka_security = + KafkaTlsSecurity::new(ResolvedAuthenticationClasses::new(vec![]), None, None); let config = get_kafka_listener_config(&kafka, &kafka_security, &rolegroup_ref, &cluster_info) @@ -603,7 +600,7 @@ mod tests { ), }, }]), - "tls".to_string(), + Some("tls".to_string()), Some("tls".to_string()), ); let cluster_info = default_cluster_info(); diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index f5e3d7f0..b015a3d6 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -445,15 +445,13 @@ mod tests { .and_then(|tls| tls.server_secret_class.clone()) } - fn get_internal_secret_class(kafka: &v1alpha1::KafkaCluster) -> String { + fn get_internal_secret_class(kafka: &v1alpha1::KafkaCluster) -> Option { kafka .spec .cluster_config .tls .as_ref() - .unwrap() - .internal_secret_class - .clone() + .and_then(|tls| tls.internal_secret_class.clone()) } #[test] @@ -542,7 +540,7 @@ mod tests { assert_eq!(get_server_secret_class(&kafka), tls::server_tls_default()); assert_eq!( get_internal_secret_class(&kafka), - "simple-kafka-internal-tls".to_string() + Some("simple-kafka-internal-tls".to_string()) ); } @@ -585,7 +583,7 @@ mod tests { assert_eq!(get_server_secret_class(&kafka), tls::server_tls_default()); assert_eq!( get_internal_secret_class(&kafka), - "simple-kafka-internal-tls".to_string() + Some("simple-kafka-internal-tls".to_string()) ); let input = r#" diff --git a/rust/operator-binary/src/crd/security.rs b/rust/operator-binary/src/crd/security.rs index b729386a..c20f3e7f 100644 --- a/rust/operator-binary/src/crd/security.rs +++ b/rust/operator-binary/src/crd/security.rs @@ -29,7 +29,7 @@ use crate::crd::{ authentication::{self, ResolvedAuthenticationClasses}, listener::{self, KafkaListenerName, node_address_cmd_env, node_port_cmd_env}, role::KafkaRole, - tls, v1alpha1, + v1alpha1, }; #[derive(Snafu, Debug)] @@ -57,7 +57,7 @@ pub enum Error { /// Helper struct combining TLS settings for server and internal with the resolved AuthenticationClasses pub struct KafkaTlsSecurity { resolved_authentication_classes: ResolvedAuthenticationClasses, - internal_secret_class: String, + internal_secret_class: Option, server_secret_class: Option, } @@ -92,7 +92,7 @@ impl KafkaTlsSecurity { #[cfg(test)] pub fn new( resolved_authentication_classes: ResolvedAuthenticationClasses, - internal_secret_class: String, + internal_secret_class: Option, server_secret_class: Option, ) -> Self { Self { @@ -120,8 +120,7 @@ impl KafkaTlsSecurity { .cluster_config .tls .as_ref() - .map(|tls| tls.internal_secret_class.clone()) - .unwrap_or_else(tls::internal_tls_default), + .and_then(|tls| tls.internal_secret_class.clone()), server_secret_class: kafka .spec .cluster_config @@ -155,11 +154,7 @@ impl KafkaTlsSecurity { /// Retrieve the mandatory internal `SecretClass`. pub fn tls_internal_secret_class(&self) -> Option<&str> { - if !self.internal_secret_class.is_empty() { - Some(self.internal_secret_class.as_str()) - } else { - None - } + self.internal_secret_class.as_deref() } pub fn has_kerberos_enabled(&self) -> bool { diff --git a/rust/operator-binary/src/crd/tls.rs b/rust/operator-binary/src/crd/tls.rs index 94843601..08ebe3d3 100644 --- a/rust/operator-binary/src/crd/tls.rs +++ b/rust/operator-binary/src/crd/tls.rs @@ -1,8 +1,6 @@ use serde::{Deserialize, Serialize}; use stackable_operator::schemars::{self, JsonSchema}; -const TLS_DEFAULT_SECRET_CLASS: &str = "tls"; - #[derive(Clone, Deserialize, Debug, Eq, JsonSchema, PartialEq, Serialize)] #[serde(rename_all = "camelCase")] pub struct KafkaTls { @@ -14,7 +12,7 @@ pub struct KafkaTls { /// /// Defaults to `tls` #[serde(default = "internal_tls_default")] - pub internal_secret_class: String, + pub internal_secret_class: Option, /// The [SecretClass](DOCS_BASE_URL_PLACEHOLDER/secret-operator/secretclass.html) to use for /// client connections. This setting controls: /// - If TLS encryption is used at all @@ -31,18 +29,15 @@ pub struct KafkaTls { /// Default TLS settings. /// Internal and server communication default to `tls` secret class. pub fn default_kafka_tls() -> Option { - Some(KafkaTls { - internal_secret_class: internal_tls_default(), - server_secret_class: server_tls_default(), - }) + None } /// Helper methods to provide defaults in the CRDs and tests -pub fn internal_tls_default() -> String { - TLS_DEFAULT_SECRET_CLASS.into() +pub fn internal_tls_default() -> Option { + None } /// Helper methods to provide defaults in the CRDs and tests pub fn server_tls_default() -> Option { - Some(TLS_DEFAULT_SECRET_CLASS.into()) + None } From 7250bd96f34dd1d02166ebb6c05a236a3e9172bd Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Fri, 19 Dec 2025 18:19:19 +0100 Subject: [PATCH 7/7] add test manifests --- examples/kraft-migration/01-setup.yaml | 87 +++++++++++++++++++ .../kraft-migration/02-start-controllers.yaml | 33 +++++++ .../kraft-migration/03-migrate-metadata.yaml | 47 ++++++++++ 3 files changed, 167 insertions(+) create mode 100644 examples/kraft-migration/01-setup.yaml create mode 100644 examples/kraft-migration/02-start-controllers.yaml create mode 100644 examples/kraft-migration/03-migrate-metadata.yaml diff --git a/examples/kraft-migration/01-setup.yaml b/examples/kraft-migration/01-setup.yaml new file mode 100644 index 00000000..c290270e --- /dev/null +++ b/examples/kraft-migration/01-setup.yaml @@ -0,0 +1,87 @@ +--- +apiVersion: v1 +kind: Namespace +metadata: + labels: + stackable.tech/vendor: Stackable + name: kraft-migration +--- +apiVersion: zookeeper.stackable.tech/v1alpha1 +kind: ZookeeperCluster +metadata: + name: simple-zk + namespace: kraft-migration +spec: + image: + productVersion: 3.8.3 + pullPolicy: IfNotPresent + servers: + roleGroups: + default: + replicas: 3 +--- +apiVersion: zookeeper.stackable.tech/v1alpha1 +kind: ZookeeperZnode +metadata: + name: simple-kafka-znode + namespace: kraft-migration +spec: + clusterRef: + name: simple-zk +# --- +# apiVersion: secrets.stackable.tech/v1alpha1 +# kind: SecretClass +# metadata: +# name: kafka-internal-tls +# spec: +# backend: +# autoTls: +# ca: +# secret: +# name: secret-provisioner-kafka-internal-tls-ca +# namespace: kraft-migration +# autoGenerate: true +# --- +# apiVersion: authentication.stackable.tech/v1alpha1 +# kind: AuthenticationClass +# metadata: +# name: kafka-client-auth-tls +# spec: +# provider: +# tls: +# clientCertSecretClass: kafka-client-auth-secret +# --- +# apiVersion: secrets.stackable.tech/v1alpha1 +# kind: SecretClass +# metadata: +# name: kafka-client-auth-secret +# spec: +# backend: +# autoTls: +# ca: +# secret: +# name: secret-provisioner-tls-kafka-client-ca +# namespace: kraft-migration +# autoGenerate: true +--- +apiVersion: kafka.stackable.tech/v1alpha1 +kind: KafkaCluster +metadata: + name: simple-kafka + namespace: kraft-migration +spec: + image: + productVersion: 3.9.1 + pullPolicy: IfNotPresent + clusterConfig: + metadataManager: ZooKeeper + # authentication: + # - authenticationClass: kafka-client-auth-tls + # tls: + # internalSecretClass: kafka-internal-tls + # serverSecretClass: tls + zookeeperConfigMapName: simple-kafka-znode + brokers: + roleGroups: + default: + replicas: 3 diff --git a/examples/kraft-migration/02-start-controllers.yaml b/examples/kraft-migration/02-start-controllers.yaml new file mode 100644 index 00000000..6c32ff96 --- /dev/null +++ b/examples/kraft-migration/02-start-controllers.yaml @@ -0,0 +1,33 @@ +--- +apiVersion: kafka.stackable.tech/v1alpha1 +kind: KafkaCluster +metadata: + name: simple-kafka + namespace: kraft-migration +spec: + image: + productVersion: 3.9.1 + pullPolicy: IfNotPresent + clusterConfig: + metadataManager: ZooKeeper + # authentication: + # - authenticationClass: kafka-client-auth-tls + # tls: + # internalSecretClass: kafka-internal-tls + # serverSecretClass: tls + zookeeperConfigMapName: simple-kafka-znode + brokers: + envOverrides: + KAFKA_CLUSTER_ID: "lyeJYZ7TQ_SfT4HcU8W3iw" + roleGroups: + default: + replicas: 3 + controllers: + roleGroups: + default: + replicas: 3 + envOverrides: + KAFKA_CLUSTER_ID: "lyeJYZ7TQ_SfT4HcU8W3iw" + configOverrides: + controller.properties: + zookeeper.metadata.migration.enable: "true" # Enable migration mode so the controller can read metadata from ZooKeeper. diff --git a/examples/kraft-migration/03-migrate-metadata.yaml b/examples/kraft-migration/03-migrate-metadata.yaml new file mode 100644 index 00000000..e971a6e2 --- /dev/null +++ b/examples/kraft-migration/03-migrate-metadata.yaml @@ -0,0 +1,47 @@ +--- +apiVersion: kafka.stackable.tech/v1alpha1 +kind: KafkaCluster +metadata: + name: simple-kafka + namespace: kraft-migration +spec: + image: + productVersion: 3.9.1 + pullPolicy: IfNotPresent + clusterConfig: + metadataManager: ZooKeeper + # authentication: + # - authenticationClass: kafka-client-auth-tls + # tls: + # internalSecretClass: kafka-internal-tls + # serverSecretClass: tls + zookeeperConfigMapName: simple-kafka-znode + brokers: + envOverrides: + KAFKA_CLUSTER_ID: "lyeJYZ7TQ_SfT4HcU8W3iw" + roleGroups: + default: + replicas: 3 + configOverrides: + broker.properties: + inter.broker.protocol.version: "3.9" # - Latest value known to Kafka 3.9.1 + zookeeper.metadata.migration.enable: "true" # - Enable migration mode so the broker can participate in metadata migration. + controller.listener.names: "CONTROLLER" + controller.quorum.voters: "2110489703@simple-kafka-controller-default-0.simple-kafka-controller-default-headless.kraft-migration.svc.cluster.local:9092,2110489704@simple-kafka-controller-default-1.simple-kafka-controller-default-headless.kraft-migration.svc.cluster.local:9092,2110489705@simple-kafka-controller-default-2.simple-kafka-controller-default-headless.kraft-migration.svc.cluster.local:9092" + + # listener.security.protocol.map: CONTROLLER:SSL,... - Already defined by the operator + # zookeeper.connect= (should already be present) - The ZooKeeper connection string. This property should already be configured. + # controller.quorum.voters= (same as controllers) - Specify the same controller quorum voters string as configured in phase 2. + # controller.listener.names=CONTROLLER - Define the listener name for the controller. + # Add CONTROLLER to listener.security.protocol.map (for example, ...CONTROLLER:PLAINTEXT) - Add the CONTROLLER listener to the security protocol map with the appropriate security protocol. + # confluent.cluster.link.metadata.topic.enable=true - This property is used by Cluster Linking during the migration. + + controllers: + roleGroups: + default: + replicas: 3 + envOverrides: + KAFKA_CLUSTER_ID: "lyeJYZ7TQ_SfT4HcU8W3iw" + configOverrides: + controller.properties: + zookeeper.metadata.migration.enable: "true" # Enable migration mode so the controller can read metadata from ZooKeeper.