Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions cmd/cloudstack-csi-sc-syncer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,26 @@ spec:
args:
- "-cloudstackconfig=/etc/cloudstack-csi-driver/cloud-config"
- "-kubeconfig=-"
- "-nodeName=$(NODE_NAME)"
env:
- name: NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
volumeMounts:
- name: cloudstack-conf
mountPath: /etc/cloudstack-csi-driver
- name: cloud-init-dir
mountPath: /run/cloud-init/
restartPolicy: Never
volumes:
- name: cloudstack-conf
secret:
secretName: cloudstack-secret
- name: cloud-init-dir
hostPath:
path: /run/cloud-init/
type: Directory
E0F
```

Expand Down
6 changes: 5 additions & 1 deletion cmd/cloudstack-csi-sc-syncer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ var (
deleteUnused = flag.Bool("delete", false, "Delete")
volumeExpansion = flag.Bool("volumeExpansion", false, "VolumeExpansion")
showVersion = flag.Bool("version", false, "Show version")

nodeName = flag.String("nodeName", "", "Node name")
addAllowedTopology = flag.Bool("addAllowedTopology", false, "Add allowed topology to storageclass")

// Version is set by the build process.
version = ""
)
Expand All @@ -46,6 +48,8 @@ func main() {
NamePrefix: *namePrefix,
Delete: *deleteUnused,
VolumeExpansion: *volumeExpansion,
NodeName: *nodeName,
AddAllowedTopology: *addAllowedTopology,
})
if err != nil {
log.Fatalf("Error: %v", err)
Expand Down
58 changes: 58 additions & 0 deletions pkg/syncer/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
var (
volBindingMode = storagev1.VolumeBindingWaitForFirstConsumer
reclaimPolicy = corev1.PersistentVolumeReclaimDelete
zoneID string
)

func (s syncer) Run(ctx context.Context) error {
Expand Down Expand Up @@ -96,6 +97,22 @@ func (s syncer) Run(ctx context.Context) error {
return combinedErrors(errs)
}

func getAllowedTopologies() []corev1.TopologySelectorTerm {
if zoneID != "" {
return []corev1.TopologySelectorTerm{
{
MatchLabelExpressions: []corev1.TopologySelectorLabelRequirement{
{
Key: "topology." + driver.DriverName + "/zone",
Values: []string{zoneID},
},
},
},
}
}
return nil
}

func (s syncer) syncOffering(ctx context.Context, offering *cloudstack.DiskOffering) (string, error) {
offeringName := offering.Name
custom := offering.Iscustomized
Expand All @@ -113,6 +130,8 @@ func (s syncer) syncOffering(ctx context.Context, offering *cloudstack.DiskOffer
}
log.Printf("Storage class name: %s", name)

zoneID = s.getZoneID(ctx)

sc, err := s.k8sClient.StorageV1().StorageClasses().Get(ctx, name, metav1.GetOptions{})
if err != nil {
if k8serrors.IsNotFound(err) {
Expand All @@ -132,6 +151,12 @@ func (s syncer) syncOffering(ctx context.Context, offering *cloudstack.DiskOffer
driver.DiskOfferingKey: offering.Id,
},
}
//Add AllowedTopologies if the addAllowedTopology flag is true
if s.addAllowedTopology {
if getAllowedTopologies() != nil {
newSc.AllowedTopologies = getAllowedTopologies()
}
}
_, err = s.k8sClient.StorageV1().StorageClasses().Create(ctx, newSc, metav1.CreateOptions{})

return name, err
Expand Down Expand Up @@ -178,6 +203,31 @@ func (s syncer) syncOffering(ctx context.Context, offering *cloudstack.DiskOffer
return name, nil
}

func getExistingZoneID(terms []corev1.TopologySelectorTerm) string {
prefix := "topology." + driver.DriverName + "/zone"
for _, term := range terms {
for _, exp := range term.MatchLabelExpressions {
if exp.Key == prefix {
if len(exp.Values) > 0 {
return exp.Values[0]
}
}
}
}
return ""
}

// get ZoneID of the node where syncer is running
func (s syncer) getZoneID(ctx context.Context) string {
vm, err := s.csConnector.GetNodeInfo(ctx, s.nodeName)
if err != nil {
log.Printf("GetNodeinfo failed: %s", err.Error())
} else {
return vm.ZoneID
}
return ""
}

func checkStorageClass(sc *storagev1.StorageClass, expectedOfferingID string, expectedVolumeExpansion bool) error {
errs := make([]error, 0)
diskOfferingID, ok := sc.Parameters[driver.DiskOfferingKey]
Expand All @@ -197,6 +247,14 @@ func checkStorageClass(sc *storagev1.StorageClass, expectedOfferingID string, ex
errs = append(errs, fmt.Errorf("wrong AllowVolumeExpansion for storage class %s", sc.Name))
}

if sc.AllowedTopologies == nil {
errs = append(errs, errors.New("allowedtopology flag is true but missing allowedtopologies"))
} else if sc.AllowedTopologies != nil {
if zoneID != getExistingZoneID(sc.AllowedTopologies) {
errs = append(errs, errors.New("allowedtopology flag is true but zoneID is not the same with desired zoneID: "+zoneID))
}
}

if len(errs) > 0 {
return combinedErrors(errs)
}
Expand Down
25 changes: 25 additions & 0 deletions pkg/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type Config struct {
NamePrefix string
Delete bool
VolumeExpansion bool
NodeName string
AddAllowedTopology bool
}

// Syncer has a function Run which synchronizes CloudStack
Expand All @@ -43,6 +45,9 @@ type syncer struct {
namePrefix string
delete bool
volumeExpansion bool
csConnector cloud.Cloud
nodeName string
addAllowedTopology bool
}

func createK8sClient(kubeconfig, agent string) (*kubernetes.Clientset, error) {
Expand Down Expand Up @@ -74,6 +79,17 @@ func createCloudStackClient(cloudstackconfig string) (*cloudstack.CloudStackClie
return client, nil
}

func createCSConnector(cloudstackconfig string) (cloud.Cloud, error) {
config, err := cloud.ReadConfig(cloudstackconfig)
if err != nil {
return nil, err
}

csConnector := cloud.New(config)

return csConnector, nil
}

func createLabelsSet(label string) labels.Set {
m := make(map[string]string)
if len(label) > 0 {
Expand All @@ -100,12 +116,21 @@ func New(config Config) (Syncer, error) {
return nil, fmt.Errorf("cannot create CloudStack client: %w", err)
}

csConnector, err := createCSConnector(config.CloudStackConfig)
if err != nil {
return nil, fmt.Errorf("cannot create CS connector interface: %w", err)
}


return syncer{
k8sClient: k8sClient,
csClient: csClient,
labelsSet: createLabelsSet(config.Label),
namePrefix: config.NamePrefix,
delete: config.Delete,
volumeExpansion: config.VolumeExpansion,
csConnector: csConnector,
nodeName: config.NodeName,
addAllowedTopology: config.AddAllowedTopology,
}, nil
}