jenkins-cron/v2/clean.go
2025-05-09 17:21:06 +08:00

178 lines
3.9 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"context"
"fmt"
"log"
"os"
"path/filepath"
"runtime"
"sync"
"sync/atomic"
"time"
)
type RemoveStats struct {
Total int64
Success int64
Failed int64
Errors []error
}
type RemoveOptions struct {
Workers int // 并发 worker 数
Logger func(format string, args ...any)
Retries int // 删除失败重试次数
Stats *RemoveStats // 删除统计(可选)
Progress func(current int64) // 删除进度回调(可选)
}
type removeResult struct {
Err error
}
func Remove(ctx context.Context, path string, opts RemoveOptions) error {
if opts.Workers <= 0 {
opts.Workers = runtime.NumCPU()
}
if opts.Logger == nil {
opts.Logger = log.Printf
}
if opts.Retries < 0 {
opts.Retries = 0
}
if opts.Stats == nil {
opts.Stats = &RemoveStats{}
}
// 检查路径是否存在
if _, err := os.Stat(path); os.IsNotExist(err) {
opts.Logger("[clean] Path does not exist, skipping: %s", path)
return nil
}
workChan := make(chan string, opts.Workers*2)
resultChan := make(chan removeResult, opts.Workers*2)
var wg sync.WaitGroup
for i := 0; i < opts.Workers; i++ {
wg.Add(1)
go removeWorker(ctx, &wg, workChan, resultChan, opts)
}
// 目录遍历
go func() {
defer close(workChan)
_ = walkDir(ctx, path, workChan, opts)
}()
// 收集结果
go func() {
wg.Wait()
close(resultChan)
}()
var (
firstErr error
errorsMu sync.Mutex
)
for res := range resultChan {
atomic.AddInt64(&opts.Stats.Total, 1)
if res.Err != nil {
atomic.AddInt64(&opts.Stats.Failed, 1)
errorsMu.Lock()
opts.Stats.Errors = append(opts.Stats.Errors, res.Err)
errorsMu.Unlock()
if firstErr == nil {
firstErr = res.Err
}
} else {
atomic.AddInt64(&opts.Stats.Success, 1)
}
if opts.Progress != nil {
opts.Progress(atomic.LoadInt64(&opts.Stats.Total))
}
}
// 尝试删除根目录
if err := tryRemoveRoot(ctx, path, opts); err != nil && firstErr == nil {
firstErr = err
opts.Stats.Failed++
opts.Stats.Errors = append(opts.Stats.Errors, err)
}
return firstErr
}
// 遍历目录,逐一发送子路径(不含根目录)到 workChan深度优先、逆序删除
func walkDir(ctx context.Context, root string, workChan chan<- string, opts RemoveOptions) error {
var paths []string
err := filepath.WalkDir(root, func(subPath string, d os.DirEntry, err error) error {
if ctx.Err() != nil {
return ctx.Err()
}
if err != nil {
opts.Logger("[clean] Walk error on %s: %v", subPath, err)
return nil // 跳过错误继续
}
if subPath != root {
paths = append(paths, subPath)
}
return nil
})
// 逆序删除(先深后浅)
for i := len(paths) - 1; i >= 0; i-- {
select {
case <-ctx.Done():
return ctx.Err()
case workChan <- paths[i]:
}
}
return err
}
// Worker 执行删除任务,支持重试
func removeWorker(ctx context.Context, wg *sync.WaitGroup, workChan <-chan string, resultChan chan<- removeResult, opts RemoveOptions) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case subPath, ok := <-workChan:
if !ok {
return
}
var err error
for i := 0; i <= opts.Retries; i++ {
err = os.RemoveAll(subPath)
if err == nil {
break
}
time.Sleep(100 * time.Millisecond)
}
if err != nil {
opts.Logger("[clean] Failed to remove %s: %v", subPath, err)
resultChan <- removeResult{Err: fmt.Errorf("remove %q: %w", subPath, err)}
} else {
opts.Logger("[clean] Removed: %s", subPath)
resultChan <- removeResult{}
}
}
}
}
// 删除根目录
func tryRemoveRoot(ctx context.Context, path string, opts RemoveOptions) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if err := os.Remove(path); err != nil {
opts.Logger("[clean] Failed to remove root %s: %v", path, err)
return fmt.Errorf("remove root %q: %w", path, err)
}
opts.Logger("[clean] Removed root directory: %s", path)
return nil
}