Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/registry-facade/pkg/registry/blob_test.go
2499 views
1
// Copyright (c) 2023 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License.AGPL.txt in the project root for license information.
4
5
package registry
6
7
import (
8
"bytes"
9
"context"
10
"errors"
11
"fmt"
12
"io"
13
"net/http"
14
"net/http/httptest"
15
"os"
16
"path/filepath"
17
"strings"
18
"sync"
19
"syscall"
20
"testing"
21
"time"
22
23
"github.com/alicebob/miniredis/v2"
24
"github.com/containerd/containerd/remotes"
25
"github.com/containerd/containerd/remotes/docker"
26
httpapi "github.com/ipfs/kubo/client/rpc"
27
oldcmds "github.com/ipfs/kubo/commands"
28
config "github.com/ipfs/kubo/config"
29
"github.com/ipfs/kubo/core"
30
"github.com/ipfs/kubo/core/corehttp"
31
"github.com/ipfs/kubo/plugin/loader"
32
"github.com/ipfs/kubo/repo/fsrepo"
33
ma "github.com/multiformats/go-multiaddr"
34
"github.com/opencontainers/go-digest"
35
redis "github.com/redis/go-redis/v9"
36
"github.com/stretchr/testify/assert"
37
"github.com/stretchr/testify/require"
38
"k8s.io/apimachinery/pkg/util/wait"
39
40
rfapi "github.com/gitpod-io/gitpod/registry-facade/api"
41
)
42
43
var loadPluginsOnce sync.Once
44
45
// setupPlugins load plugins
46
func setupPlugins(externalPluginsPath string) error {
47
// Load any external plugins if available on externalPluginsPath
48
plugins, err := loader.NewPluginLoader(filepath.Join(externalPluginsPath, "plugins"))
49
if err != nil {
50
return fmt.Errorf("error loading plugins: %s", err)
51
}
52
53
// Load preloaded and external plugins
54
if err := plugins.Initialize(); err != nil {
55
return fmt.Errorf("error initializing plugins: %s", err)
56
}
57
58
if err := plugins.Inject(); err != nil {
59
return fmt.Errorf("error initializing plugins: %s", err)
60
}
61
62
return nil
63
}
64
65
// createIPFSConfig creates a config with default options and a 2048 bit key
66
func createIPFSConfig() (*config.Config, error) {
67
return config.Init(io.Discard, 2048)
68
}
69
70
// createTempRepo creates the repo according to the config
71
func createTempRepo(cfg *config.Config) (string, error) {
72
// Create temporal directory
73
repoPath, err := os.MkdirTemp("", "ipfs-shell")
74
if err != nil {
75
return "", fmt.Errorf("failed to create temp dir: %s", err)
76
}
77
78
// Create the repo with the config
79
err = fsrepo.Init(repoPath, cfg)
80
if err != nil {
81
return "", fmt.Errorf("failed to init ephemeral node: %s", err)
82
}
83
return repoPath, nil
84
}
85
86
func TestIPFSBlobCache(t *testing.T) {
87
ctx, cancel := context.WithCancel(context.Background())
88
defer cancel()
89
90
// Create IPFS configuration
91
ipfsCfg, err := createIPFSConfig()
92
if err != nil {
93
t.Fatalf("fail to create ipfs configuration: %v", err)
94
}
95
if len(ipfsCfg.Addresses.API) == 0 {
96
t.Fatal("the configuration must have api address")
97
}
98
ipfsAPIAddr := ipfsCfg.Addresses.API[0]
99
100
// Load plugins
101
var onceErr error
102
loadPluginsOnce.Do(func() {
103
onceErr = setupPlugins("")
104
})
105
if onceErr != nil {
106
t.Fatalf("fail to setup plugins: %v", onceErr)
107
}
108
109
// Create a temporal repo
110
ipfsRepoPath, err := createTempRepo(ipfsCfg)
111
if err != nil {
112
t.Fatalf("fail to create temp repo: %v", err)
113
}
114
defer os.RemoveAll(ipfsRepoPath)
115
116
cctx := &oldcmds.Context{
117
ConfigRoot: ipfsRepoPath,
118
ReqLog: &oldcmds.ReqLog{},
119
// Plugins: plugins,
120
ConstructNode: func() (n *core.IpfsNode, err error) {
121
r, err := fsrepo.Open(ipfsRepoPath)
122
if err != nil { // repo is owned by the node
123
return nil, err
124
}
125
126
// ok everything is good. set it on the invocation (for ownership)
127
// and return it.
128
n, err = core.NewNode(ctx, &core.BuildCfg{
129
Online: true,
130
Repo: r,
131
})
132
if err != nil {
133
return nil, err
134
}
135
136
return n, nil
137
},
138
}
139
140
// Construct IPFS node
141
node, err := cctx.ConstructNode()
142
if err != nil {
143
t.Fatalf("fail to construct node: %v", err)
144
}
145
defer node.Close()
146
147
go func() {
148
// Create a IPFS server
149
t.Logf("HTTP API server listening on %s\n", ipfsAPIAddr)
150
corehttp.ListenAndServe(node, ipfsAPIAddr, corehttp.CommandsOption(*cctx))
151
}()
152
153
// Init HTTP client connects to IPFS server
154
ipfsAPIMaddr, err := ma.NewMultiaddr(ipfsAPIAddr)
155
if err != nil {
156
t.Fatalf("fail to new multi address: %v", err)
157
}
158
159
api, err := httpapi.NewApiWithClient(ipfsAPIMaddr, NewRetryableHTTPClient())
160
if err != nil {
161
t.Fatal(err)
162
}
163
164
// Running unit tests
165
redisServer, err := miniredis.Run()
166
if err != nil {
167
t.Fatalf("cannot run mini redis server: %v", err)
168
}
169
defer redisServer.Close()
170
171
redisC := redis.NewClient(&redis.Options{Addr: redisServer.Addr()})
172
173
redisBlobStore := &RedisBlobStore{Client: redisC}
174
ipfsBlobCache := &IPFSBlobCache{Redis: redisC, IPFS: api}
175
ipfsBlobSrc := ipfsBlobSource{source: ipfsBlobCache}
176
imageSpec := &rfapi.ImageSpec{BaseRef: "docker.io/library/alpine@sha256:7580ece7963bfa863801466c0a488f11c86f85d9988051a9f9c68cb27f6b7872"}
177
dgst := digest.NewDigestFromBytes(digest.SHA256, []byte("7580ece7963bfa863801466c0a488f11c86f85d9988051a9f9c68cb27f6b7872"))
178
mediaType := "application/vnd.docker.image.rootfs.diff.tar.gzip"
179
180
err = ipfsBlobCache.Store(ctx, dgst, io.NopCloser(bytes.NewReader([]byte("foobar"))), mediaType)
181
if err != nil {
182
t.Fatalf("cannot store to ipfs blobcache: %v", err)
183
}
184
185
exist := ipfsBlobSrc.HasBlob(ctx, imageSpec, dgst)
186
if !exist {
187
t.Fatal("the digest should exists")
188
}
189
190
resolverFactory := func() remotes.Resolver {
191
client := NewRetryableHTTPClient()
192
resolverOpts := docker.ResolverOptions{Client: client}
193
return docker.NewResolver(resolverOpts)
194
}
195
196
blobHandler := &blobHandler{
197
Context: ctx,
198
Digest: dgst,
199
Name: "unittest",
200
201
Spec: imageSpec,
202
Resolver: resolverFactory(),
203
Store: redisBlobStore,
204
IPFS: ipfsBlobCache,
205
}
206
207
req := httptest.NewRequest("", "http://example.com", nil)
208
w := newFailFirstResponseWriter()
209
210
blobHandler.getBlob(w, req)
211
}
212
213
type failFirstResponseWriter struct {
214
code int
215
headerMap http.Header
216
body *bytes.Buffer
217
218
requests int
219
}
220
221
func newFailFirstResponseWriter() *failFirstResponseWriter {
222
return &failFirstResponseWriter{
223
headerMap: make(http.Header),
224
body: new(bytes.Buffer),
225
code: 200,
226
}
227
}
228
229
func (rw *failFirstResponseWriter) Header() http.Header {
230
m := rw.headerMap
231
if m == nil {
232
m = make(http.Header)
233
rw.headerMap = m
234
}
235
return m
236
}
237
238
func (rw *failFirstResponseWriter) Write(buf []byte) (int, error) {
239
defer func() {
240
rw.requests += 1
241
}()
242
243
if rw.requests == 0 {
244
return 0, syscall.ECONNRESET
245
}
246
if rw.requests == 1 {
247
return 0, syscall.EPIPE
248
}
249
250
if rw.body != nil {
251
rw.body.Write(buf)
252
}
253
return len(buf), nil
254
}
255
256
func (rw *failFirstResponseWriter) WriteHeader(code int) {
257
rw.code = code
258
}
259
260
// mockBlobSource allows faking BlobSource behavior for tests.
261
type mockBlobSource struct {
262
// How many times GetBlob should fail before succeeding.
263
failCount int
264
// The error to return on failure.
265
failError error
266
267
// Internal counter for calls.
268
callCount int
269
// The data to return on success.
270
successData string
271
272
// Whether to use a reader that fails mid-stream on the first call.
273
failReaderOnFirstCall bool
274
// The number of bytes to read successfully before the reader fails.
275
failAfterBytes int
276
}
277
278
func (m *mockBlobSource) Name() string { return "mock" }
279
func (m *mockBlobSource) HasBlob(ctx context.Context, details *rfapi.ImageSpec, dgst digest.Digest) bool {
280
return true
281
}
282
283
func (m *mockBlobSource) GetBlob(ctx context.Context, details *rfapi.ImageSpec, dgst digest.Digest) (dontCache bool, mediaType string, url string, data io.ReadCloser, err error) {
284
m.callCount++
285
if m.callCount <= m.failCount {
286
return false, "", "", nil, m.failError
287
}
288
289
if m.failReaderOnFirstCall && m.callCount == 1 {
290
return false, "application/octet-stream", "", io.NopCloser(&failingReader{
291
reader: strings.NewReader(m.successData),
292
failAfterBytes: m.failAfterBytes,
293
failError: m.failError,
294
}), nil
295
}
296
297
return false, "application/octet-stream", "", io.NopCloser(strings.NewReader(m.successData)), nil
298
}
299
300
// failingReader is a reader that fails after a certain point.
301
type failingReader struct {
302
reader io.Reader
303
failAfterBytes int
304
failError error
305
bytesRead int
306
}
307
308
func (fr *failingReader) Read(p []byte) (n int, err error) {
309
if fr.bytesRead >= fr.failAfterBytes {
310
return 0, fr.failError
311
}
312
n, err = fr.reader.Read(p)
313
if err != nil {
314
return n, err
315
}
316
fr.bytesRead += n
317
if fr.bytesRead >= fr.failAfterBytes {
318
// Return the error, but also the bytes read in this call.
319
return n, fr.failError
320
}
321
return n, nil
322
}
323
324
func TestRetrieveFromSource_RetryOnGetBlob(t *testing.T) {
325
// Arrange
326
mockSource := &mockBlobSource{
327
failCount: 2,
328
failError: errors.New("transient network error"),
329
successData: "hello world",
330
}
331
332
bh := &blobHandler{
333
Digest: "sha256:dummy",
334
Spec: &rfapi.ImageSpec{},
335
}
336
337
// Use short backoff for testing
338
originalBackoff := retrievalBackoffParams
339
retrievalBackoffParams = wait.Backoff{
340
Duration: 1 * time.Millisecond,
341
Steps: 3,
342
}
343
defer func() { retrievalBackoffParams = originalBackoff }()
344
345
w := httptest.NewRecorder()
346
r := httptest.NewRequest("GET", "/v2/...", nil)
347
348
// Act
349
handled, dontCache, err := bh.retrieveFromSource(context.Background(), mockSource, w, r)
350
351
// Assert
352
require.NoError(t, err)
353
assert.True(t, handled)
354
assert.False(t, dontCache)
355
assert.Equal(t, "hello world", w.Body.String())
356
assert.Equal(t, 3, mockSource.callCount, "Expected GetBlob to be called 3 times (2 failures + 1 success)")
357
}
358
359
func TestRetrieveFromSource_RetryOnCopy(t *testing.T) {
360
// Arrange
361
mockSource := &mockBlobSource{
362
failCount: 0, // GetBlob succeeds immediately
363
failReaderOnFirstCall: true,
364
failAfterBytes: 5,
365
failError: syscall.EPIPE,
366
successData: "hello world",
367
}
368
369
bh := &blobHandler{
370
Digest: "sha256:dummy",
371
Spec: &rfapi.ImageSpec{},
372
}
373
374
// Use short backoff for testing
375
originalBackoff := retrievalBackoffParams
376
retrievalBackoffParams = wait.Backoff{
377
Duration: 1 * time.Millisecond,
378
Steps: 3,
379
}
380
defer func() { retrievalBackoffParams = originalBackoff }()
381
382
w := httptest.NewRecorder()
383
r := httptest.NewRequest("GET", "/v2/...", nil)
384
385
// Act
386
handled, dontCache, err := bh.retrieveFromSource(context.Background(), mockSource, w, r)
387
388
// Assert
389
require.NoError(t, err)
390
assert.True(t, handled)
391
assert.False(t, dontCache)
392
assert.Equal(t, "hello world", w.Body.String())
393
assert.Equal(t, 2, mockSource.callCount, "Expected GetBlob to be called twice (1st succeeds, copy fails, 2nd succeeds)")
394
}
395
396