Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
gitpod-io
GitHub Repository: gitpod-io/gitpod
Path: blob/main/components/usage/pkg/apiv1/usage.go
2499 views
1
// Copyright (c) 2022 Gitpod GmbH. All rights reserved.
2
// Licensed under the GNU Affero General Public License (AGPL).
3
// See License.AGPL.txt in the project root for license information.
4
5
package apiv1
6
7
import (
8
"context"
9
"fmt"
10
"math"
11
"strings"
12
"time"
13
14
"github.com/google/uuid"
15
16
"github.com/gitpod-io/gitpod/common-go/log"
17
db "github.com/gitpod-io/gitpod/components/gitpod-db/go"
18
v1 "github.com/gitpod-io/gitpod/usage-api/v1"
19
"google.golang.org/grpc/codes"
20
"google.golang.org/grpc/status"
21
"google.golang.org/protobuf/types/known/durationpb"
22
"google.golang.org/protobuf/types/known/timestamppb"
23
"gorm.io/gorm"
24
)
25
26
var _ v1.UsageServiceServer = (*UsageService)(nil)
27
28
type UsageService struct {
29
conn *gorm.DB
30
nowFunc func() time.Time
31
pricer *WorkspacePricer
32
costCenterManager *db.CostCenterManager
33
ledgerInterval time.Duration
34
35
v1.UnimplementedUsageServiceServer
36
}
37
38
const maxQuerySize = 300 * 24 * time.Hour
39
40
func (s *UsageService) ListUsage(ctx context.Context, in *v1.ListUsageRequest) (*v1.ListUsageResponse, error) {
41
to := time.Now()
42
if in.To != nil {
43
to = in.To.AsTime()
44
}
45
from := to.Add(-maxQuerySize)
46
if in.From != nil {
47
from = in.From.AsTime()
48
}
49
50
if from.After(to) {
51
return nil, status.Errorf(codes.InvalidArgument, "Specified From timestamp is after To. Please ensure From is always before To")
52
}
53
54
if to.Sub(from) > maxQuerySize {
55
return nil, status.Errorf(codes.InvalidArgument, "Maximum range exceeded. Range specified can be at most %s", maxQuerySize.String())
56
}
57
58
if in.GetPagination().GetPerPage() < 0 {
59
return nil, status.Errorf(codes.InvalidArgument, "Number of items perPage needs to be positive (was %d).", in.GetPagination().GetPerPage())
60
}
61
62
if in.GetPagination().GetPerPage() > 1000 {
63
return nil, status.Errorf(codes.InvalidArgument, "Number of items perPage needs to be no more than 1000 (was %d).", in.GetPagination().GetPerPage())
64
}
65
66
if in.GetPagination().GetPage() < 0 {
67
return nil, status.Errorf(codes.InvalidArgument, "Page number needs to be 0 or greater (was %d).", in.GetPagination().GetPage())
68
}
69
70
attributionId, err := db.ParseAttributionID(in.AttributionId)
71
if err != nil {
72
return nil, status.Errorf(codes.InvalidArgument, "AttributionID '%s' couldn't be parsed (error: %s).", in.AttributionId, err)
73
}
74
75
order := db.DescendingOrder
76
if in.Order == v1.ListUsageRequest_ORDERING_ASCENDING {
77
order = db.AscendingOrder
78
}
79
80
var perPage int64 = 50
81
if in.GetPagination().GetPerPage() > 0 {
82
perPage = in.GetPagination().GetPerPage()
83
}
84
var page int64 = 1
85
if in.GetPagination().GetPage() > 1 {
86
page = in.GetPagination().GetPage()
87
}
88
var offset = perPage * (page - 1)
89
90
var userID uuid.UUID
91
if in.UserId != "" {
92
userID, err = uuid.Parse(in.UserId)
93
if err != nil {
94
return nil, status.Errorf(codes.InvalidArgument, "UserID '%s' couldn't be parsed (error: %s).", in.UserId, err)
95
}
96
}
97
98
excludeDrafts := false
99
listUsageResult, err := db.FindUsage(ctx, s.conn, &db.FindUsageParams{
100
AttributionId: db.AttributionID(in.GetAttributionId()),
101
UserID: userID,
102
From: from,
103
To: to,
104
Order: order,
105
Offset: offset,
106
Limit: perPage,
107
ExcludeDrafts: excludeDrafts,
108
})
109
logger := log.Log.
110
WithField("attribution_id", in.AttributionId).
111
WithField("userID", userID).
112
WithField("perPage", perPage).
113
WithField("page", page).
114
WithField("from", from).
115
WithField("to", to)
116
logger.Debug("Fetching usage data")
117
if err != nil {
118
logger.WithError(err).Error("Failed to fetch usage.")
119
return nil, status.Error(codes.Internal, "unable to retrieve usage")
120
}
121
122
var usageData []*v1.Usage
123
for _, usageRecord := range listUsageResult {
124
kind := v1.Usage_KIND_WORKSPACE_INSTANCE
125
if usageRecord.Kind == db.InvoiceUsageKind {
126
kind = v1.Usage_KIND_INVOICE
127
}
128
129
var workspaceInstanceID string
130
if usageRecord.WorkspaceInstanceID != nil {
131
workspaceInstanceID = (*usageRecord.WorkspaceInstanceID).String()
132
}
133
134
usageDataEntry := &v1.Usage{
135
Id: usageRecord.ID.String(),
136
AttributionId: string(usageRecord.AttributionID),
137
EffectiveTime: timestamppb.New(usageRecord.EffectiveTime.Time()),
138
// convert cents back to full credits
139
Credits: usageRecord.CreditCents.ToCredits(),
140
Kind: kind,
141
WorkspaceInstanceId: workspaceInstanceID,
142
Draft: usageRecord.Draft,
143
Metadata: string(usageRecord.Metadata),
144
}
145
usageData = append(usageData, usageDataEntry)
146
}
147
148
usageSummary, err := db.GetUsageSummary(ctx, s.conn,
149
db.GetUsageSummaryParams{
150
AttributionId: attributionId,
151
UserID: userID,
152
From: from,
153
To: to,
154
ExcludeDrafts: excludeDrafts,
155
},
156
)
157
158
if err != nil {
159
logger.WithError(err).Error("Failed to fetch usage metadata.")
160
return nil, status.Error(codes.Internal, "unable to retrieve usage")
161
}
162
totalPages := int64(math.Ceil(float64(usageSummary.NumberOfRecords) / float64(perPage)))
163
164
pagination := v1.PaginatedResponse{
165
PerPage: perPage,
166
Page: page,
167
TotalPages: totalPages,
168
Total: int64(usageSummary.NumberOfRecords),
169
}
170
171
return &v1.ListUsageResponse{
172
UsageEntries: usageData,
173
CreditsUsed: usageSummary.CreditCentsUsed.ToCredits(),
174
Pagination: &pagination,
175
LedgerInterval: durationpb.New(s.ledgerInterval),
176
}, nil
177
}
178
179
func (s *UsageService) GetBalance(ctx context.Context, in *v1.GetBalanceRequest) (*v1.GetBalanceResponse, error) {
180
attrId, err := db.ParseAttributionID(in.AttributionId)
181
if err != nil {
182
return nil, err
183
}
184
credits, err := db.GetBalance(ctx, s.conn, attrId)
185
if err != nil {
186
return nil, err
187
}
188
return &v1.GetBalanceResponse{
189
Credits: credits.ToCredits(),
190
}, nil
191
}
192
193
func (s *UsageService) GetCostCenter(ctx context.Context, in *v1.GetCostCenterRequest) (*v1.GetCostCenterResponse, error) {
194
if in.AttributionId == "" {
195
return nil, status.Errorf(codes.InvalidArgument, "Empty attributionId")
196
}
197
attributionId, err := db.ParseAttributionID(in.AttributionId)
198
if err != nil {
199
return nil, status.Errorf(codes.InvalidArgument, "Bad attributionId %s", in.AttributionId)
200
}
201
202
result, err := s.costCenterManager.GetOrCreateCostCenter(ctx, attributionId)
203
if err != nil {
204
return nil, err
205
}
206
return &v1.GetCostCenterResponse{
207
CostCenter: dbCostCenterToAPI(result),
208
}, nil
209
}
210
211
func dbCostCenterToAPI(c db.CostCenter) *v1.CostCenter {
212
return &v1.CostCenter{
213
AttributionId: string(c.ID),
214
SpendingLimit: c.SpendingLimit,
215
BillingStrategy: convertBillingStrategyToAPI(c.BillingStrategy),
216
NextBillingTime: db.VarcharTimeToTimestamppb(c.NextBillingTime),
217
BillingCycleStart: db.VarcharTimeToTimestamppb(c.BillingCycleStart),
218
}
219
}
220
221
func convertBillingStrategyToDB(in v1.CostCenter_BillingStrategy) db.BillingStrategy {
222
if in == v1.CostCenter_BILLING_STRATEGY_STRIPE {
223
return db.CostCenter_Stripe
224
}
225
return db.CostCenter_Other
226
}
227
228
func convertBillingStrategyToAPI(in db.BillingStrategy) v1.CostCenter_BillingStrategy {
229
if in == db.CostCenter_Stripe {
230
return v1.CostCenter_BILLING_STRATEGY_STRIPE
231
}
232
return v1.CostCenter_BILLING_STRATEGY_OTHER
233
}
234
235
func (s *UsageService) SetCostCenter(ctx context.Context, in *v1.SetCostCenterRequest) (*v1.SetCostCenterResponse, error) {
236
if in.CostCenter == nil {
237
return nil, status.Errorf(codes.InvalidArgument, "Empty CostCenter")
238
}
239
240
attrID, err := db.ParseAttributionID(in.CostCenter.AttributionId)
241
if err != nil {
242
return nil, err
243
}
244
245
costCenter := db.CostCenter{
246
ID: attrID,
247
SpendingLimit: in.CostCenter.SpendingLimit,
248
BillingStrategy: convertBillingStrategyToDB(in.CostCenter.BillingStrategy),
249
}
250
result, err := s.costCenterManager.UpdateCostCenter(ctx, costCenter)
251
if err != nil {
252
return nil, err
253
}
254
return &v1.SetCostCenterResponse{
255
CostCenter: dbCostCenterToAPI(result),
256
}, nil
257
}
258
259
func (s *UsageService) ResetUsage(ctx context.Context, req *v1.ResetUsageRequest) (*v1.ResetUsageResponse, error) {
260
now := time.Now()
261
costCentersToUpdate, err := s.costCenterManager.ListManagedCostCentersWithBillingTimeBefore(ctx, now)
262
if err != nil {
263
log.WithError(err).Error("Failed to list cost centers to update.")
264
return nil, status.Errorf(codes.Internal, "Failed to identify expired cost centers for Other billing strategy")
265
}
266
267
log.Infof("Identified %d expired cost centers at relative to %s", len(costCentersToUpdate), now.Format(time.RFC3339))
268
269
var errors []error
270
for _, cc := range costCentersToUpdate {
271
_, err = s.costCenterManager.ResetUsage(ctx, cc.ID)
272
if err != nil {
273
errors = append(errors, err)
274
}
275
}
276
if len(errors) >= 1 {
277
log.WithField("errors", errors).Error("Failed to reset usage.")
278
}
279
280
return &v1.ResetUsageResponse{}, nil
281
}
282
283
func (s *UsageService) ReconcileUsage(ctx context.Context, req *v1.ReconcileUsageRequest) (*v1.ReconcileUsageResponse, error) {
284
from := req.GetFrom().AsTime()
285
to := req.GetTo().AsTime()
286
287
logger := log.
288
WithField("from", from).
289
WithField("to", to)
290
291
if to.Before(from) {
292
return nil, status.Errorf(codes.InvalidArgument, "To must not be before From")
293
}
294
295
var instances []db.WorkspaceInstanceForUsage
296
stopped, err := db.FindStoppedWorkspaceInstancesInRange(ctx, s.conn, from, to)
297
if err != nil {
298
logger.WithError(err).Errorf("Failed to find stopped workspace instances.")
299
return nil, status.Errorf(codes.Internal, "failed to query for stopped instances")
300
}
301
logger.Infof("Found %d stopped workspace instances in range.", len(stopped))
302
instances = append(instances, stopped...)
303
304
running, err := db.FindRunningWorkspaceInstances(ctx, s.conn)
305
if err != nil {
306
logger.WithError(err).Errorf("Failed to find running workspace instances.")
307
return nil, status.Errorf(codes.Internal, "failed to query for running instances")
308
}
309
logger.Infof("Found %d running workspaces since the beginning of time.", len(running))
310
instances = append(instances, running...)
311
312
usageDrafts, err := db.FindAllDraftUsage(ctx, s.conn)
313
if err != nil {
314
logger.WithError(err).Errorf("Failed to find all draft usage records.")
315
return nil, status.Errorf(codes.Internal, "failed to find all draft usage records")
316
}
317
logger.Infof("Found %d draft usage records.", len(usageDrafts))
318
319
instancesWithUsageInDraft, err := db.FindWorkspaceInstancesByIds(ctx, s.conn, collectWorkspaceInstanceIDs(usageDrafts))
320
if err != nil {
321
logger.WithError(err).Errorf("Failed to find workspace instances for usage records in draft.")
322
return nil, status.Errorf(codes.Internal, "failed to find workspace instances for usage records in draft state")
323
}
324
logger.Infof("Found %d workspaces instances for usage records in draft.", len(instancesWithUsageInDraft))
325
instances = append(instances, instancesWithUsageInDraft...)
326
327
// now has to be computed after we've collected all data, to ensure that it's always greater than any of the records we fetch
328
now := s.nowFunc()
329
inserts, updates, err := reconcileUsage(instances, usageDrafts, s.pricer, now)
330
if err != nil {
331
logger.WithError(err).Errorf("Failed to reconcile usage with ledger.")
332
return nil, status.Errorf(codes.Internal, "Failed to reconcile usage with ledger.")
333
}
334
logger.Infof("Identified %d inserts and %d updates against usage records.", len(inserts), len(updates))
335
336
if len(inserts) > 0 {
337
err = db.InsertUsage(ctx, s.conn, inserts...)
338
if err != nil {
339
logger.WithError(err).Errorf("Failed to insert %d usage records into the database.", len(inserts))
340
return nil, status.Errorf(codes.Internal, "Failed to insert usage records into the database.")
341
}
342
logger.Infof("Inserted %d new Usage records into the database.", len(inserts))
343
}
344
345
if len(updates) > 0 {
346
err = db.UpdateUsage(ctx, s.conn, updates...)
347
if err != nil {
348
logger.WithError(err).Error("Failed to update usage records in the database.")
349
return nil, status.Errorf(codes.Internal, "Failed to update usage records in the database.")
350
}
351
logger.Infof("Updated %d Usage records in the database.", len(updates))
352
}
353
354
return &v1.ReconcileUsageResponse{}, nil
355
}
356
357
func reconcileUsage(instances []db.WorkspaceInstanceForUsage, drafts []db.Usage, pricer *WorkspacePricer, now time.Time) (inserts []db.Usage, updates []db.Usage, err error) {
358
359
instancesByID := dedupeWorkspaceInstancesForUsage(instances)
360
361
draftsByInstanceID := map[uuid.UUID]db.Usage{}
362
for _, draft := range drafts {
363
draftsByInstanceID[*draft.WorkspaceInstanceID] = draft
364
}
365
366
for instanceID, instance := range instancesByID {
367
if usage, exists := draftsByInstanceID[instanceID]; exists {
368
updatedUsage, err := updateUsageFromInstance(instance, usage, pricer, now)
369
if err != nil {
370
return nil, nil, fmt.Errorf("failed to construct updated usage record: %w", err)
371
}
372
updates = append(updates, updatedUsage)
373
continue
374
}
375
376
usage, err := newUsageFromInstance(instance, pricer, now)
377
if err != nil {
378
return nil, nil, fmt.Errorf("failed to construct usage record: %w", err)
379
}
380
inserts = append(inserts, usage)
381
}
382
383
return inserts, updates, nil
384
}
385
386
const usageDescriptionFromController = "Usage collected by automated system."
387
388
func newUsageFromInstance(instance db.WorkspaceInstanceForUsage, pricer *WorkspacePricer, now time.Time) (db.Usage, error) {
389
stopTime := instance.StoppingTime
390
if !stopTime.IsSet() {
391
stopTime = instance.StoppedTime
392
}
393
394
draft := true
395
if instance.StoppedTime.IsSet() {
396
draft = false
397
}
398
399
effectiveTime := now
400
if stopTime.IsSet() {
401
effectiveTime = stopTime.Time()
402
}
403
404
usage := db.Usage{
405
ID: uuid.New(),
406
AttributionID: instance.UsageAttributionID,
407
Description: usageDescriptionFromController,
408
CreditCents: db.NewCreditCents(pricer.CreditsUsedByInstance(&instance, now)),
409
EffectiveTime: db.NewVarCharTime(effectiveTime),
410
Kind: db.WorkspaceInstanceUsageKind,
411
WorkspaceInstanceID: &instance.ID,
412
Draft: draft,
413
}
414
415
creationTime := ""
416
if instance.CreationTime.IsSet() {
417
creationTime = db.TimeToISO8601(instance.CreationTime.Time())
418
}
419
startedTime := ""
420
if instance.StartedTime.IsSet() {
421
startedTime = db.TimeToISO8601(instance.StartedTime.Time())
422
}
423
endTime := ""
424
if stopTime.IsSet() {
425
endTime = db.TimeToISO8601(stopTime.Time())
426
}
427
stoppedTime := ""
428
if instance.StoppedTime.IsSet() {
429
stoppedTime = db.TimeToISO8601(instance.StoppedTime.Time())
430
}
431
err := usage.SetMetadataWithWorkspaceInstance(db.WorkspaceInstanceUsageData{
432
WorkspaceId: instance.WorkspaceID,
433
WorkspaceType: instance.Type,
434
WorkspaceClass: instance.WorkspaceClass,
435
ContextURL: instance.ContextURL,
436
CreationTime: creationTime,
437
StartTime: startedTime,
438
EndTime: endTime,
439
StoppedTime: stoppedTime,
440
UserID: instance.UserID,
441
UserName: instance.UserName,
442
UserAvatarURL: instance.UserAvatarURL,
443
})
444
if err != nil {
445
return db.Usage{}, fmt.Errorf("failed to serialize workspace instance metadata: %w", err)
446
}
447
448
return usage, nil
449
}
450
451
func updateUsageFromInstance(instance db.WorkspaceInstanceForUsage, usage db.Usage, pricer *WorkspacePricer, now time.Time) (db.Usage, error) {
452
// We construct a new record to ensure we always take the data from the source of truth - the workspace instance
453
updated, err := newUsageFromInstance(instance, pricer, now)
454
if err != nil {
455
return db.Usage{}, fmt.Errorf("failed to construct updated usage record: %w", err)
456
}
457
// but we override the ID to the one we already have
458
updated.ID = usage.ID
459
460
return updated, nil
461
}
462
463
func collectWorkspaceInstanceIDs(usage []db.Usage) []uuid.UUID {
464
var ids []uuid.UUID
465
for _, u := range usage {
466
ids = append(ids, *u.WorkspaceInstanceID)
467
}
468
return ids
469
}
470
471
func dedupeWorkspaceInstancesForUsage(instances []db.WorkspaceInstanceForUsage) map[uuid.UUID]db.WorkspaceInstanceForUsage {
472
set := map[uuid.UUID]db.WorkspaceInstanceForUsage{}
473
for _, instance := range instances {
474
set[instance.ID] = instance
475
}
476
return set
477
}
478
479
func NewUsageService(conn *gorm.DB, pricer *WorkspacePricer, costCenterManager *db.CostCenterManager, ledgerIntervalStr string) (*UsageService, error) {
480
481
ledgerInterval, err := time.ParseDuration(ledgerIntervalStr)
482
if err != nil {
483
return nil, fmt.Errorf("failed to parse schedule duration: %w", err)
484
}
485
486
return &UsageService{
487
conn: conn,
488
costCenterManager: costCenterManager,
489
nowFunc: func() time.Time {
490
return time.Now().UTC()
491
},
492
pricer: pricer,
493
ledgerInterval: ledgerInterval,
494
}, nil
495
}
496
497
func (s *UsageService) AddUsageCreditNote(ctx context.Context, req *v1.AddUsageCreditNoteRequest) (*v1.AddUsageCreditNoteResponse, error) {
498
log.Log.
499
WithField("attribution_id", req.AttributionId).
500
WithField("credits", req.Credits).
501
WithField("user", req.UserId).
502
WithField("note", req.Description).
503
Info("Adding usage credit note.")
504
505
attributionId, err := db.ParseAttributionID(req.AttributionId)
506
if err != nil {
507
return nil, status.Errorf(codes.InvalidArgument, "AttributionID '%s' couldn't be parsed (error: %s).", req.AttributionId, err)
508
}
509
510
description := strings.TrimSpace(req.Description)
511
if description == "" {
512
return nil, status.Error(codes.InvalidArgument, "The description must not be empty.")
513
}
514
515
usage := db.Usage{
516
ID: uuid.New(),
517
AttributionID: attributionId,
518
Description: description,
519
CreditCents: db.NewCreditCents(float64(req.Credits * -1)),
520
EffectiveTime: db.NewVarCharTime(time.Now()),
521
Kind: db.CreditNoteKind,
522
Draft: false,
523
}
524
525
if req.UserId != "" {
526
userId, err := uuid.Parse(req.UserId)
527
if err != nil {
528
return nil, fmt.Errorf("The user id is not a valid UUID. %w", err)
529
}
530
err = usage.SetCreditNoteMetaData(db.CreditNoteMetaData{UserID: userId.String()})
531
if err != nil {
532
return nil, err
533
}
534
}
535
536
err = db.InsertUsage(ctx, s.conn, usage)
537
if err != nil {
538
return nil, err
539
}
540
return &v1.AddUsageCreditNoteResponse{}, nil
541
}
542
543