Path: blob/main/components/ws-manager-mk2/grpcpool/pool.go
2498 views
// Copyright (c) 2020 Gitpod GmbH. All rights reserved.1// Licensed under the GNU Affero General Public License (AGPL).2// See License-AGPL.txt in the project root for license information.34package grpcpool56import (7"strings"8"sync"9"time"1011"golang.org/x/xerrors"12"google.golang.org/grpc"13"google.golang.org/grpc/connectivity"14)1516var (17// ErrPoolClosed is returned if Get is called after Close18ErrPoolClosed = xerrors.Errorf("pool is closed")19)2021// Factory is a function which creates new grpc connections22type Factory func(host string) (*grpc.ClientConn, error)2324// Pool is the gRPC client pool25type Pool struct {26connections map[string]*grpc.ClientConn27factory Factory28closed bool29mu sync.RWMutex3031isValidConnection ConnectionValidationFunc32}3334type ConnectionValidationFunc func(hostIP string) (valid bool)3536// New creates a new connection pool37func New(factory Factory, callback ConnectionValidationFunc) *Pool {38pool := &Pool{39connections: make(map[string]*grpc.ClientConn),40factory: factory,4142isValidConnection: callback,43}4445go func() {46for range time.Tick(5 * time.Minute) {47pool.ValidateConnections()48}49}()5051return pool52}5354// Get will return a client connection to the host. If no connection exists yet, the factory55// is used to create one.56func (p *Pool) Get(host string) (*grpc.ClientConn, error) {57p.mu.RLock()58if p.closed {59p.mu.RUnlock()60return nil, ErrPoolClosed61}62conn, exists := p.connections[host]63p.mu.RUnlock()6465if !exists || conn.GetState() == connectivity.Shutdown {66return p.add(host)67}6869return conn, nil70}7172// add adds a new connection to the host if one doesn't exist already in a state that is not Shutdown.73// Compared to Get, this function holds a write lock on mu. Get uses this function if it cannot find74// an existing connection.75func (p *Pool) add(host string) (*grpc.ClientConn, error) {76p.mu.Lock()77defer p.mu.Unlock()7879conn, exists := p.connections[host]80if exists && conn.GetState() != connectivity.Shutdown {81return conn, nil82}8384conn, err := p.factory(host)85if err != nil {86return nil, err87}8889p.connections[host] = conn90return conn, nil91}9293// Close empties the pool after closing all connections it held.94// It waits for all connections to close.95// Once the pool is closed, calling Get will result in ErrPoolClosed96func (p *Pool) Close() error {97p.mu.Lock()98defer p.mu.Unlock()99100p.closed = true101errs := make([]string, 0)102for _, c := range p.connections {103err := c.Close()104if err != nil {105errs = append(errs, err.Error())106}107}108109if len(errs) != 0 {110return xerrors.Errorf("pool close: %s", strings.Join(errs, "; "))111}112113return nil114}115116// ValidateConnections check if existing connections in the pool117// are using valid addresses and remove them from the pool if not.118func (p *Pool) ValidateConnections() {119p.mu.RLock()120addresses := make([]string, 0, len(p.connections))121for address := range p.connections {122addresses = append(addresses, address)123}124p.mu.RUnlock()125126for _, address := range addresses {127if p.isValidConnection(address) {128continue129}130131p.mu.Lock()132conn := p.connections[address]133conn.Close()134delete(p.connections, address)135p.mu.Unlock()136}137}138139140