Path: blob/main/components/registry-facade/pkg/registry/cache.go
2499 views
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.1// Licensed under the GNU Affero General Public License (AGPL).2// See License.AGPL.txt in the project root for license information.34package registry56import (7"bytes"8"context"9"encoding/json"10"fmt"11"io"12"time"1314"github.com/containerd/containerd/content"15"github.com/containerd/containerd/errdefs"16"github.com/gitpod-io/gitpod/common-go/log"17files "github.com/ipfs/boxo/files"18ipfs "github.com/ipfs/kubo/core/coreiface"19"github.com/ipfs/kubo/core/coreiface/options"20"github.com/opencontainers/go-digest"21ociv1 "github.com/opencontainers/image-spec/specs-go/v1"22redis "github.com/redis/go-redis/v9"23"golang.org/x/xerrors"24)2526// IPFSBlobCache can cache blobs in IPFS27type IPFSBlobCache struct {28Redis *redis.Client29IPFS ipfs.CoreAPI30}3132// Get retrieves the IPFS URL for a previously stored blob.33// Returns an error if the blob is not stored in IPFS yet.34func (store *IPFSBlobCache) Get(ctx context.Context, dgst digest.Digest) (ipfsURL string, err error) {35if store == nil || store.IPFS == nil || store.Redis == nil {36return "", nil37}3839res, err := store.Redis.Get(ctx, dgst.String()).Result()40if err != nil {41return "", err42}4344return "ipfs://" + res, nil45}4647// Store stores a blob in IPFS. Will happily overwrite/re-upload a blob.48func (store *IPFSBlobCache) Store(ctx context.Context, dgst digest.Digest, content io.Reader, mediaType string) (err error) {49if store == nil || store.IPFS == nil || store.Redis == nil {50return nil51}5253opts := []options.UnixfsAddOption{54options.Unixfs.Pin(true),55options.Unixfs.CidVersion(1),56options.Unixfs.RawLeaves(true),57options.Unixfs.FsCache(true),58}5960p, err := store.IPFS.Unixfs().Add(ctx, files.NewReaderFile(content), opts...)61if err != nil {62return err63}6465res := store.Redis.MSet(ctx,66dgst.String(), p.RootCid().String(),67mediaTypeKeyFromDigest(dgst), mediaType,68)69if err := res.Err(); err != nil {70return err71}7273log.WithField("digest", dgst.String()).WithField("cid", p.RootCid().String()).Debug("pushed to IPFS")7475return nil76}7778type RedisBlobStore struct {79Client *redis.Client80}8182var _ BlobStore = &RedisBlobStore{}8384// Info will return metadata about content available in the content store.85//86// If the content is not present, ErrNotFound will be returned.87func (rbs *RedisBlobStore) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) {88res, err := rbs.Client.Get(ctx, "nfo."+string(dgst)).Result()89if err == redis.Nil {90return content.Info{}, errdefs.ErrNotFound91}9293var redisInfo redisBlobInfo94err = json.Unmarshal([]byte(res), &redisInfo)95if err != nil {96return content.Info{}, xerrors.Errorf("cannot unmarshal blob info: %w", err)97}9899return content.Info{100Digest: digest.Digest(redisInfo.Digest),101Size: redisInfo.Size,102CreatedAt: time.Unix(redisInfo.CreatedAt, 0),103UpdatedAt: time.Unix(redisInfo.UpdatedAt, 0),104Labels: redisInfo.Labels,105}, nil106}107108func (rbs *RedisBlobStore) ReaderAt(ctx context.Context, desc ociv1.Descriptor) (content.ReaderAt, error) {109res, err := rbs.Client.Get(ctx, "cnt."+string(desc.Digest)).Result()110if err == redis.Nil {111return nil, errdefs.ErrNotFound112}113114return stringReader(res), nil115}116117type stringReader string118119var _ content.ReaderAt = stringReader("")120121func (r stringReader) Size() int64 { return int64(len(r)) }122func (r stringReader) Close() error { return nil }123func (r stringReader) ReadAt(p []byte, off int64) (n int, err error) {124n = copy(p, r[off:])125if n < len(p) {126return n, io.EOF127}128return129}130131// Some implementations require WithRef to be included in opts.132func (rbs *RedisBlobStore) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {133var wOpts content.WriterOpts134for _, opt := range opts {135if err := opt(&wOpts); err != nil {136return nil, err137}138}139if wOpts.Desc.Digest == "" {140return nil, xerrors.Errorf("desc.digest must not be empty: %w", errdefs.ErrInvalidArgument)141}142143return newRedisBlobWriter(wOpts.Desc.Digest, rbs.Client), nil144}145146type redisBlobWriter struct {147buf *bytes.Buffer148digest digest.Digest149client *redis.Client150151forTestingOnlyTime time.Time152}153154func newRedisBlobWriter(digest digest.Digest, client *redis.Client) *redisBlobWriter {155return &redisBlobWriter{156buf: bytes.NewBuffer(make([]byte, 0, 4096)),157digest: digest,158client: client,159}160}161162var _ content.Writer = &redisBlobWriter{}163164func (w *redisBlobWriter) Write(b []byte) (n int, err error) {165return w.buf.Write(b)166}167168func (w *redisBlobWriter) Close() error {169return nil170}171172// Digest may return empty digest or panics until committed.173func (w *redisBlobWriter) Digest() digest.Digest {174return w.digest175}176177type redisBlobInfo struct {178Digest string179Size int64180CreatedAt int64181UpdatedAt int64182Labels map[string]string183}184185// Commit commits the blob (but no roll-back is guaranteed on an error).186// size and expected can be zero-value when unknown.187// Commit always closes the writer, even on error.188// ErrAlreadyExists aborts the writer.189func (w *redisBlobWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {190act := digest.FromBytes(w.buf.Bytes())191if expected != "" && expected != act {192return fmt.Errorf("unexpected commit digest %s, expected %s: %w", act, expected, errdefs.ErrFailedPrecondition)193}194195var base content.Info196for _, opt := range opts {197if err := opt(&base); err != nil {198return err199}200}201202var (203createdAt int64204updatedAt int64205)206if !w.forTestingOnlyTime.IsZero() {207createdAt = w.forTestingOnlyTime.Unix()208updatedAt = w.forTestingOnlyTime.Unix()209} else {210createdAt = time.Now().Unix()211updatedAt = time.Now().Unix()212}213214rnfo, err := json.Marshal(redisBlobInfo{215Digest: string(expected),216Size: int64(w.buf.Len()),217CreatedAt: createdAt,218UpdatedAt: updatedAt,219Labels: base.Labels,220})221if err != nil {222return err223}224225var (226kContent = fmt.Sprintf("cnt.%s", w.digest)227kInfo = fmt.Sprintf("nfo.%s", w.digest)228)229230existingKeys, err := w.client.Exists(ctx, kContent, kInfo).Result()231if err != nil {232return err233}234235if existingKeys != 0 {236return nil237}238239err = w.client.MSet(ctx, map[string]interface{}{240kContent: w.buf.String(),241kInfo: string(rnfo),242}).Err()243if err != nil {244return err245}246247return nil248}249250// Status returns the current state of write251func (w *redisBlobWriter) Status() (content.Status, error) {252return content.Status{}, fmt.Errorf("not implemented")253}254255// Truncate updates the size of the target blob256func (w *redisBlobWriter) Truncate(size int64) error {257return fmt.Errorf("not implemented")258}259260261