diff --git a/cmd/cloudstack-csi-sc-syncer/README.md b/cmd/cloudstack-csi-sc-syncer/README.md index 8dab035..f6b768a 100644 --- a/cmd/cloudstack-csi-sc-syncer/README.md +++ b/cmd/cloudstack-csi-sc-syncer/README.md @@ -89,14 +89,26 @@ spec: args: - "-cloudstackconfig=/etc/cloudstack-csi-driver/cloud-config" - "-kubeconfig=-" + - "-nodeName=$(NODE_NAME)" + env: + - name: NODE_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName volumeMounts: - name: cloudstack-conf mountPath: /etc/cloudstack-csi-driver + - name: cloud-init-dir + mountPath: /run/cloud-init/ restartPolicy: Never volumes: - name: cloudstack-conf secret: secretName: cloudstack-secret + - name: cloud-init-dir + hostPath: + path: /run/cloud-init/ + type: Directory E0F ``` diff --git a/cmd/cloudstack-csi-sc-syncer/main.go b/cmd/cloudstack-csi-sc-syncer/main.go index 4cf3884..90b8506 100644 --- a/cmd/cloudstack-csi-sc-syncer/main.go +++ b/cmd/cloudstack-csi-sc-syncer/main.go @@ -23,7 +23,9 @@ var ( deleteUnused = flag.Bool("delete", false, "Delete") volumeExpansion = flag.Bool("volumeExpansion", false, "VolumeExpansion") showVersion = flag.Bool("version", false, "Show version") - + nodeName = flag.String("nodeName", "", "Node name") + addAllowedTopology = flag.Bool("addAllowedTopology", false, "Add allowed topology to storageclass") + // Version is set by the build process. version = "" ) @@ -46,6 +48,8 @@ func main() { NamePrefix: *namePrefix, Delete: *deleteUnused, VolumeExpansion: *volumeExpansion, + NodeName: *nodeName, + AddAllowedTopology: *addAllowedTopology, }) if err != nil { log.Fatalf("Error: %v", err) diff --git a/pkg/syncer/run.go b/pkg/syncer/run.go index b81affb..fc81e6a 100644 --- a/pkg/syncer/run.go +++ b/pkg/syncer/run.go @@ -19,6 +19,7 @@ import ( var ( volBindingMode = storagev1.VolumeBindingWaitForFirstConsumer reclaimPolicy = corev1.PersistentVolumeReclaimDelete + zoneID string ) func (s syncer) Run(ctx context.Context) error { @@ -96,6 +97,22 @@ func (s syncer) Run(ctx context.Context) error { return combinedErrors(errs) } +func getAllowedTopologies() []corev1.TopologySelectorTerm { + if zoneID != "" { + return []corev1.TopologySelectorTerm{ + { + MatchLabelExpressions: []corev1.TopologySelectorLabelRequirement{ + { + Key: "topology." + driver.DriverName + "/zone", + Values: []string{zoneID}, + }, + }, + }, + } + } + return nil +} + func (s syncer) syncOffering(ctx context.Context, offering *cloudstack.DiskOffering) (string, error) { offeringName := offering.Name custom := offering.Iscustomized @@ -113,6 +130,8 @@ func (s syncer) syncOffering(ctx context.Context, offering *cloudstack.DiskOffer } log.Printf("Storage class name: %s", name) + zoneID = s.getZoneID(ctx) + sc, err := s.k8sClient.StorageV1().StorageClasses().Get(ctx, name, metav1.GetOptions{}) if err != nil { if k8serrors.IsNotFound(err) { @@ -132,6 +151,12 @@ func (s syncer) syncOffering(ctx context.Context, offering *cloudstack.DiskOffer driver.DiskOfferingKey: offering.Id, }, } + //Add AllowedTopologies if the addAllowedTopology flag is true + if s.addAllowedTopology { + if getAllowedTopologies() != nil { + newSc.AllowedTopologies = getAllowedTopologies() + } + } _, err = s.k8sClient.StorageV1().StorageClasses().Create(ctx, newSc, metav1.CreateOptions{}) return name, err @@ -178,6 +203,31 @@ func (s syncer) syncOffering(ctx context.Context, offering *cloudstack.DiskOffer return name, nil } +func getExistingZoneID(terms []corev1.TopologySelectorTerm) string { + prefix := "topology." + driver.DriverName + "/zone" + for _, term := range terms { + for _, exp := range term.MatchLabelExpressions { + if exp.Key == prefix { + if len(exp.Values) > 0 { + return exp.Values[0] + } + } + } + } + return "" +} + +// get ZoneID of the node where syncer is running +func (s syncer) getZoneID(ctx context.Context) string { + vm, err := s.csConnector.GetNodeInfo(ctx, s.nodeName) + if err != nil { + log.Printf("GetNodeinfo failed: %s", err.Error()) + } else { + return vm.ZoneID + } + return "" +} + func checkStorageClass(sc *storagev1.StorageClass, expectedOfferingID string, expectedVolumeExpansion bool) error { errs := make([]error, 0) diskOfferingID, ok := sc.Parameters[driver.DiskOfferingKey] @@ -197,6 +247,14 @@ func checkStorageClass(sc *storagev1.StorageClass, expectedOfferingID string, ex errs = append(errs, fmt.Errorf("wrong AllowVolumeExpansion for storage class %s", sc.Name)) } + if sc.AllowedTopologies == nil { + errs = append(errs, errors.New("allowedtopology flag is true but missing allowedtopologies")) + } else if sc.AllowedTopologies != nil { + if zoneID != getExistingZoneID(sc.AllowedTopologies) { + errs = append(errs, errors.New("allowedtopology flag is true but zoneID is not the same with desired zoneID: "+zoneID)) + } + } + if len(errs) > 0 { return combinedErrors(errs) } diff --git a/pkg/syncer/syncer.go b/pkg/syncer/syncer.go index 9a4fea2..0fc393a 100644 --- a/pkg/syncer/syncer.go +++ b/pkg/syncer/syncer.go @@ -27,6 +27,8 @@ type Config struct { NamePrefix string Delete bool VolumeExpansion bool + NodeName string + AddAllowedTopology bool } // Syncer has a function Run which synchronizes CloudStack @@ -43,6 +45,9 @@ type syncer struct { namePrefix string delete bool volumeExpansion bool + csConnector cloud.Cloud + nodeName string + addAllowedTopology bool } func createK8sClient(kubeconfig, agent string) (*kubernetes.Clientset, error) { @@ -74,6 +79,17 @@ func createCloudStackClient(cloudstackconfig string) (*cloudstack.CloudStackClie return client, nil } +func createCSConnector(cloudstackconfig string) (cloud.Cloud, error) { + config, err := cloud.ReadConfig(cloudstackconfig) + if err != nil { + return nil, err + } + + csConnector := cloud.New(config) + + return csConnector, nil +} + func createLabelsSet(label string) labels.Set { m := make(map[string]string) if len(label) > 0 { @@ -100,6 +116,12 @@ func New(config Config) (Syncer, error) { return nil, fmt.Errorf("cannot create CloudStack client: %w", err) } + csConnector, err := createCSConnector(config.CloudStackConfig) + if err != nil { + return nil, fmt.Errorf("cannot create CS connector interface: %w", err) + } + + return syncer{ k8sClient: k8sClient, csClient: csClient, @@ -107,5 +129,8 @@ func New(config Config) (Syncer, error) { namePrefix: config.NamePrefix, delete: config.Delete, volumeExpansion: config.VolumeExpansion, + csConnector: csConnector, + nodeName: config.NodeName, + addAllowedTopology: config.AddAllowedTopology, }, nil }