Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
aos
GitHub Repository: aos/grafana-agent
Path: blob/main/component/loki/source/internal/kafkatarget/oauth_provider.go
4096 views
1
package kafkatarget
2
3
import (
4
"context"
5
"fmt"
6
"time"
7
8
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
9
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
10
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
11
"github.com/Shopify/sarama"
12
)
13
14
func NewOAuthProvider(opts OAuthConfig) (sarama.AccessTokenProvider, error) {
15
switch opts.TokenProvider {
16
case TokenProviderTypeAzure:
17
cred, err := azidentity.NewDefaultAzureCredential(nil)
18
if err != nil {
19
return nil, err
20
}
21
return &TokenProviderAzure{tokenProvider: cred, scopes: opts.Scopes}, nil
22
default:
23
return nil, fmt.Errorf("token provider '%s' is not supported", opts.TokenProvider)
24
}
25
}
26
27
type azureTokenProvider interface {
28
GetToken(ctx context.Context, opts policy.TokenRequestOptions) (azcore.AccessToken, error)
29
}
30
31
// TokenProviderAzure implements sarama.AccessTokenProvider
32
type TokenProviderAzure struct {
33
tokenProvider azureTokenProvider
34
scopes []string
35
}
36
37
// Token returns a new *sarama.AccessToken or an error
38
func (t *TokenProviderAzure) Token() (*sarama.AccessToken, error) {
39
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
40
defer cancel()
41
token, err := t.tokenProvider.GetToken(ctx, policy.TokenRequestOptions{Scopes: t.scopes})
42
if err != nil {
43
return nil, fmt.Errorf("failed to acquire token: %w", err)
44
}
45
return &sarama.AccessToken{Token: token.Token}, nil
46
}
47
48