Path: blob/main/components/ws-daemon/pkg/netlimit/netlimit.go
2499 views
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.1// Licensed under the GNU Affero General Public License (AGPL).2// See License.AGPL.txt in the project root for license information.34package netlimit56import (7"context"8"errors"9"fmt"10"os"11"os/exec"1213"runtime"14"strconv"15"sync"16"time"1718"github.com/gitpod-io/gitpod/common-go/kubernetes"19"github.com/gitpod-io/gitpod/common-go/log"20"github.com/gitpod-io/gitpod/ws-daemon/pkg/dispatch"21"github.com/gitpod-io/gitpod/ws-daemon/pkg/nsinsider"22"github.com/google/nftables"23"github.com/prometheus/client_golang/prometheus"24"github.com/vishvananda/netns"25)2627type ConnLimiter struct {28mu sync.RWMutex29limited map[string]struct{}30droppedBytes *prometheus.GaugeVec31droppedPackets *prometheus.GaugeVec32config Config33}3435func NewConnLimiter(config Config, prom prometheus.Registerer) *ConnLimiter {36s := &ConnLimiter{37droppedBytes: prometheus.NewGaugeVec(prometheus.GaugeOpts{38Name: "netlimit_connections_dropped_bytes",39Help: "Number of bytes dropped due to connection limiting",40}, []string{"node", "workspace"}),4142droppedPackets: prometheus.NewGaugeVec(prometheus.GaugeOpts{43Name: "netlimit_connections_dropped_packets",44Help: "Number of packets dropped due to connection limiting",45}, []string{"node", "workspace"}),46limited: map[string]struct{}{},47}4849s.config = config5051if config.Enabled {52prom.MustRegister(53s.droppedBytes,54s.droppedPackets,55)56}5758return s59}6061func (c *ConnLimiter) WorkspaceAdded(ctx context.Context, ws *dispatch.Workspace) error {62c.mu.Lock()63defer c.mu.Unlock()6465_, hasAnnotation := ws.Pod.Annotations[kubernetes.WorkspaceNetConnLimitAnnotation]66if !hasAnnotation {67return nil68}6970return c.limitWorkspace(ctx, ws)71}7273func (c *ConnLimiter) WorkspaceUpdated(ctx context.Context, ws *dispatch.Workspace) error {74c.mu.Lock()75defer c.mu.Unlock()7677_, hasAnnotation := ws.Pod.Annotations[kubernetes.WorkspaceNetConnLimitAnnotation]78if !hasAnnotation {79return nil80}8182if _, ok := c.limited[ws.InstanceID]; ok {83return nil84}8586return c.limitWorkspace(ctx, ws)87}8889func (n *ConnLimiter) GetConnectionDropCounter(pid uint64) (*nftables.CounterObj, error) {90runtime.LockOSThread()91defer runtime.UnlockOSThread()9293netns, err := netns.GetFromPid(int(pid))94if err != nil {95return nil, fmt.Errorf("could not get handle for network namespace: %w", err)96}9798nftconn, err := nftables.New(nftables.WithNetNSFd(int(netns)))99if err != nil {100return nil, fmt.Errorf("could not establish netlink connection for nft: %w", err)101}102103gitpodTable := &nftables.Table{104Name: "gitpod",105Family: nftables.TableFamilyIPv4,106}107108counterObject, err := nftconn.GetObject(&nftables.CounterObj{109Table: gitpodTable,110Name: "ws-connection-drop-stats",111})112113if err != nil {114return nil, fmt.Errorf("could not get connection drop stats: %w", err)115}116117dropCounter, ok := counterObject.(*nftables.CounterObj)118if !ok {119return nil, fmt.Errorf("could not cast counter object")120}121122return dropCounter, nil123}124125func (c *ConnLimiter) limitWorkspace(ctx context.Context, ws *dispatch.Workspace) error {126disp := dispatch.GetFromContext(ctx)127if disp == nil {128return fmt.Errorf("no dispatch available")129}130131pid, err := disp.Runtime.ContainerPID(ctx, ws.ContainerID)132if err != nil {133if errors.Is(err, context.Canceled) {134return nil135}136return fmt.Errorf("could not get pid for container %s of workspace %s", ws.ContainerID, ws.WorkspaceID)137}138139err = nsinsider.Nsinsider(ws.InstanceID, int(pid), func(cmd *exec.Cmd) {140cmd.Args = append(cmd.Args, "setup-connection-limit", "--limit", strconv.Itoa(int(c.config.ConnectionsPerMinute)),141"--bucketsize", strconv.Itoa(int(c.config.BucketSize)))142if c.config.Enforce {143cmd.Args = append(cmd.Args, "--enforce")144}145}, nsinsider.EnterMountNS(false), nsinsider.EnterNetNS(true))146if err != nil {147if errors.Is(context.Cause(ctx), context.Canceled) {148return nil149}150log.WithError(err).WithFields(ws.OWI()).Error("cannot enable connection limiting")151return err152}153c.limited[ws.InstanceID] = struct{}{}154155dispatch.GetDispatchWaitGroup(ctx).Add(1)156go func(*dispatch.Workspace) {157defer dispatch.GetDispatchWaitGroup(ctx).Done()158159ticker := time.NewTicker(30 * time.Second)160defer ticker.Stop()161162for {163select {164case <-ticker.C:165counter, err := c.GetConnectionDropCounter(pid)166if err != nil {167log.WithFields(ws.OWI()).WithError(err).Warnf("could not get connection drop stats")168continue169}170171nodeName := os.Getenv("NODENAME")172c.droppedBytes.WithLabelValues(nodeName, ws.Pod.Name).Set(float64(counter.Bytes))173c.droppedPackets.WithLabelValues(nodeName, ws.Pod.Name).Set(float64(counter.Packets))174175case <-ctx.Done():176c.mu.Lock()177delete(c.limited, ws.InstanceID)178c.mu.Unlock()179return180}181}182}(ws)183184return nil185}186187func (c *ConnLimiter) Update(config Config) {188c.mu.Lock()189defer c.mu.Unlock()190191c.config = config192log.WithField("config", config).Info("updating network connection limits")193}194195196