diff --git a/deploy/helm/kafka-operator/crds/crds.yaml b/deploy/helm/kafka-operator/crds/crds.yaml index 323b1c1d..6bab087b 100644 --- a/deploy/helm/kafka-operator/crds/crds.yaml +++ b/deploy/helm/kafka-operator/crds/crds.yaml @@ -731,9 +731,7 @@ spec: authentication: [] authorization: opa: null - tls: - internalSecretClass: tls - serverSecretClass: tls + metadataManager: ZooKeeper zookeeperConfigMapName: null description: |- Kafka settings that affect all roles and role groups. @@ -793,15 +791,17 @@ spec: - configMapName type: object type: object + metadataManager: + default: ZooKeeper + enum: + - ZooKeeper + - KRaft + type: string tls: - default: - internalSecretClass: tls - serverSecretClass: tls description: TLS encryption settings for Kafka (server, internal). nullable: true properties: internalSecretClass: - default: tls description: |- The [SecretClass](https://docs.stackable.tech/home/nightly/secret-operator/secretclass.html) to use for internal broker communication. Use mutual verification between brokers (mandatory). @@ -810,9 +810,9 @@ spec: - Which ca.crt to use when validating the other brokers Defaults to `tls` + nullable: true type: string serverSecretClass: - default: tls description: |- The [SecretClass](https://docs.stackable.tech/home/nightly/secret-operator/secretclass.html) to use for client connections. This setting controls: @@ -836,7 +836,7 @@ spec: Provide the name of the ZooKeeper [discovery ConfigMap](https://docs.stackable.tech/home/nightly/concepts/service_discovery) here. When using the [Stackable operator for Apache ZooKeeper](https://docs.stackable.tech/home/nightly/zookeeper/) to deploy a ZooKeeper cluster, this will simply be the name of your ZookeeperCluster resource. - This can only be used up to Kafka version 3.9.x. Since Kafka 4.0.0, ZooKeeper suppport was dropped. + This can only be used up to Kafka version 3.9.x. Since Kafka 4.0.0, ZooKeeper support was dropped. Please use the 'controller' role instead. nullable: true type: string diff --git a/examples/kraft-migration/01-setup.yaml b/examples/kraft-migration/01-setup.yaml new file mode 100644 index 00000000..c290270e --- /dev/null +++ b/examples/kraft-migration/01-setup.yaml @@ -0,0 +1,87 @@ +--- +apiVersion: v1 +kind: Namespace +metadata: + labels: + stackable.tech/vendor: Stackable + name: kraft-migration +--- +apiVersion: zookeeper.stackable.tech/v1alpha1 +kind: ZookeeperCluster +metadata: + name: simple-zk + namespace: kraft-migration +spec: + image: + productVersion: 3.8.3 + pullPolicy: IfNotPresent + servers: + roleGroups: + default: + replicas: 3 +--- +apiVersion: zookeeper.stackable.tech/v1alpha1 +kind: ZookeeperZnode +metadata: + name: simple-kafka-znode + namespace: kraft-migration +spec: + clusterRef: + name: simple-zk +# --- +# apiVersion: secrets.stackable.tech/v1alpha1 +# kind: SecretClass +# metadata: +# name: kafka-internal-tls +# spec: +# backend: +# autoTls: +# ca: +# secret: +# name: secret-provisioner-kafka-internal-tls-ca +# namespace: kraft-migration +# autoGenerate: true +# --- +# apiVersion: authentication.stackable.tech/v1alpha1 +# kind: AuthenticationClass +# metadata: +# name: kafka-client-auth-tls +# spec: +# provider: +# tls: +# clientCertSecretClass: kafka-client-auth-secret +# --- +# apiVersion: secrets.stackable.tech/v1alpha1 +# kind: SecretClass +# metadata: +# name: kafka-client-auth-secret +# spec: +# backend: +# autoTls: +# ca: +# secret: +# name: secret-provisioner-tls-kafka-client-ca +# namespace: kraft-migration +# autoGenerate: true +--- +apiVersion: kafka.stackable.tech/v1alpha1 +kind: KafkaCluster +metadata: + name: simple-kafka + namespace: kraft-migration +spec: + image: + productVersion: 3.9.1 + pullPolicy: IfNotPresent + clusterConfig: + metadataManager: ZooKeeper + # authentication: + # - authenticationClass: kafka-client-auth-tls + # tls: + # internalSecretClass: kafka-internal-tls + # serverSecretClass: tls + zookeeperConfigMapName: simple-kafka-znode + brokers: + roleGroups: + default: + replicas: 3 diff --git a/examples/kraft-migration/02-start-controllers.yaml b/examples/kraft-migration/02-start-controllers.yaml new file mode 100644 index 00000000..6c32ff96 --- /dev/null +++ b/examples/kraft-migration/02-start-controllers.yaml @@ -0,0 +1,33 @@ +--- +apiVersion: kafka.stackable.tech/v1alpha1 +kind: KafkaCluster +metadata: + name: simple-kafka + namespace: kraft-migration +spec: + image: + productVersion: 3.9.1 + pullPolicy: IfNotPresent + clusterConfig: + metadataManager: ZooKeeper + # authentication: + # - authenticationClass: kafka-client-auth-tls + # tls: + # internalSecretClass: kafka-internal-tls + # serverSecretClass: tls + zookeeperConfigMapName: simple-kafka-znode + brokers: + envOverrides: + KAFKA_CLUSTER_ID: "lyeJYZ7TQ_SfT4HcU8W3iw" + roleGroups: + default: + replicas: 3 + controllers: + roleGroups: + default: + replicas: 3 + envOverrides: + KAFKA_CLUSTER_ID: "lyeJYZ7TQ_SfT4HcU8W3iw" + configOverrides: + controller.properties: + zookeeper.metadata.migration.enable: "true" # Enable migration mode so the controller can read metadata from ZooKeeper. diff --git a/examples/kraft-migration/03-migrate-metadata.yaml b/examples/kraft-migration/03-migrate-metadata.yaml new file mode 100644 index 00000000..e971a6e2 --- /dev/null +++ b/examples/kraft-migration/03-migrate-metadata.yaml @@ -0,0 +1,47 @@ +--- +apiVersion: kafka.stackable.tech/v1alpha1 +kind: KafkaCluster +metadata: + name: simple-kafka + namespace: kraft-migration +spec: + image: + productVersion: 3.9.1 + pullPolicy: IfNotPresent + clusterConfig: + metadataManager: ZooKeeper + # authentication: + # - authenticationClass: kafka-client-auth-tls + # tls: + # internalSecretClass: kafka-internal-tls + # serverSecretClass: tls + zookeeperConfigMapName: simple-kafka-znode + brokers: + envOverrides: + KAFKA_CLUSTER_ID: "lyeJYZ7TQ_SfT4HcU8W3iw" + roleGroups: + default: + replicas: 3 + configOverrides: + broker.properties: + inter.broker.protocol.version: "3.9" # - Latest value known to Kafka 3.9.1 + zookeeper.metadata.migration.enable: "true" # - Enable migration mode so the broker can participate in metadata migration. + controller.listener.names: "CONTROLLER" + controller.quorum.voters: "2110489703@simple-kafka-controller-default-0.simple-kafka-controller-default-headless.kraft-migration.svc.cluster.local:9092,2110489704@simple-kafka-controller-default-1.simple-kafka-controller-default-headless.kraft-migration.svc.cluster.local:9092,2110489705@simple-kafka-controller-default-2.simple-kafka-controller-default-headless.kraft-migration.svc.cluster.local:9092" + + # listener.security.protocol.map: CONTROLLER:SSL,... - Already defined by the operator + # zookeeper.connect= (should already be present) - The ZooKeeper connection string. This property should already be configured. + # controller.quorum.voters= (same as controllers) - Specify the same controller quorum voters string as configured in phase 2. + # controller.listener.names=CONTROLLER - Define the listener name for the controller. + # Add CONTROLLER to listener.security.protocol.map (for example, ...CONTROLLER:PLAINTEXT) - Add the CONTROLLER listener to the security protocol map with the appropriate security protocol. + # confluent.cluster.link.metadata.topic.enable=true - This property is used by Cluster Linking during the migration. + + controllers: + roleGroups: + default: + replicas: 3 + envOverrides: + KAFKA_CLUSTER_ID: "lyeJYZ7TQ_SfT4HcU8W3iw" + configOverrides: + controller.properties: + zookeeper.metadata.migration.enable: "true" # Enable migration mode so the controller can read metadata from ZooKeeper. diff --git a/rust/operator-binary/src/config/command.rs b/rust/operator-binary/src/config/command.rs index b2f31e8a..bfe150b1 100644 --- a/rust/operator-binary/src/config/command.rs +++ b/rust/operator-binary/src/config/command.rs @@ -19,7 +19,6 @@ use crate::{ /// Returns the commands to start the main Kafka container pub fn broker_kafka_container_commands( kafka: &v1alpha1::KafkaCluster, - cluster_id: &str, controller_descriptors: Vec, kafka_security: &KafkaTlsSecurity, product_version: &str, @@ -42,17 +41,16 @@ pub fn broker_kafka_container_commands( true => format!("export KERBEROS_REALM=$(grep -oP 'default_realm = \\K.*' {STACKABLE_KERBEROS_KRB5_PATH})"), false => "".to_string(), }, - broker_start_command = broker_start_command(kafka, cluster_id, controller_descriptors, product_version), + broker_start_command = broker_start_command(kafka, controller_descriptors, product_version), } } fn broker_start_command( kafka: &v1alpha1::KafkaCluster, - cluster_id: &str, controller_descriptors: Vec, product_version: &str, ) -> String { - if kafka.is_controller_configured() { + if kafka.is_kraft_mode() { formatdoc! {" POD_INDEX=$(echo \"$POD_NAME\" | grep -oE '[0-9]+$') export REPLICA_ID=$((POD_INDEX+NODE_ID_OFFSET)) @@ -63,7 +61,7 @@ fn broker_start_command( cp {config_dir}/jaas.properties /tmp/jaas.properties config-utils template /tmp/jaas.properties - bin/kafka-storage.sh format --cluster-id {cluster_id} --config /tmp/{properties_file} --ignore-formatted {initial_controller_command} + bin/kafka-storage.sh format --cluster-id \"$KAFKA_CLUSTER_ID\" --config /tmp/{properties_file} --ignore-formatted {initial_controller_command} bin/kafka-server-start.sh /tmp/{properties_file} & ", config_dir = STACKABLE_CONFIG_DIR, @@ -72,6 +70,9 @@ fn broker_start_command( } } else { formatdoc! {" + POD_INDEX=$(echo \"$POD_NAME\" | grep -oE '[0-9]+$') + export REPLICA_ID=$((POD_INDEX+NODE_ID_OFFSET)) + cp {config_dir}/{properties_file} /tmp/{properties_file} config-utils template /tmp/{properties_file} @@ -128,7 +129,6 @@ wait_for_termination() "#; pub fn controller_kafka_container_command( - cluster_id: &str, controller_descriptors: Vec, product_version: &str, ) -> String { @@ -145,7 +145,7 @@ pub fn controller_kafka_container_command( config-utils template /tmp/{properties_file} - bin/kafka-storage.sh format --cluster-id {cluster_id} --config /tmp/{properties_file} --ignore-formatted {initial_controller_command} + bin/kafka-storage.sh format --cluster-id \"$KAFKA_CLUSTER_ID\" --config /tmp/{properties_file} --ignore-formatted {initial_controller_command} bin/kafka-server-start.sh /tmp/{properties_file} & wait_for_termination $! diff --git a/rust/operator-binary/src/crd/listener.rs b/rust/operator-binary/src/crd/listener.rs index 97b15b85..abc257b5 100644 --- a/rust/operator-binary/src/crd/listener.rs +++ b/rust/operator-binary/src/crd/listener.rs @@ -401,7 +401,7 @@ mod tests { ), }, }]), - "internalTls".to_string(), + Some("internalTls".to_string()), Some("tls".to_string()), ); let cluster_info = default_cluster_info(); @@ -460,7 +460,7 @@ mod tests { let kafka_security = KafkaTlsSecurity::new( ResolvedAuthenticationClasses::new(vec![]), - "tls".to_string(), + Some("tls".to_string()), Some("tls".to_string()), ); let config = @@ -514,11 +514,8 @@ mod tests { ) ); - let kafka_security = KafkaTlsSecurity::new( - ResolvedAuthenticationClasses::new(vec![]), - "".to_string(), - None, - ); + let kafka_security = + KafkaTlsSecurity::new(ResolvedAuthenticationClasses::new(vec![]), None, None); let config = get_kafka_listener_config(&kafka, &kafka_security, &rolegroup_ref, &cluster_info) @@ -603,7 +600,7 @@ mod tests { ), }, }]), - "tls".to_string(), + Some("tls".to_string()), Some("tls".to_string()), ); let cluster_info = default_cluster_info(); diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index e8ea4852..b015a3d6 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -23,6 +23,7 @@ use stackable_operator::{ utils::cluster_info::KubernetesClusterInfo, versioned::versioned, }; +use strum::{Display, EnumIter, EnumString}; use crate::{ config::node_id_hasher::node_id_hash32_offset, @@ -158,9 +159,12 @@ pub mod versioned { /// Provide the name of the ZooKeeper [discovery ConfigMap](DOCS_BASE_URL_PLACEHOLDER/concepts/service_discovery) /// here. When using the [Stackable operator for Apache ZooKeeper](DOCS_BASE_URL_PLACEHOLDER/zookeeper/) /// to deploy a ZooKeeper cluster, this will simply be the name of your ZookeeperCluster resource. - /// This can only be used up to Kafka version 3.9.x. Since Kafka 4.0.0, ZooKeeper suppport was dropped. + /// This can only be used up to Kafka version 3.9.x. Since Kafka 4.0.0, ZooKeeper support was dropped. /// Please use the 'controller' role instead. pub zookeeper_config_map_name: Option, + + #[serde(default = "default_metadata_manager")] + pub metadata_manager: MetadataManager, } } @@ -172,6 +176,7 @@ impl Default for v1alpha1::KafkaClusterConfig { tls: tls::default_kafka_tls(), vector_aggregator_config_map_name: None, zookeeper_config_map_name: None, + metadata_manager: default_metadata_manager(), } } } @@ -186,30 +191,26 @@ impl HasStatusCondition for v1alpha1::KafkaCluster { } impl v1alpha1::KafkaCluster { - /// Supporting Kraft alongside Zookeeper requires a couple of CRD checks - /// - If Kafka 4 and higher is used, no zookeeper config map ref has to be provided - /// - Configuring the controller role means no zookeeper config map ref has to be provided - pub fn check_kraft_vs_zookeeper(&self, product_version: &str) -> Result<(), Error> { - if product_version.starts_with("4.") && self.spec.controllers.is_none() { - return Err(Error::Kafka4RequiresKraft); - } - - if self.spec.controllers.is_some() - && self.spec.cluster_config.zookeeper_config_map_name.is_some() - { - return Err(Error::KraftAndZookeeperConfigured); - } - - Ok(()) - } - - pub fn is_controller_configured(&self) -> bool { - self.spec.controllers.is_some() + pub fn is_kraft_mode(&self) -> bool { + self.spec.cluster_config.metadata_manager == MetadataManager::KRaft } - // The cluster-id for Kafka + /// The Kafka cluster id when running in Kraft mode. + /// + /// In ZooKeeper mode the cluster id is a UUID generated by Kafka its self and users typically + /// do not need to deal with it. + /// + /// When in Kraft mode, the cluster id is passed on an as the environment variable `KAFKA_CLUSTER_ID`. + /// + /// When migrating to Kraft mode, users *must* set this variable via `envOverrides` to the value + /// found in the `cluster/id` ZooKeeper node or in the `meta.properties` file. + /// + /// For freshly installed clusters, users do not need to deal with the cluster id. pub fn cluster_id(&self) -> Option<&str> { - self.metadata.name.as_deref() + match self.spec.cluster_config.metadata_manager { + MetadataManager::KRaft => self.metadata.name.as_deref(), + _ => None, + } } /// The name of the load-balanced Kubernetes Service providing the bootstrap address. Kafka clients will use this @@ -407,6 +408,30 @@ pub struct KafkaClusterStatus { pub conditions: Vec, } +#[derive( + Clone, + Debug, + Deserialize, + Display, + EnumIter, + Eq, + Hash, + JsonSchema, + PartialEq, + Serialize, + EnumString, +)] +pub enum MetadataManager { + #[strum(serialize = "zookeeper")] + ZooKeeper, + #[strum(serialize = "kraft")] + KRaft, +} + +fn default_metadata_manager() -> MetadataManager { + MetadataManager::ZooKeeper +} + #[cfg(test)] mod tests { use super::*; @@ -420,15 +445,13 @@ mod tests { .and_then(|tls| tls.server_secret_class.clone()) } - fn get_internal_secret_class(kafka: &v1alpha1::KafkaCluster) -> String { + fn get_internal_secret_class(kafka: &v1alpha1::KafkaCluster) -> Option { kafka .spec .cluster_config .tls .as_ref() - .unwrap() - .internal_secret_class - .clone() + .and_then(|tls| tls.internal_secret_class.clone()) } #[test] @@ -517,7 +540,7 @@ mod tests { assert_eq!(get_server_secret_class(&kafka), tls::server_tls_default()); assert_eq!( get_internal_secret_class(&kafka), - "simple-kafka-internal-tls".to_string() + Some("simple-kafka-internal-tls".to_string()) ); } @@ -560,7 +583,7 @@ mod tests { assert_eq!(get_server_secret_class(&kafka), tls::server_tls_default()); assert_eq!( get_internal_secret_class(&kafka), - "simple-kafka-internal-tls".to_string() + Some("simple-kafka-internal-tls".to_string()) ); let input = r#" diff --git a/rust/operator-binary/src/crd/role/broker.rs b/rust/operator-binary/src/crd/role/broker.rs index 00d614b9..70ac85d0 100644 --- a/rust/operator-binary/src/crd/role/broker.rs +++ b/rust/operator-binary/src/crd/role/broker.rs @@ -107,11 +107,15 @@ impl Configuration for BrokerConfigFragment { fn compute_env( &self, - _resource: &Self::Configurable, + resource: &Self::Configurable, _role_name: &str, ) -> Result>, stackable_operator::product_config_utils::Error> { - Ok(BTreeMap::new()) + let mut result = BTreeMap::new(); + if let Some(cluster_id) = resource.cluster_id() { + result.insert("KAFKA_CLUSTER_ID".to_string(), Some(cluster_id.to_string())); + } + Ok(result) } fn compute_cli( diff --git a/rust/operator-binary/src/crd/role/controller.rs b/rust/operator-binary/src/crd/role/controller.rs index 5b9513a5..bf1468b6 100644 --- a/rust/operator-binary/src/crd/role/controller.rs +++ b/rust/operator-binary/src/crd/role/controller.rs @@ -97,11 +97,15 @@ impl Configuration for ControllerConfigFragment { fn compute_env( &self, - _resource: &Self::Configurable, + resource: &Self::Configurable, _role_name: &str, ) -> Result>, stackable_operator::product_config_utils::Error> { - Ok(BTreeMap::new()) + let mut result = BTreeMap::new(); + if let Some(cluster_id) = resource.cluster_id() { + result.insert("KAFKA_CLUSTER_ID".to_string(), Some(cluster_id.to_string())); + } + Ok(result) } fn compute_cli( diff --git a/rust/operator-binary/src/crd/role/mod.rs b/rust/operator-binary/src/crd/role/mod.rs index 47210ea4..16f72083 100644 --- a/rust/operator-binary/src/crd/role/mod.rs +++ b/rust/operator-binary/src/crd/role/mod.rs @@ -33,6 +33,11 @@ use crate::{ /// Env var pub const KAFKA_NODE_ID_OFFSET: &str = "NODE_ID_OFFSET"; +/// Past versions of the operator didn't set this explicitly and allowed Kafka to generate random ids. +/// To support Kraft migration, this must be carried over to `KAFKA_NODE_ID` so the operator needs +/// to know it's value for each broker Pod. +pub const KAFKA_BROKER_ID: &str = "broker.id"; + // See: https://kafka.apache.org/documentation/#brokerconfigs /// The node ID associated with the roles this process is playing when process.roles is non-empty. /// This is required configuration when running in KRaft mode. diff --git a/rust/operator-binary/src/crd/security.rs b/rust/operator-binary/src/crd/security.rs index b729386a..c20f3e7f 100644 --- a/rust/operator-binary/src/crd/security.rs +++ b/rust/operator-binary/src/crd/security.rs @@ -29,7 +29,7 @@ use crate::crd::{ authentication::{self, ResolvedAuthenticationClasses}, listener::{self, KafkaListenerName, node_address_cmd_env, node_port_cmd_env}, role::KafkaRole, - tls, v1alpha1, + v1alpha1, }; #[derive(Snafu, Debug)] @@ -57,7 +57,7 @@ pub enum Error { /// Helper struct combining TLS settings for server and internal with the resolved AuthenticationClasses pub struct KafkaTlsSecurity { resolved_authentication_classes: ResolvedAuthenticationClasses, - internal_secret_class: String, + internal_secret_class: Option, server_secret_class: Option, } @@ -92,7 +92,7 @@ impl KafkaTlsSecurity { #[cfg(test)] pub fn new( resolved_authentication_classes: ResolvedAuthenticationClasses, - internal_secret_class: String, + internal_secret_class: Option, server_secret_class: Option, ) -> Self { Self { @@ -120,8 +120,7 @@ impl KafkaTlsSecurity { .cluster_config .tls .as_ref() - .map(|tls| tls.internal_secret_class.clone()) - .unwrap_or_else(tls::internal_tls_default), + .and_then(|tls| tls.internal_secret_class.clone()), server_secret_class: kafka .spec .cluster_config @@ -155,11 +154,7 @@ impl KafkaTlsSecurity { /// Retrieve the mandatory internal `SecretClass`. pub fn tls_internal_secret_class(&self) -> Option<&str> { - if !self.internal_secret_class.is_empty() { - Some(self.internal_secret_class.as_str()) - } else { - None - } + self.internal_secret_class.as_deref() } pub fn has_kerberos_enabled(&self) -> bool { diff --git a/rust/operator-binary/src/crd/tls.rs b/rust/operator-binary/src/crd/tls.rs index 94843601..08ebe3d3 100644 --- a/rust/operator-binary/src/crd/tls.rs +++ b/rust/operator-binary/src/crd/tls.rs @@ -1,8 +1,6 @@ use serde::{Deserialize, Serialize}; use stackable_operator::schemars::{self, JsonSchema}; -const TLS_DEFAULT_SECRET_CLASS: &str = "tls"; - #[derive(Clone, Deserialize, Debug, Eq, JsonSchema, PartialEq, Serialize)] #[serde(rename_all = "camelCase")] pub struct KafkaTls { @@ -14,7 +12,7 @@ pub struct KafkaTls { /// /// Defaults to `tls` #[serde(default = "internal_tls_default")] - pub internal_secret_class: String, + pub internal_secret_class: Option, /// The [SecretClass](DOCS_BASE_URL_PLACEHOLDER/secret-operator/secretclass.html) to use for /// client connections. This setting controls: /// - If TLS encryption is used at all @@ -31,18 +29,15 @@ pub struct KafkaTls { /// Default TLS settings. /// Internal and server communication default to `tls` secret class. pub fn default_kafka_tls() -> Option { - Some(KafkaTls { - internal_secret_class: internal_tls_default(), - server_secret_class: server_tls_default(), - }) + None } /// Helper methods to provide defaults in the CRDs and tests -pub fn internal_tls_default() -> String { - TLS_DEFAULT_SECRET_CLASS.into() +pub fn internal_tls_default() -> Option { + None } /// Helper methods to provide defaults in the CRDs and tests pub fn server_tls_default() -> Option { - Some(TLS_DEFAULT_SECRET_CLASS.into()) + None } diff --git a/rust/operator-binary/src/kafka_controller.rs b/rust/operator-binary/src/kafka_controller.rs index aebe52c3..f5b871c2 100644 --- a/rust/operator-binary/src/kafka_controller.rs +++ b/rust/operator-binary/src/kafka_controller.rs @@ -277,11 +277,6 @@ pub async fn reconcile_kafka( .resolve(DOCKER_IMAGE_BASE_NAME, crate::built_info::PKG_VERSION) .context(ResolveProductImageSnafu)?; - // check Kraft vs ZooKeeper and fail if misconfigured - kafka - .check_kraft_vs_zookeeper(&resolved_product_image.product_version) - .context(MisconfiguredKafkaClusterSnafu)?; - let mut cluster_resources = ClusterResources::new( APP_NAME, OPERATOR_NAME, @@ -562,7 +557,8 @@ fn validated_product_config( ), ); - if kafka.is_controller_configured() { + // TODO: need this if because controller_role() raises an error + if kafka.spec.controllers.is_some() { roles.insert( KafkaRole::Controller.to_string(), ( diff --git a/rust/operator-binary/src/resource/configmap.rs b/rust/operator-binary/src/resource/configmap.rs index ded83c59..11c8ae73 100644 --- a/rust/operator-binary/src/resource/configmap.rs +++ b/rust/operator-binary/src/resource/configmap.rs @@ -19,9 +19,10 @@ use crate::{ STACKABLE_LISTENER_BROKER_DIR, listener::{KafkaListenerConfig, KafkaListenerName, node_address_cmd}, role::{ - AnyConfig, KAFKA_ADVERTISED_LISTENERS, KAFKA_CONTROLLER_QUORUM_BOOTSTRAP_SERVERS, - KAFKA_CONTROLLER_QUORUM_VOTERS, KAFKA_LISTENER_SECURITY_PROTOCOL_MAP, KAFKA_LISTENERS, - KAFKA_LOG_DIRS, KAFKA_NODE_ID, KAFKA_PROCESS_ROLES, KafkaRole, + AnyConfig, KAFKA_ADVERTISED_LISTENERS, KAFKA_BROKER_ID, + KAFKA_CONTROLLER_QUORUM_BOOTSTRAP_SERVERS, KAFKA_CONTROLLER_QUORUM_VOTERS, + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP, KAFKA_LISTENERS, KAFKA_LOG_DIRS, KAFKA_NODE_ID, + KAFKA_PROCESS_ROLES, KafkaRole, }, security::KafkaTlsSecurity, v1alpha1, @@ -94,12 +95,11 @@ pub fn build_rolegroup_config_map( let kafka_config_file_name = merged_config.config_file_name(); let mut kafka_config = server_properties_file( - kafka.is_controller_configured(), + kafka.is_kraft_mode(), &rolegroup.role, pod_descriptors, listener_config, opa_connect_string, - resolved_product_image.product_version.starts_with("3.7"), // needs_quorum_voters )?; match merged_config { @@ -213,7 +213,6 @@ fn server_properties_file( pod_descriptors: &[KafkaPodDescriptor], listener_config: &KafkaListenerConfig, opa_connect_string: Option<&str>, - needs_quorum_voters: bool, ) -> Result, Error> { let kraft_controllers = kraft_controllers(pod_descriptors); @@ -254,12 +253,15 @@ fn server_properties_file( .unwrap_or("".to_string())), ]); - if needs_quorum_voters { - let kraft_voters = - kraft_voters(pod_descriptors).context(NoKraftControllersFoundSnafu)?; + let kraft_voters = + kraft_voters(pod_descriptors).context(NoKraftControllersFoundSnafu)?; - result.extend([(KAFKA_CONTROLLER_QUORUM_VOTERS.to_string(), kraft_voters)]); - } + result.extend([(KAFKA_CONTROLLER_QUORUM_VOTERS.to_string(), kraft_voters)]); + + result.extend([( + "zookeeper.connect".to_string(), + "${env:ZOOKEEPER}".to_string(), + )]); Ok(result) } @@ -278,6 +280,11 @@ fn server_properties_file( KAFKA_LISTENER_SECURITY_PROTOCOL_MAP.to_string(), listener_config.listener_security_protocol_map(), ), + ( + "broker.id.generation.enable".to_string(), + "false".to_string(), + ), + (KAFKA_BROKER_ID.to_string(), "${env:REPLICA_ID}".to_string()), ]); if kraft_mode { @@ -300,12 +307,10 @@ fn server_properties_file( ), ]); - if needs_quorum_voters { - let kraft_voters = - kraft_voters(pod_descriptors).context(NoKraftControllersFoundSnafu)?; + let kraft_voters = + kraft_voters(pod_descriptors).context(NoKraftControllersFoundSnafu)?; - result.extend([(KAFKA_CONTROLLER_QUORUM_VOTERS.to_string(), kraft_voters)]); - } + result.extend([(KAFKA_CONTROLLER_QUORUM_VOTERS.to_string(), kraft_voters)]); } else { // Running with ZooKeeper enabled result.extend([( diff --git a/rust/operator-binary/src/resource/statefulset.rs b/rust/operator-binary/src/resource/statefulset.rs index 89154dad..a7aec39c 100644 --- a/rust/operator-binary/src/resource/statefulset.rs +++ b/rust/operator-binary/src/resource/statefulset.rs @@ -283,8 +283,6 @@ pub fn build_broker_rolegroup_statefulset( ..EnvVar::default() }); - let cluster_id = kafka.cluster_id().context(ClusterIdMissingSnafu)?; - cb_kafka .image_from_product_image(resolved_product_image) .command(vec![ @@ -296,7 +294,6 @@ pub fn build_broker_rolegroup_statefulset( ]) .args(vec![broker_kafka_container_commands( kafka, - cluster_id, // we need controller pods kafka .pod_descriptors( @@ -634,6 +631,22 @@ pub fn build_controller_rolegroup_statefulset( ..EnvVar::default() }); + // Controllers need the ZooKeeper connection string for migration + if let Some(zookeeper_config_map_name) = &kafka.spec.cluster_config.zookeeper_config_map_name { + env.push(EnvVar { + name: "ZOOKEEPER".to_string(), + value_from: Some(EnvVarSource { + config_map_key_ref: Some(ConfigMapKeySelector { + name: zookeeper_config_map_name.to_string(), + key: "ZOOKEEPER".to_string(), + ..ConfigMapKeySelector::default() + }), + ..EnvVarSource::default() + }), + ..EnvVar::default() + }) + }; + cb_kafka .image_from_product_image(resolved_product_image) .command(vec![ @@ -644,7 +657,6 @@ pub fn build_controller_rolegroup_statefulset( "-c".to_string(), ]) .args(vec![controller_kafka_container_command( - kafka.cluster_id().context(ClusterIdMissingSnafu)?, kafka .pod_descriptors(Some(kafka_role), cluster_info, kafka_security.client_port()) .context(BuildPodDescriptorsSnafu)?,