Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/registry-facade/pkg/registry/cache.go
2499 views
1
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License.AGPL.txt in the project root for license information.
4
5
package registry
6
7
import (
8
"bytes"
9
"context"
10
"encoding/json"
11
"fmt"
12
"io"
13
"time"
14
15
"github.com/containerd/containerd/content"
16
"github.com/containerd/containerd/errdefs"
17
"github.com/gitpod-io/gitpod/common-go/log"
18
files "github.com/ipfs/boxo/files"
19
ipfs "github.com/ipfs/kubo/core/coreiface"
20
"github.com/ipfs/kubo/core/coreiface/options"
21
"github.com/opencontainers/go-digest"
22
ociv1 "github.com/opencontainers/image-spec/specs-go/v1"
23
redis "github.com/redis/go-redis/v9"
24
"golang.org/x/xerrors"
25
)
26
27
// IPFSBlobCache can cache blobs in IPFS
28
type IPFSBlobCache struct {
29
Redis *redis.Client
30
IPFS ipfs.CoreAPI
31
}
32
33
// Get retrieves the IPFS URL for a previously stored blob.
34
// Returns an error if the blob is not stored in IPFS yet.
35
func (store *IPFSBlobCache) Get(ctx context.Context, dgst digest.Digest) (ipfsURL string, err error) {
36
if store == nil || store.IPFS == nil || store.Redis == nil {
37
return "", nil
38
}
39
40
res, err := store.Redis.Get(ctx, dgst.String()).Result()
41
if err != nil {
42
return "", err
43
}
44
45
return "ipfs://" + res, nil
46
}
47
48
// Store stores a blob in IPFS. Will happily overwrite/re-upload a blob.
49
func (store *IPFSBlobCache) Store(ctx context.Context, dgst digest.Digest, content io.Reader, mediaType string) (err error) {
50
if store == nil || store.IPFS == nil || store.Redis == nil {
51
return nil
52
}
53
54
opts := []options.UnixfsAddOption{
55
options.Unixfs.Pin(true),
56
options.Unixfs.CidVersion(1),
57
options.Unixfs.RawLeaves(true),
58
options.Unixfs.FsCache(true),
59
}
60
61
p, err := store.IPFS.Unixfs().Add(ctx, files.NewReaderFile(content), opts...)
62
if err != nil {
63
return err
64
}
65
66
res := store.Redis.MSet(ctx,
67
dgst.String(), p.RootCid().String(),
68
mediaTypeKeyFromDigest(dgst), mediaType,
69
)
70
if err := res.Err(); err != nil {
71
return err
72
}
73
74
log.WithField("digest", dgst.String()).WithField("cid", p.RootCid().String()).Debug("pushed to IPFS")
75
76
return nil
77
}
78
79
type RedisBlobStore struct {
80
Client *redis.Client
81
}
82
83
var _ BlobStore = &RedisBlobStore{}
84
85
// Info will return metadata about content available in the content store.
86
//
87
// If the content is not present, ErrNotFound will be returned.
88
func (rbs *RedisBlobStore) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) {
89
res, err := rbs.Client.Get(ctx, "nfo."+string(dgst)).Result()
90
if err == redis.Nil {
91
return content.Info{}, errdefs.ErrNotFound
92
}
93
94
var redisInfo redisBlobInfo
95
err = json.Unmarshal([]byte(res), &redisInfo)
96
if err != nil {
97
return content.Info{}, xerrors.Errorf("cannot unmarshal blob info: %w", err)
98
}
99
100
return content.Info{
101
Digest: digest.Digest(redisInfo.Digest),
102
Size: redisInfo.Size,
103
CreatedAt: time.Unix(redisInfo.CreatedAt, 0),
104
UpdatedAt: time.Unix(redisInfo.UpdatedAt, 0),
105
Labels: redisInfo.Labels,
106
}, nil
107
}
108
109
func (rbs *RedisBlobStore) ReaderAt(ctx context.Context, desc ociv1.Descriptor) (content.ReaderAt, error) {
110
res, err := rbs.Client.Get(ctx, "cnt."+string(desc.Digest)).Result()
111
if err == redis.Nil {
112
return nil, errdefs.ErrNotFound
113
}
114
115
return stringReader(res), nil
116
}
117
118
type stringReader string
119
120
var _ content.ReaderAt = stringReader("")
121
122
func (r stringReader) Size() int64 { return int64(len(r)) }
123
func (r stringReader) Close() error { return nil }
124
func (r stringReader) ReadAt(p []byte, off int64) (n int, err error) {
125
n = copy(p, r[off:])
126
if n < len(p) {
127
return n, io.EOF
128
}
129
return
130
}
131
132
// Some implementations require WithRef to be included in opts.
133
func (rbs *RedisBlobStore) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
134
var wOpts content.WriterOpts
135
for _, opt := range opts {
136
if err := opt(&wOpts); err != nil {
137
return nil, err
138
}
139
}
140
if wOpts.Desc.Digest == "" {
141
return nil, xerrors.Errorf("desc.digest must not be empty: %w", errdefs.ErrInvalidArgument)
142
}
143
144
return newRedisBlobWriter(wOpts.Desc.Digest, rbs.Client), nil
145
}
146
147
type redisBlobWriter struct {
148
buf *bytes.Buffer
149
digest digest.Digest
150
client *redis.Client
151
152
forTestingOnlyTime time.Time
153
}
154
155
func newRedisBlobWriter(digest digest.Digest, client *redis.Client) *redisBlobWriter {
156
return &redisBlobWriter{
157
buf: bytes.NewBuffer(make([]byte, 0, 4096)),
158
digest: digest,
159
client: client,
160
}
161
}
162
163
var _ content.Writer = &redisBlobWriter{}
164
165
func (w *redisBlobWriter) Write(b []byte) (n int, err error) {
166
return w.buf.Write(b)
167
}
168
169
func (w *redisBlobWriter) Close() error {
170
return nil
171
}
172
173
// Digest may return empty digest or panics until committed.
174
func (w *redisBlobWriter) Digest() digest.Digest {
175
return w.digest
176
}
177
178
type redisBlobInfo struct {
179
Digest string
180
Size int64
181
CreatedAt int64
182
UpdatedAt int64
183
Labels map[string]string
184
}
185
186
// Commit commits the blob (but no roll-back is guaranteed on an error).
187
// size and expected can be zero-value when unknown.
188
// Commit always closes the writer, even on error.
189
// ErrAlreadyExists aborts the writer.
190
func (w *redisBlobWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
191
act := digest.FromBytes(w.buf.Bytes())
192
if expected != "" && expected != act {
193
return fmt.Errorf("unexpected commit digest %s, expected %s: %w", act, expected, errdefs.ErrFailedPrecondition)
194
}
195
196
var base content.Info
197
for _, opt := range opts {
198
if err := opt(&base); err != nil {
199
return err
200
}
201
}
202
203
var (
204
createdAt int64
205
updatedAt int64
206
)
207
if !w.forTestingOnlyTime.IsZero() {
208
createdAt = w.forTestingOnlyTime.Unix()
209
updatedAt = w.forTestingOnlyTime.Unix()
210
} else {
211
createdAt = time.Now().Unix()
212
updatedAt = time.Now().Unix()
213
}
214
215
rnfo, err := json.Marshal(redisBlobInfo{
216
Digest: string(expected),
217
Size: int64(w.buf.Len()),
218
CreatedAt: createdAt,
219
UpdatedAt: updatedAt,
220
Labels: base.Labels,
221
})
222
if err != nil {
223
return err
224
}
225
226
var (
227
kContent = fmt.Sprintf("cnt.%s", w.digest)
228
kInfo = fmt.Sprintf("nfo.%s", w.digest)
229
)
230
231
existingKeys, err := w.client.Exists(ctx, kContent, kInfo).Result()
232
if err != nil {
233
return err
234
}
235
236
if existingKeys != 0 {
237
return nil
238
}
239
240
err = w.client.MSet(ctx, map[string]interface{}{
241
kContent: w.buf.String(),
242
kInfo: string(rnfo),
243
}).Err()
244
if err != nil {
245
return err
246
}
247
248
return nil
249
}
250
251
// Status returns the current state of write
252
func (w *redisBlobWriter) Status() (content.Status, error) {
253
return content.Status{}, fmt.Errorf("not implemented")
254
}
255
256
// Truncate updates the size of the target blob
257
func (w *redisBlobWriter) Truncate(size int64) error {
258
return fmt.Errorf("not implemented")
259
}
260
261