Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/ws-manager-mk2/grpcpool/pool.go
2498 views
1
// Copyright (c) 2020 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License-AGPL.txt in the project root for license information.
4
5
package grpcpool
6
7
import (
8
"strings"
9
"sync"
10
"time"
11
12
"golang.org/x/xerrors"
13
"google.golang.org/grpc"
14
"google.golang.org/grpc/connectivity"
15
)
16
17
var (
18
// ErrPoolClosed is returned if Get is called after Close
19
ErrPoolClosed = xerrors.Errorf("pool is closed")
20
)
21
22
// Factory is a function which creates new grpc connections
23
type Factory func(host string) (*grpc.ClientConn, error)
24
25
// Pool is the gRPC client pool
26
type Pool struct {
27
connections map[string]*grpc.ClientConn
28
factory Factory
29
closed bool
30
mu sync.RWMutex
31
32
isValidConnection ConnectionValidationFunc
33
}
34
35
type ConnectionValidationFunc func(hostIP string) (valid bool)
36
37
// New creates a new connection pool
38
func New(factory Factory, callback ConnectionValidationFunc) *Pool {
39
pool := &Pool{
40
connections: make(map[string]*grpc.ClientConn),
41
factory: factory,
42
43
isValidConnection: callback,
44
}
45
46
go func() {
47
for range time.Tick(5 * time.Minute) {
48
pool.ValidateConnections()
49
}
50
}()
51
52
return pool
53
}
54
55
// Get will return a client connection to the host. If no connection exists yet, the factory
56
// is used to create one.
57
func (p *Pool) Get(host string) (*grpc.ClientConn, error) {
58
p.mu.RLock()
59
if p.closed {
60
p.mu.RUnlock()
61
return nil, ErrPoolClosed
62
}
63
conn, exists := p.connections[host]
64
p.mu.RUnlock()
65
66
if !exists || conn.GetState() == connectivity.Shutdown {
67
return p.add(host)
68
}
69
70
return conn, nil
71
}
72
73
// add adds a new connection to the host if one doesn't exist already in a state that is not Shutdown.
74
// Compared to Get, this function holds a write lock on mu. Get uses this function if it cannot find
75
// an existing connection.
76
func (p *Pool) add(host string) (*grpc.ClientConn, error) {
77
p.mu.Lock()
78
defer p.mu.Unlock()
79
80
conn, exists := p.connections[host]
81
if exists && conn.GetState() != connectivity.Shutdown {
82
return conn, nil
83
}
84
85
conn, err := p.factory(host)
86
if err != nil {
87
return nil, err
88
}
89
90
p.connections[host] = conn
91
return conn, nil
92
}
93
94
// Close empties the pool after closing all connections it held.
95
// It waits for all connections to close.
96
// Once the pool is closed, calling Get will result in ErrPoolClosed
97
func (p *Pool) Close() error {
98
p.mu.Lock()
99
defer p.mu.Unlock()
100
101
p.closed = true
102
errs := make([]string, 0)
103
for _, c := range p.connections {
104
err := c.Close()
105
if err != nil {
106
errs = append(errs, err.Error())
107
}
108
}
109
110
if len(errs) != 0 {
111
return xerrors.Errorf("pool close: %s", strings.Join(errs, "; "))
112
}
113
114
return nil
115
}
116
117
// ValidateConnections check if existing connections in the pool
118
// are using valid addresses and remove them from the pool if not.
119
func (p *Pool) ValidateConnections() {
120
p.mu.RLock()
121
addresses := make([]string, 0, len(p.connections))
122
for address := range p.connections {
123
addresses = append(addresses, address)
124
}
125
p.mu.RUnlock()
126
127
for _, address := range addresses {
128
if p.isValidConnection(address) {
129
continue
130
}
131
132
p.mu.Lock()
133
conn := p.connections[address]
134
conn.Close()
135
delete(p.connections, address)
136
p.mu.Unlock()
137
}
138
}
139
140