Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/registry-facade/pkg/registry/registry.go
2499 views
1
// Copyright (c) 2020 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License.AGPL.txt in the project root for license information.
4
5
package registry
6
7
import (
8
"bytes"
9
"context"
10
"crypto/tls"
11
"encoding/json"
12
"fmt"
13
"io/ioutil"
14
stdlog "log"
15
"net"
16
"net/http"
17
"os"
18
"path/filepath"
19
"strings"
20
"time"
21
22
common_grpc "github.com/gitpod-io/gitpod/common-go/grpc"
23
"github.com/gitpod-io/gitpod/common-go/log"
24
"github.com/gitpod-io/gitpod/registry-facade/api"
25
"github.com/gitpod-io/gitpod/registry-facade/api/config"
26
"k8s.io/apimachinery/pkg/util/wait"
27
28
"github.com/containerd/containerd/content/local"
29
"github.com/containerd/containerd/remotes"
30
"github.com/distribution/reference"
31
"github.com/docker/distribution"
32
"github.com/docker/distribution/registry/api/errcode"
33
distv2 "github.com/docker/distribution/registry/api/v2"
34
"github.com/golang/protobuf/jsonpb"
35
"github.com/gorilla/mux"
36
httpapi "github.com/ipfs/kubo/client/rpc"
37
ma "github.com/multiformats/go-multiaddr"
38
"github.com/prometheus/client_golang/prometheus"
39
"github.com/redis/go-redis/v9"
40
"golang.org/x/xerrors"
41
"google.golang.org/grpc"
42
"google.golang.org/grpc/credentials"
43
"google.golang.org/grpc/credentials/insecure"
44
)
45
46
// BuildStaticLayer builds a layer set from a static layer configuration
47
func buildStaticLayer(ctx context.Context, cfg []config.StaticLayerCfg, newResolver ResolverProvider) (CompositeLayerSource, error) {
48
var l CompositeLayerSource
49
for _, sl := range cfg {
50
switch sl.Type {
51
case "file":
52
src, err := NewFileLayerSource(ctx, sl.Ref)
53
if err != nil {
54
return nil, xerrors.Errorf("cannot source layer from %s: %w", sl.Ref, err)
55
}
56
l = append(l, src)
57
case "image":
58
src, err := NewStaticSourceFromImage(ctx, newResolver, sl.Ref)
59
if err != nil {
60
return nil, xerrors.Errorf("cannot source layer from %s: %w", sl.Ref, err)
61
}
62
l = append(l, src)
63
default:
64
return nil, xerrors.Errorf("unknown static layer type: %s", sl.Type)
65
}
66
}
67
return l, nil
68
}
69
70
// ResolverProvider provides new resolver
71
type ResolverProvider func() remotes.Resolver
72
73
// Registry acts as registry facade
74
type Registry struct {
75
Config config.Config
76
Resolver ResolverProvider
77
Store BlobStore
78
IPFS *IPFSBlobCache
79
LayerSource LayerSource
80
ConfigModifier ConfigModifier
81
SpecProvider map[string]ImageSpecProvider
82
83
staticLayerSource *RevisioningLayerSource
84
metrics *metrics
85
srv *http.Server
86
}
87
88
// NewRegistry creates a new registry
89
func NewRegistry(cfg config.Config, newResolver ResolverProvider, reg prometheus.Registerer) (*Registry, error) {
90
var mfStore BlobStore
91
92
if cfg.IPFSCache != nil && cfg.IPFSCache.Enabled {
93
if cfg.RedisCache == nil || !cfg.RedisCache.Enabled {
94
return nil, xerrors.Errorf("IPFS cache requires Redis")
95
}
96
}
97
98
if cfg.RedisCache != nil && cfg.RedisCache.Enabled {
99
rdc, err := getRedisClient(cfg.RedisCache)
100
if err != nil {
101
return nil, xerrors.Errorf("cannot connect to Redis: %w", err)
102
}
103
104
mfStore = &RedisBlobStore{Client: rdc}
105
log.Info("using redis to cache manifests and config")
106
107
resolverFactory := &RedisCachedResolver{
108
Client: rdc,
109
Provider: newResolver,
110
}
111
newResolver = resolverFactory.Factory
112
log.Info("using redis to cache references")
113
} else {
114
storePath := cfg.Store
115
if tproot := os.Getenv("TELEPRESENCE_ROOT"); tproot != "" {
116
storePath = filepath.Join(tproot, storePath)
117
}
118
var err error
119
mfStore, err = local.NewStore(storePath)
120
if err != nil {
121
return nil, err
122
}
123
log.WithField("storePath", storePath).Info("using local filesystem to cache manifests and config")
124
// TODO(cw): GC the store
125
}
126
127
ctx, cancel := context.WithCancel(context.Background())
128
defer cancel()
129
130
metrics, err := newMetrics(reg, true)
131
if err != nil {
132
return nil, err
133
}
134
135
var layerSources []LayerSource
136
137
// static layers
138
log.Info("preparing static layer")
139
staticLayer := NewRevisioningLayerSource(CompositeLayerSource{})
140
layerSources = append(layerSources, staticLayer)
141
if len(cfg.StaticLayer) > 0 {
142
l, err := buildStaticLayer(ctx, cfg.StaticLayer, newResolver)
143
if err != nil {
144
return nil, err
145
}
146
staticLayer.Update(l)
147
}
148
149
// ide layer
150
ideRefSource := func(s *api.ImageSpec) (ref []string, err error) {
151
ref = append(ref, s.IdeRef, s.SupervisorRef)
152
ref = append(ref, s.IdeLayerRef...)
153
return ref, nil
154
}
155
ideLayerSource, err := NewSpecMappedImageSource(newResolver, ideRefSource)
156
if err != nil {
157
return nil, err
158
}
159
layerSources = append(layerSources, ideLayerSource)
160
161
// content layer
162
clsrc, err := NewContentLayerSource()
163
if err != nil {
164
return nil, xerrors.Errorf("cannot create content layer source: %w", err)
165
}
166
layerSources = append(layerSources, clsrc)
167
168
specProvider := map[string]ImageSpecProvider{}
169
if cfg.RemoteSpecProvider != nil {
170
var providers []ImageSpecProvider
171
for _, providerCfg := range cfg.RemoteSpecProvider {
172
rsp, err := createRemoteSpecProvider(providerCfg)
173
if err != nil {
174
return nil, err
175
}
176
177
providers = append(providers, rsp)
178
}
179
180
specProvider[api.ProviderPrefixRemote] = NewCompositeSpecProvider(providers...)
181
}
182
183
if cfg.FixedSpecProvider != "" {
184
fc, err := ioutil.ReadFile(cfg.FixedSpecProvider)
185
if err != nil {
186
return nil, xerrors.Errorf("cannot read fixed spec: %w", err)
187
}
188
189
f := make(map[string]json.RawMessage)
190
err = json.Unmarshal(fc, &f)
191
if err != nil {
192
return nil, xerrors.Errorf("cannot unmarshal fixed spec: %w", err)
193
}
194
195
prov := make(FixedImageSpecProvider)
196
for k, v := range f {
197
var spec api.ImageSpec
198
err = jsonpb.UnmarshalString(string(v), &spec)
199
if err != nil {
200
return nil, xerrors.Errorf("cannot unmarshal fixed spec: %w", err)
201
}
202
prov[k] = &spec
203
}
204
specProvider[api.ProviderPrefixFixed] = prov
205
}
206
207
var ipfs *IPFSBlobCache
208
if cfg.IPFSCache != nil && cfg.IPFSCache.Enabled {
209
addr := cfg.IPFSCache.IPFSAddr
210
if ipfsHost := os.Getenv("IPFS_HOST"); ipfsHost != "" {
211
addr = strings.ReplaceAll(addr, "$IPFS_HOST", ipfsHost)
212
}
213
214
maddr, err := ma.NewMultiaddr(strings.TrimSpace(addr))
215
if err != nil {
216
return nil, xerrors.Errorf("cannot connect to IPFS: %w", err)
217
}
218
219
core, err := httpapi.NewApiWithClient(maddr, NewRetryableHTTPClient())
220
if err != nil {
221
return nil, xerrors.Errorf("cannot connect to IPFS: %w", err)
222
}
223
rdc, err := getRedisClient(cfg.RedisCache)
224
if err != nil {
225
return nil, xerrors.Errorf("cannot connect to Redis: %w", err)
226
}
227
228
ipfs = &IPFSBlobCache{
229
Redis: rdc,
230
IPFS: core,
231
}
232
log.WithField("config", cfg.IPFSCache).Info("enabling IPFS caching")
233
}
234
235
layerSource := CompositeLayerSource(layerSources)
236
return &Registry{
237
Config: cfg,
238
Resolver: newResolver,
239
Store: mfStore,
240
IPFS: ipfs,
241
SpecProvider: specProvider,
242
LayerSource: layerSource,
243
staticLayerSource: staticLayer,
244
ConfigModifier: NewConfigModifierFromLayerSource(layerSource),
245
metrics: metrics,
246
}, nil
247
}
248
249
func createRemoteSpecProvider(cfg *config.RSProvider) (ImageSpecProvider, error) {
250
grpcOpts := common_grpc.DefaultClientOptions()
251
if cfg.TLS != nil {
252
tlsConfig, err := common_grpc.ClientAuthTLSConfig(
253
cfg.TLS.Authority, cfg.TLS.Certificate, cfg.TLS.PrivateKey,
254
common_grpc.WithSetRootCAs(true),
255
common_grpc.WithServerName("ws-manager"),
256
)
257
if err != nil {
258
log.WithField("config", cfg.TLS).Error("Cannot load ws-manager certs - this is a configuration issue.")
259
return nil, xerrors.Errorf("cannot load ws-manager certs: %w", err)
260
}
261
262
grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
263
} else {
264
grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
265
}
266
267
specprov, err := NewCachingSpecProvider(128, NewRemoteSpecProvider(cfg.Addr, grpcOpts))
268
if err != nil {
269
return nil, xerrors.Errorf("cannot create caching spec provider: %w", err)
270
}
271
272
return specprov, nil
273
}
274
275
func getRedisClient(cfg *config.RedisCacheConfig) (*redis.Client, error) {
276
if cfg.SingleHostAddress == "" {
277
return nil, xerrors.Errorf("registry-facade setting 'singleHostAddr' is missing")
278
}
279
280
opts := &redis.Options{
281
Addr: cfg.SingleHostAddress,
282
Username: "default",
283
Password: cfg.Password,
284
}
285
286
if cfg.Username != "" {
287
opts.Username = cfg.Username
288
}
289
290
if cfg.UseTLS {
291
opts.TLSConfig = &tls.Config{
292
// golang tls does not support verify certificate without any SANs
293
InsecureSkipVerify: cfg.InsecureSkipVerify,
294
}
295
}
296
297
log.WithField("addr", cfg.SingleHostAddress).WithField("username", cfg.Username).WithField("tls", cfg.UseTLS).Info("connecting to Redis")
298
299
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
300
defer cancel()
301
302
rdc := redis.NewClient(opts)
303
304
var lastError error
305
waitErr := wait.ExponentialBackoffWithContext(ctx, wait.Backoff{
306
Steps: 5,
307
Duration: 50 * time.Millisecond,
308
Factor: 2.0,
309
Jitter: 0.2,
310
}, func(ctx context.Context) (bool, error) {
311
_, err := rdc.Ping(ctx).Result()
312
if err != nil {
313
lastError = err
314
return false, nil
315
}
316
317
return true, nil
318
})
319
if waitErr != nil {
320
if waitErr == wait.ErrWaitTimeout {
321
return nil, xerrors.Errorf("cannot check Redis connection: %w", lastError)
322
}
323
324
return nil, waitErr
325
}
326
327
return rdc, nil
328
}
329
330
// UpdateStaticLayer updates the static layer a registry-facade adds
331
func (reg *Registry) UpdateStaticLayer(ctx context.Context, cfg []config.StaticLayerCfg) error {
332
l, err := buildStaticLayer(ctx, cfg, reg.Resolver)
333
if err != nil {
334
return err
335
}
336
reg.staticLayerSource.Update(l)
337
return nil
338
}
339
340
// Serve serves the registry on the given port
341
func (reg *Registry) Serve() error {
342
routes := distv2.RouterWithPrefix(reg.Config.Prefix)
343
reg.registerHandler(routes)
344
345
var handler http.Handler = routes
346
if reg.Config.RequireAuth {
347
handler = reg.requireAuthentication(routes)
348
}
349
mux := http.NewServeMux()
350
mux.Handle("/", handler)
351
352
if addr := os.Getenv("REGFAC_NO_TLS_DEBUG"); addr != "" {
353
// Gitpod port-forwarding also does SSL termination. If we only served the HTTPS service
354
// when using telepresence we could not make any requests to the registry facade directly,
355
// e.g. using curl or another Docker daemon. Using the env var we can enable an additional
356
// HTTP service.
357
//
358
// Note: this is just meant for a telepresence setup
359
go func() {
360
err := http.ListenAndServe(addr, mux)
361
if err != nil {
362
log.WithError(err).Error("start of registry server failed")
363
}
364
}()
365
}
366
367
addr := fmt.Sprintf(":%d", reg.Config.Port)
368
l, err := net.Listen("tcp", addr)
369
if err != nil {
370
return err
371
}
372
373
reg.srv = &http.Server{
374
Addr: addr,
375
Handler: mux,
376
ErrorLog: stdlog.New(logrusErrorWriter{}, "", 0),
377
}
378
379
if reg.Config.TLS != nil {
380
log.WithField("addr", addr).Info("HTTPS registry server listening")
381
382
cert, key := reg.Config.TLS.Certificate, reg.Config.TLS.PrivateKey
383
if tproot := os.Getenv("TELEPRESENCE_ROOT"); tproot != "" {
384
cert = filepath.Join(tproot, cert)
385
key = filepath.Join(tproot, key)
386
}
387
388
return reg.srv.ServeTLS(l, cert, key)
389
}
390
391
log.WithField("addr", addr).Info("HTTP registry server listening")
392
return reg.srv.Serve(l)
393
}
394
395
// MustServe calls serve and logs any error as Fatal
396
func (reg *Registry) MustServe() {
397
err := reg.Serve()
398
if err != nil {
399
log.WithError(err).Fatal("cannot serve registry")
400
}
401
}
402
403
// Shutdowner is a process that can be shut down
404
type Shutdowner interface {
405
Shutdown(context.Context) error
406
}
407
408
func (reg *Registry) requireAuthentication(h http.Handler) http.Handler {
409
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
410
fail := func() {
411
w.Header().Add("WWW-Authenticate", "Basic")
412
w.WriteHeader(http.StatusUnauthorized)
413
}
414
415
_, _, ok := r.BasicAuth()
416
if !ok {
417
fail()
418
return
419
}
420
421
// todo: implement auth
422
423
h.ServeHTTP(w, r)
424
})
425
}
426
427
// registerHandler registers the handle* functions with the corresponding routes
428
func (reg *Registry) registerHandler(routes *mux.Router) {
429
routes.Get(distv2.RouteNameBase).HandlerFunc(reg.handleAPIBase)
430
routes.Get(distv2.RouteNameManifest).Handler(dispatcher(reg.handleManifest))
431
// routes.Get(v2.RouteNameCatalog).Handler(dispatcher(reg.handleCatalog))
432
// routes.Get(v2.RouteNameTags).Handler(dispatcher(reg.handleTags))
433
routes.Get(distv2.RouteNameBlob).Handler(dispatcher(reg.handleBlob))
434
// routes.Get(v2.RouteNameBlobUpload).Handler(dispatcher(reg.handleBlobUpload))
435
// routes.Get(v2.RouteNameBlobUploadChunk).Handler(dispatcher(reg.handleBlobUploadChunk))
436
routes.NotFoundHandler = http.HandlerFunc(reg.handleAPIBase)
437
}
438
439
// handleApiBase implements a simple yes-man for doing overall checks against the
440
// api. This can support auth roundtrips to support docker login.
441
func (reg *Registry) handleAPIBase(w http.ResponseWriter, r *http.Request) {
442
const emptyJSON = "{}"
443
// Provide a simple /v2/ 200 OK response with empty json response.
444
w.Header().Set("Content-Type", "application/json")
445
w.Header().Set("Content-Length", fmt.Sprint(len(emptyJSON)))
446
447
fmt.Fprint(w, emptyJSON)
448
}
449
450
type dispatchFunc func(ctx context.Context, r *http.Request) http.Handler
451
452
// dispatcher wraps a dispatchFunc and provides context
453
func dispatcher(d dispatchFunc) http.Handler {
454
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
455
//fc, _ := httputil.DumpRequest(r, false)
456
//log.WithField("req", string(fc)).Debug("dispatching request")
457
458
// Get context from request, add vars and other info and sync back
459
ctx := r.Context()
460
ctx = &muxVarsContext{
461
Context: ctx,
462
vars: mux.Vars(r),
463
}
464
r = r.WithContext(ctx)
465
466
if nameRequired(r) {
467
nameRef, err := reference.WithName(getName(ctx))
468
if err != nil {
469
log.WithError(err).WithField("nameRef", nameRef).Errorf("error parsing reference from context")
470
respondWithError(w, distribution.ErrRepositoryNameInvalid{
471
Name: nameRef.Name(),
472
Reason: err,
473
})
474
return
475
}
476
}
477
478
d(ctx, r).ServeHTTP(w, r)
479
})
480
}
481
482
func respondWithError(w http.ResponseWriter, terr error) {
483
err := errcode.ServeJSON(w, terr)
484
if err != nil {
485
log.WithError(err).WithField("orignalErr", terr).Errorf("error serving error json")
486
}
487
}
488
489
// nameRequired returns true if the route requires a name.
490
func nameRequired(r *http.Request) bool {
491
route := mux.CurrentRoute(r)
492
if route == nil {
493
return true
494
}
495
routeName := route.GetName()
496
return routeName != distv2.RouteNameBase && routeName != distv2.RouteNameCatalog
497
}
498
499
type muxVarsContext struct {
500
context.Context
501
vars map[string]string
502
}
503
504
func (ctx *muxVarsContext) Value(key interface{}) interface{} {
505
if keyStr, ok := key.(string); ok {
506
if keyStr == "vars" {
507
return ctx.vars
508
}
509
510
keyStr = strings.TrimPrefix(keyStr, "vars.")
511
512
if v, ok := ctx.vars[keyStr]; ok {
513
return v
514
}
515
}
516
517
return ctx.Context.Value(key)
518
}
519
520
// getName extracts the name var from the context which was passed in through the mux route
521
func getName(ctx context.Context) string {
522
val := ctx.Value("vars.name")
523
sval, ok := val.(string)
524
if !ok {
525
return ""
526
}
527
return sval
528
}
529
530
func getSpecProviderName(ctx context.Context) (specProviderName string, remainder string) {
531
name := getName(ctx)
532
segs := strings.Split(name, "/")
533
if len(segs) > 1 {
534
specProviderName = segs[0]
535
remainder = strings.Join(segs[1:], "/")
536
}
537
return
538
}
539
540
// getReference extracts the referece var from the context which was passed in through the mux route
541
func getReference(ctx context.Context) string {
542
val := ctx.Value("vars.reference")
543
sval, ok := val.(string)
544
if !ok {
545
return ""
546
}
547
return sval
548
}
549
550
// getDigest extracts the digest var from the context which was passed in through the mux route
551
func getDigest(ctx context.Context) string {
552
val := ctx.Value("vars.digest")
553
sval, ok := val.(string)
554
if !ok {
555
return ""
556
}
557
558
return sval
559
}
560
561
var tlsHandshakeErrorPrefix = []byte("http: TLS handshake error")
562
563
type logrusErrorWriter struct{}
564
565
func (w logrusErrorWriter) Write(p []byte) (int, error) {
566
if bytes.Contains(p, tlsHandshakeErrorPrefix) {
567
return len(p), nil
568
}
569
570
log.Errorf("%s", string(p))
571
return len(p), nil
572
}
573
574