Path: blob/main/components/registry-facade/pkg/registry/blob.go
2499 views
// Copyright (c) 2020 Gitpod GmbH. All rights reserved.1// Licensed under the GNU Affero General Public License (AGPL).2// See License.AGPL.txt in the project root for license information.34package registry56import (7"bytes"8"context"9"encoding/json"10"errors"11"io"12"net/http"13"sync"14"syscall"15"time"1617"github.com/containerd/containerd/content"18"github.com/containerd/containerd/errdefs"19"github.com/containerd/containerd/remotes"20distv2 "github.com/docker/distribution/registry/api/v2"21"github.com/gorilla/handlers"22files "github.com/ipfs/boxo/files"23icorepath "github.com/ipfs/boxo/path"24"github.com/ipfs/go-cid"25"github.com/opencontainers/go-digest"26ociv1 "github.com/opencontainers/image-spec/specs-go/v1"27"github.com/opentracing/opentracing-go"28"golang.org/x/xerrors"29"k8s.io/apimachinery/pkg/util/wait"3031"github.com/gitpod-io/gitpod/common-go/log"32"github.com/gitpod-io/gitpod/common-go/tracing"33"github.com/gitpod-io/gitpod/registry-facade/api"34)3536// retrievalBackoffParams defines the backoff parameters for blob retrieval.37// Aiming at ~10 seconds total time for retries38var retrievalBackoffParams = wait.Backoff{39Duration: 1 * time.Second,40Factor: 1.2,41Jitter: 0.2,42Steps: 5,43}4445func (reg *Registry) handleBlob(ctx context.Context, r *http.Request) http.Handler {46spname, name := getSpecProviderName(ctx)47sp, ok := reg.SpecProvider[spname]48if !ok {49log.WithField("specProvName", spname).Error("unknown spec provider")50return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {51respondWithError(w, distv2.ErrorCodeManifestUnknown)52})53}54spec, err := sp.GetSpec(ctx, name)55if err != nil {56log.WithError(err).WithField("specProvName", spname).WithField("name", name).Error("cannot get spec")57return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {58respondWithError(w, distv2.ErrorCodeManifestUnknown)59})60}6162dgst, err := digest.Parse(getDigest(ctx))63if err != nil {64log.WithError(err).WithField("instanceId", name).Error("cannot get workspace details")65}6667blobHandler := &blobHandler{68Context: ctx,69Digest: dgst,70Name: name,7172Spec: spec,73Resolver: reg.Resolver(),74Store: reg.Store,75IPFS: reg.IPFS,76AdditionalSources: []BlobSource{77reg.LayerSource,78},79ConfigModifier: reg.ConfigModifier,8081Metrics: reg.metrics,82}8384mhandler := handlers.MethodHandler{85"GET": http.HandlerFunc(blobHandler.getBlob),86"HEAD": http.HandlerFunc(blobHandler.getBlob),87}88res := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {89reg.metrics.BlobCounter.Inc()90mhandler.ServeHTTP(w, r)91})9293return res94}9596type blobHandler struct {97Context context.Context98Digest digest.Digest99Name string100101Spec *api.ImageSpec102Resolver remotes.Resolver103Store BlobStore104IPFS *IPFSBlobCache105AdditionalSources []BlobSource106ConfigModifier ConfigModifier107108Metrics *metrics109}110111var bufPool = sync.Pool{112New: func() interface{} {113// setting to 4096 to align with PIPE_BUF114// http://man7.org/linux/man-pages/man7/pipe.7.html115buffer := make([]byte, 4096)116return &buffer117},118}119120func (bh *blobHandler) getBlob(w http.ResponseWriter, r *http.Request) {121// v2.ErrorCodeBlobUnknown.WithDetail(bh.Digest)122//nolint:staticcheck,ineffassign123span, ctx := opentracing.StartSpanFromContext(r.Context(), "getBlob")124125ctx, cancel := context.WithCancel(context.Background())126defer cancel()127128err := func() error {129// TODO: rather than download the same manifest over and over again,130// we should add it to the store and try and fetch it from there.131// Only if the store fetch fails should we attetmpt to download it.132manifest, fetcher, err := bh.downloadManifest(ctx, bh.Spec.BaseRef)133if err != nil {134return xerrors.Errorf("cannnot fetch the manifest: %w", err)135}136137var srcs []BlobSource138139// 1. local store (faster)140srcs = append(srcs, storeBlobSource{Store: bh.Store})141142// 2. IPFS (if configured)143if bh.IPFS != nil {144ipfsSrc := ipfsBlobSource{source: bh.IPFS}145srcs = append(srcs, ipfsSrc)146}147148// 3. upstream registry149srcs = append(srcs, proxyingBlobSource{Fetcher: fetcher, Blobs: manifest.Layers})150151srcs = append(srcs, &configBlobSource{Fetcher: fetcher, Spec: bh.Spec, Manifest: manifest, ConfigModifier: bh.ConfigModifier})152srcs = append(srcs, bh.AdditionalSources...)153154w.Header().Set("Etag", bh.Digest.String())155156var retrieved bool157var src BlobSource158var dontCache bool159for _, s := range srcs {160if !s.HasBlob(ctx, bh.Spec, bh.Digest) {161continue162}163164retrieved, dontCache, err = bh.retrieveFromSource(ctx, s, w, r)165if err != nil {166log.WithField("src", s.Name()).WithError(err).Error("unable to retrieve blob")167}168169if retrieved {170src = s171break172}173}174175if !retrieved {176log.WithField("baseRef", bh.Spec.BaseRef).WithError(err).Error("unable to return blob")177return xerrors.Errorf("unable to return blob: %w", err)178}179180if dontCache {181return nil182}183184go func() {185// we can do this only after the io.Copy above. Otherwise we might expect the blob186// to be in the blobstore when in reality it isn't.187_, mediaType, _, rc, err := src.GetBlob(context.Background(), bh.Spec, bh.Digest)188if err != nil {189log.WithError(err).WithField("digest", bh.Digest).Warn("cannot push to IPFS - unable to get blob")190return191}192if rc == nil {193log.WithField("digest", bh.Digest).Warn("cannot push to IPFS - blob is nil")194return195}196197defer rc.Close()198199err = bh.IPFS.Store(context.Background(), bh.Digest, rc, mediaType)200if err != nil {201log.WithError(err).WithField("digest", bh.Digest).Warn("cannot push to IPFS")202}203}()204205return nil206}()207208if err != nil {209log.WithError(err).Error("cannot get blob")210respondWithError(w, err)211}212tracing.FinishSpan(span, &err)213}214215func (bh *blobHandler) retrieveFromSource(ctx context.Context, src BlobSource, w http.ResponseWriter, r *http.Request) (handled, dontCache bool, err error) {216log.Debugf("retrieving blob %s from %s", bh.Digest, src.Name())217218var n int64219t0 := time.Now()220var body bytes.Buffer221var finalMediaType string222223// The entire operation is now inside the backoff loop224err = wait.ExponentialBackoffWithContext(ctx, retrievalBackoffParams, func(ctx context.Context) (done bool, err error) {225// 1. GetBlob is now INSIDE the retry loop226var url string227var rc io.ReadCloser228dontCache, finalMediaType, url, rc, err = src.GetBlob(ctx, bh.Spec, bh.Digest)229if err != nil {230log.WithField("blobSource", src.Name()).WithError(err).Warn("error fetching blob from source, retrying...")231return false, nil232}233if rc != nil {234defer rc.Close()235}236237if url != "" {238http.Redirect(w, r, url, http.StatusPermanentRedirect)239dontCache = true240return true, nil241}242243body.Reset()244bp := bufPool.Get().(*[]byte)245defer bufPool.Put(bp)246247// 2. CopyBuffer is also inside the retry loop248n, err = io.CopyBuffer(&body, rc, *bp)249if err == nil {250return true, nil251}252253// Check for retryable errors during copy254if errors.Is(err, syscall.ECONNRESET) || errors.Is(err, syscall.EPIPE) {255// TODO(gpl): current error seem to be captured by this - but does it make sense to widen this condition?256log.WithField("blobSource", src.Name()).WithField("baseRef", bh.Spec.BaseRef).WithError(err).Warn("retry get blob because of streaming error")257return false, nil258}259260return true, err261})262263if err != nil {264if bh.Metrics != nil {265bh.Metrics.BlobDownloadCounter.WithLabelValues(src.Name(), "false").Inc()266}267return false, true, err268}269270w.Header().Set("Content-Type", finalMediaType)271w.Write(body.Bytes())272273if bh.Metrics != nil {274bh.Metrics.BlobDownloadCounter.WithLabelValues(src.Name(), "true").Inc()275bh.Metrics.BlobDownloadSpeedHist.WithLabelValues(src.Name()).Observe(float64(n) / time.Since(t0).Seconds())276bh.Metrics.BlobDownloadSizeCounter.WithLabelValues(src.Name()).Add(float64(n))277}278279return true, dontCache, nil280}281282func (bh *blobHandler) downloadManifest(ctx context.Context, ref string) (res *ociv1.Manifest, fetcher remotes.Fetcher, err error) {283_, desc, err := bh.Resolver.Resolve(ctx, ref)284if err != nil {285// ErrInvalidAuthorization286return nil, nil, err287}288289fetcher, err = bh.Resolver.Fetcher(ctx, ref)290if err != nil {291log.WithError(err).WithField("ref", ref).WithField("instanceId", bh.Name).Error("cannot get fetcher")292return nil, nil, err293}294res, _, err = DownloadManifest(ctx, AsFetcherFunc(fetcher), desc, WithStore(bh.Store))295return296}297298type reader struct {299content.ReaderAt300off int64301}302303func (r *reader) Read(b []byte) (n int, err error) {304n, err = r.ReadAt(b, r.off)305r.off += int64(n)306return307}308309// BlobSource can provide blobs for download310type BlobSource interface {311// HasBlob checks if a digest can be served by this blob source312HasBlob(ctx context.Context, details *api.ImageSpec, dgst digest.Digest) bool313314// GetBlob provides access to a blob. If a ReadCloser is returned the receiver is expected to315// call close on it eventually.316GetBlob(ctx context.Context, details *api.ImageSpec, dgst digest.Digest) (dontCache bool, mediaType string, url string, data io.ReadCloser, err error)317318// Name identifies the blob source in metrics319Name() string320}321322type storeBlobSource struct {323Store BlobStore324}325326func (sbs storeBlobSource) Name() string {327return "blobstore"328}329330func (sbs storeBlobSource) HasBlob(ctx context.Context, spec *api.ImageSpec, dgst digest.Digest) bool {331_, err := sbs.Store.Info(ctx, dgst)332return err == nil333}334335func (sbs storeBlobSource) GetBlob(ctx context.Context, spec *api.ImageSpec, dgst digest.Digest) (dontCache bool, mediaType string, url string, data io.ReadCloser, err error) {336info, err := sbs.Store.Info(ctx, dgst)337if err != nil {338return339}340341r, err := sbs.Store.ReaderAt(ctx, ociv1.Descriptor{Digest: dgst})342if err != nil {343return344}345346return false, info.Labels["Content-Type"], "", &reader{ReaderAt: r}, nil347}348349type proxyingBlobSource struct {350Fetcher remotes.Fetcher351Blobs []ociv1.Descriptor352}353354func (sbs proxyingBlobSource) Name() string {355return "proxy"356}357358func (pbs proxyingBlobSource) HasBlob(ctx context.Context, spec *api.ImageSpec, dgst digest.Digest) bool {359for _, b := range pbs.Blobs {360if b.Digest == dgst {361return true362}363}364return false365}366367func (pbs proxyingBlobSource) GetBlob(ctx context.Context, spec *api.ImageSpec, dgst digest.Digest) (dontCache bool, mediaType string, url string, data io.ReadCloser, err error) {368var src ociv1.Descriptor369for _, b := range pbs.Blobs {370if b.Digest == dgst {371src = b372break373}374}375if src.Digest == "" {376err = errdefs.ErrNotFound377return378}379380r, err := pbs.Fetcher.Fetch(ctx, src)381if err != nil {382return383}384return false, src.MediaType, "", r, nil385}386387type configBlobSource struct {388Fetcher remotes.Fetcher389Spec *api.ImageSpec390Manifest *ociv1.Manifest391ConfigModifier ConfigModifier392}393394func (sbs configBlobSource) Name() string {395return "config"396}397398func (pbs *configBlobSource) HasBlob(ctx context.Context, spec *api.ImageSpec, dgst digest.Digest) bool {399cfg, err := pbs.getConfig(ctx)400if err != nil {401log.WithError(err).Error("cannot (re-)produce image config")402return false403}404405cfgDgst := digest.FromBytes(cfg)406return cfgDgst == dgst407}408409func (pbs *configBlobSource) GetBlob(ctx context.Context, spec *api.ImageSpec, dgst digest.Digest) (dontCache bool, mediaType string, url string, data io.ReadCloser, err error) {410if !pbs.HasBlob(ctx, spec, dgst) {411err = distv2.ErrorCodeBlobUnknown412return413}414415cfg, err := pbs.getConfig(ctx)416if err != nil {417return418}419mediaType = pbs.Manifest.Config.MediaType420data = io.NopCloser(bytes.NewReader(cfg))421return422}423424func (pbs *configBlobSource) getConfig(ctx context.Context) (rawCfg []byte, err error) {425manifest := *pbs.Manifest426cfg, err := DownloadConfig(ctx, AsFetcherFunc(pbs.Fetcher), "", manifest.Config)427if err != nil {428return429}430431_, err = pbs.ConfigModifier(ctx, pbs.Spec, cfg)432if err != nil {433return434}435436rawCfg, err = json.Marshal(cfg)437return438}439440type ipfsBlobSource struct {441source *IPFSBlobCache442}443444func (sbs ipfsBlobSource) Name() string {445return "ipfs"446}447448func (sbs ipfsBlobSource) HasBlob(ctx context.Context, spec *api.ImageSpec, dgst digest.Digest) bool {449_, err := sbs.source.Redis.Get(ctx, dgst.String()).Result()450return err == nil451}452453func (sbs ipfsBlobSource) GetBlob(ctx context.Context, spec *api.ImageSpec, dgst digest.Digest) (dontCache bool, mediaType string, url string, data io.ReadCloser, err error) {454log := log.WithField("digest", dgst)455456ipfsCID, err := sbs.source.Redis.Get(ctx, dgst.String()).Result()457if err != nil {458log.WithError(err).Error("unable to get blob details from Redis")459err = distv2.ErrorCodeBlobUnknown460return461}462463c, err := cid.Decode(ipfsCID)464if err != nil {465log.WithError(err).Error("unable to decode CID")466err = distv2.ErrorCodeBlobUnknown467return468}469470ipfsFile, err := sbs.source.IPFS.Unixfs().Get(ctx, icorepath.FromCid(c))471if err != nil {472log.WithError(err).Error("unable to get blob from IPFS")473err = distv2.ErrorCodeBlobUnknown474return475}476477f, ok := ipfsFile.(interface {478files.File479io.ReaderAt480})481if !ok {482log.WithError(err).Error("IPFS file does not support io.ReaderAt")483err = distv2.ErrorCodeBlobUnknown484return485}486487mediaType, err = sbs.source.Redis.Get(ctx, mediaTypeKeyFromDigest(dgst)).Result()488if err != nil {489log.WithError(err).Error("cannot get media type from Redis")490err = distv2.ErrorCodeBlobUnknown491return492}493494log.Debug("returning blob from IPFS")495return true, mediaType, "", f, nil496}497498func mediaTypeKeyFromDigest(dgst digest.Digest) string {499return "mtype:" + dgst.String()500}501502503