Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 117 additions & 60 deletions pkg/purge_repos.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,22 +323,23 @@ func (opt *purgeOptions) executePurgeWorkflow(setupOpt restic.SetupOptions, cuto
return fmt.Errorf("failed to create restic wrapper: %v", err)
}

repoList, err := opt.findRepositoriesToPurge(rw, cutoffTime)
// Get repository base URL for display purposes
repoBase, err := opt.getResticRepoFromEnv(rw)
if err != nil {
return err
return fmt.Errorf("failed to get restic repository base: %w", err)
}

fmt.Println("\n🔎 Searching for repositories. This may take a while depending on the number of repositories...")
repoList, err := opt.findRepositoriesToPurge(rw, repoBase, cutoffTime)
if err != nil {
displayRepositoryErrors(err)
}

if len(repoList) == 0 {
opt.displayNoRepositoriesMessage()
return nil
}

// Get repository base URL for display purposes
repoBase, err := opt.getResticRepoFromEnv(rw)
if err != nil {
return fmt.Errorf("failed to get restic repository base: %w", err)
}

opt.displayRepositoriesTable(repoList, repoBase)
if opt.dryRun {
displayDryRunMessage(len(repoList))
Expand All @@ -352,29 +353,21 @@ func (opt *purgeOptions) executePurgeWorkflow(setupOpt restic.SetupOptions, cuto
return opt.deleteRepositories(rw, repoList)
}

func (opt *purgeOptions) findRepositoriesToPurge(rw *restic.ResticWrapper, cutoffTime time.Time) ([]repositoryInfo, error) {
func (opt *purgeOptions) findRepositoriesToPurge(rw *restic.ResticWrapper, repoBase string, cutoffTime time.Time) ([]repositoryInfo, error) {
var repos []repositoryInfo
subDirs, err := opt.listSubdirectories("")
if err != nil {
return nil, fmt.Errorf("cannot list sub-dirs: %w", err)
}

repoBase, err := opt.getResticRepoFromEnv(rw)
if err != nil {
return nil, err
}

script := opt.generateRepoListScript(repoBase, rw, subDirs)
out, err := runResticScriptViaDocker(script)
if err != nil {
return nil, fmt.Errorf("Error running repo check script: %v\nOutput:\n%s", err, out)
}

err = extractRepoListFromOutput(out, subDirs, cutoffTime, &repos)
if err != nil {
return nil, err
}
return repos, nil
err = extractRepoListFromOutput(out, repoBase, subDirs, cutoffTime, &repos)
return repos, err
}

func (opt *purgeOptions) listSubdirectories(path string) ([]string, error) {
Expand Down Expand Up @@ -434,61 +427,125 @@ func runResticScriptViaDocker(script string) (string, error) {
return string(out), err
}

func extractRepoListFromOutput(out string, subDirs []string, cutoffTime time.Time, repos *[]repositoryInfo) error {
type snapshot struct {
Time string `json:"time"`
}
dirIndex := 0
var errs []error
var snapshots []snapshot
func extractRepoListFromOutput(out string, repoBase string, subDirs []string, cutoffTime time.Time, repos *[]repositoryInfo) error {
var (
dirIndex int
errs []error
)

lines := strings.Split(out, "\n")
for _, line := range lines {
line = strings.TrimSpace(line)
// Skip error messages and separators
if strings.HasPrefix(line, "Failed to access repository") ||
strings.Contains(line, "Fatal: repository does not exist") {
// If we hit an error, we should still increment dirIndex to stay in sync
if dirIndex < len(subDirs) {
dirIndex++
}
if line == "" {
continue
}

// Parse JSON array
if strings.HasPrefix(line, "[") {
if err := json.Unmarshal([]byte(line), &snapshots); err != nil {
errs = append(errs, fmt.Errorf("failed to parse JSON for %s: %v", line, err))
if dirIndex < len(subDirs) {
dirIndex++
}
switch {
case strings.HasPrefix(line, "["):
if err := processSnapshotLine(line, subDirs, &dirIndex, cutoffTime, repos, &errs); err != nil {
continue
}

if len(snapshots) > 0 {
snapshotTime, err := time.Parse(time.RFC3339Nano, snapshots[0].Time)
if err != nil {
errs = append(errs, fmt.Errorf("failed to parse time for %s: %v", line, err))
if dirIndex < len(subDirs) {
dirIndex++
}
continue
}
if dirIndex < len(subDirs) && snapshotTime.Before(cutoffTime) {
*repos = append(*repos, repositoryInfo{
Path: subDirs[dirIndex],
LastModified: snapshotTime,
})
}
case strings.HasPrefix(line, "{"):
processErrorJSONLine(line, repoBase, subDirs, dirIndex, &errs)
case strings.HasPrefix(line, "Failed to access repository") ||
strings.Contains(line, "Fatal: repository does not exist"):
// Handle plain text error lines
if dirIndex < len(subDirs) {
dirIndex++
}
dirIndex++
}
}

return kerr.NewAggregate(errs)
}

func processSnapshotLine(line string, subDirs []string, dirIndex *int, cutoffTime time.Time, repos *[]repositoryInfo, errs *[]error) error {
type snapshot struct {
Time string `json:"time"`
}

increaseDirIndexAndAppendErr := func(dirIndex *int, err error) {
if *dirIndex < len(subDirs) {
*errs = append(*errs, err)
*dirIndex++
}
}

var snapshots []snapshot
if err := json.Unmarshal([]byte(line), &snapshots); err != nil {
increaseDirIndexAndAppendErr(dirIndex, fmt.Errorf("failed to parse JSON for %s: %v", subDirs[*dirIndex], err))
return err
}

if len(snapshots) > 0 {
snapshotTime, err := time.Parse(time.RFC3339Nano, snapshots[0].Time)
if err != nil {
increaseDirIndexAndAppendErr(dirIndex, fmt.Errorf("failed to parse time for %s: %v", subDirs[*dirIndex], err))
return err
}
if *dirIndex < len(subDirs) && snapshotTime.Before(cutoffTime) {
*repos = append(*repos, repositoryInfo{
Path: subDirs[*dirIndex],
LastModified: snapshotTime,
})
}
}
*dirIndex++
return nil
}

func processErrorJSONLine(line string, repoBase string, subDirs []string, dirIndex int, errs *[]error) {
errMsg := struct {
MessageType string `json:"message_type"`
Code int `json:"code"`
Message string `json:"message"`
}{}
if err := json.Unmarshal([]byte(line), &errMsg); err == nil && errMsg.Message != "" {
// Skip "repository does not exist" (no repo to purge)
if dirIndex < len(subDirs) && !strings.Contains(strings.ToLower(errMsg.Message), "repository does not exist") {
repoURL := strings.TrimRight(repoBase+"/"+subDirs[dirIndex], "/")
*errs = append(*errs, fmt.Errorf("%s: %s", repoURL, errMsg.Message))
}
}
}

func displayRepositoryErrors(err error) {
if err == nil {
return
}
fmt.Println("\n⚠️ Some repositories could not be processed:")

w := tabwriter.NewWriter(os.Stdout, TableMinWidth, TableTabWidth, TablePadding, TablePadChar, 0)
defer func() {
_ = w.Flush() // Handle error silently for display purposes
}()

// Header
_, _ = fmt.Fprintf(w, "REPOSITORY\tERROR\n")
_, _ = fmt.Fprintf(w, "----------\t-----\n")

printErr := func(e error) {
parts := strings.SplitN(e.Error(), ": ", 2)
if len(parts) == 2 {
_, _ = fmt.Fprintf(w, "%s\t%s\n", parts[0], parts[1])
} else {
_, _ = fmt.Fprintf(w, "N/A\t%s\n", e.Error())
}
}

// kerr.NewAggregate returns something that implements Errors()
if agg, ok := err.(interface{ Errors() []error }); ok {
for _, e := range agg.Errors() {
printErr(e)
}
} else {
// fallback in case it's not an aggregate
printErr(err)
}
fmt.Println()
}

func (opt *purgeOptions) displayNoRepositoriesMessage() {
fmt.Println("✅ No repositories found matching the criteria.")
fmt.Println("\n✅ No repositories found matching the criteria.")
fmt.Printf(" - Age filter: older than %s\n", opt.olderThan)
}

Expand Down Expand Up @@ -574,7 +631,7 @@ func (opt *purgeOptions) deleteRepositories(rw *restic.ResticWrapper, repos []re
}

// Execute restic purge operations
fmt.Println("Starting repository deletion process...")
fmt.Println("\n🔥 Starting repository deletion. This process can be lengthy, please do not interrupt.")
script := opt.generateRepoPurgeScript(rw, repoBase, repos)
out, err := runResticScriptViaDocker(script)
if err != nil {
Expand Down
Loading