Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/blobserve/cmd/run.go
2498 views
1
// Copyright (c) 2020 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License.AGPL.txt in the project root for license information.
4
5
package cmd
6
7
import (
8
"context"
9
"fmt"
10
"net/http"
11
"os"
12
"os/signal"
13
"path/filepath"
14
"sync"
15
"syscall"
16
"time"
17
18
"github.com/containerd/containerd/remotes"
19
"github.com/containerd/containerd/remotes/docker"
20
"github.com/distribution/reference"
21
"github.com/docker/cli/cli/config/configfile"
22
blobserve_config "github.com/gitpod-io/gitpod/blobserve/pkg/config"
23
"github.com/heptiolabs/healthcheck"
24
"github.com/prometheus/client_golang/prometheus"
25
"github.com/prometheus/client_golang/prometheus/collectors"
26
"github.com/prometheus/client_golang/prometheus/promhttp"
27
"github.com/spf13/cobra"
28
29
"github.com/gitpod-io/gitpod/blobserve/pkg/blobserve"
30
"github.com/gitpod-io/gitpod/common-go/kubernetes"
31
"github.com/gitpod-io/gitpod/common-go/log"
32
"github.com/gitpod-io/gitpod/common-go/pprof"
33
"github.com/gitpod-io/gitpod/common-go/watch"
34
)
35
36
var jsonLog bool
37
var verbose bool
38
39
// runCmd represents the run command
40
var runCmd = &cobra.Command{
41
Use: "run <config.json>",
42
Short: "Starts the blobserve",
43
Args: cobra.ExactArgs(1),
44
Run: func(cmd *cobra.Command, args []string) {
45
cfg, err := blobserve_config.GetConfig(args[0])
46
if err != nil {
47
log.WithError(err).WithField("filename", args[0]).Fatal("cannot load config")
48
}
49
50
var (
51
dockerCfg *configfile.ConfigFile
52
dockerCfgMu sync.RWMutex
53
)
54
if cfg.AuthCfg != "" {
55
dockerCfg = loadDockerCfg(cfg.AuthCfg)
56
}
57
58
reg := prometheus.NewRegistry()
59
60
var (
61
clientRequestsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
62
Name: "http_client_requests_total",
63
Help: "Counter of outgoing HTTP requests",
64
}, []string{"method", "code"})
65
clientRequestsDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
66
Name: "http_client_requests_duration_seconds",
67
Help: "Histogram of outgoing HTTP request durations",
68
Buckets: prometheus.DefBuckets,
69
}, []string{"method", "code"})
70
serverRequestsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
71
Name: "http_server_requests_total",
72
Help: "Counter of incoming HTTP requests",
73
}, []string{"method", "code"})
74
serverRequestsDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
75
Name: "http_server_requests_duration_seconds",
76
Help: "Histogram of incoming HTTP request durations",
77
Buckets: prometheus.DefBuckets,
78
}, []string{"method", "code"})
79
)
80
81
resolverProvider := func() remotes.Resolver {
82
var resolverOpts docker.ResolverOptions
83
84
dockerCfgMu.RLock()
85
defer dockerCfgMu.RUnlock()
86
if dockerCfg != nil {
87
resolverOpts.Hosts = docker.ConfigureDefaultRegistries(
88
docker.WithAuthorizer(authorizerFromDockerConfig(dockerCfg)),
89
docker.WithClient(&http.Client{
90
Transport: promhttp.InstrumentRoundTripperCounter(clientRequestsTotal,
91
promhttp.InstrumentRoundTripperDuration(clientRequestsDuration,
92
http.DefaultTransport)),
93
}),
94
)
95
}
96
97
return docker.NewResolver(resolverOpts)
98
}
99
100
srv, err := blobserve.NewServer(cfg.BlobServe, resolverProvider,
101
func(h http.Handler) http.Handler {
102
return promhttp.InstrumentHandlerCounter(serverRequestsTotal,
103
promhttp.InstrumentHandlerDuration(serverRequestsDuration,
104
h),
105
)
106
},
107
)
108
if err != nil {
109
log.WithError(err).Fatal("cannot create blob server")
110
}
111
go srv.MustServe()
112
113
if cfg.PProfAddr != "" {
114
go pprof.Serve(cfg.PProfAddr)
115
}
116
if cfg.PrometheusAddr != "" {
117
reg.MustRegister(
118
collectors.NewGoCollector(),
119
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
120
clientRequestsTotal,
121
clientRequestsDuration,
122
serverRequestsTotal,
123
serverRequestsDuration,
124
)
125
126
handler := http.NewServeMux()
127
handler.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))
128
129
go func() {
130
err := http.ListenAndServe(cfg.PrometheusAddr, handler)
131
if err != nil {
132
log.WithError(err).Error("Prometheus metrics server failed")
133
}
134
}()
135
log.WithField("addr", cfg.PrometheusAddr).Info("started Prometheus metrics server")
136
}
137
138
if cfg.ReadinessProbeAddr != "" {
139
// use the first layer as source for the tests
140
if len(cfg.BlobServe.Repos) < 1 {
141
log.Fatal("To use the readiness probe you need to specify at least one blobserve repo")
142
}
143
144
var repository string
145
// find first key of the blobserve repos
146
for k := range cfg.BlobServe.Repos {
147
repository = k
148
break
149
}
150
151
named, err := reference.ParseNamed(repository)
152
if err != nil {
153
log.WithError(err).WithField("repo", repository).Fatal("cannot parse repository reference")
154
}
155
156
staticLayerHost := reference.Domain(named)
157
158
// Ensure we can resolve DNS queries, and can access the registry host
159
health := healthcheck.NewHandler()
160
health.AddReadinessCheck("dns", kubernetes.DNSCanResolveProbe(staticLayerHost, 1*time.Second))
161
health.AddReadinessCheck("registry", kubernetes.NetworkIsReachableProbe(fmt.Sprintf("https://%v", repository)))
162
163
go func() {
164
if err := http.ListenAndServe(cfg.ReadinessProbeAddr, health); err != nil && err != http.ErrServerClosed {
165
log.WithError(err).Panic("error starting HTTP server")
166
}
167
}()
168
}
169
170
ctx, cancel := context.WithCancel(context.Background())
171
defer cancel()
172
173
err = watch.File(ctx, cfg.AuthCfg, func() {
174
dockerCfgMu.Lock()
175
defer dockerCfgMu.Unlock()
176
177
dockerCfg = loadDockerCfg(cfg.AuthCfg)
178
})
179
if err != nil {
180
log.WithError(err).Fatal("cannot start watch of Docker auth configuration file")
181
}
182
183
log.Info("🏪 blobserve is up and running")
184
sigChan := make(chan os.Signal, 1)
185
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
186
<-sigChan
187
},
188
}
189
190
func init() {
191
rootCmd.AddCommand(runCmd)
192
}
193
194
func loadDockerCfg(fn string) *configfile.ConfigFile {
195
if tproot := os.Getenv("TELEPRESENCE_ROOT"); tproot != "" {
196
fn = filepath.Join(tproot, fn)
197
}
198
fr, err := os.OpenFile(fn, os.O_RDONLY, 0)
199
if err != nil {
200
log.WithError(err).Fatal("cannot read docker auth config")
201
}
202
203
dockerCfg := configfile.New(fn)
204
err = dockerCfg.LoadFromReader(fr)
205
fr.Close()
206
if err != nil {
207
log.WithError(err).Fatal("cannot read docker config")
208
}
209
log.WithField("fn", fn).Info("using authentication for backing registries")
210
211
return dockerCfg
212
}
213
214
// FromDockerConfig turns docker client config into docker registry hosts
215
func authorizerFromDockerConfig(cfg *configfile.ConfigFile) docker.Authorizer {
216
return docker.NewDockerAuthorizer(docker.WithAuthCreds(func(host string) (user, pass string, err error) {
217
auth, err := cfg.GetAuthConfig(host)
218
if err != nil {
219
return
220
}
221
user = auth.Username
222
pass = auth.Password
223
return
224
}))
225
}
226
227