From c12c6557805261036d7fe28daa67c293dca1dcf1 Mon Sep 17 00:00:00 2001 From: binary-manu <47382117+binary-manu@users.noreply.github.com> Date: Sun, 17 Sep 2023 20:45:18 +0200 Subject: [PATCH 1/2] Major rewrite - Bump go to 1.20 - Add go.mod - Parallel download of files being cached - Preserve and send original modification times - Don't cache databases and DB signatures - A bunch of fixes --- .drone.yml | 37 --- .github/FUNDING.yml | 12 - .github/workflows/go.yml | 26 -- .gitignore | 1 + .tool-versions | 1 + README.md | 27 +-- cmd/internal/Cache.go | 22 ++ cmd/internal/PerfectWriter.go | 28 +++ cmd/internal/WORMSeekCloser.go | 57 +++++ cmd/pkgproxy.go | 427 +++++++++++++++++++++++++++++++++ go.mod | 3 + pkgproxy.go | 295 ----------------------- pkgproxy_test.go | 33 --- 13 files changed, 551 insertions(+), 418 deletions(-) delete mode 100644 .drone.yml delete mode 100644 .github/FUNDING.yml delete mode 100644 .github/workflows/go.yml create mode 100644 .tool-versions create mode 100644 cmd/internal/Cache.go create mode 100644 cmd/internal/PerfectWriter.go create mode 100644 cmd/internal/WORMSeekCloser.go create mode 100644 cmd/pkgproxy.go create mode 100644 go.mod delete mode 100644 pkgproxy.go delete mode 100644 pkgproxy_test.go diff --git a/.drone.yml b/.drone.yml deleted file mode 100644 index 8709ecd..0000000 --- a/.drone.yml +++ /dev/null @@ -1,37 +0,0 @@ ---- -kind: pipeline -name: go-1-11 - -steps: -- name: test - image: golang:1.11 - commands: - - go vet - - go test -v -cover - -- name: build - image: golang:1.11 - commands: - - go build - ---- -kind: pipeline -name: go-1-12 - -steps: -- name: test - image: golang:1.12 - commands: - - go vet - - go test -v -cover - -- name: build - image: golang:1.12 - commands: - - go build - ---- -kind: signature -hmac: 4244e6c73b31fde642184872bc211e364aabb088a5f2e2136f8701593e07153f - -... diff --git a/.github/FUNDING.yml b/.github/FUNDING.yml deleted file mode 100644 index 198aa37..0000000 --- a/.github/FUNDING.yml +++ /dev/null @@ -1,12 +0,0 @@ -# These are supported funding model platforms - -github: # Replace with up to 4 GitHub Sponsors-enabled usernames e.g., [user1, user2] -patreon: # Replace with a single Patreon username -open_collective: # Replace with a single Open Collective username -ko_fi: # Replace with a single Ko-fi username -tidelift: # Replace with a single Tidelift platform-name/package-name e.g., npm/babel -community_bridge: # Replace with a single Community Bridge project-name e.g., cloud-foundry -liberapay: buckket # Replace with a single Liberapay username -issuehunt: # Replace with a single IssueHunt username -otechie: # Replace with a single Otechie username -custom: # Replace with up to 4 custom sponsorship URLs e.g., ['link1', 'link2'] diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml deleted file mode 100644 index 5f71c21..0000000 --- a/.github/workflows/go.yml +++ /dev/null @@ -1,26 +0,0 @@ -name: Go -on: [push] -jobs: - - build: - name: Build - runs-on: ubuntu-latest - strategy: - matrix: - go: [ '1.11', '1.12', '1.13' ] - steps: - - - name: Set up Go ${{ matrix.go }} - uses: actions/setup-go@v1 - with: - go-version: ${{ matrix.go }} - id: go - - - name: Check out code into the Go module directory - uses: actions/checkout@v1 - - - name: Get dependencies - run: go get -v -t -d ./... - - - name: Build - run: go build -v . diff --git a/.gitignore b/.gitignore index 485dee6..4045fd6 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ .idea +.vscode/ diff --git a/.tool-versions b/.tool-versions new file mode 100644 index 0000000..6d1d0fb --- /dev/null +++ b/.tool-versions @@ -0,0 +1 @@ +golang 1.20.7 diff --git a/README.md b/README.md index 1fe58ab..2678e7f 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,10 @@ -[![Build Status](https://drone.buckket.org/api/badges/buckket/pkgproxy/status.svg)](https://drone.buckket.org/buckket/pkgproxy) - **pkgproxy** is a caching proxy server specifically designed for caching Arch GNU/Linux packages for pacman. +_This is a major rewrite of https://github.com/buckket/pkgproxy in order to iron out some bugs and implement +concurrent downloading of the same uncached file. It can be used as a drop-in replacement of the original +`pkgproxy`, with the exception that is does not caches databases (which is transparent anyway)._ + + Updating multiple Arch systems in your home network can be a slow process if you have to download every pkg file for every machine over and over again. One could setup a local Arch Linux mirror, but it takes a considerable amount of disk space (~60GB). Instead why not just cache packages you really downloaded on one machine since it’s highly likely that @@ -10,13 +13,7 @@ and saves a copy to disk so that future requests of the same file can be served ## Installation -### From source - - go get -u git.buckket.org/buckket/pkgproxy - -### Packet manager - -- Arch Linux: [pkgproxy](https://aur.archlinux.org/packages/pkgproxy/)AUR + go install github.com/binary-manu/pkgproxy/cmd/pkgproxy@latest ## Usage @@ -24,7 +21,7 @@ Update your clients mirror list (`/etc/pacman.d/mirrorlist`) to point to `pkgpro Server = http://${HOST_WITH_PKGPROXY_RUNNING}:8080/$repo/os/$arch -Run `pkgproxy` manually or use a systemd service file (example provided): +Run `pkgproxy` manually or use a systemd service file. ``` Usage: @@ -43,12 +40,12 @@ Usage: Show version information ``` -## Limitations +## Things to know -- Multiple incoming requests of the same file are handled sequentially, which may cause pacman to timeout, - especially if a large file is being downloaded. -- All cached files are deleted when `pkgproxy` exits. No files will be deleted by `pkgproxy` as long as - it is running. If you want to limit disk usage create a systemd timer which deletes files older than x days. +- Database files are not cached. +- Packages are cached, and can be downloaded concurrently: there is no blocking while a package is being + stored into the cache. +- All cached files are deleted when `pkgproxy` exits, unless `-keep-cache` is used. ## License diff --git a/cmd/internal/Cache.go b/cmd/internal/Cache.go new file mode 100644 index 0000000..87c2b6c --- /dev/null +++ b/cmd/internal/Cache.go @@ -0,0 +1,22 @@ +package internal + +import "sync" + +// Cache is a map protected by a Mutex, which can only be accessed +// via the method LockedDo. +type Cache[K comparable, V any] struct { + cache map[K]V + mutex sync.Mutex +} + +// LockedDo executes a function with the cache mutex held, so that +// f is the only user at the moment. The mutex is released as soon as +// f returns. +func (c *Cache[K, V]) LockedDo(f func(cache map[K]V) error) error { + c.mutex.Lock() + defer c.mutex.Unlock() + if c.cache == nil { + c.cache = make(map[K]V) + } + return f(c.cache) +} diff --git a/cmd/internal/PerfectWriter.go b/cmd/internal/PerfectWriter.go new file mode 100644 index 0000000..79ef3a0 --- /dev/null +++ b/cmd/internal/PerfectWriter.go @@ -0,0 +1,28 @@ +package internal + +import "io" + +// PerfectWriter never fails a write... if one fails, it lies and returns +// no error, while refusing to write further data. This is used with +// io.MultiWriter so that errors some Writers can be ignored. +// The first write error is made available via Error(). +type PerfectWriter struct { + writer io.Writer + err error +} + +// NewPerfectWriter wraps a writer into a PerfectWriter and returns it +func NewPerfectWriter(w io.Writer) *PerfectWriter { + return &PerfectWriter{w, nil} +} + +func (w *PerfectWriter) Error() error { + return w.err +} + +func (w *PerfectWriter) Write(data []byte) (int, error) { + if w.err == nil { + _, w.err = w.writer.Write(data) + } + return len(data), nil +} diff --git a/cmd/internal/WORMSeekCloser.go b/cmd/internal/WORMSeekCloser.go new file mode 100644 index 0000000..35d3192 --- /dev/null +++ b/cmd/internal/WORMSeekCloser.go @@ -0,0 +1,57 @@ +package internal + +import ( + "io" + "sync" +) + +// WORMSeekCloser models something which can be read in parallel without +// using the file pointer (hence the ReaderAt), but can be written by one +// writer at a time (Writer). It can also be closed and seeked. +type WORMSeekCloser interface { + io.ReaderAt + io.Writer + io.Closer + io.Seeker +} + +// ConcurrentWORMSeekCloser is safe for parallel use. It allows ReadAt +// calls to run in parallel, while other methods are serialized. +type ConcurrentWORMSeekCloser struct { + worm WORMSeekCloser + mutex sync.RWMutex +} + +// NewConcurrentWORMSeekCloser wraps a WORMSeekCloser and returns an object safe +// for concurrent use. +func NewConcurrentWORMSeekCloser(inferior WORMSeekCloser) *ConcurrentWORMSeekCloser { + return &ConcurrentWORMSeekCloser{worm: inferior} +} + +// ReadAt which is concurrency-safe +func (worm *ConcurrentWORMSeekCloser) ReadAt(p []byte, off int64) (n int, err error) { + worm.mutex.RLock() + defer worm.mutex.RUnlock() + return worm.worm.ReadAt(p, off) +} + +// Seek which is concurrency-safe +func (worm *ConcurrentWORMSeekCloser) Seek(offset int64, whence int) (int64, error) { + worm.mutex.Lock() + defer worm.mutex.Unlock() + return worm.worm.Seek(offset, whence) +} + +// Write which is concurrency-safe +func (worm *ConcurrentWORMSeekCloser) Write(p []byte) (n int, err error) { + worm.mutex.Lock() + defer worm.mutex.Unlock() + return worm.worm.Write(p) +} + +// Close which is concurrency-safe +func (worm *ConcurrentWORMSeekCloser) Close() error { + worm.mutex.Lock() + defer worm.mutex.Unlock() + return worm.worm.Close() +} diff --git a/cmd/pkgproxy.go b/cmd/pkgproxy.go new file mode 100644 index 0000000..788130c --- /dev/null +++ b/cmd/pkgproxy.go @@ -0,0 +1,427 @@ +/* +pkgproxy is a caching proxy server specifically designed for caching Arch GNU/Linux packages for pacman. + +Usage: + + pkgproxy [options] + + Options: + -cache string + Cache base path (default: $XDG_CACHE_HOME) + -keep-cache bool + Keep the cache between restarts + -port string + Listen on addr (default ":8080") + -upstream string + Upstream URL (default "https://mirrors.kernel.org/archlinux/$repo/os/$arch") + -version bool + Show version information +*/ +package main + +import ( + "errors" + "flag" + "fmt" + "io" + "log" + "net/http" + "os" + "path" + "strings" + "time" + + util "github.com/binary-manu/pkgproxy/cmd/internal" +) + +/******************************************************************************** + * Type definitions + ********************************************************************************/ + +type fileStatus int + +// A CacheEntry is a reference-counted set of items shared among file downloads. +// File is used both for writing downloaded data (by one single goroutine) while +// other clients that want to also download the same file will read from File, +// while periodically checking if the size has changed due to appended data, As +// soon as the file has been fully downloaded, Complete is closed to signal +// other goroutines to quit. Started signals that the downloading gorountine has +// initialized file-specific fields and that reading can start. +type cacheEntry struct { + RefCount uint + Started chan struct{} + Complete chan struct{} + File util.WORMSeekCloser + + // The following fields should be accessed only after + // Started has been closed + HTTPInfo http.Header +} + +type request struct { + Repo string + OS string + Arch string + File string + CacheEntry *cacheEntry + FileStatus fileStatus +} + +type settings struct { + CacheDir string + UpstreamServer string +} + +type fileHandler = func(w http.ResponseWriter, r *http.Request, req *request) + +type fileHandlerMap = map[fileStatus]fileHandler + +/******************************************************************************** + * Constants + ********************************************************************************/ + +var version = "HEAD" + +const ( + _ fileStatus = iota + // File has already been cached and can be served locally + fileStatusCached + // Another goroutine is already downloading this file + fileStatusInDownload + // The file is not in cache and no one is downloading it + fileStatusMissing + // The file will not be cached, but always redownloaded + fileStatusNoCaching +) + +/******************************************************************************** + * Globals + ********************************************************************************/ + +var gSettings settings +var headersToForward = []string{"Content-Length", "Last-Modified", "ETag", "Content-Type"} +var sharedState util.Cache[string, *cacheEntry] +var fileHandlers = fileHandlerMap{ + fileStatusCached: fileHandlerCached, + fileStatusMissing: fileHandlerMissingOrUncacheable, + fileStatusNoCaching: fileHandlerMissingOrUncacheable, + fileStatusInDownload: fileHandlerInDownload, +} + +/******************************************************************************** + * Methods: Request + ********************************************************************************/ + +func newRequest(requestURL string) (request, error) { + urlSplit := strings.Split(requestURL, "/")[1:] + if len(urlSplit) < 4 || len(urlSplit[3]) < 3 { + return request{}, errors.New("invalid URL") + } + return request{ + Repo: urlSplit[0], OS: urlSplit[1], Arch: urlSplit[2], File: urlSplit[3], + }, nil +} + +func (req *request) GetUpstreamURL() string { + upstreamURL := strings.Replace(gSettings.UpstreamServer, "$repo", req.Repo, 1) + upstreamURL = strings.Replace(upstreamURL, "$arch", req.Arch, 1) + return upstreamURL + "/" + req.File +} + +func (req *request) GetCachePathName() string { + return path.Join(gSettings.CacheDir, req.File) +} + +func (req *request) GetCacheTempPathName() string { + return path.Join(gSettings.CacheDir, "."+req.File) +} + +/******************************************************************************** + * Methods: CacheEntry + ********************************************************************************/ + +// Create a new CacheEntry with a reference count of 1, an open Compete channel +// and File pointing to a (temporary) cache file where data should be written. +func newCacheEntryFromRequest(req *request) (*cacheEntry, error) { + file, err := os.OpenFile(req.GetCacheTempPathName(), os.O_CREATE|os.O_RDWR|os.O_APPEND, 0666) + if err != nil { + return nil, err + } + return &cacheEntry{ + RefCount: 1, + Complete: make(chan struct{}), + Started: make(chan struct{}), + File: util.NewConcurrentWORMSeekCloser(file), + }, nil +} + +/******************************************************************************** + * Functions + ********************************************************************************/ + +func setupCacheDir() error { + err := os.MkdirAll(gSettings.CacheDir, 0777) + if err != nil && !errors.Is(err, os.ErrExist) { + return err + } + return nil +} + +func destroyCacheDir() error { + return os.RemoveAll(gSettings.CacheDir) +} + +func renameTempFile(req *request, timestamp string) error { + ts, tserr := time.Parse("Mon, 02 Jan 2006 15:04:05 GMT", timestamp) + err := os.Rename(req.GetCacheTempPathName(), req.GetCachePathName()) + if tserr == nil { + os.Chtimes(req.GetCachePathName(), time.Now(), ts) + } + return err +} + +func fileHandlerCached(w http.ResponseWriter, r *http.Request, req *request) { + log.Printf("(%s) Serving cached file", req.File) + cachedData, err := os.Open(req.GetCachePathName()) + if err != nil { + log.Printf("(%s) Failed to open cached file, sending %q, error %s", req.File, http.StatusText(http.StatusInternalServerError), err) + http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + return + } + defer cachedData.Close() + + fileTime := time.Now() + if fileStat, err := cachedData.Stat(); err == nil { + fileTime = fileStat.ModTime() + } + http.ServeContent(w, r, req.File, fileTime, cachedData) +} + +func fileHandlerMissingOrUncacheable(w http.ResponseWriter, _ *http.Request, req *request) { + if req.FileStatus != fileStatusNoCaching { + log.Printf("(%s) Forwarding and saving to cache", req.File) + } else { + log.Printf("(%s) Forwarding uncacheable file", req.File) + } + + resp, err := http.Get(req.GetUpstreamURL()) + if err != nil { + log.Printf("(%s) Failed to query host, sending %q, error: %s", req.File, http.StatusText(http.StatusInternalServerError), err) + http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + return + } else if resp.StatusCode != http.StatusOK { + defer resp.Body.Close() + log.Printf("(%s) Host responded with %d (%s)", req.File, resp.StatusCode, http.StatusText(resp.StatusCode)) + http.Error(w, http.StatusText(resp.StatusCode), resp.StatusCode) + return + } + defer resp.Body.Close() + + for _, h := range headersToForward { + if v := resp.Header.Get(h); v != "" { + w.Header().Set(h, v) + } + } + + if req.FileStatus == fileStatusNoCaching { + _, err = io.Copy(w, resp.Body) + if err == nil { + log.Printf("(%s) Uncacheable file served successfully", req.File) + } else { + log.Printf("(%s) Uncacheable file not served: %s", req.File, err) + } + return + } + + req.CacheEntry.HTTPInfo = w.Header().Clone() + close(req.CacheEntry.Started) + + // Failed writes to the client are ignored so that caching can continue + // then reported at the end + pw := util.NewPerfectWriter(w) + _, err = io.Copy(io.MultiWriter(req.CacheEntry.File, pw), resp.Body) + if err == nil { + err = renameTempFile(req, resp.Header.Get("Last-Modified")) + } + + switch { + case err == nil && pw.Error() == nil: + log.Printf("(%s) File cached and served successfully", req.File) + case err == nil && pw.Error() != nil: + log.Printf("(%s) File cached successfully, but serving to the client failed: %s", req.File, pw.Error()) + default: + log.Printf("(%s) File caching failed: %s", req.File, err) + } +} + +func fileHandlerInDownload(w http.ResponseWriter, _ *http.Request, req *request) { + log.Printf("(%s) Forwarding file in download", req.File) + checkSizeTick := time.NewTicker(time.Second) + var fileSize int64 + done := false + var err error + + <-req.CacheEntry.Started + + for h, v := range req.CacheEntry.HTTPInfo { + w.Header()[h] = v + } + + for !done { + select { + case <-req.CacheEntry.Complete: + done = true + case <-checkSizeTick.C: + } + + var newSize int64 + newSize, err := req.CacheEntry.File.Seek(0, io.SeekCurrent) + if err != nil { + break + } + if newSize <= fileSize { + continue + } + _, err = io.Copy(w, io.NewSectionReader(req.CacheEntry.File, fileSize, newSize-fileSize)) + if err != nil { + break + } + fileSize = newSize + } + + if err != nil { + log.Printf("(%s) Error while serving file in download: %s", req.File, err) + } else { + log.Printf("(%s) File in download served", req.File) + } +} + +func handleRequest(w http.ResponseWriter, r *http.Request, req *request) { + var entry *cacheEntry + var fileStatus fileStatus + + err := sharedState.LockedDo(func(cache map[string]*cacheEntry) error { + _, err := os.Stat(req.GetCachePathName()) + switch { + case strings.HasSuffix(req.File, ".db") || strings.HasSuffix(req.File, ".db.sig"): + fileStatus = fileStatusNoCaching + case errors.Is(err, os.ErrNotExist): + // File does not exists, maybe it is being downloaded + var isInDownload bool + if entry, isInDownload = cache[req.File]; isInDownload { + // File currently in download, request should be handled by tailing the file + // until the download is complete + entry.RefCount++ + fileStatus = fileStatusInDownload + } else { + // File not in download, the request must be handled by downloading the file + entry, err = newCacheEntryFromRequest(req) + if err != nil { + return fmt.Errorf("unable to create cache file: %w", err) + } + cache[req.File] = entry + fileStatus = fileStatusMissing + } + case err == nil: + // File already downloaded, serve it + fileStatus = fileStatusCached + default: + return fmt.Errorf("unable to stat cached file: %w", err) + } + return nil + }) + if err != nil { + log.Printf("(%s) Cache error: %s", req.File, err) + return + } + + cacheCleaner := func(cache map[string]*cacheEntry) error { + if fileStatus == fileStatusMissing { + close(entry.Complete) + } + entry.RefCount-- + if entry.RefCount == 0 { + entry.File.Close() + os.Remove(req.GetCacheTempPathName()) + delete(cache, req.File) + } + return nil + } + if entry != nil { + defer sharedState.LockedDo(cacheCleaner) + } + + req.CacheEntry = entry + req.FileStatus = fileStatus + + fileHandlers[fileStatus](w, r, req) +} + +func handler(w http.ResponseWriter, r *http.Request) { + log.Printf("Request for URL: %s\n", r.URL) + + if r.Method != "GET" { + log.Printf("We don't do %q, sending %q", r.Method, http.StatusText(http.StatusNotImplemented)) + http.Error(w, http.StatusText(http.StatusNotImplemented), http.StatusNotImplemented) + return + } + + req, err := newRequest(r.URL.String()) + if err != nil { + log.Printf("URL invalid, sending %q", http.StatusText(http.StatusBadRequest)) + http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) + return + } + + handleRequest(w, r, &req) +} + +func main() { + flCachePath := flag.String("cache", "", "Cache base path") + flAddr := flag.String("port", ":8080", "Listen on addr") + flUpstream := flag.String("upstream", "https://mirrors.kernel.org/archlinux/$repo/os/$arch", "Upstream URL") + flShowVersion := flag.Bool("version", false, "Show version information") + flKeepCache := flag.Bool("keep-cache", false, "Keep the cache between restarts") + flag.Parse() + + if *flShowVersion { + fmt.Printf("pkgproxy %s\n", version) + return + } + + if len(*flCachePath) > 0 { + gSettings.CacheDir = *flCachePath + } else { + var err error + gSettings.CacheDir, err = os.UserCacheDir() + if err != nil { + log.Fatalf("Unable to determine user cache directory: %s", err) + } + } + gSettings.CacheDir = path.Join(gSettings.CacheDir, "pkgproxy") + gSettings.UpstreamServer = *flUpstream + + var err error + if *flKeepCache { + err = setupCacheDir() + } else { + err = destroyCacheDir() + if err == nil { + err = setupCacheDir() + defer destroyCacheDir() + } + } + if err != nil { + log.Fatalf("Unable to setup cache directory %s: %s", gSettings.CacheDir, err) + } + + log.Printf( + "pkgproxy %s listening on %s, forwarding to %s and storing to %s", + version, + *flAddr, + gSettings.UpstreamServer, + gSettings.CacheDir, + ) + http.HandleFunc("/", handler) + log.Fatal(http.ListenAndServe(*flAddr, nil)) +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..309a6fc --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module github.com/binary-manu/pkgproxy + +go 1.20 diff --git a/pkgproxy.go b/pkgproxy.go deleted file mode 100644 index fd27519..0000000 --- a/pkgproxy.go +++ /dev/null @@ -1,295 +0,0 @@ -/* -pkgproxy is a caching proxy server specifically designed for caching Arch GNU/Linux packages for pacman. - -Usage: - pkgproxy [options] - - Options: - -cache string - Cache base path (default: $XDG_CACHE_HOME) - -keep-cache bool - Keep the cache between restarts - -port string - Listen on addr (default ":8080") - -upstream string - Upstream URL (default "https://mirrors.kernel.org/archlinux/$repo/os/$arch") - -version bool - Show version information -*/ -package main - -import ( - "errors" - "flag" - "fmt" - "io" - "log" - "net/http" - "net/url" - "os" - "path" - "strings" - "sync" - "time" -) - -const version = "1.0.1" - -var CacheMap = make(map[string]string) -var MutexMap = make(map[string]*sync.Mutex) - -type Request struct { - Repo string - OS string - Arch string - File string -} - -type Settings struct { - CacheDir string - UpstreamServer string -} - -var GSettings Settings - -func setupCacheDir() { - err := os.Mkdir(GSettings.CacheDir, 0700) - if err != nil && !os.IsExist(err) { - panic(err) - } -} - -func destroyCacheDir() { - err := os.RemoveAll(GSettings.CacheDir) - if err != nil { - panic(err) - } -} - -func renameTempFile(filename *string) error { - return os.Rename(path.Join(GSettings.CacheDir, "."+*filename), path.Join(GSettings.CacheDir, *filename)) -} - -func removeTempFile(filename *string) error { - return os.Remove(path.Join(GSettings.CacheDir, "."+*filename)) -} - -func buildUpstreamURL(req *Request) string { - upstreamURL := strings.Replace(GSettings.UpstreamServer, "$repo", req.Repo, 1) - upstreamURL = strings.Replace(upstreamURL, "$arch", req.Arch, 1) - return upstreamURL + "/" + req.File -} - -func splitReqURL(requestURL string) (Request, error) { - URLSplit := strings.Split(requestURL, "/")[1:] - if len(URLSplit) < 4 || len(URLSplit[3]) < 3 { - return Request{}, errors.New("invalid URL") - } - return Request{URLSplit[0], URLSplit[1], URLSplit[2], URLSplit[3]}, nil -} - -func buildCacheKey(reqURL *string, resp *http.Response) string { - u, err := url.Parse(*reqURL) - if err != nil { - return "" - } - cacheKey := fmt.Sprintf("%s::", u.Hostname()) - if len(resp.Header.Get("ETag")) > 0 { - cacheKey += strings.Trim(resp.Header.Get("ETag"), "\"") - } else if len(resp.Header.Get("Last-Modified")) > 0 { - cacheKey += resp.Header.Get("Last-Modified") - } else { - cacheKey = "" - } - return cacheKey -} - -func handleRequest(w http.ResponseWriter, r *http.Request, req *Request) { - var isCached, isDB bool - var fileError, respError bool - var resp *http.Response - var file *os.File - var err error - var cacheKey string - - reqURL := buildUpstreamURL(req) - - _, ok := MutexMap[req.File] - if !ok { - MutexMap[req.File] = &sync.Mutex{} - } - MutexMap[req.File].Lock() - defer delete(MutexMap, req.File) - - if strings.HasSuffix(req.File, ".db") { - isDB = true - resp, err = http.Head(reqURL) - if err != nil { - log.Printf("(%s)[Upstream] Failed to query host, sending %q", req.File, http.StatusText(http.StatusInternalServerError)) - http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) - return - } else if resp.StatusCode != http.StatusOK { - defer resp.Body.Close() - log.Printf("(%s)[Upstream] Host responded with %d (%s)", req.File, resp.StatusCode, http.StatusText(resp.StatusCode)) - http.Error(w, http.StatusText(resp.StatusCode), resp.StatusCode) - return - } - defer resp.Body.Close() - cacheKey = buildCacheKey(&reqURL, resp) - } - - if !isDB || (isDB && CacheMap[req.Repo] == cacheKey) { - file, err = os.Open(path.Join(GSettings.CacheDir, req.File)) - if err != nil { - file, err = os.Create(path.Join(GSettings.CacheDir, "."+req.File)) - if err != nil { - } else { - defer file.Close() - } - } else { - defer file.Close() - isCached = true - } - } else { - log.Printf("(%s)[Local] Cached version is outdated, requesting new file", req.File) - file, err = os.Create(path.Join(GSettings.CacheDir, "."+req.File)) - if err != nil { - } else { - defer file.Close() - } - } - - if isCached { - log.Printf("(%s)[Meta] Serving cached version", req.File) - w.Header().Set("Content-Type", "application/octet-stream") - lastmod := time.Time{} - if isDB { - w.Header().Set("Content-Length", resp.Header.Get("Content-Length")) - w.Header().Set("Last-Modified", resp.Header.Get("Last-Modified")) - w.Header().Set("ETag", resp.Header.Get("ETag")) - lastmod, _ = time.Parse(http.TimeFormat, resp.Header.Get("Last-Modified")) - } - http.ServeContent(w, r, req.File, lastmod, file) - } else { - log.Printf("(%s)[Meta] Forwarding and saving to cache", req.File) - resp, err := http.Get(reqURL) - if err != nil { - file.Close() - removeTempFile(&req.File) - log.Printf("(%s)[Upstream] Failed to query host, sending %q", req.File, http.StatusText(http.StatusInternalServerError)) - http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) - return - } else if resp.StatusCode != http.StatusOK { - defer resp.Body.Close() - file.Close() - removeTempFile(&req.File) - log.Printf("(%s)[Upstream] Host responded with %d (%s)", req.File, resp.StatusCode, http.StatusText(resp.StatusCode)) - http.Error(w, http.StatusText(resp.StatusCode), resp.StatusCode) - return - } - defer resp.Body.Close() - w.Header().Set("Content-Length", resp.Header.Get("Content-Length")) - w.Header().Set("Content-Type", "application/octet-stream") - w.Header().Set("Last-Modified", resp.Header.Get("Last-Modified")) - w.Header().Set("ETag", resp.Header.Get("ETag")) - buf := make([]byte, 4096) - for { - n, err := resp.Body.Read(buf) - if err != nil && err != io.EOF { - panic(err) - } - if n == 0 || (fileError && respError) { - break - } - if !fileError { - if _, err := file.Write(buf[:n]); err != nil { - log.Printf("(%s)[Local] %s", req.File, err) - fileError = true - } - } - if !respError { - if _, err := w.Write(buf[:n]); err != nil { - log.Printf("(%s)[Forward] %s", req.File, err) - respError = true - } - } - } - - if !fileError { - err = renameTempFile(&req.File) - if err != nil { - log.Printf("(%s)[Local] Could not rename temp file", req.File) - } else { - log.Printf("(%s)[Local] Successfully cached", req.File) - } - if isDB { - CacheMap[req.Repo] = cacheKey - } - } else { - file.Close() - removeTempFile(&req.File) - log.Printf("(%s)[Local] Could not cache", req.File) - } - if !respError { - log.Printf("(%s)[Forward] Successfully forwarded", req.File) - } else { - log.Printf("(%s)[Forward] Error while forwarding", req.File) - } - } -} - -func handler(w http.ResponseWriter, r *http.Request) { - log.Printf("[Incoming] Request for URL: %s\n", r.URL) - - if r.Method != "GET" { - log.Printf("[Incoming] We don't do %q, sending %q", r.Method, http.StatusText(http.StatusNotImplemented)) - http.Error(w, http.StatusText(http.StatusNotImplemented), http.StatusNotImplemented) - return - } - - req, err := splitReqURL(r.URL.String()) - if err != nil { - log.Printf("[Incoming] URL invalid, sending %q", http.StatusText(http.StatusBadRequest)) - http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) - return - } - - handleRequest(w, r, &req) -} - -func main() { - flCachePath := flag.String("cache", "", "Cache base path") - flAddr := flag.String("port", ":8080", "Listen on addr") - flUpstream := flag.String("upstream", "https://mirrors.kernel.org/archlinux/$repo/os/$arch", "Upstream URL") - flShowVersion := flag.Bool("version", false, "Show version information") - flKeepCache := flag.Bool("keep-cache", false, "Keep the cache between restarts") - flag.Parse() - - if *flShowVersion { - fmt.Printf("pkgproxy %s\n", version) - return - } - - if len(*flCachePath) > 0 { - GSettings.CacheDir = *flCachePath - } else { - var err error - GSettings.CacheDir, err = os.UserCacheDir() - if err != nil { - panic(err) - } - } - GSettings.CacheDir = path.Join(GSettings.CacheDir, "pkgproxy") - GSettings.UpstreamServer = *flUpstream - - if *flKeepCache { - setupCacheDir() - } else { - destroyCacheDir() - setupCacheDir() - defer destroyCacheDir() - } - - http.HandleFunc("/", handler) - log.Fatal(http.ListenAndServe(*flAddr, nil)) -} diff --git a/pkgproxy_test.go b/pkgproxy_test.go deleted file mode 100644 index 6064684..0000000 --- a/pkgproxy_test.go +++ /dev/null @@ -1,33 +0,0 @@ -package main - -import "testing" - -func TestBuildUpstreamURL(t *testing.T) { - GSettings.UpstreamServer = "https://example.org/pub/archlinux/$repo/os/$arch" - - req := Request{"extra", "os", "x86_64", "extra.db"} - url := buildUpstreamURL(&req) - if url != "https://example.org/pub/archlinux/extra/os/x86_64/extra.db" { - t.Error("URL does not match") - } - - req = Request{} - url = buildUpstreamURL(&req) - if url != "https://example.org/pub/archlinux//os//" { - t.Error("URL does not match") - } -} - -func TestSplitReqURL(t *testing.T) { - url, err := splitReqURL("/extra/os/x86_64/abiword-3.0.2-9-x86_64.pkg.tar.xz") - if err != nil { - t.Error("Parsing URL failed") - } else if url.Repo != "extra" || url.OS != "os" || url.Arch != "x86_64" || url.File != "abiword-3.0.2-9-x86_64.pkg.tar.xz" { - t.Error("Parsed URL does not match expected result") - } - - url, err = splitReqURL("") - if err == nil { - t.Error("Parsing URL should have failed") - } -} From f393503d27b47f4e3c851f0f9da5dfe9996111db Mon Sep 17 00:00:00 2001 From: binary-manu <47382117+binary-manu@users.noreply.github.com> Date: Mon, 25 Sep 2023 21:14:00 +0200 Subject: [PATCH 2/2] Handle deadlock between download/send goroutines If the download goroutine exits before closing the Started channel and other goroutines are waiting on that channel, they will deadlock. --- cmd/pkgproxy.go | 35 ++++++++++++++++++++++++++++------- 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/cmd/pkgproxy.go b/cmd/pkgproxy.go index 788130c..12889ea 100644 --- a/cmd/pkgproxy.go +++ b/cmd/pkgproxy.go @@ -55,7 +55,8 @@ type cacheEntry struct { // The following fields should be accessed only after // Started has been closed - HTTPInfo http.Header + HTTPStatus int + HTTPHeader http.Header } type request struct { @@ -148,10 +149,11 @@ func newCacheEntryFromRequest(req *request) (*cacheEntry, error) { return nil, err } return &cacheEntry{ - RefCount: 1, - Complete: make(chan struct{}), - Started: make(chan struct{}), - File: util.NewConcurrentWORMSeekCloser(file), + RefCount: 1, + Complete: make(chan struct{}), + Started: make(chan struct{}), + File: util.NewConcurrentWORMSeekCloser(file), + HTTPStatus: http.StatusInternalServerError, }, nil } @@ -208,11 +210,17 @@ func fileHandlerMissingOrUncacheable(w http.ResponseWriter, _ *http.Request, req if err != nil { log.Printf("(%s) Failed to query host, sending %q, error: %s", req.File, http.StatusText(http.StatusInternalServerError), err) http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + if req.CacheEntry != nil { + req.CacheEntry.HTTPStatus = http.StatusInternalServerError + } return } else if resp.StatusCode != http.StatusOK { defer resp.Body.Close() log.Printf("(%s) Host responded with %d (%s)", req.File, resp.StatusCode, http.StatusText(resp.StatusCode)) http.Error(w, http.StatusText(resp.StatusCode), resp.StatusCode) + if req.CacheEntry != nil { + req.CacheEntry.HTTPStatus = resp.StatusCode + } return } defer resp.Body.Close() @@ -233,7 +241,8 @@ func fileHandlerMissingOrUncacheable(w http.ResponseWriter, _ *http.Request, req return } - req.CacheEntry.HTTPInfo = w.Header().Clone() + req.CacheEntry.HTTPStatus = http.StatusOK + req.CacheEntry.HTTPHeader = w.Header().Clone() close(req.CacheEntry.Started) // Failed writes to the client are ignored so that caching can continue @@ -263,9 +272,16 @@ func fileHandlerInDownload(w http.ResponseWriter, _ *http.Request, req *request) <-req.CacheEntry.Started - for h, v := range req.CacheEntry.HTTPInfo { + for h, v := range req.CacheEntry.HTTPHeader { w.Header()[h] = v } + w.WriteHeader(req.CacheEntry.HTTPStatus) + if req.CacheEntry.HTTPStatus != http.StatusOK { + // Download goroutine failed + log.Printf("(%s) Error while serving file in download: download goroutine failed with status %d", + req.File, req.CacheEntry.HTTPStatus) + return + } for !done { select { @@ -338,6 +354,11 @@ func handleRequest(w http.ResponseWriter, r *http.Request, req *request) { cacheCleaner := func(cache map[string]*cacheEntry) error { if fileStatus == fileStatusMissing { close(entry.Complete) + select { + case <-entry.Started: + default: + close(entry.Started) + } } entry.RefCount-- if entry.RefCount == 0 {