Path: blob/main/component/loki/source/internal/kafkatarget/oauth_provider.go
4096 views
package kafkatarget12import (3"context"4"fmt"5"time"67"github.com/Azure/azure-sdk-for-go/sdk/azcore"8"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"9"github.com/Azure/azure-sdk-for-go/sdk/azidentity"10"github.com/Shopify/sarama"11)1213func NewOAuthProvider(opts OAuthConfig) (sarama.AccessTokenProvider, error) {14switch opts.TokenProvider {15case TokenProviderTypeAzure:16cred, err := azidentity.NewDefaultAzureCredential(nil)17if err != nil {18return nil, err19}20return &TokenProviderAzure{tokenProvider: cred, scopes: opts.Scopes}, nil21default:22return nil, fmt.Errorf("token provider '%s' is not supported", opts.TokenProvider)23}24}2526type azureTokenProvider interface {27GetToken(ctx context.Context, opts policy.TokenRequestOptions) (azcore.AccessToken, error)28}2930// TokenProviderAzure implements sarama.AccessTokenProvider31type TokenProviderAzure struct {32tokenProvider azureTokenProvider33scopes []string34}3536// Token returns a new *sarama.AccessToken or an error37func (t *TokenProviderAzure) Token() (*sarama.AccessToken, error) {38ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)39defer cancel()40token, err := t.tokenProvider.GetToken(ctx, policy.TokenRequestOptions{Scopes: t.scopes})41if err != nil {42return nil, fmt.Errorf("failed to acquire token: %w", err)43}44return &sarama.AccessToken{Token: token.Token}, nil45}464748