Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
alist-org
GitHub Repository: alist-org/alist
Path: blob/main/internal/fs/s3_transition.go
1987 views
1
package fs
2
3
import (
4
"encoding/json"
5
"fmt"
6
"strings"
7
"time"
8
9
"github.com/alist-org/alist/v3/drivers/s3"
10
"github.com/alist-org/alist/v3/internal/driver"
11
"github.com/alist-org/alist/v3/internal/model"
12
"github.com/alist-org/alist/v3/internal/op"
13
"github.com/alist-org/alist/v3/internal/task"
14
"github.com/pkg/errors"
15
"github.com/xhofe/tache"
16
)
17
18
const s3TransitionPollInterval = 15 * time.Second
19
20
// S3TransitionTask represents an asynchronous S3 archive/thaw request that is
21
// tracked via the task manager so that clients can monitor the progress of the
22
// operation.
23
type S3TransitionTask struct {
24
task.TaskExtension
25
status string
26
27
StorageMountPath string `json:"storage_mount_path"`
28
ObjectPath string `json:"object_path"`
29
DisplayPath string `json:"display_path"`
30
ObjectName string `json:"object_name"`
31
Transition string `json:"transition"`
32
Payload json.RawMessage `json:"payload,omitempty"`
33
34
TargetStorageClass string `json:"target_storage_class,omitempty"`
35
RequestID string `json:"request_id,omitempty"`
36
VersionID string `json:"version_id,omitempty"`
37
38
storage driver.Driver `json:"-"`
39
}
40
41
// S3TransitionTaskManager holds asynchronous S3 archive/thaw tasks.
42
var S3TransitionTaskManager *tache.Manager[*S3TransitionTask]
43
44
var _ task.TaskExtensionInfo = (*S3TransitionTask)(nil)
45
46
func (t *S3TransitionTask) GetName() string {
47
action := strings.ToLower(t.Transition)
48
if action == "" {
49
action = "transition"
50
}
51
display := t.DisplayPath
52
if display == "" {
53
display = t.ObjectPath
54
}
55
if display == "" {
56
display = t.ObjectName
57
}
58
return fmt.Sprintf("s3 %s %s", action, display)
59
}
60
61
func (t *S3TransitionTask) GetStatus() string {
62
return t.status
63
}
64
65
func (t *S3TransitionTask) Run() error {
66
t.ReinitCtx()
67
t.ClearEndTime()
68
start := time.Now()
69
t.SetStartTime(start)
70
defer func() { t.SetEndTime(time.Now()) }()
71
72
if err := t.ensureStorage(); err != nil {
73
t.status = fmt.Sprintf("locate storage failed: %v", err)
74
return err
75
}
76
77
payload, err := t.decodePayload()
78
if err != nil {
79
t.status = fmt.Sprintf("decode payload failed: %v", err)
80
return err
81
}
82
83
method := strings.ToLower(strings.TrimSpace(t.Transition))
84
switch method {
85
case s3.OtherMethodArchive:
86
t.status = "submitting archive request"
87
t.SetProgress(0)
88
resp, err := op.Other(t.Ctx(), t.storage, model.FsOtherArgs{
89
Path: t.ObjectPath,
90
Method: s3.OtherMethodArchive,
91
Data: payload,
92
})
93
if err != nil {
94
t.status = fmt.Sprintf("archive request failed: %v", err)
95
return err
96
}
97
archiveResp, ok := toArchiveResponse(resp)
98
if ok {
99
if t.TargetStorageClass == "" {
100
t.TargetStorageClass = archiveResp.StorageClass
101
}
102
t.RequestID = archiveResp.RequestID
103
t.VersionID = archiveResp.VersionID
104
if archiveResp.StorageClass != "" {
105
t.status = fmt.Sprintf("archive requested, waiting for %s", archiveResp.StorageClass)
106
} else {
107
t.status = "archive requested"
108
}
109
} else if sc := t.extractTargetStorageClass(); sc != "" {
110
t.TargetStorageClass = sc
111
t.status = fmt.Sprintf("archive requested, waiting for %s", sc)
112
} else {
113
t.status = "archive requested"
114
}
115
if t.TargetStorageClass != "" {
116
t.TargetStorageClass = s3.NormalizeStorageClass(t.TargetStorageClass)
117
}
118
t.SetProgress(25)
119
return t.waitForArchive()
120
case s3.OtherMethodThaw:
121
t.status = "submitting thaw request"
122
t.SetProgress(0)
123
resp, err := op.Other(t.Ctx(), t.storage, model.FsOtherArgs{
124
Path: t.ObjectPath,
125
Method: s3.OtherMethodThaw,
126
Data: payload,
127
})
128
if err != nil {
129
t.status = fmt.Sprintf("thaw request failed: %v", err)
130
return err
131
}
132
thawResp, ok := toThawResponse(resp)
133
if ok {
134
t.RequestID = thawResp.RequestID
135
if thawResp.Status != nil && !thawResp.Status.Ongoing {
136
t.SetProgress(100)
137
t.status = thawCompletionMessage(thawResp.Status)
138
return nil
139
}
140
}
141
t.status = "thaw requested"
142
t.SetProgress(25)
143
return t.waitForThaw()
144
default:
145
return errors.Errorf("unsupported transition method: %s", t.Transition)
146
}
147
}
148
149
func (t *S3TransitionTask) ensureStorage() error {
150
if t.storage != nil {
151
return nil
152
}
153
storage, err := op.GetStorageByMountPath(t.StorageMountPath)
154
if err != nil {
155
return err
156
}
157
t.storage = storage
158
return nil
159
}
160
161
func (t *S3TransitionTask) decodePayload() (interface{}, error) {
162
if len(t.Payload) == 0 {
163
return nil, nil
164
}
165
var payload interface{}
166
if err := json.Unmarshal(t.Payload, &payload); err != nil {
167
return nil, err
168
}
169
return payload, nil
170
}
171
172
func (t *S3TransitionTask) extractTargetStorageClass() string {
173
if len(t.Payload) == 0 {
174
return ""
175
}
176
var req s3.ArchiveRequest
177
if err := json.Unmarshal(t.Payload, &req); err != nil {
178
return ""
179
}
180
return s3.NormalizeStorageClass(req.StorageClass)
181
}
182
183
func (t *S3TransitionTask) waitForArchive() error {
184
ticker := time.NewTicker(s3TransitionPollInterval)
185
defer ticker.Stop()
186
187
ctx := t.Ctx()
188
for {
189
select {
190
case <-ctx.Done():
191
t.status = "archive canceled"
192
return ctx.Err()
193
case <-ticker.C:
194
resp, err := op.Other(ctx, t.storage, model.FsOtherArgs{
195
Path: t.ObjectPath,
196
Method: s3.OtherMethodArchiveStatus,
197
})
198
if err != nil {
199
t.status = fmt.Sprintf("archive status error: %v", err)
200
return err
201
}
202
archiveResp, ok := toArchiveResponse(resp)
203
if !ok {
204
t.status = fmt.Sprintf("unexpected archive status response: %T", resp)
205
return errors.Errorf("unexpected archive status response: %T", resp)
206
}
207
currentClass := strings.TrimSpace(archiveResp.StorageClass)
208
target := strings.TrimSpace(t.TargetStorageClass)
209
if target == "" {
210
target = currentClass
211
t.TargetStorageClass = currentClass
212
}
213
if currentClass == "" {
214
t.status = "waiting for storage class update"
215
t.SetProgress(50)
216
continue
217
}
218
if strings.EqualFold(currentClass, target) {
219
t.SetProgress(100)
220
t.status = fmt.Sprintf("archive complete (%s)", currentClass)
221
return nil
222
}
223
t.status = fmt.Sprintf("storage class %s (target %s)", currentClass, target)
224
t.SetProgress(75)
225
}
226
}
227
}
228
229
func (t *S3TransitionTask) waitForThaw() error {
230
ticker := time.NewTicker(s3TransitionPollInterval)
231
defer ticker.Stop()
232
233
ctx := t.Ctx()
234
for {
235
select {
236
case <-ctx.Done():
237
t.status = "thaw canceled"
238
return ctx.Err()
239
case <-ticker.C:
240
resp, err := op.Other(ctx, t.storage, model.FsOtherArgs{
241
Path: t.ObjectPath,
242
Method: s3.OtherMethodThawStatus,
243
})
244
if err != nil {
245
t.status = fmt.Sprintf("thaw status error: %v", err)
246
return err
247
}
248
thawResp, ok := toThawResponse(resp)
249
if !ok {
250
t.status = fmt.Sprintf("unexpected thaw status response: %T", resp)
251
return errors.Errorf("unexpected thaw status response: %T", resp)
252
}
253
status := thawResp.Status
254
if status == nil {
255
t.status = "waiting for thaw status"
256
t.SetProgress(50)
257
continue
258
}
259
if status.Ongoing {
260
t.status = fmt.Sprintf("thaw in progress (%s)", status.Raw)
261
t.SetProgress(75)
262
continue
263
}
264
t.SetProgress(100)
265
t.status = thawCompletionMessage(status)
266
return nil
267
}
268
}
269
}
270
271
func thawCompletionMessage(status *s3.RestoreStatus) string {
272
if status == nil {
273
return "thaw complete"
274
}
275
if status.Expiry != "" {
276
return fmt.Sprintf("thaw complete, expires %s", status.Expiry)
277
}
278
return "thaw complete"
279
}
280
281
func toArchiveResponse(v interface{}) (s3.ArchiveResponse, bool) {
282
switch resp := v.(type) {
283
case s3.ArchiveResponse:
284
return resp, true
285
case *s3.ArchiveResponse:
286
if resp != nil {
287
return *resp, true
288
}
289
}
290
return s3.ArchiveResponse{}, false
291
}
292
293
func toThawResponse(v interface{}) (s3.ThawResponse, bool) {
294
switch resp := v.(type) {
295
case s3.ThawResponse:
296
return resp, true
297
case *s3.ThawResponse:
298
if resp != nil {
299
return *resp, true
300
}
301
}
302
return s3.ThawResponse{}, false
303
}
304
305
// Ensure compatibility with persistence when tasks are restored.
306
func (t *S3TransitionTask) OnRestore() {
307
// The storage handle is not persisted intentionally; it will be lazily
308
// re-fetched on the next Run invocation.
309
t.storage = nil
310
}
311
312