Path: blob/main/components/registry-facade/pkg/registry/registry.go
2499 views
// Copyright (c) 2020 Gitpod GmbH. All rights reserved.1// Licensed under the GNU Affero General Public License (AGPL).2// See License.AGPL.txt in the project root for license information.34package registry56import (7"bytes"8"context"9"crypto/tls"10"encoding/json"11"fmt"12"io/ioutil"13stdlog "log"14"net"15"net/http"16"os"17"path/filepath"18"strings"19"time"2021common_grpc "github.com/gitpod-io/gitpod/common-go/grpc"22"github.com/gitpod-io/gitpod/common-go/log"23"github.com/gitpod-io/gitpod/registry-facade/api"24"github.com/gitpod-io/gitpod/registry-facade/api/config"25"k8s.io/apimachinery/pkg/util/wait"2627"github.com/containerd/containerd/content/local"28"github.com/containerd/containerd/remotes"29"github.com/distribution/reference"30"github.com/docker/distribution"31"github.com/docker/distribution/registry/api/errcode"32distv2 "github.com/docker/distribution/registry/api/v2"33"github.com/golang/protobuf/jsonpb"34"github.com/gorilla/mux"35httpapi "github.com/ipfs/kubo/client/rpc"36ma "github.com/multiformats/go-multiaddr"37"github.com/prometheus/client_golang/prometheus"38"github.com/redis/go-redis/v9"39"golang.org/x/xerrors"40"google.golang.org/grpc"41"google.golang.org/grpc/credentials"42"google.golang.org/grpc/credentials/insecure"43)4445// BuildStaticLayer builds a layer set from a static layer configuration46func buildStaticLayer(ctx context.Context, cfg []config.StaticLayerCfg, newResolver ResolverProvider) (CompositeLayerSource, error) {47var l CompositeLayerSource48for _, sl := range cfg {49switch sl.Type {50case "file":51src, err := NewFileLayerSource(ctx, sl.Ref)52if err != nil {53return nil, xerrors.Errorf("cannot source layer from %s: %w", sl.Ref, err)54}55l = append(l, src)56case "image":57src, err := NewStaticSourceFromImage(ctx, newResolver, sl.Ref)58if err != nil {59return nil, xerrors.Errorf("cannot source layer from %s: %w", sl.Ref, err)60}61l = append(l, src)62default:63return nil, xerrors.Errorf("unknown static layer type: %s", sl.Type)64}65}66return l, nil67}6869// ResolverProvider provides new resolver70type ResolverProvider func() remotes.Resolver7172// Registry acts as registry facade73type Registry struct {74Config config.Config75Resolver ResolverProvider76Store BlobStore77IPFS *IPFSBlobCache78LayerSource LayerSource79ConfigModifier ConfigModifier80SpecProvider map[string]ImageSpecProvider8182staticLayerSource *RevisioningLayerSource83metrics *metrics84srv *http.Server85}8687// NewRegistry creates a new registry88func NewRegistry(cfg config.Config, newResolver ResolverProvider, reg prometheus.Registerer) (*Registry, error) {89var mfStore BlobStore9091if cfg.IPFSCache != nil && cfg.IPFSCache.Enabled {92if cfg.RedisCache == nil || !cfg.RedisCache.Enabled {93return nil, xerrors.Errorf("IPFS cache requires Redis")94}95}9697if cfg.RedisCache != nil && cfg.RedisCache.Enabled {98rdc, err := getRedisClient(cfg.RedisCache)99if err != nil {100return nil, xerrors.Errorf("cannot connect to Redis: %w", err)101}102103mfStore = &RedisBlobStore{Client: rdc}104log.Info("using redis to cache manifests and config")105106resolverFactory := &RedisCachedResolver{107Client: rdc,108Provider: newResolver,109}110newResolver = resolverFactory.Factory111log.Info("using redis to cache references")112} else {113storePath := cfg.Store114if tproot := os.Getenv("TELEPRESENCE_ROOT"); tproot != "" {115storePath = filepath.Join(tproot, storePath)116}117var err error118mfStore, err = local.NewStore(storePath)119if err != nil {120return nil, err121}122log.WithField("storePath", storePath).Info("using local filesystem to cache manifests and config")123// TODO(cw): GC the store124}125126ctx, cancel := context.WithCancel(context.Background())127defer cancel()128129metrics, err := newMetrics(reg, true)130if err != nil {131return nil, err132}133134var layerSources []LayerSource135136// static layers137log.Info("preparing static layer")138staticLayer := NewRevisioningLayerSource(CompositeLayerSource{})139layerSources = append(layerSources, staticLayer)140if len(cfg.StaticLayer) > 0 {141l, err := buildStaticLayer(ctx, cfg.StaticLayer, newResolver)142if err != nil {143return nil, err144}145staticLayer.Update(l)146}147148// ide layer149ideRefSource := func(s *api.ImageSpec) (ref []string, err error) {150ref = append(ref, s.IdeRef, s.SupervisorRef)151ref = append(ref, s.IdeLayerRef...)152return ref, nil153}154ideLayerSource, err := NewSpecMappedImageSource(newResolver, ideRefSource)155if err != nil {156return nil, err157}158layerSources = append(layerSources, ideLayerSource)159160// content layer161clsrc, err := NewContentLayerSource()162if err != nil {163return nil, xerrors.Errorf("cannot create content layer source: %w", err)164}165layerSources = append(layerSources, clsrc)166167specProvider := map[string]ImageSpecProvider{}168if cfg.RemoteSpecProvider != nil {169var providers []ImageSpecProvider170for _, providerCfg := range cfg.RemoteSpecProvider {171rsp, err := createRemoteSpecProvider(providerCfg)172if err != nil {173return nil, err174}175176providers = append(providers, rsp)177}178179specProvider[api.ProviderPrefixRemote] = NewCompositeSpecProvider(providers...)180}181182if cfg.FixedSpecProvider != "" {183fc, err := ioutil.ReadFile(cfg.FixedSpecProvider)184if err != nil {185return nil, xerrors.Errorf("cannot read fixed spec: %w", err)186}187188f := make(map[string]json.RawMessage)189err = json.Unmarshal(fc, &f)190if err != nil {191return nil, xerrors.Errorf("cannot unmarshal fixed spec: %w", err)192}193194prov := make(FixedImageSpecProvider)195for k, v := range f {196var spec api.ImageSpec197err = jsonpb.UnmarshalString(string(v), &spec)198if err != nil {199return nil, xerrors.Errorf("cannot unmarshal fixed spec: %w", err)200}201prov[k] = &spec202}203specProvider[api.ProviderPrefixFixed] = prov204}205206var ipfs *IPFSBlobCache207if cfg.IPFSCache != nil && cfg.IPFSCache.Enabled {208addr := cfg.IPFSCache.IPFSAddr209if ipfsHost := os.Getenv("IPFS_HOST"); ipfsHost != "" {210addr = strings.ReplaceAll(addr, "$IPFS_HOST", ipfsHost)211}212213maddr, err := ma.NewMultiaddr(strings.TrimSpace(addr))214if err != nil {215return nil, xerrors.Errorf("cannot connect to IPFS: %w", err)216}217218core, err := httpapi.NewApiWithClient(maddr, NewRetryableHTTPClient())219if err != nil {220return nil, xerrors.Errorf("cannot connect to IPFS: %w", err)221}222rdc, err := getRedisClient(cfg.RedisCache)223if err != nil {224return nil, xerrors.Errorf("cannot connect to Redis: %w", err)225}226227ipfs = &IPFSBlobCache{228Redis: rdc,229IPFS: core,230}231log.WithField("config", cfg.IPFSCache).Info("enabling IPFS caching")232}233234layerSource := CompositeLayerSource(layerSources)235return &Registry{236Config: cfg,237Resolver: newResolver,238Store: mfStore,239IPFS: ipfs,240SpecProvider: specProvider,241LayerSource: layerSource,242staticLayerSource: staticLayer,243ConfigModifier: NewConfigModifierFromLayerSource(layerSource),244metrics: metrics,245}, nil246}247248func createRemoteSpecProvider(cfg *config.RSProvider) (ImageSpecProvider, error) {249grpcOpts := common_grpc.DefaultClientOptions()250if cfg.TLS != nil {251tlsConfig, err := common_grpc.ClientAuthTLSConfig(252cfg.TLS.Authority, cfg.TLS.Certificate, cfg.TLS.PrivateKey,253common_grpc.WithSetRootCAs(true),254common_grpc.WithServerName("ws-manager"),255)256if err != nil {257log.WithField("config", cfg.TLS).Error("Cannot load ws-manager certs - this is a configuration issue.")258return nil, xerrors.Errorf("cannot load ws-manager certs: %w", err)259}260261grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))262} else {263grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))264}265266specprov, err := NewCachingSpecProvider(128, NewRemoteSpecProvider(cfg.Addr, grpcOpts))267if err != nil {268return nil, xerrors.Errorf("cannot create caching spec provider: %w", err)269}270271return specprov, nil272}273274func getRedisClient(cfg *config.RedisCacheConfig) (*redis.Client, error) {275if cfg.SingleHostAddress == "" {276return nil, xerrors.Errorf("registry-facade setting 'singleHostAddr' is missing")277}278279opts := &redis.Options{280Addr: cfg.SingleHostAddress,281Username: "default",282Password: cfg.Password,283}284285if cfg.Username != "" {286opts.Username = cfg.Username287}288289if cfg.UseTLS {290opts.TLSConfig = &tls.Config{291// golang tls does not support verify certificate without any SANs292InsecureSkipVerify: cfg.InsecureSkipVerify,293}294}295296log.WithField("addr", cfg.SingleHostAddress).WithField("username", cfg.Username).WithField("tls", cfg.UseTLS).Info("connecting to Redis")297298ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)299defer cancel()300301rdc := redis.NewClient(opts)302303var lastError error304waitErr := wait.ExponentialBackoffWithContext(ctx, wait.Backoff{305Steps: 5,306Duration: 50 * time.Millisecond,307Factor: 2.0,308Jitter: 0.2,309}, func(ctx context.Context) (bool, error) {310_, err := rdc.Ping(ctx).Result()311if err != nil {312lastError = err313return false, nil314}315316return true, nil317})318if waitErr != nil {319if waitErr == wait.ErrWaitTimeout {320return nil, xerrors.Errorf("cannot check Redis connection: %w", lastError)321}322323return nil, waitErr324}325326return rdc, nil327}328329// UpdateStaticLayer updates the static layer a registry-facade adds330func (reg *Registry) UpdateStaticLayer(ctx context.Context, cfg []config.StaticLayerCfg) error {331l, err := buildStaticLayer(ctx, cfg, reg.Resolver)332if err != nil {333return err334}335reg.staticLayerSource.Update(l)336return nil337}338339// Serve serves the registry on the given port340func (reg *Registry) Serve() error {341routes := distv2.RouterWithPrefix(reg.Config.Prefix)342reg.registerHandler(routes)343344var handler http.Handler = routes345if reg.Config.RequireAuth {346handler = reg.requireAuthentication(routes)347}348mux := http.NewServeMux()349mux.Handle("/", handler)350351if addr := os.Getenv("REGFAC_NO_TLS_DEBUG"); addr != "" {352// Gitpod port-forwarding also does SSL termination. If we only served the HTTPS service353// when using telepresence we could not make any requests to the registry facade directly,354// e.g. using curl or another Docker daemon. Using the env var we can enable an additional355// HTTP service.356//357// Note: this is just meant for a telepresence setup358go func() {359err := http.ListenAndServe(addr, mux)360if err != nil {361log.WithError(err).Error("start of registry server failed")362}363}()364}365366addr := fmt.Sprintf(":%d", reg.Config.Port)367l, err := net.Listen("tcp", addr)368if err != nil {369return err370}371372reg.srv = &http.Server{373Addr: addr,374Handler: mux,375ErrorLog: stdlog.New(logrusErrorWriter{}, "", 0),376}377378if reg.Config.TLS != nil {379log.WithField("addr", addr).Info("HTTPS registry server listening")380381cert, key := reg.Config.TLS.Certificate, reg.Config.TLS.PrivateKey382if tproot := os.Getenv("TELEPRESENCE_ROOT"); tproot != "" {383cert = filepath.Join(tproot, cert)384key = filepath.Join(tproot, key)385}386387return reg.srv.ServeTLS(l, cert, key)388}389390log.WithField("addr", addr).Info("HTTP registry server listening")391return reg.srv.Serve(l)392}393394// MustServe calls serve and logs any error as Fatal395func (reg *Registry) MustServe() {396err := reg.Serve()397if err != nil {398log.WithError(err).Fatal("cannot serve registry")399}400}401402// Shutdowner is a process that can be shut down403type Shutdowner interface {404Shutdown(context.Context) error405}406407func (reg *Registry) requireAuthentication(h http.Handler) http.Handler {408return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {409fail := func() {410w.Header().Add("WWW-Authenticate", "Basic")411w.WriteHeader(http.StatusUnauthorized)412}413414_, _, ok := r.BasicAuth()415if !ok {416fail()417return418}419420// todo: implement auth421422h.ServeHTTP(w, r)423})424}425426// registerHandler registers the handle* functions with the corresponding routes427func (reg *Registry) registerHandler(routes *mux.Router) {428routes.Get(distv2.RouteNameBase).HandlerFunc(reg.handleAPIBase)429routes.Get(distv2.RouteNameManifest).Handler(dispatcher(reg.handleManifest))430// routes.Get(v2.RouteNameCatalog).Handler(dispatcher(reg.handleCatalog))431// routes.Get(v2.RouteNameTags).Handler(dispatcher(reg.handleTags))432routes.Get(distv2.RouteNameBlob).Handler(dispatcher(reg.handleBlob))433// routes.Get(v2.RouteNameBlobUpload).Handler(dispatcher(reg.handleBlobUpload))434// routes.Get(v2.RouteNameBlobUploadChunk).Handler(dispatcher(reg.handleBlobUploadChunk))435routes.NotFoundHandler = http.HandlerFunc(reg.handleAPIBase)436}437438// handleApiBase implements a simple yes-man for doing overall checks against the439// api. This can support auth roundtrips to support docker login.440func (reg *Registry) handleAPIBase(w http.ResponseWriter, r *http.Request) {441const emptyJSON = "{}"442// Provide a simple /v2/ 200 OK response with empty json response.443w.Header().Set("Content-Type", "application/json")444w.Header().Set("Content-Length", fmt.Sprint(len(emptyJSON)))445446fmt.Fprint(w, emptyJSON)447}448449type dispatchFunc func(ctx context.Context, r *http.Request) http.Handler450451// dispatcher wraps a dispatchFunc and provides context452func dispatcher(d dispatchFunc) http.Handler {453return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {454//fc, _ := httputil.DumpRequest(r, false)455//log.WithField("req", string(fc)).Debug("dispatching request")456457// Get context from request, add vars and other info and sync back458ctx := r.Context()459ctx = &muxVarsContext{460Context: ctx,461vars: mux.Vars(r),462}463r = r.WithContext(ctx)464465if nameRequired(r) {466nameRef, err := reference.WithName(getName(ctx))467if err != nil {468log.WithError(err).WithField("nameRef", nameRef).Errorf("error parsing reference from context")469respondWithError(w, distribution.ErrRepositoryNameInvalid{470Name: nameRef.Name(),471Reason: err,472})473return474}475}476477d(ctx, r).ServeHTTP(w, r)478})479}480481func respondWithError(w http.ResponseWriter, terr error) {482err := errcode.ServeJSON(w, terr)483if err != nil {484log.WithError(err).WithField("orignalErr", terr).Errorf("error serving error json")485}486}487488// nameRequired returns true if the route requires a name.489func nameRequired(r *http.Request) bool {490route := mux.CurrentRoute(r)491if route == nil {492return true493}494routeName := route.GetName()495return routeName != distv2.RouteNameBase && routeName != distv2.RouteNameCatalog496}497498type muxVarsContext struct {499context.Context500vars map[string]string501}502503func (ctx *muxVarsContext) Value(key interface{}) interface{} {504if keyStr, ok := key.(string); ok {505if keyStr == "vars" {506return ctx.vars507}508509keyStr = strings.TrimPrefix(keyStr, "vars.")510511if v, ok := ctx.vars[keyStr]; ok {512return v513}514}515516return ctx.Context.Value(key)517}518519// getName extracts the name var from the context which was passed in through the mux route520func getName(ctx context.Context) string {521val := ctx.Value("vars.name")522sval, ok := val.(string)523if !ok {524return ""525}526return sval527}528529func getSpecProviderName(ctx context.Context) (specProviderName string, remainder string) {530name := getName(ctx)531segs := strings.Split(name, "/")532if len(segs) > 1 {533specProviderName = segs[0]534remainder = strings.Join(segs[1:], "/")535}536return537}538539// getReference extracts the referece var from the context which was passed in through the mux route540func getReference(ctx context.Context) string {541val := ctx.Value("vars.reference")542sval, ok := val.(string)543if !ok {544return ""545}546return sval547}548549// getDigest extracts the digest var from the context which was passed in through the mux route550func getDigest(ctx context.Context) string {551val := ctx.Value("vars.digest")552sval, ok := val.(string)553if !ok {554return ""555}556557return sval558}559560var tlsHandshakeErrorPrefix = []byte("http: TLS handshake error")561562type logrusErrorWriter struct{}563564func (w logrusErrorWriter) Write(p []byte) (int, error) {565if bytes.Contains(p, tlsHandshakeErrorPrefix) {566return len(p), nil567}568569log.Errorf("%s", string(p))570return len(p), nil571}572573574