Path: blob/main/pkg/operator/resources_integrations.go
4094 views
package operator12import (3"fmt"45gragent "github.com/grafana/agent/pkg/operator/apis/monitoring/v1alpha1"6"github.com/grafana/agent/pkg/operator/assets"7"github.com/grafana/agent/pkg/operator/config"8apps_v1 "k8s.io/api/apps/v1"9core_v1 "k8s.io/api/core/v1"10)1112func newIntegrationsDaemonSet(cfg *Config, name string, d gragent.Deployment) (*apps_v1.DaemonSet, error) {13opts := integrationsPodTemplateOptions(name, d, true)14tmpl, selector, err := generatePodTemplate(cfg, name, d, opts)15if err != nil {16return nil, err17}1819spec := apps_v1.DaemonSetSpec{20UpdateStrategy: apps_v1.DaemonSetUpdateStrategy{21Type: apps_v1.RollingUpdateDaemonSetStrategyType,22},23Selector: selector,24Template: tmpl,25}2627return &apps_v1.DaemonSet{28ObjectMeta: metadataFromPodTemplate(name, d, tmpl),29Spec: spec,30}, nil31}3233func newIntegrationsDeployment(cfg *Config, name string, d gragent.Deployment) (*apps_v1.Deployment, error) {34opts := integrationsPodTemplateOptions(name, d, false)35tmpl, selector, err := generatePodTemplate(cfg, name, d, opts)36if err != nil {37return nil, err38}3940spec := apps_v1.DeploymentSpec{41Strategy: apps_v1.DeploymentStrategy{42Type: apps_v1.RollingUpdateDeploymentStrategyType,43},44Selector: selector,45Template: tmpl,46}4748return &apps_v1.Deployment{49ObjectMeta: metadataFromPodTemplate(name, d, tmpl),50Spec: spec,51}, nil52}5354func integrationsPodTemplateOptions(name string, d gragent.Deployment, daemonset bool) podTemplateOptions {55// Integrations expect that the metrics and logs instances exist. This means56// that we have to merge the podTemplateOptions used for metrics and logs57// with the options used for integrations.5859// Since integrations may be running as a DaemonSet, it's not possible for us60// to rely on a PVC template that metrics might be using. We'll force the WAL61// to use an empty volume.62d.Agent.Spec.Storage = nil6364integrationOpts := podTemplateOptions{65ExtraSelectorLabels: map[string]string{66agentTypeLabel: "integrations",67},68Privileged: daemonset,69}7071// We need to iterate over all of our integrations to append extra Volumes,72// VolumesMounts, and references to Secrets or ConfigMaps from the resource73// hierarchy.74var (75secretsPaths []core_v1.KeyToPath76mountedKeys = map[assets.Key]struct{}{}77)7879for _, i := range d.Integrations {80inst := i.Instance81volumePrefix := fmt.Sprintf("%s-%s-", inst.Namespace, inst.Name)8283for _, v := range inst.Spec.Volumes {84// Prefix the key of the Integration CR so it doesn't potentially collide85// with other loaded Integration CRs.86v = *v.DeepCopy()87v.Name = volumePrefix + v.Name8889integrationOpts.ExtraVolumes = append(integrationOpts.ExtraVolumes, v)90}91for _, vm := range inst.Spec.VolumeMounts {92// Prefix the key of the Integration CR so it doesn't potentially collide93// with other loaded Integration CRs.94vm = *vm.DeepCopy()95vm.Name = volumePrefix + vm.Name9697integrationOpts.ExtraVolumeMounts = append(integrationOpts.ExtraVolumeMounts, vm)98}99100for _, s := range inst.Spec.Secrets {101// We need to determine what the value for this Secret was in the shared102// Secret resource.103key := assets.KeyForSecret(inst.Namespace, &s)104if _, mounted := mountedKeys[key]; mounted {105continue106}107mountedKeys[key] = struct{}{}108109secretsPaths = append(secretsPaths, core_v1.KeyToPath{110Key: config.SanitizeLabelName(string(key)),111Path: fmt.Sprintf("secrets/%s/%s/%s", inst.Namespace, s.Name, s.Key),112})113}114115for _, cm := range inst.Spec.ConfigMaps {116// We need to determine what the value for this ConfigMap was in the shared117// Secret resource.118key := assets.KeyForConfigMap(inst.Namespace, &cm)119if _, mounted := mountedKeys[key]; mounted {120continue121}122mountedKeys[key] = struct{}{}123124secretsPaths = append(secretsPaths, core_v1.KeyToPath{125Key: config.SanitizeLabelName(string(key)),126Path: fmt.Sprintf("configMaps/%s/%s/%s", inst.Namespace, cm.Name, cm.Key),127})128}129}130131if len(secretsPaths) > 0 {132// Load in references to Secrets and ConfigMaps.133integrationSecretsName := fmt.Sprintf("%s-integrations-secrets", d.Agent.Name)134135integrationOpts.ExtraVolumes = append(integrationOpts.ExtraVolumes, core_v1.Volume{136Name: integrationSecretsName,137VolumeSource: core_v1.VolumeSource{138Secret: &core_v1.SecretVolumeSource{139// The reconcile-wide Secret holds all secrets and config maps140// integrations may have used.141SecretName: fmt.Sprintf("%s-secrets", d.Agent.Name),142Items: secretsPaths,143},144},145})146147integrationOpts.ExtraVolumeMounts = append(integrationOpts.ExtraVolumeMounts, core_v1.VolumeMount{148Name: integrationSecretsName,149MountPath: "/etc/grafana-agent/integrations",150ReadOnly: true,151})152}153154// Extra options to merge in.155//156// NOTE(rfratto): Merge order is important, as subsequent podTemplateOptions157// have placeholders necessary to generate configs.158var (159metricsOpts = metricsPodTemplateOptions(name, d, 0)160logsOpts = logsPodTemplateOptions()161)162return mergePodTemplateOptions(&integrationOpts, &metricsOpts, &logsOpts)163}164165// mergePodTemplateOptions merges the provided inputs into a single166// podTemplateOptions. Precedence for existing values is taken in input order;167// if an environment variable is defined in both inputs[0] and inputs[1], the168// value from inputs[0] is used.169func mergePodTemplateOptions(inputs ...*podTemplateOptions) podTemplateOptions {170res := podTemplateOptions{171ExtraSelectorLabels: make(map[string]string),172}173174// Volumes are unique by both mount path or name. If a mount path already175// exists, we want to ignore that volume and the respective volume mount176// that uses it.177178var (179mountNames = map[string]struct{}{} // Consumed mount names180mountPaths = map[string]struct{}{} // Consumed mount paths181volumeNames = map[string]struct{}{} // Consumed volume names182varNames = map[string]struct{}{} // Consumed variable names183)184185for _, input := range inputs {186for k, v := range input.ExtraSelectorLabels {187if _, exist := res.ExtraSelectorLabels[k]; exist {188continue189}190res.ExtraSelectorLabels[k] = v191}192193// Merge in VolumeMounts before Volumes, allowing us to detect what volume194// names specific to this input should be ignored.195ignoreVolumes := map[string]struct{}{}196197for _, vm := range input.ExtraVolumeMounts {198// Ignore a volume if the mount path or volume name already exists.199var (200_, exists = mountNames[vm.Name]201_, mounted = mountPaths[vm.MountPath]202)203if exists || mounted {204ignoreVolumes[vm.Name] = struct{}{}205continue206}207208res.ExtraVolumeMounts = append(res.ExtraVolumeMounts, vm)209mountNames[vm.Name] = struct{}{}210mountPaths[vm.MountPath] = struct{}{}211}212213// Merge in volumes that haven't been ignored or have a unique name.214for _, v := range input.ExtraVolumes {215if _, ignored := ignoreVolumes[v.Name]; ignored {216continue217} else if _, exists := volumeNames[v.Name]; exists {218continue219}220221res.ExtraVolumes = append(res.ExtraVolumes, v)222volumeNames[v.Name] = struct{}{}223}224225for _, ev := range input.ExtraEnvVars {226if _, exists := varNames[ev.Name]; exists {227continue228}229230res.ExtraEnvVars = append(res.ExtraEnvVars, ev)231varNames[ev.Name] = struct{}{}232}233}234235return res236}237238239