Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/registry-facade/cmd/run.go
2500 views
1
// Copyright (c) 2020 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License.AGPL.txt in the project root for license information.
4
5
package cmd
6
7
import (
8
"fmt"
9
"net"
10
"net/http"
11
"os"
12
"os/signal"
13
"path/filepath"
14
"sync"
15
"syscall"
16
"time"
17
18
"github.com/containerd/containerd/remotes"
19
"github.com/containerd/containerd/remotes/docker"
20
"github.com/distribution/reference"
21
"github.com/docker/cli/cli/config/configfile"
22
"github.com/heptiolabs/healthcheck"
23
"github.com/prometheus/client_golang/prometheus"
24
"github.com/prometheus/client_golang/prometheus/collectors"
25
"github.com/prometheus/client_golang/prometheus/promhttp"
26
"github.com/spf13/cobra"
27
"golang.org/x/net/context"
28
29
common_grpc "github.com/gitpod-io/gitpod/common-go/grpc"
30
"github.com/gitpod-io/gitpod/common-go/kubernetes"
31
"github.com/gitpod-io/gitpod/common-go/log"
32
"github.com/gitpod-io/gitpod/common-go/pprof"
33
"github.com/gitpod-io/gitpod/common-go/watch"
34
"github.com/gitpod-io/gitpod/registry-facade/api/config"
35
"github.com/gitpod-io/gitpod/registry-facade/pkg/registry"
36
)
37
38
var jsonLog bool
39
var verbose bool
40
41
// runCmd represents the run command
42
var runCmd = &cobra.Command{
43
Use: "run <config.json>",
44
Short: "Starts the registry facade",
45
Args: cobra.ExactArgs(1),
46
Run: func(cmd *cobra.Command, args []string) {
47
configPath := args[0]
48
cfg, err := config.GetConfig(configPath)
49
if err != nil {
50
log.WithError(err).WithField("filename", configPath).Fatal("cannot load config")
51
}
52
53
promreg := prometheus.NewRegistry()
54
gpreg := prometheus.WrapRegistererWithPrefix("gitpod_registry_facade_", promreg)
55
rtt, err := registry.NewMeasuringRegistryRoundTripper(newDefaultTransport(), prometheus.WrapRegistererWithPrefix("downstream_", gpreg))
56
if err != nil {
57
log.WithError(err).Fatal("cannot register metrics")
58
}
59
if cfg.PrometheusAddr != "" {
60
promreg.MustRegister(
61
collectors.NewGoCollector(),
62
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
63
common_grpc.ClientMetrics(),
64
)
65
66
handler := http.NewServeMux()
67
handler.Handle("/metrics", promhttp.HandlerFor(promreg, promhttp.HandlerOpts{}))
68
69
go func() {
70
err := http.ListenAndServe(cfg.PrometheusAddr, handler)
71
if err != nil {
72
log.WithError(err).Error("Prometheus metrics server failed")
73
}
74
}()
75
log.WithField("addr", cfg.PrometheusAddr).Info("started Prometheus metrics server")
76
}
77
if cfg.PProfAddr != "" {
78
go pprof.Serve(cfg.PProfAddr)
79
}
80
81
var (
82
dockerCfg *configfile.ConfigFile
83
dockerCfgMu sync.RWMutex
84
)
85
if cfg.AuthCfg != "" {
86
dockerCfg = loadDockerCfg(cfg.AuthCfg)
87
}
88
89
resolverProvider := func() remotes.Resolver {
90
client := registry.NewRetryableHTTPClient()
91
client.Transport = rtt
92
93
resolverOpts := docker.ResolverOptions{
94
Client: client,
95
}
96
97
dockerCfgMu.RLock()
98
defer dockerCfgMu.RUnlock()
99
if dockerCfg != nil {
100
resolverOpts.Hosts = docker.ConfigureDefaultRegistries(
101
docker.WithAuthorizer(authorizerFromDockerConfig(dockerCfg)),
102
docker.WithClient(client),
103
)
104
}
105
106
return docker.NewResolver(resolverOpts)
107
}
108
109
if cfg.ReadinessProbeAddr != "" {
110
// use the first layer as source for the tests
111
if len(cfg.Registry.StaticLayer) < 1 {
112
log.Fatal("To use the readiness probe you need to specify at least one blobserve repo")
113
}
114
115
staticLayerRef := cfg.Registry.StaticLayer[0].Ref
116
117
named, err := reference.ParseNamed(staticLayerRef)
118
if err != nil {
119
log.WithError(err).WithField("repo", staticLayerRef).Fatal("cannot parse repository reference")
120
}
121
122
staticLayerHost := reference.Domain(named)
123
124
// Ensure we can resolve DNS queries, and can access the registry host
125
health := healthcheck.NewHandler()
126
health.AddReadinessCheck("dns", kubernetes.DNSCanResolveProbe(staticLayerHost, 1*time.Second))
127
health.AddReadinessCheck("registry", kubernetes.NetworkIsReachableProbe(fmt.Sprintf("https://%v", staticLayerRef)))
128
health.AddReadinessCheck("registry-facade", kubernetes.NetworkIsReachableProbe(fmt.Sprintf("https://127.0.0.1:%v/%v/base/", cfg.Registry.Port, cfg.Registry.Prefix)))
129
130
health.AddLivenessCheck("dns", kubernetes.DNSCanResolveProbe(staticLayerHost, 1*time.Second))
131
health.AddLivenessCheck("registry", kubernetes.NetworkIsReachableProbe(fmt.Sprintf("https://%v", staticLayerRef)))
132
133
go func() {
134
if err := http.ListenAndServe(cfg.ReadinessProbeAddr, health); err != nil && err != http.ErrServerClosed {
135
log.WithError(err).Panic("error starting HTTP server")
136
}
137
}()
138
}
139
140
registryDoneChan := make(chan struct{})
141
reg, err := registry.NewRegistry(cfg.Registry, resolverProvider, prometheus.WrapRegistererWithPrefix("registry_", gpreg))
142
if err != nil {
143
log.WithError(err).Fatal("cannot create registry")
144
}
145
146
ctx, cancel := context.WithCancel(context.Background())
147
defer cancel()
148
149
err = watch.File(ctx, configPath, func() {
150
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
151
defer cancel()
152
153
cfg, err := config.GetConfig(configPath)
154
if err != nil {
155
log.WithError(err).Warn("cannot reload configuration")
156
return
157
}
158
159
err = reg.UpdateStaticLayer(ctx, cfg.Registry.StaticLayer)
160
if err != nil {
161
log.WithError(err).Warn("cannot reload configuration")
162
}
163
})
164
if err != nil {
165
log.WithError(err).Fatal("cannot start watch of configuration file")
166
}
167
168
err = watch.File(ctx, cfg.AuthCfg, func() {
169
dockerCfgMu.Lock()
170
defer dockerCfgMu.Unlock()
171
172
dockerCfg = loadDockerCfg(cfg.AuthCfg)
173
})
174
if err != nil {
175
log.WithError(err).Fatal("cannot start watch of Docker auth configuration file")
176
}
177
178
go func() {
179
defer close(registryDoneChan)
180
reg.MustServe()
181
}()
182
183
log.Info("🏪 registry facade is up and running")
184
sigChan := make(chan os.Signal, 1)
185
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
186
select {
187
case <-sigChan:
188
case <-registryDoneChan:
189
}
190
},
191
}
192
193
func loadDockerCfg(fn string) *configfile.ConfigFile {
194
if tproot := os.Getenv("TELEPRESENCE_ROOT"); tproot != "" {
195
fn = filepath.Join(tproot, fn)
196
}
197
fr, err := os.OpenFile(fn, os.O_RDONLY, 0)
198
if err != nil {
199
log.WithError(err).Fatal("cannot read docker auth config")
200
}
201
202
dockerCfg := configfile.New(fn)
203
err = dockerCfg.LoadFromReader(fr)
204
fr.Close()
205
if err != nil {
206
log.WithError(err).Fatal("cannot read docker config")
207
}
208
log.WithField("fn", fn).Info("using authentication for backing registries")
209
210
return dockerCfg
211
}
212
213
func newDefaultTransport() *http.Transport {
214
return &http.Transport{
215
Proxy: http.ProxyFromEnvironment,
216
DialContext: (&net.Dialer{
217
Timeout: 30 * time.Second,
218
KeepAlive: 30 * time.Second,
219
DualStack: false,
220
}).DialContext,
221
MaxIdleConns: 0,
222
MaxIdleConnsPerHost: 32,
223
IdleConnTimeout: 30 * time.Second,
224
TLSHandshakeTimeout: 10 * time.Second,
225
ExpectContinueTimeout: 5 * time.Second,
226
DisableKeepAlives: true,
227
}
228
}
229
230
func init() {
231
rootCmd.AddCommand(runCmd)
232
}
233
234
// FromDockerConfig turns docker client config into docker registry hosts
235
func authorizerFromDockerConfig(cfg *configfile.ConfigFile) docker.Authorizer {
236
return docker.NewDockerAuthorizer(docker.WithAuthCreds(func(host string) (user, pass string, err error) {
237
auth, err := cfg.GetAuthConfig(host)
238
if err != nil {
239
return
240
}
241
user = auth.Username
242
pass = auth.Password
243
return
244
}))
245
}
246
247