Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/registry-facade/pkg/registry/blob.go
2499 views
1
// Copyright (c) 2020 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License.AGPL.txt in the project root for license information.
4
5
package registry
6
7
import (
8
"bytes"
9
"context"
10
"encoding/json"
11
"errors"
12
"io"
13
"net/http"
14
"sync"
15
"syscall"
16
"time"
17
18
"github.com/containerd/containerd/content"
19
"github.com/containerd/containerd/errdefs"
20
"github.com/containerd/containerd/remotes"
21
distv2 "github.com/docker/distribution/registry/api/v2"
22
"github.com/gorilla/handlers"
23
files "github.com/ipfs/boxo/files"
24
icorepath "github.com/ipfs/boxo/path"
25
"github.com/ipfs/go-cid"
26
"github.com/opencontainers/go-digest"
27
ociv1 "github.com/opencontainers/image-spec/specs-go/v1"
28
"github.com/opentracing/opentracing-go"
29
"golang.org/x/xerrors"
30
"k8s.io/apimachinery/pkg/util/wait"
31
32
"github.com/gitpod-io/gitpod/common-go/log"
33
"github.com/gitpod-io/gitpod/common-go/tracing"
34
"github.com/gitpod-io/gitpod/registry-facade/api"
35
)
36
37
// retrievalBackoffParams defines the backoff parameters for blob retrieval.
38
// Aiming at ~10 seconds total time for retries
39
var retrievalBackoffParams = wait.Backoff{
40
Duration: 1 * time.Second,
41
Factor: 1.2,
42
Jitter: 0.2,
43
Steps: 5,
44
}
45
46
func (reg *Registry) handleBlob(ctx context.Context, r *http.Request) http.Handler {
47
spname, name := getSpecProviderName(ctx)
48
sp, ok := reg.SpecProvider[spname]
49
if !ok {
50
log.WithField("specProvName", spname).Error("unknown spec provider")
51
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
52
respondWithError(w, distv2.ErrorCodeManifestUnknown)
53
})
54
}
55
spec, err := sp.GetSpec(ctx, name)
56
if err != nil {
57
log.WithError(err).WithField("specProvName", spname).WithField("name", name).Error("cannot get spec")
58
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
59
respondWithError(w, distv2.ErrorCodeManifestUnknown)
60
})
61
}
62
63
dgst, err := digest.Parse(getDigest(ctx))
64
if err != nil {
65
log.WithError(err).WithField("instanceId", name).Error("cannot get workspace details")
66
}
67
68
blobHandler := &blobHandler{
69
Context: ctx,
70
Digest: dgst,
71
Name: name,
72
73
Spec: spec,
74
Resolver: reg.Resolver(),
75
Store: reg.Store,
76
IPFS: reg.IPFS,
77
AdditionalSources: []BlobSource{
78
reg.LayerSource,
79
},
80
ConfigModifier: reg.ConfigModifier,
81
82
Metrics: reg.metrics,
83
}
84
85
mhandler := handlers.MethodHandler{
86
"GET": http.HandlerFunc(blobHandler.getBlob),
87
"HEAD": http.HandlerFunc(blobHandler.getBlob),
88
}
89
res := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
90
reg.metrics.BlobCounter.Inc()
91
mhandler.ServeHTTP(w, r)
92
})
93
94
return res
95
}
96
97
type blobHandler struct {
98
Context context.Context
99
Digest digest.Digest
100
Name string
101
102
Spec *api.ImageSpec
103
Resolver remotes.Resolver
104
Store BlobStore
105
IPFS *IPFSBlobCache
106
AdditionalSources []BlobSource
107
ConfigModifier ConfigModifier
108
109
Metrics *metrics
110
}
111
112
var bufPool = sync.Pool{
113
New: func() interface{} {
114
// setting to 4096 to align with PIPE_BUF
115
// http://man7.org/linux/man-pages/man7/pipe.7.html
116
buffer := make([]byte, 4096)
117
return &buffer
118
},
119
}
120
121
func (bh *blobHandler) getBlob(w http.ResponseWriter, r *http.Request) {
122
// v2.ErrorCodeBlobUnknown.WithDetail(bh.Digest)
123
//nolint:staticcheck,ineffassign
124
span, ctx := opentracing.StartSpanFromContext(r.Context(), "getBlob")
125
126
ctx, cancel := context.WithCancel(context.Background())
127
defer cancel()
128
129
err := func() error {
130
// TODO: rather than download the same manifest over and over again,
131
// we should add it to the store and try and fetch it from there.
132
// Only if the store fetch fails should we attetmpt to download it.
133
manifest, fetcher, err := bh.downloadManifest(ctx, bh.Spec.BaseRef)
134
if err != nil {
135
return xerrors.Errorf("cannnot fetch the manifest: %w", err)
136
}
137
138
var srcs []BlobSource
139
140
// 1. local store (faster)
141
srcs = append(srcs, storeBlobSource{Store: bh.Store})
142
143
// 2. IPFS (if configured)
144
if bh.IPFS != nil {
145
ipfsSrc := ipfsBlobSource{source: bh.IPFS}
146
srcs = append(srcs, ipfsSrc)
147
}
148
149
// 3. upstream registry
150
srcs = append(srcs, proxyingBlobSource{Fetcher: fetcher, Blobs: manifest.Layers})
151
152
srcs = append(srcs, &configBlobSource{Fetcher: fetcher, Spec: bh.Spec, Manifest: manifest, ConfigModifier: bh.ConfigModifier})
153
srcs = append(srcs, bh.AdditionalSources...)
154
155
w.Header().Set("Etag", bh.Digest.String())
156
157
var retrieved bool
158
var src BlobSource
159
var dontCache bool
160
for _, s := range srcs {
161
if !s.HasBlob(ctx, bh.Spec, bh.Digest) {
162
continue
163
}
164
165
retrieved, dontCache, err = bh.retrieveFromSource(ctx, s, w, r)
166
if err != nil {
167
log.WithField("src", s.Name()).WithError(err).Error("unable to retrieve blob")
168
}
169
170
if retrieved {
171
src = s
172
break
173
}
174
}
175
176
if !retrieved {
177
log.WithField("baseRef", bh.Spec.BaseRef).WithError(err).Error("unable to return blob")
178
return xerrors.Errorf("unable to return blob: %w", err)
179
}
180
181
if dontCache {
182
return nil
183
}
184
185
go func() {
186
// we can do this only after the io.Copy above. Otherwise we might expect the blob
187
// to be in the blobstore when in reality it isn't.
188
_, mediaType, _, rc, err := src.GetBlob(context.Background(), bh.Spec, bh.Digest)
189
if err != nil {
190
log.WithError(err).WithField("digest", bh.Digest).Warn("cannot push to IPFS - unable to get blob")
191
return
192
}
193
if rc == nil {
194
log.WithField("digest", bh.Digest).Warn("cannot push to IPFS - blob is nil")
195
return
196
}
197
198
defer rc.Close()
199
200
err = bh.IPFS.Store(context.Background(), bh.Digest, rc, mediaType)
201
if err != nil {
202
log.WithError(err).WithField("digest", bh.Digest).Warn("cannot push to IPFS")
203
}
204
}()
205
206
return nil
207
}()
208
209
if err != nil {
210
log.WithError(err).Error("cannot get blob")
211
respondWithError(w, err)
212
}
213
tracing.FinishSpan(span, &err)
214
}
215
216
func (bh *blobHandler) retrieveFromSource(ctx context.Context, src BlobSource, w http.ResponseWriter, r *http.Request) (handled, dontCache bool, err error) {
217
log.Debugf("retrieving blob %s from %s", bh.Digest, src.Name())
218
219
var n int64
220
t0 := time.Now()
221
var body bytes.Buffer
222
var finalMediaType string
223
224
// The entire operation is now inside the backoff loop
225
err = wait.ExponentialBackoffWithContext(ctx, retrievalBackoffParams, func(ctx context.Context) (done bool, err error) {
226
// 1. GetBlob is now INSIDE the retry loop
227
var url string
228
var rc io.ReadCloser
229
dontCache, finalMediaType, url, rc, err = src.GetBlob(ctx, bh.Spec, bh.Digest)
230
if err != nil {
231
log.WithField("blobSource", src.Name()).WithError(err).Warn("error fetching blob from source, retrying...")
232
return false, nil
233
}
234
if rc != nil {
235
defer rc.Close()
236
}
237
238
if url != "" {
239
http.Redirect(w, r, url, http.StatusPermanentRedirect)
240
dontCache = true
241
return true, nil
242
}
243
244
body.Reset()
245
bp := bufPool.Get().(*[]byte)
246
defer bufPool.Put(bp)
247
248
// 2. CopyBuffer is also inside the retry loop
249
n, err = io.CopyBuffer(&body, rc, *bp)
250
if err == nil {
251
return true, nil
252
}
253
254
// Check for retryable errors during copy
255
if errors.Is(err, syscall.ECONNRESET) || errors.Is(err, syscall.EPIPE) {
256
// TODO(gpl): current error seem to be captured by this - but does it make sense to widen this condition?
257
log.WithField("blobSource", src.Name()).WithField("baseRef", bh.Spec.BaseRef).WithError(err).Warn("retry get blob because of streaming error")
258
return false, nil
259
}
260
261
return true, err
262
})
263
264
if err != nil {
265
if bh.Metrics != nil {
266
bh.Metrics.BlobDownloadCounter.WithLabelValues(src.Name(), "false").Inc()
267
}
268
return false, true, err
269
}
270
271
w.Header().Set("Content-Type", finalMediaType)
272
w.Write(body.Bytes())
273
274
if bh.Metrics != nil {
275
bh.Metrics.BlobDownloadCounter.WithLabelValues(src.Name(), "true").Inc()
276
bh.Metrics.BlobDownloadSpeedHist.WithLabelValues(src.Name()).Observe(float64(n) / time.Since(t0).Seconds())
277
bh.Metrics.BlobDownloadSizeCounter.WithLabelValues(src.Name()).Add(float64(n))
278
}
279
280
return true, dontCache, nil
281
}
282
283
func (bh *blobHandler) downloadManifest(ctx context.Context, ref string) (res *ociv1.Manifest, fetcher remotes.Fetcher, err error) {
284
_, desc, err := bh.Resolver.Resolve(ctx, ref)
285
if err != nil {
286
// ErrInvalidAuthorization
287
return nil, nil, err
288
}
289
290
fetcher, err = bh.Resolver.Fetcher(ctx, ref)
291
if err != nil {
292
log.WithError(err).WithField("ref", ref).WithField("instanceId", bh.Name).Error("cannot get fetcher")
293
return nil, nil, err
294
}
295
res, _, err = DownloadManifest(ctx, AsFetcherFunc(fetcher), desc, WithStore(bh.Store))
296
return
297
}
298
299
type reader struct {
300
content.ReaderAt
301
off int64
302
}
303
304
func (r *reader) Read(b []byte) (n int, err error) {
305
n, err = r.ReadAt(b, r.off)
306
r.off += int64(n)
307
return
308
}
309
310
// BlobSource can provide blobs for download
311
type BlobSource interface {
312
// HasBlob checks if a digest can be served by this blob source
313
HasBlob(ctx context.Context, details *api.ImageSpec, dgst digest.Digest) bool
314
315
// GetBlob provides access to a blob. If a ReadCloser is returned the receiver is expected to
316
// call close on it eventually.
317
GetBlob(ctx context.Context, details *api.ImageSpec, dgst digest.Digest) (dontCache bool, mediaType string, url string, data io.ReadCloser, err error)
318
319
// Name identifies the blob source in metrics
320
Name() string
321
}
322
323
type storeBlobSource struct {
324
Store BlobStore
325
}
326
327
func (sbs storeBlobSource) Name() string {
328
return "blobstore"
329
}
330
331
func (sbs storeBlobSource) HasBlob(ctx context.Context, spec *api.ImageSpec, dgst digest.Digest) bool {
332
_, err := sbs.Store.Info(ctx, dgst)
333
return err == nil
334
}
335
336
func (sbs storeBlobSource) GetBlob(ctx context.Context, spec *api.ImageSpec, dgst digest.Digest) (dontCache bool, mediaType string, url string, data io.ReadCloser, err error) {
337
info, err := sbs.Store.Info(ctx, dgst)
338
if err != nil {
339
return
340
}
341
342
r, err := sbs.Store.ReaderAt(ctx, ociv1.Descriptor{Digest: dgst})
343
if err != nil {
344
return
345
}
346
347
return false, info.Labels["Content-Type"], "", &reader{ReaderAt: r}, nil
348
}
349
350
type proxyingBlobSource struct {
351
Fetcher remotes.Fetcher
352
Blobs []ociv1.Descriptor
353
}
354
355
func (sbs proxyingBlobSource) Name() string {
356
return "proxy"
357
}
358
359
func (pbs proxyingBlobSource) HasBlob(ctx context.Context, spec *api.ImageSpec, dgst digest.Digest) bool {
360
for _, b := range pbs.Blobs {
361
if b.Digest == dgst {
362
return true
363
}
364
}
365
return false
366
}
367
368
func (pbs proxyingBlobSource) GetBlob(ctx context.Context, spec *api.ImageSpec, dgst digest.Digest) (dontCache bool, mediaType string, url string, data io.ReadCloser, err error) {
369
var src ociv1.Descriptor
370
for _, b := range pbs.Blobs {
371
if b.Digest == dgst {
372
src = b
373
break
374
}
375
}
376
if src.Digest == "" {
377
err = errdefs.ErrNotFound
378
return
379
}
380
381
r, err := pbs.Fetcher.Fetch(ctx, src)
382
if err != nil {
383
return
384
}
385
return false, src.MediaType, "", r, nil
386
}
387
388
type configBlobSource struct {
389
Fetcher remotes.Fetcher
390
Spec *api.ImageSpec
391
Manifest *ociv1.Manifest
392
ConfigModifier ConfigModifier
393
}
394
395
func (sbs configBlobSource) Name() string {
396
return "config"
397
}
398
399
func (pbs *configBlobSource) HasBlob(ctx context.Context, spec *api.ImageSpec, dgst digest.Digest) bool {
400
cfg, err := pbs.getConfig(ctx)
401
if err != nil {
402
log.WithError(err).Error("cannot (re-)produce image config")
403
return false
404
}
405
406
cfgDgst := digest.FromBytes(cfg)
407
return cfgDgst == dgst
408
}
409
410
func (pbs *configBlobSource) GetBlob(ctx context.Context, spec *api.ImageSpec, dgst digest.Digest) (dontCache bool, mediaType string, url string, data io.ReadCloser, err error) {
411
if !pbs.HasBlob(ctx, spec, dgst) {
412
err = distv2.ErrorCodeBlobUnknown
413
return
414
}
415
416
cfg, err := pbs.getConfig(ctx)
417
if err != nil {
418
return
419
}
420
mediaType = pbs.Manifest.Config.MediaType
421
data = io.NopCloser(bytes.NewReader(cfg))
422
return
423
}
424
425
func (pbs *configBlobSource) getConfig(ctx context.Context) (rawCfg []byte, err error) {
426
manifest := *pbs.Manifest
427
cfg, err := DownloadConfig(ctx, AsFetcherFunc(pbs.Fetcher), "", manifest.Config)
428
if err != nil {
429
return
430
}
431
432
_, err = pbs.ConfigModifier(ctx, pbs.Spec, cfg)
433
if err != nil {
434
return
435
}
436
437
rawCfg, err = json.Marshal(cfg)
438
return
439
}
440
441
type ipfsBlobSource struct {
442
source *IPFSBlobCache
443
}
444
445
func (sbs ipfsBlobSource) Name() string {
446
return "ipfs"
447
}
448
449
func (sbs ipfsBlobSource) HasBlob(ctx context.Context, spec *api.ImageSpec, dgst digest.Digest) bool {
450
_, err := sbs.source.Redis.Get(ctx, dgst.String()).Result()
451
return err == nil
452
}
453
454
func (sbs ipfsBlobSource) GetBlob(ctx context.Context, spec *api.ImageSpec, dgst digest.Digest) (dontCache bool, mediaType string, url string, data io.ReadCloser, err error) {
455
log := log.WithField("digest", dgst)
456
457
ipfsCID, err := sbs.source.Redis.Get(ctx, dgst.String()).Result()
458
if err != nil {
459
log.WithError(err).Error("unable to get blob details from Redis")
460
err = distv2.ErrorCodeBlobUnknown
461
return
462
}
463
464
c, err := cid.Decode(ipfsCID)
465
if err != nil {
466
log.WithError(err).Error("unable to decode CID")
467
err = distv2.ErrorCodeBlobUnknown
468
return
469
}
470
471
ipfsFile, err := sbs.source.IPFS.Unixfs().Get(ctx, icorepath.FromCid(c))
472
if err != nil {
473
log.WithError(err).Error("unable to get blob from IPFS")
474
err = distv2.ErrorCodeBlobUnknown
475
return
476
}
477
478
f, ok := ipfsFile.(interface {
479
files.File
480
io.ReaderAt
481
})
482
if !ok {
483
log.WithError(err).Error("IPFS file does not support io.ReaderAt")
484
err = distv2.ErrorCodeBlobUnknown
485
return
486
}
487
488
mediaType, err = sbs.source.Redis.Get(ctx, mediaTypeKeyFromDigest(dgst)).Result()
489
if err != nil {
490
log.WithError(err).Error("cannot get media type from Redis")
491
err = distv2.ErrorCodeBlobUnknown
492
return
493
}
494
495
log.Debug("returning blob from IPFS")
496
return true, mediaType, "", f, nil
497
}
498
499
func mediaTypeKeyFromDigest(dgst digest.Digest) string {
500
return "mtype:" + dgst.String()
501
}
502
503