diff --git a/.gitignore b/.gitignore index 08547ed..d73f513 100644 --- a/.gitignore +++ b/.gitignore @@ -1,8 +1,9 @@ bin/clip bin/e2e +bin/harness tmp/* testfs/* *.gob *.clip *.rclip -*.pprof \ No newline at end of file +*.pprofbin/clip-harness-stdgzip diff --git a/Makefile b/Makefile index 2b1e13c..97d4a34 100644 --- a/Makefile +++ b/Makefile @@ -12,4 +12,13 @@ stop: cd hack; okteto down --file okteto.yml e2e: - go build -o ./bin/e2e ./e2e/main.go \ No newline at end of file + go build -o ./bin/e2e ./e2e/main.go + +harness: + go build -o ./bin/harness ./cmd/harness + +verify: harness + ./bin/harness verify + +bench: harness + ./bin/harness bench \ No newline at end of file diff --git a/cmd/harness/bench.go b/cmd/harness/bench.go new file mode 100644 index 0000000..bd921ec --- /dev/null +++ b/cmd/harness/bench.go @@ -0,0 +1,95 @@ +package main + +import ( + "context" + "flag" + "fmt" + "os" + "path/filepath" + + "github.com/beam-cloud/clip/pkg/clip" + "github.com/beam-cloud/clip/pkg/storage" +) + +// runBench benchmarks indexing of a real image across three modes: cold +// sequential, cold parallel, and warm (layer index cache), verifying that all +// modes produce identical output. +func runBench(args []string) error { + fs := flag.NewFlagSet("bench", flag.ExitOnError) + image := fs.String("image", "python:3.11-slim", "image reference to index") + concurrency := fs.Int("concurrency", 4, "layer index concurrency") + cacheDir := fs.String("cache-dir", "", "layer index cache dir (default: fresh temp dir)") + verbose := fs.Bool("v", false, "verbose clip logging") + if err := fs.Parse(args); err != nil { + return err + } + if !*verbose { + clip.SetLogLevel("warn") + } + + ctx := context.Background() + + workDir, err := os.MkdirTemp("", "clip-harness-bench-*") + if err != nil { + return err + } + defer os.RemoveAll(workDir) + + cacheRoot := *cacheDir + if cacheRoot == "" { + cacheRoot = filepath.Join(workDir, "layer-cache") + } + diskCache, err := storage.NewDiskLayerIndexCache(cacheRoot) + if err != nil { + return err + } + + fmt.Printf("==> benchmarking index of %s (concurrency=%d)\n", *image, *concurrency) + + fmt.Println("==> cold sequential run (no layer index cache, concurrency=1)") + coldSequential, err := runIndex(ctx, "bench-cold-seq", *image, workDir, nil, 1) + if err != nil { + return err + } + fmt.Printf(" %v (%s decompressed, %s)\n", coldSequential.duration, humanBytes(coldSequential.uncompressedBytes()), coldSequential.throughput()) + + fmt.Println("==> cold parallel run (no layer index cache)") + coldParallel, err := runIndex(ctx, "bench-cold", *image, workDir, nil, *concurrency) + if err != nil { + return err + } + fmt.Printf(" %v (%s)\n", coldParallel.duration, coldParallel.throughput()) + + fmt.Println("==> populate run (writes layer index cache)") + populate, err := runIndex(ctx, "bench-populate", *image, workDir, &instrumentedLayerCache{inner: diskCache}, *concurrency) + if err != nil { + return err + } + fmt.Printf(" %v (hits=%d puts=%d)\n", populate.duration, populate.cacheHits, populate.cachePuts) + + fmt.Println("==> warm run (layer index cache hits, no layer pulls)") + warm, err := runIndex(ctx, "bench-warm", *image, workDir, &instrumentedLayerCache{inner: diskCache}, *concurrency) + if err != nil { + return err + } + fmt.Printf(" %v (hits=%d puts=%d)\n", warm.duration, warm.cacheHits, warm.cachePuts) + + if err := compareRuns(coldSequential, coldParallel); err != nil { + return fmt.Errorf("parallel cold run output differs from sequential cold run: %w", err) + } + if err := compareRuns(coldParallel, warm); err != nil { + return fmt.Errorf("warm run output differs from cold run: %w", err) + } + + fmt.Println() + fmt.Println("results:") + fmt.Printf(" image decompressed size: %s in %d layers\n", humanBytes(coldSequential.uncompressedBytes()), coldSequential.layerCount()) + fmt.Printf(" cold sequential: %12v (%s)\n", coldSequential.duration, coldSequential.throughput()) + fmt.Printf(" cold parallel: %12v (%s, %.1fx vs sequential)\n", + coldParallel.duration, coldParallel.throughput(), float64(coldSequential.duration)/float64(coldParallel.duration)) + fmt.Printf(" populate: %12v\n", populate.duration) + fmt.Printf(" warm: %12v (%.1fx vs cold sequential, %d/%d layers from cache)\n", + warm.duration, float64(coldSequential.duration)/float64(warm.duration), warm.cacheHits, warm.cacheHits+warm.cachePuts) + fmt.Println(" outputs verified identical across all runs") + return nil +} diff --git a/cmd/harness/image.go b/cmd/harness/image.go new file mode 100644 index 0000000..dae8621 --- /dev/null +++ b/cmd/harness/image.go @@ -0,0 +1,250 @@ +package main + +import ( + "archive/tar" + "bytes" + "compress/gzip" + "fmt" + "io" + "log" + "math/rand" + "net" + "net/http" + "path" + "strings" + "time" + + "github.com/google/go-containerregistry/pkg/name" + "github.com/google/go-containerregistry/pkg/registry" + v1 "github.com/google/go-containerregistry/pkg/v1" + "github.com/google/go-containerregistry/pkg/v1/empty" + "github.com/google/go-containerregistry/pkg/v1/mutate" + "github.com/google/go-containerregistry/pkg/v1/remote" + "github.com/google/go-containerregistry/pkg/v1/tarball" +) + +// referenceTime keeps every tar header in the reference image deterministic. +var referenceTime = time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC) + +// layerEntry describes one tar entry in a reference image layer. +type layerEntry struct { + name string + typeflag byte + content []byte + linkname string + mode int64 +} + +// referenceLayers defines an image that exercises every node type the indexer +// handles: regular files, directories, symlinks, hard links, whiteouts, +// opaque whiteouts, and a payload large enough to span multiple gzip +// checkpoints. +func referenceLayers() [][]layerEntry { + // Deterministic pseudo-random payload (~5 MiB) to force checkpoints + rng := rand.New(rand.NewSource(42)) + payload := make([]byte, 5*1024*1024) + rng.Read(payload) + + return [][]layerEntry{ + // Layer 1: base filesystem + { + {name: "bin/", typeflag: tar.TypeDir, mode: 0755}, + {name: "bin/tool", typeflag: tar.TypeReg, content: []byte("#!/bin/sh\necho tool\n"), mode: 0755}, + {name: "bin/tool-alias", typeflag: tar.TypeLink, linkname: "bin/tool", mode: 0755}, + {name: "etc/", typeflag: tar.TypeDir, mode: 0755}, + {name: "etc/old.conf", typeflag: tar.TypeReg, content: []byte("remove-me"), mode: 0644}, + {name: "etc/keep.conf", typeflag: tar.TypeReg, content: []byte("keep-me"), mode: 0644}, + {name: "opt/", typeflag: tar.TypeDir, mode: 0755}, + {name: "opt/stale.txt", typeflag: tar.TypeReg, content: []byte("stale"), mode: 0644}, + {name: "opt/sub/", typeflag: tar.TypeDir, mode: 0755}, + {name: "opt/sub/deep.txt", typeflag: tar.TypeReg, content: []byte("deep"), mode: 0644}, + {name: "usr/", typeflag: tar.TypeDir, mode: 0755}, + {name: "usr/lib/", typeflag: tar.TypeDir, mode: 0755}, + {name: "usr/lib/big.bin", typeflag: tar.TypeReg, content: payload, mode: 0644}, + }, + // Layer 2: whiteout, opaque whiteout, and file replacement + { + {name: "etc/.wh.old.conf", typeflag: tar.TypeReg, mode: 0644}, + {name: "etc/keep.conf", typeflag: tar.TypeReg, content: []byte("replaced"), mode: 0600}, + {name: "opt/.wh..wh..opq", typeflag: tar.TypeReg, mode: 0644}, + {name: "opt/fresh.txt", typeflag: tar.TypeReg, content: []byte("fresh"), mode: 0644}, + }, + // Layer 3: symlinks and additions + { + {name: "srv/", typeflag: tar.TypeDir, mode: 0755}, + {name: "srv/app.txt", typeflag: tar.TypeReg, content: []byte("app"), mode: 0644}, + {name: "srv/link-to-app", typeflag: tar.TypeSymlink, linkname: "app.txt", mode: 0777}, + {name: "bin/tool2", typeflag: tar.TypeReg, content: []byte("#!/bin/sh\necho tool2\n"), mode: 0755}, + }, + } +} + +// buildLayerBlob produces a deterministic gzipped tarball for the entries. +func buildLayerBlob(entries []layerEntry) ([]byte, error) { + var tarBuf bytes.Buffer + tw := tar.NewWriter(&tarBuf) + for _, entry := range entries { + hdr := &tar.Header{ + Name: entry.name, + Typeflag: entry.typeflag, + Mode: entry.mode, + Linkname: entry.linkname, + ModTime: referenceTime, + Format: tar.FormatPAX, + } + if entry.typeflag == tar.TypeReg { + hdr.Size = int64(len(entry.content)) + } + if err := tw.WriteHeader(hdr); err != nil { + return nil, err + } + if entry.typeflag == tar.TypeReg && len(entry.content) > 0 { + if _, err := tw.Write(entry.content); err != nil { + return nil, err + } + } + } + if err := tw.Close(); err != nil { + return nil, err + } + + var gzBuf bytes.Buffer + gzw := gzip.NewWriter(&gzBuf) + if _, err := io.Copy(gzw, &tarBuf); err != nil { + return nil, err + } + if err := gzw.Close(); err != nil { + return nil, err + } + return gzBuf.Bytes(), nil +} + +// buildReferenceImage composes the reference layers into an OCI image with a +// fixed created time so indexing output is fully reproducible. +func buildReferenceImage() (v1.Image, error) { + img := empty.Image + for i, entries := range referenceLayers() { + blob, err := buildLayerBlob(entries) + if err != nil { + return nil, fmt.Errorf("failed to build layer %d: %w", i, err) + } + layer, err := tarball.LayerFromOpener(func() (io.ReadCloser, error) { + return io.NopCloser(bytes.NewReader(blob)), nil + }) + if err != nil { + return nil, fmt.Errorf("failed to create layer %d: %w", i, err) + } + img, err = mutate.AppendLayers(img, layer) + if err != nil { + return nil, fmt.Errorf("failed to append layer %d: %w", i, err) + } + } + return mutate.CreatedAt(img, v1.Time{Time: referenceTime}) +} + +// localRegistry serves an in-memory OCI registry on a loopback address. +type localRegistry struct { + server *http.Server + listener net.Listener +} + +func startLocalRegistry() (*localRegistry, error) { + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return nil, fmt.Errorf("failed to start local registry listener: %w", err) + } + + server := &http.Server{ + Handler: registry.New(registry.Logger(log.New(io.Discard, "", 0))), + } + go server.Serve(listener) + + return &localRegistry{server: server, listener: listener}, nil +} + +func (r *localRegistry) host() string { + return r.listener.Addr().String() +} + +func (r *localRegistry) close() { + r.server.Close() +} + +// pushReferenceImage builds the reference image and pushes it to the local +// registry, returning its reference. +func pushReferenceImage(reg *localRegistry) (string, error) { + img, err := buildReferenceImage() + if err != nil { + return "", err + } + + imageRef := fmt.Sprintf("%s/harness/reference:latest", reg.host()) + ref, err := name.ParseReference(imageRef) + if err != nil { + return "", err + } + if err := remote.Write(ref, img); err != nil { + return "", fmt.Errorf("failed to push reference image: %w", err) + } + return imageRef, nil +} + +// expectedNode is an independently computed expectation for one path in the +// merged image filesystem. +type expectedNode struct { + kind string // "dir", "file", "symlink" + size int64 + target string +} + +// expectedTree applies OCI overlay semantics to the reference layer +// definitions using a simple map-based implementation, fully independent of +// the indexer code under test. +func expectedTree() map[string]expectedNode { + tree := map[string]expectedNode{ + "/": {kind: "dir"}, + } + + deletePrefix := func(prefix string) { + for p := range tree { + if strings.HasPrefix(p, prefix) { + delete(tree, p) + } + } + } + + for _, entries := range referenceLayers() { + for _, entry := range entries { + clean := path.Clean("/" + strings.TrimPrefix(entry.name, "./")) + base := path.Base(clean) + dir := path.Dir(clean) + + if base == ".wh..wh..opq" { + deletePrefix(dir + "/") + continue + } + if strings.HasPrefix(base, ".wh.") { + victim := path.Join(dir, strings.TrimPrefix(base, ".wh.")) + delete(tree, victim) + deletePrefix(victim + "/") + continue + } + + switch entry.typeflag { + case tar.TypeDir: + tree[clean] = expectedNode{kind: "dir"} + case tar.TypeReg: + tree[clean] = expectedNode{kind: "file", size: int64(len(entry.content))} + case tar.TypeSymlink: + tree[clean] = expectedNode{kind: "symlink", target: entry.linkname, size: int64(len(entry.linkname))} + case tar.TypeLink: + target := path.Clean("/" + strings.TrimPrefix(entry.linkname, "./")) + if targetNode, ok := tree[target]; ok { + tree[clean] = expectedNode{kind: "file", size: targetNode.size} + } + } + } + } + + return tree +} diff --git a/cmd/harness/index.go b/cmd/harness/index.go new file mode 100644 index 0000000..90eb375 --- /dev/null +++ b/cmd/harness/index.go @@ -0,0 +1,227 @@ +package main + +import ( + "context" + "fmt" + "os" + "path/filepath" + "reflect" + "sync/atomic" + "time" + + "github.com/beam-cloud/clip/pkg/clip" + "github.com/beam-cloud/clip/pkg/common" + "github.com/beam-cloud/clip/pkg/storage" +) + +// instrumentedLayerCache wraps a LayerIndexCache and counts hits, misses, and +// puts so runs can assert on cache behavior. +type instrumentedLayerCache struct { + inner storage.LayerIndexCache + hits atomic.Int64 + misses atomic.Int64 + puts atomic.Int64 +} + +func (c *instrumentedLayerCache) GetLayerIndex(ctx context.Context, key string) ([]byte, error) { + data, err := c.inner.GetLayerIndex(ctx, key) + if err == nil && data != nil { + c.hits.Add(1) + } else { + c.misses.Add(1) + } + return data, err +} + +func (c *instrumentedLayerCache) PutLayerIndex(ctx context.Context, key string, data []byte) error { + c.puts.Add(1) + return c.inner.PutLayerIndex(ctx, key, data) +} + +// indexRun captures everything needed to compare and report on one indexing +// pass over an image. +type indexRun struct { + label string + duration time.Duration + indexBytes []byte + metadata *common.ClipArchiveMetadata + cacheHits int64 + cachePuts int64 +} + +// runIndex indexes an image with clip.CreateFromOCIImage and extracts the +// encoded index region plus decoded metadata from the resulting archive. +func runIndex(ctx context.Context, label, imageRef, outputDir string, layerCache storage.LayerIndexCache, concurrency int) (*indexRun, error) { + outputPath := filepath.Join(outputDir, label+".clip") + + started := time.Now() + err := clip.CreateFromOCIImage(ctx, clip.CreateFromOCIImageOptions{ + ImageRef: imageRef, + OutputPath: outputPath, + LayerIndexCache: layerCache, + IndexConcurrency: concurrency, + }) + if err != nil { + return nil, fmt.Errorf("indexing failed (%s): %w", label, err) + } + duration := time.Since(started) + + archiver := clip.NewClipArchiver() + metadata, err := archiver.ExtractMetadata(outputPath) + if err != nil { + return nil, fmt.Errorf("failed to extract metadata (%s): %w", label, err) + } + + archiveBytes, err := os.ReadFile(outputPath) + if err != nil { + return nil, err + } + header := metadata.Header + if header.IndexPos+header.IndexLength > int64(len(archiveBytes)) { + return nil, fmt.Errorf("invalid index region in %s", outputPath) + } + + run := &indexRun{ + label: label, + duration: duration, + indexBytes: archiveBytes[header.IndexPos : header.IndexPos+header.IndexLength], + metadata: metadata, + } + if instrumented, ok := layerCache.(*instrumentedLayerCache); ok { + run.cacheHits = instrumented.hits.Load() + run.cachePuts = instrumented.puts.Load() + } + return run, nil +} + +// uncompressedBytes sums the decompressed sizes of all layers, derived from +// each layer's final gzip checkpoint (recorded at end-of-stream). +func (r *indexRun) uncompressedBytes() int64 { + info, ok := r.metadata.StorageInfo.(common.OCIStorageInfo) + if !ok { + return 0 + } + var total int64 + for _, idx := range info.GzipIdxByLayer { + if idx != nil && len(idx.Checkpoints) > 0 { + total += idx.Checkpoints[len(idx.Checkpoints)-1].UOff + } + } + return total +} + +func (r *indexRun) layerCount() int { + info, ok := r.metadata.StorageInfo.(common.OCIStorageInfo) + if !ok { + return 0 + } + return len(info.Layers) +} + +func (r *indexRun) throughput() string { + secs := r.duration.Seconds() + if secs <= 0 { + return "n/a" + } + return fmt.Sprintf("%.0f MiB/s", float64(r.uncompressedBytes())/(1<<20)/secs) +} + +// compareRuns asserts two runs produced identical results. The index region +// must be byte-identical; storage info is compared by deep equality because +// gob encodes its maps in nondeterministic key order. +// +// Gzip checkpoint compressed offsets (COff) are normalized before comparison: +// they are measured through the gzip reader's input buffering, so their exact +// values depend on network read chunking and differ between independent cold +// runs against real registries. They are advisory seek hints (only consulted +// by the optional UseCheckpoints read path); the uncompressed offsets (UOff), +// which define checkpoint placement, are fully deterministic and compared +// exactly. +func compareRuns(a, b *indexRun) error { + if !reflect.DeepEqual(a.indexBytes, b.indexBytes) { + return fmt.Errorf("index bytes differ between %s (%d bytes) and %s (%d bytes)", + a.label, len(a.indexBytes), b.label, len(b.indexBytes)) + } + + infoA := normalizeStorageInfo(a.metadata.StorageInfo) + infoB := normalizeStorageInfo(b.metadata.StorageInfo) + if !reflect.DeepEqual(infoA, infoB) { + return fmt.Errorf("storage info differs between %s and %s: %s", a.label, b.label, describeStorageInfoDiff(infoA, infoB)) + } + return nil +} + +// normalizeStorageInfo zeroes the buffering-dependent compressed offsets in +// gzip checkpoints so comparisons cover only deterministic fields. +func normalizeStorageInfo(info interface{}) interface{} { + ociInfo, ok := info.(common.OCIStorageInfo) + if !ok { + return info + } + + normalized := ociInfo + normalized.GzipIdxByLayer = make(map[string]*common.GzipIndex, len(ociInfo.GzipIdxByLayer)) + for digest, idx := range ociInfo.GzipIdxByLayer { + if idx == nil { + normalized.GzipIdxByLayer[digest] = nil + continue + } + checkpoints := make([]common.GzipCheckpoint, len(idx.Checkpoints)) + for i, cp := range idx.Checkpoints { + checkpoints[i] = common.GzipCheckpoint{UOff: cp.UOff} + } + normalized.GzipIdxByLayer[digest] = &common.GzipIndex{ + LayerDigest: idx.LayerDigest, + Checkpoints: checkpoints, + } + } + return normalized +} + +// describeStorageInfoDiff returns a human-readable summary of which storage +// info fields differ between two runs. +func describeStorageInfoDiff(a, b interface{}) string { + infoA, okA := a.(common.OCIStorageInfo) + infoB, okB := b.(common.OCIStorageInfo) + if !okA || !okB { + return fmt.Sprintf("type mismatch: %T vs %T", a, b) + } + + var diffs []string + if infoA.RegistryURL != infoB.RegistryURL { + diffs = append(diffs, fmt.Sprintf("RegistryURL: %q vs %q", infoA.RegistryURL, infoB.RegistryURL)) + } + if infoA.Repository != infoB.Repository { + diffs = append(diffs, fmt.Sprintf("Repository: %q vs %q", infoA.Repository, infoB.Repository)) + } + if infoA.Reference != infoB.Reference { + diffs = append(diffs, fmt.Sprintf("Reference: %q vs %q", infoA.Reference, infoB.Reference)) + } + if !reflect.DeepEqual(infoA.Layers, infoB.Layers) { + diffs = append(diffs, fmt.Sprintf("Layers: %v vs %v", infoA.Layers, infoB.Layers)) + } + if !reflect.DeepEqual(infoA.DecompressedHashByLayer, infoB.DecompressedHashByLayer) { + diffs = append(diffs, fmt.Sprintf("DecompressedHashByLayer: %v vs %v", infoA.DecompressedHashByLayer, infoB.DecompressedHashByLayer)) + } + if !reflect.DeepEqual(infoA.GzipIdxByLayer, infoB.GzipIdxByLayer) { + diffs = append(diffs, "GzipIdxByLayer differs") + } + if !reflect.DeepEqual(infoA.ImageMetadata, infoB.ImageMetadata) { + diffs = append(diffs, "ImageMetadata differs") + } + if len(diffs) == 0 { + return "(no field-level diff found)" + } + return fmt.Sprintf("%v", diffs) +} + +func humanBytes(n int64) string { + switch { + case n >= 1<<30: + return fmt.Sprintf("%.2f GiB", float64(n)/(1<<30)) + case n >= 1<<20: + return fmt.Sprintf("%.2f MiB", float64(n)/(1<<20)) + default: + return fmt.Sprintf("%d B", n) + } +} diff --git a/cmd/harness/main.go b/cmd/harness/main.go new file mode 100644 index 0000000..1580076 --- /dev/null +++ b/cmd/harness/main.go @@ -0,0 +1,53 @@ +// harness is a local validation and benchmarking tool for the CLIP OCI +// indexer. +// +// Usage: +// +// harness verify run determinism and correctness checks +// harness bench [-image ref] benchmark cold vs warm indexing of an image +// +// verify builds a reference image in-process, serves it from an in-memory +// registry, and asserts that sequential, parallel, and layer-index-cached +// indexing all produce identical output that matches independently computed +// overlay semantics. +// +// bench indexes a real image cold (sequential and parallel) and warm (layer +// index cache) and reports durations, throughput, and cache activity. +package main + +import ( + "fmt" + "os" +) + +func main() { + if len(os.Args) < 2 { + usage() + os.Exit(2) + } + + var err error + switch os.Args[1] { + case "verify": + err = runVerify(os.Args[2:]) + case "bench": + err = runBench(os.Args[2:]) + default: + usage() + os.Exit(2) + } + + if err != nil { + fmt.Fprintf(os.Stderr, "FAIL: %v\n", err) + os.Exit(1) + } +} + +func usage() { + fmt.Fprint(os.Stderr, `usage: harness [flags] + +commands: + verify determinism and correctness checks against a reference image + bench [-image ref] cold vs warm indexing benchmark against a real image +`) +} diff --git a/cmd/harness/verify.go b/cmd/harness/verify.go new file mode 100644 index 0000000..620fbd8 --- /dev/null +++ b/cmd/harness/verify.go @@ -0,0 +1,169 @@ +package main + +import ( + "context" + "flag" + "fmt" + "os" + "path/filepath" + "sort" + + "github.com/beam-cloud/clip/pkg/clip" + "github.com/beam-cloud/clip/pkg/common" + "github.com/beam-cloud/clip/pkg/storage" +) + +// runVerify asserts that indexing the reference image is deterministic and +// correct across sequential, parallel, and layer-index-cached runs. +func runVerify(args []string) error { + fs := flag.NewFlagSet("verify", flag.ExitOnError) + verbose := fs.Bool("v", false, "verbose clip logging") + if err := fs.Parse(args); err != nil { + return err + } + if !*verbose { + clip.SetLogLevel("warn") + } + + ctx := context.Background() + + workDir, err := os.MkdirTemp("", "clip-harness-verify-*") + if err != nil { + return err + } + defer os.RemoveAll(workDir) + + fmt.Println("==> building reference image and starting in-memory registry") + reg, err := startLocalRegistry() + if err != nil { + return err + } + defer reg.close() + + imageRef, err := pushReferenceImage(reg) + if err != nil { + return err + } + fmt.Printf(" image: %s\n", imageRef) + + fmt.Println("==> run 1: cold index, sequential (concurrency=1)") + sequential, err := runIndex(ctx, "cold-sequential", imageRef, workDir, nil, 1) + if err != nil { + return err + } + fmt.Printf(" %v, index=%d bytes\n", sequential.duration, len(sequential.indexBytes)) + + fmt.Println("==> run 2: cold index, parallel (concurrency=8)") + parallel, err := runIndex(ctx, "cold-parallel", imageRef, workDir, nil, 8) + if err != nil { + return err + } + fmt.Printf(" %v, index=%d bytes\n", parallel.duration, len(parallel.indexBytes)) + + if err := compareRuns(sequential, parallel); err != nil { + return fmt.Errorf("parallel indexing is not deterministic: %w", err) + } + fmt.Println(" OK: sequential and parallel runs are identical") + + diskCache, err := storage.NewDiskLayerIndexCache(filepath.Join(workDir, "layer-cache")) + if err != nil { + return err + } + + fmt.Println("==> run 3: cold index, populating layer index cache") + populate, err := runIndex(ctx, "cache-populate", imageRef, workDir, &instrumentedLayerCache{inner: diskCache}, 8) + if err != nil { + return err + } + fmt.Printf(" %v, cache hits=%d puts=%d\n", populate.duration, populate.cacheHits, populate.cachePuts) + if populate.cacheHits != 0 { + return fmt.Errorf("expected 0 cache hits on populate run, got %d", populate.cacheHits) + } + if populate.cachePuts == 0 { + return fmt.Errorf("expected layer artifacts to be stored on populate run") + } + if err := compareRuns(sequential, populate); err != nil { + return fmt.Errorf("cache-populating run differs from cold run: %w", err) + } + + fmt.Println("==> run 4: warm index from layer index cache") + warm, err := runIndex(ctx, "cache-warm", imageRef, workDir, &instrumentedLayerCache{inner: diskCache}, 8) + if err != nil { + return err + } + fmt.Printf(" %v, cache hits=%d puts=%d\n", warm.duration, warm.cacheHits, warm.cachePuts) + if warm.cacheHits != populate.cachePuts { + return fmt.Errorf("expected %d cache hits on warm run, got %d", populate.cachePuts, warm.cacheHits) + } + if warm.cachePuts != 0 { + return fmt.Errorf("expected no cache puts on warm run, got %d", warm.cachePuts) + } + if err := compareRuns(sequential, warm); err != nil { + return fmt.Errorf("cache-warm run differs from cold run: %w", err) + } + fmt.Println(" OK: warm run identical to cold run, all layers served from cache") + + fmt.Println("==> verifying index against independently computed ground truth") + if err := verifyIndexAgainstExpectedTree(warm.metadata); err != nil { + return err + } + fmt.Println(" OK: index matches ground truth") + + fmt.Println("PASS: all verification checks succeeded") + return nil +} + +// verifyIndexAgainstExpectedTree compares the decoded clip index with overlay +// semantics computed directly from the reference layer definitions. +func verifyIndexAgainstExpectedTree(metadata *common.ClipArchiveMetadata) error { + expected := expectedTree() + + actual := make(map[string]*common.ClipNode, len(expected)) + metadata.Index.Ascend(metadata.Index.Min(), func(item interface{}) bool { + node := item.(*common.ClipNode) + actual[node.Path] = node + return true + }) + + paths := make([]string, 0, len(expected)) + for p := range expected { + paths = append(paths, p) + } + sort.Strings(paths) + + for _, p := range paths { + want := expected[p] + node, ok := actual[p] + if !ok { + return fmt.Errorf("ground truth mismatch: missing path %s", p) + } + + switch want.kind { + case "dir": + if node.NodeType != common.DirNode { + return fmt.Errorf("ground truth mismatch: %s should be dir, got %s", p, node.NodeType) + } + case "file": + if node.NodeType != common.FileNode { + return fmt.Errorf("ground truth mismatch: %s should be file, got %s", p, node.NodeType) + } + if int64(node.Attr.Size) != want.size { + return fmt.Errorf("ground truth mismatch: %s size %d, want %d", p, node.Attr.Size, want.size) + } + case "symlink": + if node.NodeType != common.SymLinkNode { + return fmt.Errorf("ground truth mismatch: %s should be symlink, got %s", p, node.NodeType) + } + if node.Target != want.target { + return fmt.Errorf("ground truth mismatch: %s target %q, want %q", p, node.Target, want.target) + } + } + } + + for p := range actual { + if _, ok := expected[p]; !ok { + return fmt.Errorf("ground truth mismatch: unexpected path %s in index", p) + } + } + return nil +} diff --git a/go.mod b/go.mod index 61f67e6..e6aa407 100644 --- a/go.mod +++ b/go.mod @@ -57,7 +57,7 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect - github.com/klauspost/compress v1.17.11 // indirect + github.com/klauspost/compress v1.17.11 github.com/lufia/plan9stats v0.0.0-20240226150601-1dcf7310316a // indirect github.com/magiconair/properties v1.8.9 // indirect github.com/mattn/go-colorable v0.1.13 // indirect diff --git a/go.sum b/go.sum index 5f979b1..eb4a750 100644 --- a/go.sum +++ b/go.sum @@ -225,6 +225,8 @@ golang.org/x/crypto v0.33.0 h1:IOBPskki6Lysi0lo9qQvbxiQ+FvsCC/YWOecCHAixus= golang.org/x/crypto v0.33.0/go.mod h1:bVdXmD7IV/4GdElGPozy6U7lWdRXA4qyRVGJV57uQ5M= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc= +golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -266,6 +268,8 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.11.0 h1:EMCa6U9S2LtZXLAMoWiR/R8dAQFRqbAitmbJ2UKhoi8= +golang.org/x/tools v0.11.0/go.mod h1:anzJrxPjNtfgiYQYirP2CPGzGLxrH2u2QBhn6Bf3qY8= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/pkg/clip/clip.go b/pkg/clip/clip.go index 2d27d0b..522d384 100644 --- a/pkg/clip/clip.go +++ b/pkg/clip/clip.go @@ -273,23 +273,25 @@ func StoreS3(storeS3Opts StoreS3Options) error { // CreateFromOCIImageOptions configures OCI image indexing type CreateFromOCIImageOptions struct { - ImageRef string // Source image to index (can be local) - StorageImageRef string // Optional: image reference to store in metadata (defaults to ImageRef) - OutputPath string // Path for the metadata-only .clip archive - CheckpointMiB int64 // Gzip checkpoint interval - CredProvider interface{} // Optional registry credential provider - ProgressChan chan<- OCIIndexProgress - Platform *v1.Platform - ContentCache storage.ContentCache // Optional cache to warm with decompressed layer streams - ContentCacheDir string // Optional temp directory for layer cache upload spooling + ImageRef string // Source image to index (can be local) + StorageImageRef string // Optional: image reference to store in metadata (defaults to ImageRef) + OutputPath string // Path for the metadata-only .clip archive + CheckpointMiB int64 // Gzip checkpoint interval + CredProvider interface{} // Optional registry credential provider + ProgressChan chan<- OCIIndexProgress + Platform *v1.Platform + ContentCache storage.ContentCache // Optional cache to warm with decompressed layer streams + ContentCacheDir string // Optional temp directory for layer cache upload spooling + LayerIndexCache storage.LayerIndexCache // Optional per-layer index artifact cache (skips pull+index on hit) + IndexConcurrency int // Max layers indexed concurrently (default 4) } // CreateFromOCIImage creates a metadata-only index (.clip) file from an OCI image func CreateFromOCIImage(ctx context.Context, options CreateFromOCIImageOptions) error { if options.StorageImageRef != "" && options.StorageImageRef != options.ImageRef { - log.Info().Msgf("creating OCI archive index: indexing from %s, storing reference to %s", options.ImageRef, options.StorageImageRef) + log.Debug().Msgf("creating OCI archive index: indexing from %s, storing reference to %s", options.ImageRef, options.StorageImageRef) } else { - log.Info().Msgf("creating OCI archive index from %s to %s", options.ImageRef, options.OutputPath) + log.Debug().Msgf("creating OCI archive index from %s to %s", options.ImageRef, options.OutputPath) } if options.CheckpointMiB == 0 { @@ -306,28 +308,30 @@ func CreateFromOCIImage(ctx context.Context, options CreateFromOCIImageOptions) archiver := NewClipArchiver() err := archiver.CreateFromOCI(ctx, IndexOCIImageOptions{ - ImageRef: options.ImageRef, - StorageImageRef: options.StorageImageRef, - CheckpointMiB: options.CheckpointMiB, - CredProvider: credProvider, - ProgressChan: options.ProgressChan, - Platform: options.Platform, - ContentCache: options.ContentCache, - ContentCacheDir: options.ContentCacheDir, + ImageRef: options.ImageRef, + StorageImageRef: options.StorageImageRef, + CheckpointMiB: options.CheckpointMiB, + CredProvider: credProvider, + ProgressChan: options.ProgressChan, + Platform: options.Platform, + ContentCache: options.ContentCache, + ContentCacheDir: options.ContentCacheDir, + LayerIndexCache: options.LayerIndexCache, + IndexConcurrency: options.IndexConcurrency, }, options.OutputPath) if err != nil { return err } - log.Info().Msg("OCI archive index created successfully") + log.Debug().Msg("OCI archive index created successfully") return nil } // CreateAndUploadOCIArchive creates an OCI index and uploads metadata to S3 // This combines indexing with remote storage upload func CreateAndUploadOCIArchive(ctx context.Context, options CreateFromOCIImageOptions, si common.ClipStorageInfo) error { - log.Info().Msgf("creating and uploading OCI archive index from %s", options.ImageRef) + log.Debug().Msgf("creating and uploading OCI archive index from %s", options.ImageRef) // Create the OCI index locally err := CreateFromOCIImage(ctx, options) @@ -355,7 +359,7 @@ func CreateAndUploadOCIArchive(ctx context.Context, options CreateFromOCIImageOp return fmt.Errorf("failed to create remote archive: %w", err) } - log.Info().Msg("OCI archive index uploaded successfully") + log.Debug().Msg("OCI archive index uploaded successfully") } return nil diff --git a/pkg/clip/layer_artifact.go b/pkg/clip/layer_artifact.go new file mode 100644 index 0000000..b67d028 --- /dev/null +++ b/pkg/clip/layer_artifact.go @@ -0,0 +1,127 @@ +package clip + +import ( + "bytes" + "encoding/gob" + "fmt" + "strings" + + "github.com/beam-cloud/clip/pkg/common" + "github.com/tidwall/btree" +) + +// LayerArtifactVersion is baked into layer-index cache keys so that any change +// to the artifact format or indexing semantics invalidates previously cached +// artifacts. +const LayerArtifactVersion = 1 + +// LayerEntryKind describes the type of operation a layer entry applies to the +// merged image index. +type LayerEntryKind uint8 + +const ( + // LayerEntryNode adds (or replaces) a file/dir/symlink node. + LayerEntryNode LayerEntryKind = iota + // LayerEntryWhiteout removes a path (and any children) added by lower layers. + LayerEntryWhiteout + // LayerEntryOpaqueWhiteout removes all children of a directory added by lower layers. + LayerEntryOpaqueWhiteout + // LayerEntryHardLink adds a node whose attributes are copied from an + // existing path in the index at merge time. + LayerEntryHardLink +) + +// LayerEntry is a single ordered operation recorded while walking a layer tar +// stream. Replaying entries in order against an index reproduces exactly the +// same result as indexing the layer directly. +type LayerEntry struct { + Kind LayerEntryKind + Node *common.ClipNode // set for LayerEntryNode + Path string // whiteout victim, opaque dir, or hardlink path + Target string // hardlink target path +} + +// LayerArtifact is a self-contained, serializable result of indexing a single +// OCI layer. It is fully deterministic given the compressed layer blob and the +// checkpoint interval, which makes it safe to cache keyed by layer digest. +type LayerArtifact struct { + Version int + LayerDigest string + CheckpointMiB int64 + Entries []LayerEntry + Checkpoints []common.GzipCheckpoint + DecompressedHash string + UncompressedSize int64 +} + +// LayerArtifactCacheKey returns the deterministic cache key for a layer's +// index artifact. The key incorporates the artifact format version and the +// checkpoint interval, since both affect artifact contents. +func LayerArtifactCacheKey(layerDigest string, checkpointMiB int64) string { + digest := strings.ReplaceAll(layerDigest, ":", "_") + return fmt.Sprintf("clip-layer-index/v%d/cp%d/%s", LayerArtifactVersion, checkpointMiB, digest) +} + +// EncodeLayerArtifact serializes an artifact with gob. The artifact contains +// only slices and fixed-size structs (no maps), so encoding is deterministic. +func EncodeLayerArtifact(artifact *LayerArtifact) ([]byte, error) { + var buf bytes.Buffer + if err := gob.NewEncoder(&buf).Encode(artifact); err != nil { + return nil, fmt.Errorf("failed to encode layer artifact: %w", err) + } + return buf.Bytes(), nil +} + +// DecodeLayerArtifact deserializes an artifact and validates it against the +// expected layer digest and checkpoint interval. Returns an error for any +// mismatch so callers can treat corrupt/stale cache entries as misses. +func DecodeLayerArtifact(data []byte, expectedDigest string, expectedCheckpointMiB int64) (*LayerArtifact, error) { + var artifact LayerArtifact + if err := gob.NewDecoder(bytes.NewReader(data)).Decode(&artifact); err != nil { + return nil, fmt.Errorf("failed to decode layer artifact: %w", err) + } + if artifact.Version != LayerArtifactVersion { + return nil, fmt.Errorf("layer artifact version mismatch: got %d, want %d", artifact.Version, LayerArtifactVersion) + } + if artifact.LayerDigest != expectedDigest { + return nil, fmt.Errorf("layer artifact digest mismatch: got %s, want %s", artifact.LayerDigest, expectedDigest) + } + if artifact.CheckpointMiB != expectedCheckpointMiB { + return nil, fmt.Errorf("layer artifact checkpoint interval mismatch: got %d, want %d", artifact.CheckpointMiB, expectedCheckpointMiB) + } + if artifact.DecompressedHash == "" { + return nil, fmt.Errorf("layer artifact missing decompressed hash") + } + return &artifact, nil +} + +// applyLayerArtifact replays a layer's entries, in order, against the shared +// image index. This reproduces the exact semantics of indexing the layer +// directly: whiteouts delete lower-layer paths, later entries replace earlier +// ones, and hard links copy attributes from the current index state. +func (ca *ClipArchiver) applyLayerArtifact(index *btree.BTree, artifact *LayerArtifact) { + for i := range artifact.Entries { + entry := &artifact.Entries[i] + switch entry.Kind { + case LayerEntryNode: + if entry.Node != nil { + index.Set(entry.Node) + } + case LayerEntryWhiteout: + ca.deleteNode(index, entry.Path) + case LayerEntryOpaqueWhiteout: + ca.deleteRange(index, entry.Path+"/") + case LayerEntryHardLink: + targetNode := index.Get(&common.ClipNode{Path: entry.Target}) + if targetNode != nil { + tn := targetNode.(*common.ClipNode) + index.Set(&common.ClipNode{ + Path: entry.Path, + NodeType: common.FileNode, + Attr: tn.Attr, + Remote: tn.Remote, + }) + } + } + } +} diff --git a/pkg/clip/layer_artifact_test.go b/pkg/clip/layer_artifact_test.go new file mode 100644 index 0000000..cc24280 --- /dev/null +++ b/pkg/clip/layer_artifact_test.go @@ -0,0 +1,181 @@ +package clip + +import ( + "archive/tar" + "bytes" + "compress/gzip" + "context" + "io" + "testing" + + "github.com/beam-cloud/clip/pkg/common" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type tarEntry struct { + name string + typeflag byte + content string + linkname string +} + +func buildLayer(t *testing.T, entries []tarEntry) []byte { + t.Helper() + + var tarBuf bytes.Buffer + tw := tar.NewWriter(&tarBuf) + for _, e := range entries { + hdr := &tar.Header{ + Name: e.name, + Typeflag: e.typeflag, + Mode: 0644, + Linkname: e.linkname, + } + if e.typeflag == tar.TypeDir { + hdr.Mode = 0755 + } + if e.typeflag == tar.TypeReg { + hdr.Size = int64(len(e.content)) + } + require.NoError(t, tw.WriteHeader(hdr)) + if e.typeflag == tar.TypeReg && len(e.content) > 0 { + _, err := tw.Write([]byte(e.content)) + require.NoError(t, err) + } + } + require.NoError(t, tw.Close()) + + var gzBuf bytes.Buffer + gzw := gzip.NewWriter(&gzBuf) + _, err := io.Copy(gzw, &tarBuf) + require.NoError(t, err) + require.NoError(t, gzw.Close()) + + return gzBuf.Bytes() +} + +func indexLayerHelper(t *testing.T, archiver *ClipArchiver, compressed []byte, digest string) *LayerArtifact { + t.Helper() + artifact, err := archiver.indexLayerToArtifact( + context.Background(), + io.NopCloser(bytes.NewReader(compressed)), + digest, + IndexOCIImageOptions{CheckpointMiB: 2}, + ) + require.NoError(t, err) + return artifact +} + +func indexPaths(index interface { + Ascend(pivot interface{}, iter func(item interface{}) bool) + Min() interface{} +}) map[string]*common.ClipNode { + nodes := map[string]*common.ClipNode{} + index.Ascend(index.Min(), func(a interface{}) bool { + n := a.(*common.ClipNode) + nodes[n.Path] = n + return true + }) + return nodes +} + +func TestLayerArtifactRoundTripDeterminism(t *testing.T) { + archiver := NewClipArchiver() + + layer := buildLayer(t, []tarEntry{ + {name: "dir/", typeflag: tar.TypeDir}, + {name: "dir/a.txt", typeflag: tar.TypeReg, content: "hello"}, + {name: "dir/b.txt", typeflag: tar.TypeReg, content: "world"}, + {name: "link", typeflag: tar.TypeSymlink, linkname: "dir/a.txt"}, + {name: "hard", typeflag: tar.TypeLink, linkname: "dir/a.txt"}, + }) + + artifact := indexLayerHelper(t, archiver, layer, "sha256:layer1") + + encoded1, err := EncodeLayerArtifact(artifact) + require.NoError(t, err) + encoded2, err := EncodeLayerArtifact(artifact) + require.NoError(t, err) + assert.Equal(t, encoded1, encoded2, "artifact encoding must be deterministic") + + decoded, err := DecodeLayerArtifact(encoded1, "sha256:layer1", 2) + require.NoError(t, err) + assert.Equal(t, artifact.DecompressedHash, decoded.DecompressedHash) + assert.Equal(t, artifact.UncompressedSize, decoded.UncompressedSize) + assert.Equal(t, len(artifact.Entries), len(decoded.Entries)) + assert.Equal(t, artifact.Checkpoints, decoded.Checkpoints) + + // Applying the original and the decoded artifact must produce identical indexes + idx1 := archiver.newIndex() + archiver.applyLayerArtifact(idx1, artifact) + idx2 := archiver.newIndex() + archiver.applyLayerArtifact(idx2, decoded) + + bytes1, err := archiver.EncodeIndex(idx1) + require.NoError(t, err) + bytes2, err := archiver.EncodeIndex(idx2) + require.NoError(t, err) + assert.Equal(t, bytes1, bytes2, "replayed index must be byte-identical") + + // Validate index content + nodes := indexPaths(idx1) + require.Contains(t, nodes, "/dir/a.txt") + require.Contains(t, nodes, "/hard") + assert.Equal(t, nodes["/dir/a.txt"].Attr.Ino, nodes["/hard"].Attr.Ino, "hardlink copies target attrs") + assert.Equal(t, common.SymLinkNode, nodes["/link"].NodeType) +} + +func TestLayerArtifactDecodeValidation(t *testing.T) { + archiver := NewClipArchiver() + layer := buildLayer(t, []tarEntry{{name: "f.txt", typeflag: tar.TypeReg, content: "x"}}) + artifact := indexLayerHelper(t, archiver, layer, "sha256:abc") + + encoded, err := EncodeLayerArtifact(artifact) + require.NoError(t, err) + + _, err = DecodeLayerArtifact(encoded, "sha256:other", 2) + assert.Error(t, err, "digest mismatch must be rejected") + + _, err = DecodeLayerArtifact(encoded, "sha256:abc", 4) + assert.Error(t, err, "checkpoint interval mismatch must be rejected") + + _, err = DecodeLayerArtifact([]byte("garbage"), "sha256:abc", 2) + assert.Error(t, err, "corrupt data must be rejected") +} + +func TestLayerArtifactWhiteoutSemantics(t *testing.T) { + archiver := NewClipArchiver() + + lower := buildLayer(t, []tarEntry{ + {name: "etc/", typeflag: tar.TypeDir}, + {name: "etc/old.conf", typeflag: tar.TypeReg, content: "old"}, + {name: "opt/", typeflag: tar.TypeDir}, + {name: "opt/keep.txt", typeflag: tar.TypeReg, content: "keep"}, + {name: "opt/sub/", typeflag: tar.TypeDir}, + {name: "opt/sub/x.txt", typeflag: tar.TypeReg, content: "x"}, + }) + upper := buildLayer(t, []tarEntry{ + // Whiteout removes /etc/old.conf + {name: "etc/.wh.old.conf", typeflag: tar.TypeReg}, + // Opaque whiteout clears /opt then re-adds one file + {name: "opt/.wh..wh..opq", typeflag: tar.TypeReg}, + {name: "opt/new.txt", typeflag: tar.TypeReg, content: "new"}, + }) + + lowerArt := indexLayerHelper(t, archiver, lower, "sha256:lower") + upperArt := indexLayerHelper(t, archiver, upper, "sha256:upper") + + index := archiver.newIndex() + archiver.applyLayerArtifact(index, lowerArt) + archiver.applyLayerArtifact(index, upperArt) + + nodes := indexPaths(index) + assert.NotContains(t, nodes, "/etc/old.conf", "whiteout must remove lower-layer file") + assert.Contains(t, nodes, "/etc") + assert.NotContains(t, nodes, "/opt/keep.txt", "opaque whiteout must clear lower-layer dir contents") + assert.NotContains(t, nodes, "/opt/sub/x.txt") + assert.NotContains(t, nodes, "/opt/sub") + assert.Contains(t, nodes, "/opt/new.txt", "same-layer entries after opaque whiteout survive") + assert.Contains(t, nodes, "/opt") +} diff --git a/pkg/clip/layer_blob_cache.go b/pkg/clip/layer_blob_cache.go new file mode 100644 index 0000000..b639d19 --- /dev/null +++ b/pkg/clip/layer_blob_cache.go @@ -0,0 +1,221 @@ +package clip + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "hash" + "io" + "strings" + + "github.com/beam-cloud/clip/pkg/storage" + v1 "github.com/google/go-containerregistry/pkg/v1" + log "github.com/rs/zerolog/log" +) + +const contentCacheBlobReadChunkSize = 4 * 1024 * 1024 + +// compressedLayerCacheKey returns the content-cache key for a compressed +// layer blob. OCI layer digests are the sha256 of the compressed bytes, so +// the hex digest doubles as the content-addressed cache key. +func compressedLayerCacheKey(layerDigest string) string { + return strings.TrimPrefix(layerDigest, "sha256:") +} + +// contentCacheExistsWithSize performs a size-aware existence check when the +// cache supports it. Only a size-aware positive answer is trusted, since it +// guarantees the cached blob is complete. +func contentCacheExistsWithSize(cache storage.ContentCache, key string, size int64) bool { + if cache == nil || size <= 0 { + return false + } + sizeCache, ok := cache.(storage.ContentCacheExistsWithSize) + if !ok { + return false + } + exists, err := sizeCache.ContentExistsWithSize(key, size, struct{ RoutingKey string }{RoutingKey: key}) + if err != nil { + log.Debug().Err(err).Str("key", key).Msg("content cache exists check failed") + return false + } + return exists +} + +// contentCacheBlobReader streams a content-cache blob sequentially with +// read-ahead buffering, so the gzip reader's small reads don't translate into +// per-read cache RPCs. +type contentCacheBlobReader struct { + cache storage.ContentCache + readInto storage.ContentCacheReadInto + key string + size int64 + offset int64 + buf []byte + bufPos int +} + +func newContentCacheBlobReader(cache storage.ContentCache, key string, size int64) *contentCacheBlobReader { + r := &contentCacheBlobReader{cache: cache, key: key, size: size} + if ri, ok := cache.(storage.ContentCacheReadInto); ok { + r.readInto = ri + } + return r +} + +func (r *contentCacheBlobReader) refill() error { + remaining := r.size - r.offset + if remaining <= 0 { + return io.EOF + } + length := int64(contentCacheBlobReadChunkSize) + if length > remaining { + length = remaining + } + + opts := struct{ RoutingKey string }{RoutingKey: r.key} + if r.readInto != nil { + if cap(r.buf) < int(length) { + r.buf = make([]byte, length) + } + r.buf = r.buf[:length] + n, err := r.readInto.ReadContentInto(r.key, r.offset, r.buf, opts) + if err != nil { + return err + } + if n != length { + return fmt.Errorf("short content cache read: expected %d bytes, got %d", length, n) + } + } else { + data, err := r.cache.GetContent(r.key, r.offset, length, opts) + if err != nil { + return err + } + if int64(len(data)) != length { + return fmt.Errorf("short content cache read: expected %d bytes, got %d", length, len(data)) + } + r.buf = data + } + + r.offset += length + r.bufPos = 0 + return nil +} + +func (r *contentCacheBlobReader) Read(p []byte) (int, error) { + if r.bufPos >= len(r.buf) { + if err := r.refill(); err != nil { + return 0, err + } + } + n := copy(p, r.buf[r.bufPos:]) + r.bufPos += n + return n, nil +} + +func (r *contentCacheBlobReader) Close() error { return nil } + +// hashingReadCloser hashes every byte read from the underlying reader so the +// stream can be verified against the layer digest after consumption. +type hashingReadCloser struct { + rc io.ReadCloser + hasher hash.Hash +} + +func newHashingReadCloser(rc io.ReadCloser) *hashingReadCloser { + return &hashingReadCloser{rc: rc, hasher: sha256.New()} +} + +func (h *hashingReadCloser) Read(p []byte) (int, error) { + n, err := h.rc.Read(p) + if n > 0 { + h.hasher.Write(p[:n]) + } + return n, err +} + +func (h *hashingReadCloser) Close() error { return h.rc.Close() } + +func (h *hashingReadCloser) sum() string { + return hex.EncodeToString(h.hasher.Sum(nil)) +} + +// indexLayerFromBestSource indexes a layer using the cheapest available +// source of its compressed bytes: +// +// 1. The content cache, keyed by the layer digest (the same content-addressed +// mechanism used for layer caching at runtime). Hit -> no registry pull. +// 2. The registry. The compressed bytes are then warmed into the content +// cache so other workers skip the registry pull next time. +func (ca *ClipArchiver) indexLayerFromBestSource( + ctx context.Context, + layer v1.Layer, + layerDigest string, + opts IndexOCIImageOptions, +) (*LayerArtifact, error) { + blobKey := compressedLayerCacheKey(layerDigest) + + // Source 1: compressed blob from the content cache + if opts.ContentCache != nil { + compressedSize, err := layer.Size() + if err == nil && contentCacheExistsWithSize(opts.ContentCache, blobKey, compressedSize) { + reader := newHashingReadCloser(newContentCacheBlobReader(opts.ContentCache, blobKey, compressedSize)) + artifact, err := ca.indexLayerToArtifact(ctx, reader, layerDigest, opts) + if err == nil { + // Consume any trailing bytes the gzip reader didn't request, + // then verify the full blob hash against the layer digest. + if _, drainErr := io.Copy(io.Discard, reader); drainErr == nil && reader.sum() == blobKey { + log.Debug(). + Str("layer_digest", layerDigest). + Int64("compressed_bytes", compressedSize). + Msg("compressed layer cache hit: indexed without registry pull") + return artifact, nil + } + log.Warn(). + Str("layer_digest", layerDigest). + Msg("compressed layer cache content failed digest verification; falling back to registry") + } else { + log.Warn(). + Err(err). + Str("layer_digest", layerDigest). + Msg("failed to index layer from content cache; falling back to registry") + } + } + } + + // Source 2: registry pull (+ warm the compressed blob into the cache) + compressedRC, err := layer.Compressed() + if err != nil { + return nil, fmt.Errorf("failed to get compressed layer: %w", err) + } + defer compressedRC.Close() + + var blobSpool *indexedLayerContentCacheSpool + var source io.Reader = compressedRC + if opts.ContentCache != nil { + blobSpool = newIndexedLayerContentCacheSpool(opts.ContentCacheDir, layerDigest) + if blobSpool != nil { + defer blobSpool.closeAndRemove() + source = io.TeeReader(compressedRC, blobSpool) + } + } + + artifact, err := ca.indexLayerToArtifact(ctx, io.NopCloser(source), layerDigest, opts) + if err != nil { + return nil, err + } + + if blobSpool != nil && blobSpool.err == nil && blobSpool.path != "" { + // Drain trailing compressed bytes the gzip reader didn't consume so + // the spool contains the complete blob. + if _, err := io.Copy(io.Discard, source); err != nil { + log.Warn().Err(err).Str("layer_digest", layerDigest).Msg("failed to drain compressed layer stream for cache warm") + } else if err := blobSpool.close(); err != nil { + log.Warn().Err(err).Str("layer_digest", layerDigest).Msg("failed to finalize compressed layer spool") + } else if err := ca.storeLayerBlobInContentCache(ctx, opts.ContentCache, blobSpool.path, blobKey, layerDigest, "compressed layer"); err != nil { + log.Warn().Err(err).Str("layer_digest", layerDigest).Msg("failed to store compressed layer in content cache") + } + } + + return artifact, nil +} diff --git a/pkg/clip/layer_blob_cache_test.go b/pkg/clip/layer_blob_cache_test.go new file mode 100644 index 0000000..4fa5520 --- /dev/null +++ b/pkg/clip/layer_blob_cache_test.go @@ -0,0 +1,188 @@ +package clip + +import ( + "archive/tar" + "bytes" + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "io" + "log" + "net/http" + "net/http/httptest" + "os" + "strings" + "sync" + "testing" + + "github.com/google/go-containerregistry/pkg/name" + "github.com/google/go-containerregistry/pkg/registry" + v1 "github.com/google/go-containerregistry/pkg/v1" + "github.com/google/go-containerregistry/pkg/v1/empty" + "github.com/google/go-containerregistry/pkg/v1/mutate" + "github.com/google/go-containerregistry/pkg/v1/remote" + "github.com/google/go-containerregistry/pkg/v1/tarball" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// fakeBlobContentCache is an in-memory content-addressed cache implementing +// the optional extensions clip uses during indexing. +type fakeBlobContentCache struct { + mu sync.Mutex + blobs map[string][]byte +} + +func newFakeBlobContentCache() *fakeBlobContentCache { + return &fakeBlobContentCache{blobs: map[string][]byte{}} +} + +func (c *fakeBlobContentCache) GetContent(hash string, offset int64, length int64, opts struct{ RoutingKey string }) ([]byte, error) { + c.mu.Lock() + defer c.mu.Unlock() + data, ok := c.blobs[hash] + if !ok || offset+length > int64(len(data)) { + return nil, fmt.Errorf("content not found: %s", hash) + } + return data[offset : offset+length], nil +} + +func (c *fakeBlobContentCache) ReadContentInto(hash string, offset int64, dest []byte, opts struct{ RoutingKey string }) (int64, error) { + data, err := c.GetContent(hash, offset, int64(len(dest)), opts) + if err != nil { + return 0, err + } + return int64(copy(dest, data)), nil +} + +func (c *fakeBlobContentCache) StoreContent(chunks chan []byte, hash string, opts struct{ RoutingKey string }) (string, error) { + var buf bytes.Buffer + for chunk := range chunks { + buf.Write(chunk) + } + return c.put(buf.Bytes()), nil +} + +func (c *fakeBlobContentCache) StoreContentFromLocalPath(path string, hash string, opts struct{ RoutingKey string }) (string, error) { + data, err := os.ReadFile(path) + if err != nil { + return "", err + } + return c.put(data), nil +} + +func (c *fakeBlobContentCache) ContentExistsWithSize(hash string, size int64, opts struct{ RoutingKey string }) (bool, error) { + c.mu.Lock() + defer c.mu.Unlock() + data, ok := c.blobs[hash] + return ok && int64(len(data)) == size, nil +} + +func (c *fakeBlobContentCache) put(data []byte) string { + sum := sha256.Sum256(data) + hash := hex.EncodeToString(sum[:]) + c.mu.Lock() + defer c.mu.Unlock() + c.blobs[hash] = append([]byte(nil), data...) + return hash +} + +func (c *fakeBlobContentCache) has(hash string) bool { + c.mu.Lock() + defer c.mu.Unlock() + _, ok := c.blobs[hash] + return ok +} + +// TestCompressedLayerContentCacheReadThrough verifies that indexing warms +// compressed layer blobs into the content cache (keyed by layer digest) and +// that subsequent indexing reads layers from the cache instead of the +// registry, producing identical output. +func TestCompressedLayerContentCacheReadThrough(t *testing.T) { + // Build a 2-layer synthetic image + img := empty.Image + for layerIdx := 0; layerIdx < 2; layerIdx++ { + entries := []tarEntry{ + {name: fmt.Sprintf("dir%d/", layerIdx), typeflag: tar.TypeDir}, + {name: fmt.Sprintf("dir%d/file.txt", layerIdx), typeflag: tar.TypeReg, content: strings.Repeat("x", 4096)}, + } + blob := buildLayer(t, entries) + layer, err := tarball.LayerFromOpener(func() (io.ReadCloser, error) { + return io.NopCloser(bytes.NewReader(blob)), nil + }) + require.NoError(t, err) + img, err = mutate.AppendLayers(img, layer) + require.NoError(t, err) + } + + layers, err := img.Layers() + require.NoError(t, err) + layerDigests := make([]string, 0, len(layers)) + for _, l := range layers { + d, err := l.Digest() + require.NoError(t, err) + layerDigests = append(layerDigests, d.String()) + } + + // Serve via in-memory registry, counting layer blob fetches + var mu sync.Mutex + layerBlobGets := 0 + inner := registry.New(registry.Logger(log.New(io.Discard, "", 0))) + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodGet && strings.Contains(r.URL.Path, "/blobs/") { + for _, d := range layerDigests { + if strings.Contains(r.URL.Path, d) { + mu.Lock() + layerBlobGets++ + mu.Unlock() + } + } + } + inner.ServeHTTP(w, r) + })) + defer srv.Close() + + imageRef := strings.TrimPrefix(srv.URL, "http://") + "/test/blobcache:latest" + ref, err := name.ParseReference(imageRef) + require.NoError(t, err) + require.NoError(t, remote.Write(ref, img)) + + archiver := NewClipArchiver() + cache := newFakeBlobContentCache() + opts := IndexOCIImageOptions{ + ImageRef: imageRef, + CheckpointMiB: 2, + ContentCache: cache, + ContentCacheDir: t.TempDir(), + Platform: &v1.Platform{OS: "linux", Architecture: "amd64"}, + } + + // Run 1: cold — layers pulled from registry, compressed blobs warmed + idx1, _, _, hashes1, _, _, _, _, err := archiver.IndexOCIImage(context.Background(), opts) + require.NoError(t, err) + mu.Lock() + getsAfterCold := layerBlobGets + mu.Unlock() + require.Greater(t, getsAfterCold, 0, "cold run must pull layers from registry") + + for _, d := range layerDigests { + assert.True(t, cache.has(compressedLayerCacheKey(d)), "compressed blob %s must be warmed into content cache", d) + } + + // Run 2: layers must come from the content cache, not the registry + idx2, _, _, hashes2, _, _, _, _, err := archiver.IndexOCIImage(context.Background(), opts) + require.NoError(t, err) + mu.Lock() + getsAfterWarm := layerBlobGets + mu.Unlock() + assert.Equal(t, getsAfterCold, getsAfterWarm, "warm run must not fetch layer blobs from registry") + + // Outputs identical + bytes1, err := archiver.EncodeIndex(idx1) + require.NoError(t, err) + bytes2, err := archiver.EncodeIndex(idx2) + require.NoError(t, err) + assert.Equal(t, bytes1, bytes2, "index must be identical regardless of layer source") + assert.Equal(t, hashes1, hashes2) +} diff --git a/pkg/clip/oci_hash_consistency_test.go b/pkg/clip/oci_hash_consistency_test.go index ea9a6b3..e1b2ccc 100644 --- a/pkg/clip/oci_hash_consistency_test.go +++ b/pkg/clip/oci_hash_consistency_test.go @@ -193,16 +193,17 @@ func TestRealLayerHashConsistency(t *testing.T) { // Test using the actual indexing code archiver := &ClipArchiver{} index := archiver.newIndex() - - gzipIndex, indexedHash, err := archiver.indexLayerOptimized( + + artifact, err := archiver.indexLayerToArtifact( context.Background(), io.NopCloser(bytes.NewReader(compressedData)), "sha256:test123", - index, IndexOCIImageOptions{CheckpointMiB: 2}, ) require.NoError(t, err) - require.NotNil(t, gzipIndex) + require.NotNil(t, artifact) + archiver.applyLayerArtifact(index, artifact) + indexedHash := artifact.DecompressedHash t.Logf("Indexed hash: %s", indexedHash) // Now decompress and verify diff --git a/pkg/clip/oci_indexer.go b/pkg/clip/oci_indexer.go index c5dbdbc..5a215ae 100644 --- a/pkg/clip/oci_indexer.go +++ b/pkg/clip/oci_indexer.go @@ -2,7 +2,9 @@ package clip import ( "archive/tar" - "compress/gzip" + // klauspost/compress gunzip is substantially faster than stdlib and is the + // dominant cost when indexing layers (decompress + hash of every byte) + "github.com/klauspost/compress/gzip" "context" "crypto/sha256" "encoding/hex" @@ -25,6 +27,7 @@ import ( "github.com/hanwen/go-fuse/v2/fuse" log "github.com/rs/zerolog/log" "github.com/tidwall/btree" + "golang.org/x/sync/errgroup" ) // OCIIndexProgress represents a progress update during OCI image indexing @@ -39,16 +42,20 @@ type OCIIndexProgress struct { // IndexOCIImageOptions configures the OCI indexer type IndexOCIImageOptions struct { - ImageRef string // Source image to index (can be local) - StorageImageRef string // Optional: image reference to store in metadata (defaults to ImageRef) - CheckpointMiB int64 // Checkpoint every N MiB (default 2) - CredProvider common.RegistryCredentialProvider // optional credential provider for registry authentication - ProgressChan chan<- OCIIndexProgress // optional channel for progress updates - Platform *v1.Platform // Target platform (defaults to linux/runtime.GOARCH) - ContentCache storage.ContentCache // optional remote cache for fully decompressed layers - ContentCacheDir string // optional temp directory for cache upload spooling + ImageRef string // Source image to index (can be local) + StorageImageRef string // Optional: image reference to store in metadata (defaults to ImageRef) + CheckpointMiB int64 // Checkpoint every N MiB (default 2) + CredProvider common.RegistryCredentialProvider // optional credential provider for registry authentication + ProgressChan chan<- OCIIndexProgress // optional channel for progress updates + Platform *v1.Platform // Target platform (defaults to linux/runtime.GOARCH) + ContentCache storage.ContentCache // optional remote cache for fully decompressed layers + ContentCacheDir string // optional temp directory for cache upload spooling + LayerIndexCache storage.LayerIndexCache // optional cache of per-layer index artifacts (skips pull+index on hit) + IndexConcurrency int // max layers indexed concurrently (default 4) } +const defaultIndexConcurrency = 4 + const indexedLayerContentCacheChunkSize = 4 * 1024 * 1024 type indexedLayerContentCacheSpool struct { @@ -164,7 +171,7 @@ func (ca *ClipArchiver) IndexOCIImage(ctx context.Context, opts IndexOCIImageOpt // Log the indexing strategy if storageRef != opts.ImageRef { - log.Info().Msgf("Indexing from local: %s, will store reference to: %s", opts.ImageRef, storageRef) + log.Debug().Msgf("Indexing from local: %s, will store reference to: %s", opts.ImageRef, storageRef) } // Determine which credential provider to use @@ -252,8 +259,13 @@ func (ca *ClipArchiver) IndexOCIImage(ctx context.Context, opts IndexOCIImageOpt gzipIdx = make(map[string]*common.GzipIndex) decompressedHashes = make(map[string]string) - // Create root node with complete FUSE attributes - now := time.Now() + // Create the root node with deterministic timestamps derived from the + // image config's created time, so indexing the same image digest always + // produces byte-identical metadata. + rootTime := time.Unix(0, 0) + if imageMetadata != nil && !imageMetadata.Created.IsZero() { + rootTime = imageMetadata.Created + } root := &common.ClipNode{ Path: "/", NodeType: common.DirNode, @@ -261,12 +273,12 @@ func (ca *ClipArchiver) IndexOCIImage(ctx context.Context, opts IndexOCIImageOpt Ino: 1, Size: 0, Blocks: 0, - Atime: uint64(now.Unix()), - Atimensec: uint32(now.Nanosecond()), - Mtime: uint64(now.Unix()), - Mtimensec: uint32(now.Nanosecond()), - Ctime: uint64(now.Unix()), - Ctimensec: uint32(now.Nanosecond()), + Atime: uint64(rootTime.Unix()), + Atimensec: uint32(rootTime.Nanosecond()), + Mtime: uint64(rootTime.Unix()), + Mtimensec: uint32(rootTime.Nanosecond()), + Ctime: uint64(rootTime.Unix()), + Ctimensec: uint32(rootTime.Nanosecond()), Mode: uint32(syscall.S_IFDIR | 0755), Nlink: 2, // Directories start with link count of 2 (. and ..) Owner: fuse.Owner{ @@ -277,48 +289,110 @@ func (ca *ClipArchiver) IndexOCIImage(ctx context.Context, opts IndexOCIImageOpt } index.Set(root) - log.Info().Msgf("Indexing %d layers from %s", len(layers), opts.ImageRef) - - // Process each layer in order (bottom to top) - for i, layer := range layers { + // Resolve all layer digests up front (cheap; manifest is already fetched) + for _, layer := range layers { digest, err := layer.Digest() if err != nil { return nil, nil, nil, nil, "", "", "", nil, fmt.Errorf("failed to get layer digest: %w", err) } + layerDigests = append(layerDigests, digest.String()) + } - layerDigestStr := digest.String() - layerDigests = append(layerDigests, layerDigestStr) + log.Info().Msgf("Indexing %d layers from %s", len(layers), opts.ImageRef) - log.Info().Msgf("Processing layer %d/%d: %s", i+1, len(layers), layerDigestStr) + // Index layers concurrently. Each layer produces a self-contained, + // deterministic artifact; artifacts are merged sequentially in layer + // order below, which preserves overlay (whiteout/replace) semantics. + artifacts := make([]*LayerArtifact, len(layers)) + cacheHits := make([]bool, len(layers)) - // Send progress update: starting layer - if opts.ProgressChan != nil { - opts.ProgressChan <- OCIIndexProgress{ - LayerIndex: i + 1, - TotalLayers: len(layers), - LayerDigest: layerDigestStr, - Stage: "starting", - Message: fmt.Sprintf("Processing layer %d/%d", i+1, len(layers)), + concurrency := opts.IndexConcurrency + if concurrency <= 0 { + concurrency = defaultIndexConcurrency + } + if concurrency > len(layers) { + concurrency = len(layers) + } + + g, gctx := errgroup.WithContext(ctx) + g.SetLimit(concurrency) + + for i := range layers { + i := i + layer := layers[i] + layerDigestStr := layerDigests[i] + + g.Go(func() error { + if opts.ProgressChan != nil { + opts.ProgressChan <- OCIIndexProgress{ + LayerIndex: i + 1, + TotalLayers: len(layers), + LayerDigest: layerDigestStr, + Stage: "starting", + Message: fmt.Sprintf("Processing layer %d/%d", i+1, len(layers)), + } } - } - // Get compressed layer stream - compressedRC, err := layer.Compressed() - if err != nil { - return nil, nil, nil, nil, "", "", "", nil, fmt.Errorf("failed to get compressed layer: %w", err) - } + // Fast path: reuse a cached layer index artifact and skip the + // registry pull + decompression entirely. + cacheKey := LayerArtifactCacheKey(layerDigestStr, opts.CheckpointMiB) + if opts.LayerIndexCache != nil { + data, err := opts.LayerIndexCache.GetLayerIndex(gctx, cacheKey) + if err != nil { + log.Warn().Err(err).Str("layer_digest", layerDigestStr).Msg("layer index cache lookup failed") + } else if data != nil { + artifact, err := DecodeLayerArtifact(data, layerDigestStr, opts.CheckpointMiB) + if err != nil { + log.Warn().Err(err).Str("layer_digest", layerDigestStr).Msg("discarding invalid cached layer index artifact") + } else { + artifacts[i] = artifact + cacheHits[i] = true + log.Debug(). + Str("layer_digest", layerDigestStr). + Int("entries", len(artifact.Entries)). + Msg("layer index cache hit: skipping layer pull") + return nil + } + } + } - // Index this layer with optimizations - gzipIndex, decompressedHash, err := ca.indexLayerOptimized(ctx, compressedRC, layerDigestStr, index, opts) - compressedRC.Close() - if err != nil { - return nil, nil, nil, nil, "", "", "", nil, fmt.Errorf("failed to index layer %s: %w", layerDigestStr, err) - } + log.Debug().Msgf("Processing layer %d/%d: %s", i+1, len(layers), layerDigestStr) + + artifact, err := ca.indexLayerFromBestSource(gctx, layer, layerDigestStr, opts) + if err != nil { + return fmt.Errorf("failed to index layer %s: %w", layerDigestStr, err) + } + artifacts[i] = artifact + + if opts.LayerIndexCache != nil { + data, err := EncodeLayerArtifact(artifact) + if err != nil { + log.Warn().Err(err).Str("layer_digest", layerDigestStr).Msg("failed to encode layer index artifact") + } else if err := opts.LayerIndexCache.PutLayerIndex(gctx, cacheKey, data); err != nil { + log.Warn().Err(err).Str("layer_digest", layerDigestStr).Msg("failed to store layer index artifact in cache") + } + } + return nil + }) + } + + if err := g.Wait(); err != nil { + return nil, nil, nil, nil, "", "", "", nil, err + } + + // Merge artifacts strictly in layer order (bottom to top). This is the + // only step that mutates the shared index, so the result is identical to + // sequential indexing regardless of the concurrency above. + for i, artifact := range artifacts { + layerDigestStr := layerDigests[i] + ca.applyLayerArtifact(index, artifact) - gzipIdx[layerDigestStr] = gzipIndex - decompressedHashes[layerDigestStr] = decompressedHash + gzipIdx[layerDigestStr] = &common.GzipIndex{ + LayerDigest: layerDigestStr, + Checkpoints: artifact.Checkpoints, + } + decompressedHashes[layerDigestStr] = artifact.DecompressedHash - // Send progress update: completed layer if opts.ProgressChan != nil { opts.ProgressChan <- OCIIndexProgress{ LayerIndex: i + 1, @@ -331,31 +405,43 @@ func (ca *ClipArchiver) IndexOCIImage(ctx context.Context, opts IndexOCIImageOpt } } - log.Info().Msgf("Successfully indexed image with %d files", index.Len()) + cachedLayers := 0 + for _, hit := range cacheHits { + if hit { + cachedLayers++ + } + } + log.Info(). + Int("layers", len(layers)). + Int("layer_index_cache_hits", cachedLayers). + Int("files", index.Len()). + Msg("Successfully indexed image") return index, layerDigests, gzipIdx, decompressedHashes, registryURL, repository, reference, imageMetadata, nil } -// indexLayerOptimized processes a single layer using streaming I/O with zero memory overhead. +// indexLayerToArtifact processes a single layer using streaming I/O with zero memory overhead. // // Performance characteristics: // - Zero-copy streaming: TeeReader hashes data as it flows to tar.Reader // - Constant memory: O(checkpoint_size) ~2MB, independent of layer size // - Single pass: Reads compressed stream exactly once // -// Returns gzip index and SHA256 hash of complete decompressed stream. -func (ca *ClipArchiver) indexLayerOptimized( +// The result is a self-contained, deterministic LayerArtifact (ordered entry +// operations + gzip checkpoints + decompressed stream hash) that can be cached +// keyed by the layer digest and replayed against an index with +// applyLayerArtifact. +func (ca *ClipArchiver) indexLayerToArtifact( ctx context.Context, compressedRC io.ReadCloser, layerDigest string, - index *btree.BTree, opts IndexOCIImageOptions, -) (*common.GzipIndex, string, error) { +) (*LayerArtifact, error) { compressedCounter := &countingReader{r: compressedRC} gzr, err := gzip.NewReader(compressedCounter) if err != nil { - return nil, "", fmt.Errorf("failed to create gzip reader: %w", err) + return nil, fmt.Errorf("failed to create gzip reader: %w", err) } defer gzr.Close() @@ -382,6 +468,8 @@ func (ca *ClipArchiver) indexLayerOptimized( checkpointInterval := opts.CheckpointMiB * 1024 * 1024 lastCheckpoint := int64(0) + entries := make([]LayerEntry, 0, 256) + // Process tar entries for { hdr, err := tr.Next() @@ -389,7 +477,7 @@ func (ca *ClipArchiver) indexLayerOptimized( break } if err != nil { - return nil, "", fmt.Errorf("failed to read tar header: %w", err) + return nil, fmt.Errorf("failed to read tar header: %w", err) } // Record checkpoint periodically (before processing file data) @@ -407,22 +495,30 @@ func (ca *ClipArchiver) indexLayerOptimized( cleanPath = path.Clean(cleanPath) // Handle OCI whiteouts (fast path: check prefix before full processing) - if ca.handleWhiteout(index, cleanPath) { + if entry, isWhiteout := whiteoutEntry(cleanPath); isWhiteout { + entries = append(entries, entry) continue } // Process based on type (most common first for branch prediction) switch hdr.Typeflag { case tar.TypeReg, tar.TypeRegA: - if err := ca.processRegularFile(index, tr, hdr, cleanPath, layerDigest, compressedCounter, uncompressedCounter, &checkpoints, &lastCheckpoint); err != nil { - return nil, "", err + entry, err := ca.fileEntry(tr, hdr, cleanPath, layerDigest, compressedCounter, uncompressedCounter, &checkpoints, &lastCheckpoint) + if err != nil { + return nil, err } + entries = append(entries, entry) case tar.TypeDir: - ca.processDirectory(index, hdr, cleanPath, layerDigest) + entries = append(entries, ca.directoryEntry(hdr, cleanPath, layerDigest)) case tar.TypeSymlink: - ca.processSymlink(index, hdr, cleanPath, layerDigest) + entries = append(entries, ca.symlinkEntry(hdr, cleanPath, layerDigest)) case tar.TypeLink: - ca.processHardLink(index, hdr, cleanPath) + targetPath := path.Clean("/" + strings.TrimPrefix(hdr.Linkname, "./")) + entries = append(entries, LayerEntry{ + Kind: LayerEntryHardLink, + Path: cleanPath, + Target: targetPath, + }) } } @@ -435,7 +531,7 @@ func (ca *ClipArchiver) indexLayerOptimized( // These bytes ARE present in decompressed stream and MUST be hashed to match disk cache. _, err = io.Copy(io.Discard, uncompressedCounter) if err != nil && err != io.EOF { - return nil, "", fmt.Errorf("failed to consume trailing tar bytes: %w", err) + return nil, fmt.Errorf("failed to consume trailing tar bytes: %w", err) } // Finalize hash (includes all bytes: file contents + tar headers + padding) @@ -443,7 +539,7 @@ func (ca *ClipArchiver) indexLayerOptimized( if opts.ContentCache != nil && cacheSpool != nil && cacheSpool.err == nil && cacheSpool.path != "" { if err := cacheSpool.close(); err != nil { - return nil, "", fmt.Errorf("failed to close layer content cache temp file: %w", err) + return nil, fmt.Errorf("failed to close layer content cache temp file: %w", err) } if err := ca.storeIndexedLayerInContentCache(ctx, opts.ContentCache, cacheSpool.path, decompressedHash, layerDigest); err != nil { @@ -460,59 +556,69 @@ func (ca *ClipArchiver) indexLayerOptimized( Msg("skipping indexed layer content cache store after spool write failure") } - // Return gzip index and decompressed hash - return &common.GzipIndex{ - LayerDigest: layerDigest, - Checkpoints: checkpoints, - }, decompressedHash, nil + return &LayerArtifact{ + Version: LayerArtifactVersion, + LayerDigest: layerDigest, + CheckpointMiB: opts.CheckpointMiB, + Entries: entries, + Checkpoints: checkpoints, + DecompressedHash: decompressedHash, + UncompressedSize: uncompressedCounter.n, + }, nil } func (ca *ClipArchiver) storeIndexedLayerInContentCache(ctx context.Context, contentCache storage.ContentCache, filePath, decompressedHash, layerDigest string) error { + return ca.storeLayerBlobInContentCache(ctx, contentCache, filePath, decompressedHash, layerDigest, "indexed layer") +} + +// storeLayerBlobInContentCache stores a file's bytes in the content cache +// under the given content hash, skipping the store if a size-aware existence +// check reports the blob is already complete. +func (ca *ClipArchiver) storeLayerBlobInContentCache(ctx context.Context, contentCache storage.ContentCache, filePath, contentHash, layerDigest, kind string) error { if contentCache == nil { return nil } - if existsCache, ok := contentCache.(storage.ContentCacheExists); ok { - exists, err := existsCache.ContentExists(decompressedHash, struct{ RoutingKey string }{RoutingKey: decompressedHash}) - if err != nil { - log.Warn(). - Err(err). - Str("layer_digest", layerDigest). - Str("decompressed_hash", decompressedHash). - Msg("failed to check indexed layer content cache") - } else if exists { - log.Info(). - Str("layer_digest", layerDigest). - Str("decompressed_hash", decompressedHash). - Msg("indexed layer already present in content cache") - return nil + // Skip the store when the blob is already complete in the cache, + // preferring the size-aware check (a positive answer guarantees the + // cached blob is complete). + exists := false + if info, err := os.Stat(filePath); err == nil { + exists = contentCacheExistsWithSize(contentCache, contentHash, info.Size()) + } + if !exists { + if existsCache, ok := contentCache.(storage.ContentCacheExists); ok { + if found, err := existsCache.ContentExists(contentHash, struct{ RoutingKey string }{RoutingKey: contentHash}); err == nil && found { + exists = true + } } } + if exists { + log.Debug().Str("layer_digest", layerDigest).Msgf("%s already present in content cache", kind) + return nil + } if localStore, ok := contentCache.(storage.ContentCacheStoreLocalPath); ok && localStore != nil { - actualHash, err := localStore.StoreContentFromLocalPath(filePath, decompressedHash, struct{ RoutingKey string }{RoutingKey: decompressedHash}) + actualHash, err := localStore.StoreContentFromLocalPath(filePath, contentHash, struct{ RoutingKey string }{RoutingKey: contentHash}) if err != nil { return err } - if actualHash != "" && actualHash != decompressedHash { - return fmt.Errorf("indexed layer content cache hash mismatch: expected %s, got %s", decompressedHash, actualHash) + if actualHash != "" && actualHash != contentHash { + return fmt.Errorf("%s content cache hash mismatch: expected %s, got %s", kind, contentHash, actualHash) } - log.Info(). - Str("layer_digest", layerDigest). - Str("decompressed_hash", decompressedHash). - Msg("stored indexed layer in content cache") + log.Debug().Str("layer_digest", layerDigest).Msgf("stored %s in content cache", kind) return nil } file, err := os.Open(filePath) if err != nil { - return fmt.Errorf("failed to open indexed layer temp file: %w", err) + return fmt.Errorf("failed to open %s temp file: %w", kind, err) } defer file.Close() info, err := file.Stat() if err != nil { - return fmt.Errorf("failed to stat indexed layer temp file: %w", err) + return fmt.Errorf("failed to stat %s temp file: %w", kind, err) } storeCtx, cancel := context.WithCancel(ctx) @@ -547,7 +653,7 @@ func (ca *ClipArchiver) storeIndexedLayerInContentCache(ctx context.Context, con }() started := time.Now() - actualHash, storeErr := contentCache.StoreContent(chunks, decompressedHash, struct{ RoutingKey string }{RoutingKey: decompressedHash}) + actualHash, storeErr := contentCache.StoreContent(chunks, contentHash, struct{ RoutingKey string }{RoutingKey: contentHash}) cancel() readErr := <-readErrCh if storeErr != nil { @@ -556,42 +662,39 @@ func (ca *ClipArchiver) storeIndexedLayerInContentCache(ctx context.Context, con if readErr != nil { return readErr } - if actualHash != "" && actualHash != decompressedHash { - return fmt.Errorf("indexed layer content cache hash mismatch: expected %s, got %s", decompressedHash, actualHash) + if actualHash != "" && actualHash != contentHash { + return fmt.Errorf("%s content cache hash mismatch: expected %s, got %s", kind, contentHash, actualHash) } - log.Info(). + log.Debug(). Str("layer_digest", layerDigest). - Str("decompressed_hash", decompressedHash). Int64("bytes", info.Size()). Dur("duration", time.Since(started)). - Msg("stored indexed layer in content cache") + Msgf("stored %s in content cache", kind) return nil } -// handleWhiteout processes OCI whiteout files -func (ca *ClipArchiver) handleWhiteout(index *btree.BTree, fullPath string) bool { +// whiteoutEntry detects OCI whiteout files and returns the corresponding +// layer entry operation. +func whiteoutEntry(fullPath string) (LayerEntry, bool) { dir := path.Dir(fullPath) base := path.Base(fullPath) // Opaque whiteout: .wh..wh..opq if base == ".wh..wh..opq" { - // Remove all entries under this directory from lower layers - ca.deleteRange(index, dir+"/") log.Debug().Msgf(" Opaque whiteout: %s", dir) - return true + return LayerEntry{Kind: LayerEntryOpaqueWhiteout, Path: dir}, true } // Regular whiteout: .wh. if strings.HasPrefix(base, ".wh.") { victim := path.Join(dir, strings.TrimPrefix(base, ".wh.")) - ca.deleteNode(index, victim) log.Debug().Msgf(" Whiteout: %s", victim) - return true + return LayerEntry{Kind: LayerEntryWhiteout, Path: victim}, true } - return false + return LayerEntry{}, false } // deleteNode removes a node and all its children from the index @@ -705,16 +808,16 @@ func (ca *ClipArchiver) CreateFromOCI(ctx context.Context, opts IndexOCIImageOpt return fmt.Errorf("failed to create remote archive: %w", err) } - log.Info().Msgf("Created metadata-only clip file: %s", clipOut) - log.Info().Msgf(" Files indexed: %d", index.Len()) - log.Info().Msgf(" Layers: %d", len(layers)) - - // Calculate total checkpoint size totalCheckpoints := 0 for _, idx := range gzipIdx { totalCheckpoints += len(idx.Checkpoints) } - log.Info().Msgf(" Gzip checkpoints: %d", totalCheckpoints) + log.Debug(). + Str("path", clipOut). + Int("files", index.Len()). + Int("layers", len(layers)). + Int("gzip_checkpoints", totalCheckpoints). + Msg("created metadata-only clip file") return nil } @@ -726,10 +829,9 @@ func (ca *ClipArchiver) addCheckpoint(checkpoints *[]common.GzipCheckpoint, cOff *lastCheckpoint = uOff } -// processRegularFile processes a regular file entry from tar. +// fileEntry processes a regular file entry from tar. // Uses io.CopyN for efficient content skipping (streaming, no allocation). -func (ca *ClipArchiver) processRegularFile( - index *btree.BTree, +func (ca *ClipArchiver) fileEntry( tr *tar.Reader, hdr *tar.Header, cleanPath string, @@ -738,7 +840,7 @@ func (ca *ClipArchiver) processRegularFile( uncompressedCounter *countingReader, checkpoints *[]common.GzipCheckpoint, lastCheckpoint *int64, -) error { +) (LayerEntry, error) { dataStart := uncompressedCounter.n // Content-defined checkpoint for large files (>512KB) @@ -754,10 +856,10 @@ func (ca *ClipArchiver) processRegularFile( if hdr.Size > 0 { n, err := io.CopyN(io.Discard, tr, hdr.Size) if err != nil && err != io.EOF { - return fmt.Errorf("failed to skip file content: %w", err) + return LayerEntry{}, fmt.Errorf("failed to skip file content: %w", err) } if n != hdr.Size { - return fmt.Errorf("incomplete file read: want %d, got %d", hdr.Size, n) + return LayerEntry{}, fmt.Errorf("incomplete file read: want %d, got %d", hdr.Size, n) } } @@ -788,13 +890,12 @@ func (ca *ClipArchiver) processRegularFile( }, } - index.Set(node) log.Debug().Str("path", cleanPath).Int64("size", hdr.Size).Int64("uoff", dataStart).Msg("File") - return nil + return LayerEntry{Kind: LayerEntryNode, Node: node}, nil } -// processSymlink processes a symlink entry from tar -func (ca *ClipArchiver) processSymlink(index *btree.BTree, hdr *tar.Header, cleanPath, layerDigest string) { +// symlinkEntry processes a symlink entry from tar +func (ca *ClipArchiver) symlinkEntry(hdr *tar.Header, cleanPath, layerDigest string) LayerEntry { target := hdr.Linkname if target == "" { log.Warn().Msgf("Empty symlink target for %s", cleanPath) @@ -823,12 +924,12 @@ func (ca *ClipArchiver) processSymlink(index *btree.BTree, hdr *tar.Header, clea }, } - index.Set(node) log.Debug().Str("path", cleanPath).Str("target", target).Msg("Symlink") + return LayerEntry{Kind: LayerEntryNode, Node: node} } -// processDirectory processes a directory entry from tar -func (ca *ClipArchiver) processDirectory(index *btree.BTree, hdr *tar.Header, cleanPath, layerDigest string) { +// directoryEntry processes a directory entry from tar +func (ca *ClipArchiver) directoryEntry(hdr *tar.Header, cleanPath, layerDigest string) LayerEntry { node := &common.ClipNode{ Path: cleanPath, NodeType: common.DirNode, @@ -851,24 +952,8 @@ func (ca *ClipArchiver) processDirectory(index *btree.BTree, hdr *tar.Header, cl }, } - index.Set(node) log.Debug().Str("path", cleanPath).Int64("mode", hdr.Mode).Int64("mtime", hdr.ModTime.Unix()).Msg("Dir") -} - -// processHardLink processes a hard link entry from tar -func (ca *ClipArchiver) processHardLink(index *btree.BTree, hdr *tar.Header, cleanPath string) { - targetPath := path.Clean("/" + strings.TrimPrefix(hdr.Linkname, "./")) - targetNode := index.Get(&common.ClipNode{Path: targetPath}) - if targetNode != nil { - tn := targetNode.(*common.ClipNode) - node := &common.ClipNode{ - Path: cleanPath, - NodeType: common.FileNode, - Attr: tn.Attr, - Remote: tn.Remote, - } - index.Set(node) - } + return LayerEntry{Kind: LayerEntryNode, Node: node} } // extractImageMetadata extracts comprehensive metadata from an OCI image @@ -958,7 +1043,7 @@ func (ca *ClipArchiver) extractImageMetadata(imgInterface interface{}, imageRef LayersData: layersData, } - log.Info(). + log.Debug(). Str("architecture", metadata.Architecture). Str("os", metadata.Os). Time("created", metadata.Created). diff --git a/pkg/storage/layer_index_cache.go b/pkg/storage/layer_index_cache.go new file mode 100644 index 0000000..8b2f3c8 --- /dev/null +++ b/pkg/storage/layer_index_cache.go @@ -0,0 +1,83 @@ +package storage + +import ( + "context" + "fmt" + "os" + "path/filepath" +) + +// LayerIndexCache stores serialized per-layer index artifacts keyed by a +// deterministic string (see clip.LayerArtifactCacheKey). A hit allows the +// indexer to skip pulling and decompressing that layer entirely. +// +// Implementations must treat the cache as best-effort: Get returns +// (nil, nil) on a miss, and Put failures should not fail the build. +type LayerIndexCache interface { + GetLayerIndex(ctx context.Context, key string) ([]byte, error) + PutLayerIndex(ctx context.Context, key string, data []byte) error +} + +// ContentCacheExistsWithSize is an optional ContentCache extension that +// performs a size-aware completeness check. Unlike ContentCacheExists, a +// positive response guarantees the cached content is complete (not a stale, +// partially-published entry), so callers can safely skip re-storing it. +type ContentCacheExistsWithSize interface { + ContentExistsWithSize(hash string, size int64, opts struct{ RoutingKey string }) (bool, error) +} + +// DiskLayerIndexCache is a simple local-filesystem LayerIndexCache. It is used +// by the test harness and can serve as a worker-local fallback cache. +type DiskLayerIndexCache struct { + rootDir string +} + +func NewDiskLayerIndexCache(rootDir string) (*DiskLayerIndexCache, error) { + if rootDir == "" { + return nil, fmt.Errorf("disk layer index cache root dir is required") + } + if err := os.MkdirAll(rootDir, 0755); err != nil { + return nil, fmt.Errorf("failed to create layer index cache dir: %w", err) + } + return &DiskLayerIndexCache{rootDir: rootDir}, nil +} + +func (c *DiskLayerIndexCache) entryPath(key string) string { + return filepath.Join(c.rootDir, filepath.FromSlash(key)) +} + +func (c *DiskLayerIndexCache) GetLayerIndex(ctx context.Context, key string) ([]byte, error) { + data, err := os.ReadFile(c.entryPath(key)) + if os.IsNotExist(err) { + return nil, nil + } + if err != nil { + return nil, err + } + return data, nil +} + +func (c *DiskLayerIndexCache) PutLayerIndex(ctx context.Context, key string, data []byte) error { + path := c.entryPath(key) + if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil { + return err + } + + // Write via temp file + rename so concurrent readers never observe a + // partially written artifact. + tmp, err := os.CreateTemp(filepath.Dir(path), ".layer-index-*") + if err != nil { + return err + } + tmpPath := tmp.Name() + if _, err := tmp.Write(data); err != nil { + tmp.Close() + os.Remove(tmpPath) + return err + } + if err := tmp.Close(); err != nil { + os.Remove(tmpPath) + return err + } + return os.Rename(tmpPath, path) +} diff --git a/pkg/storage/oci.go b/pkg/storage/oci.go index fd36a76..6988c8d 100644 --- a/pkg/storage/oci.go +++ b/pkg/storage/oci.go @@ -1,7 +1,9 @@ package storage import ( - "compress/gzip" + // klauspost/compress gunzip is substantially faster than stdlib for + // full-layer decompression on cache misses + "github.com/klauspost/compress/gzip" "context" "errors" "fmt"