Path: blob/main/components/registry-facade/pkg/registry/blob_test.go
2499 views
// Copyright (c) 2023 Gitpod GmbH. All rights reserved.1// Licensed under the GNU Affero General Public License (AGPL).2// See License.AGPL.txt in the project root for license information.34package registry56import (7"bytes"8"context"9"errors"10"fmt"11"io"12"net/http"13"net/http/httptest"14"os"15"path/filepath"16"strings"17"sync"18"syscall"19"testing"20"time"2122"github.com/alicebob/miniredis/v2"23"github.com/containerd/containerd/remotes"24"github.com/containerd/containerd/remotes/docker"25httpapi "github.com/ipfs/kubo/client/rpc"26oldcmds "github.com/ipfs/kubo/commands"27config "github.com/ipfs/kubo/config"28"github.com/ipfs/kubo/core"29"github.com/ipfs/kubo/core/corehttp"30"github.com/ipfs/kubo/plugin/loader"31"github.com/ipfs/kubo/repo/fsrepo"32ma "github.com/multiformats/go-multiaddr"33"github.com/opencontainers/go-digest"34redis "github.com/redis/go-redis/v9"35"github.com/stretchr/testify/assert"36"github.com/stretchr/testify/require"37"k8s.io/apimachinery/pkg/util/wait"3839rfapi "github.com/gitpod-io/gitpod/registry-facade/api"40)4142var loadPluginsOnce sync.Once4344// setupPlugins load plugins45func setupPlugins(externalPluginsPath string) error {46// Load any external plugins if available on externalPluginsPath47plugins, err := loader.NewPluginLoader(filepath.Join(externalPluginsPath, "plugins"))48if err != nil {49return fmt.Errorf("error loading plugins: %s", err)50}5152// Load preloaded and external plugins53if err := plugins.Initialize(); err != nil {54return fmt.Errorf("error initializing plugins: %s", err)55}5657if err := plugins.Inject(); err != nil {58return fmt.Errorf("error initializing plugins: %s", err)59}6061return nil62}6364// createIPFSConfig creates a config with default options and a 2048 bit key65func createIPFSConfig() (*config.Config, error) {66return config.Init(io.Discard, 2048)67}6869// createTempRepo creates the repo according to the config70func createTempRepo(cfg *config.Config) (string, error) {71// Create temporal directory72repoPath, err := os.MkdirTemp("", "ipfs-shell")73if err != nil {74return "", fmt.Errorf("failed to create temp dir: %s", err)75}7677// Create the repo with the config78err = fsrepo.Init(repoPath, cfg)79if err != nil {80return "", fmt.Errorf("failed to init ephemeral node: %s", err)81}82return repoPath, nil83}8485func TestIPFSBlobCache(t *testing.T) {86ctx, cancel := context.WithCancel(context.Background())87defer cancel()8889// Create IPFS configuration90ipfsCfg, err := createIPFSConfig()91if err != nil {92t.Fatalf("fail to create ipfs configuration: %v", err)93}94if len(ipfsCfg.Addresses.API) == 0 {95t.Fatal("the configuration must have api address")96}97ipfsAPIAddr := ipfsCfg.Addresses.API[0]9899// Load plugins100var onceErr error101loadPluginsOnce.Do(func() {102onceErr = setupPlugins("")103})104if onceErr != nil {105t.Fatalf("fail to setup plugins: %v", onceErr)106}107108// Create a temporal repo109ipfsRepoPath, err := createTempRepo(ipfsCfg)110if err != nil {111t.Fatalf("fail to create temp repo: %v", err)112}113defer os.RemoveAll(ipfsRepoPath)114115cctx := &oldcmds.Context{116ConfigRoot: ipfsRepoPath,117ReqLog: &oldcmds.ReqLog{},118// Plugins: plugins,119ConstructNode: func() (n *core.IpfsNode, err error) {120r, err := fsrepo.Open(ipfsRepoPath)121if err != nil { // repo is owned by the node122return nil, err123}124125// ok everything is good. set it on the invocation (for ownership)126// and return it.127n, err = core.NewNode(ctx, &core.BuildCfg{128Online: true,129Repo: r,130})131if err != nil {132return nil, err133}134135return n, nil136},137}138139// Construct IPFS node140node, err := cctx.ConstructNode()141if err != nil {142t.Fatalf("fail to construct node: %v", err)143}144defer node.Close()145146go func() {147// Create a IPFS server148t.Logf("HTTP API server listening on %s\n", ipfsAPIAddr)149corehttp.ListenAndServe(node, ipfsAPIAddr, corehttp.CommandsOption(*cctx))150}()151152// Init HTTP client connects to IPFS server153ipfsAPIMaddr, err := ma.NewMultiaddr(ipfsAPIAddr)154if err != nil {155t.Fatalf("fail to new multi address: %v", err)156}157158api, err := httpapi.NewApiWithClient(ipfsAPIMaddr, NewRetryableHTTPClient())159if err != nil {160t.Fatal(err)161}162163// Running unit tests164redisServer, err := miniredis.Run()165if err != nil {166t.Fatalf("cannot run mini redis server: %v", err)167}168defer redisServer.Close()169170redisC := redis.NewClient(&redis.Options{Addr: redisServer.Addr()})171172redisBlobStore := &RedisBlobStore{Client: redisC}173ipfsBlobCache := &IPFSBlobCache{Redis: redisC, IPFS: api}174ipfsBlobSrc := ipfsBlobSource{source: ipfsBlobCache}175imageSpec := &rfapi.ImageSpec{BaseRef: "docker.io/library/alpine@sha256:7580ece7963bfa863801466c0a488f11c86f85d9988051a9f9c68cb27f6b7872"}176dgst := digest.NewDigestFromBytes(digest.SHA256, []byte("7580ece7963bfa863801466c0a488f11c86f85d9988051a9f9c68cb27f6b7872"))177mediaType := "application/vnd.docker.image.rootfs.diff.tar.gzip"178179err = ipfsBlobCache.Store(ctx, dgst, io.NopCloser(bytes.NewReader([]byte("foobar"))), mediaType)180if err != nil {181t.Fatalf("cannot store to ipfs blobcache: %v", err)182}183184exist := ipfsBlobSrc.HasBlob(ctx, imageSpec, dgst)185if !exist {186t.Fatal("the digest should exists")187}188189resolverFactory := func() remotes.Resolver {190client := NewRetryableHTTPClient()191resolverOpts := docker.ResolverOptions{Client: client}192return docker.NewResolver(resolverOpts)193}194195blobHandler := &blobHandler{196Context: ctx,197Digest: dgst,198Name: "unittest",199200Spec: imageSpec,201Resolver: resolverFactory(),202Store: redisBlobStore,203IPFS: ipfsBlobCache,204}205206req := httptest.NewRequest("", "http://example.com", nil)207w := newFailFirstResponseWriter()208209blobHandler.getBlob(w, req)210}211212type failFirstResponseWriter struct {213code int214headerMap http.Header215body *bytes.Buffer216217requests int218}219220func newFailFirstResponseWriter() *failFirstResponseWriter {221return &failFirstResponseWriter{222headerMap: make(http.Header),223body: new(bytes.Buffer),224code: 200,225}226}227228func (rw *failFirstResponseWriter) Header() http.Header {229m := rw.headerMap230if m == nil {231m = make(http.Header)232rw.headerMap = m233}234return m235}236237func (rw *failFirstResponseWriter) Write(buf []byte) (int, error) {238defer func() {239rw.requests += 1240}()241242if rw.requests == 0 {243return 0, syscall.ECONNRESET244}245if rw.requests == 1 {246return 0, syscall.EPIPE247}248249if rw.body != nil {250rw.body.Write(buf)251}252return len(buf), nil253}254255func (rw *failFirstResponseWriter) WriteHeader(code int) {256rw.code = code257}258259// mockBlobSource allows faking BlobSource behavior for tests.260type mockBlobSource struct {261// How many times GetBlob should fail before succeeding.262failCount int263// The error to return on failure.264failError error265266// Internal counter for calls.267callCount int268// The data to return on success.269successData string270271// Whether to use a reader that fails mid-stream on the first call.272failReaderOnFirstCall bool273// The number of bytes to read successfully before the reader fails.274failAfterBytes int275}276277func (m *mockBlobSource) Name() string { return "mock" }278func (m *mockBlobSource) HasBlob(ctx context.Context, details *rfapi.ImageSpec, dgst digest.Digest) bool {279return true280}281282func (m *mockBlobSource) GetBlob(ctx context.Context, details *rfapi.ImageSpec, dgst digest.Digest) (dontCache bool, mediaType string, url string, data io.ReadCloser, err error) {283m.callCount++284if m.callCount <= m.failCount {285return false, "", "", nil, m.failError286}287288if m.failReaderOnFirstCall && m.callCount == 1 {289return false, "application/octet-stream", "", io.NopCloser(&failingReader{290reader: strings.NewReader(m.successData),291failAfterBytes: m.failAfterBytes,292failError: m.failError,293}), nil294}295296return false, "application/octet-stream", "", io.NopCloser(strings.NewReader(m.successData)), nil297}298299// failingReader is a reader that fails after a certain point.300type failingReader struct {301reader io.Reader302failAfterBytes int303failError error304bytesRead int305}306307func (fr *failingReader) Read(p []byte) (n int, err error) {308if fr.bytesRead >= fr.failAfterBytes {309return 0, fr.failError310}311n, err = fr.reader.Read(p)312if err != nil {313return n, err314}315fr.bytesRead += n316if fr.bytesRead >= fr.failAfterBytes {317// Return the error, but also the bytes read in this call.318return n, fr.failError319}320return n, nil321}322323func TestRetrieveFromSource_RetryOnGetBlob(t *testing.T) {324// Arrange325mockSource := &mockBlobSource{326failCount: 2,327failError: errors.New("transient network error"),328successData: "hello world",329}330331bh := &blobHandler{332Digest: "sha256:dummy",333Spec: &rfapi.ImageSpec{},334}335336// Use short backoff for testing337originalBackoff := retrievalBackoffParams338retrievalBackoffParams = wait.Backoff{339Duration: 1 * time.Millisecond,340Steps: 3,341}342defer func() { retrievalBackoffParams = originalBackoff }()343344w := httptest.NewRecorder()345r := httptest.NewRequest("GET", "/v2/...", nil)346347// Act348handled, dontCache, err := bh.retrieveFromSource(context.Background(), mockSource, w, r)349350// Assert351require.NoError(t, err)352assert.True(t, handled)353assert.False(t, dontCache)354assert.Equal(t, "hello world", w.Body.String())355assert.Equal(t, 3, mockSource.callCount, "Expected GetBlob to be called 3 times (2 failures + 1 success)")356}357358func TestRetrieveFromSource_RetryOnCopy(t *testing.T) {359// Arrange360mockSource := &mockBlobSource{361failCount: 0, // GetBlob succeeds immediately362failReaderOnFirstCall: true,363failAfterBytes: 5,364failError: syscall.EPIPE,365successData: "hello world",366}367368bh := &blobHandler{369Digest: "sha256:dummy",370Spec: &rfapi.ImageSpec{},371}372373// Use short backoff for testing374originalBackoff := retrievalBackoffParams375retrievalBackoffParams = wait.Backoff{376Duration: 1 * time.Millisecond,377Steps: 3,378}379defer func() { retrievalBackoffParams = originalBackoff }()380381w := httptest.NewRecorder()382r := httptest.NewRequest("GET", "/v2/...", nil)383384// Act385handled, dontCache, err := bh.retrieveFromSource(context.Background(), mockSource, w, r)386387// Assert388require.NoError(t, err)389assert.True(t, handled)390assert.False(t, dontCache)391assert.Equal(t, "hello world", w.Body.String())392assert.Equal(t, 2, mockSource.callCount, "Expected GetBlob to be called twice (1st succeeds, copy fails, 2nd succeeds)")393}394395396