Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
freebsd
GitHub Repository: freebsd/freebsd-src
Path: blob/main/sys/contrib/openzfs/module/zfs/dmu_recv.c
48383 views
1
// SPDX-License-Identifier: CDDL-1.0
2
/*
3
* CDDL HEADER START
4
*
5
* The contents of this file are subject to the terms of the
6
* Common Development and Distribution License (the "License").
7
* You may not use this file except in compliance with the License.
8
*
9
* You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10
* or https://opensource.org/licenses/CDDL-1.0.
11
* See the License for the specific language governing permissions
12
* and limitations under the License.
13
*
14
* When distributing Covered Code, include this CDDL HEADER in each
15
* file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16
* If applicable, add the following below this CDDL HEADER, with the
17
* fields enclosed by brackets "[]" replaced with your own identifying
18
* information: Portions Copyright [yyyy] [name of copyright owner]
19
*
20
* CDDL HEADER END
21
*/
22
/*
23
* Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
24
* Copyright 2011 Nexenta Systems, Inc. All rights reserved.
25
* Copyright (c) 2011, 2020 by Delphix. All rights reserved.
26
* Copyright (c) 2014, Joyent, Inc. All rights reserved.
27
* Copyright 2014 HybridCluster. All rights reserved.
28
* Copyright (c) 2018, loli10K <[email protected]>. All rights reserved.
29
* Copyright (c) 2019, 2024, Klara, Inc.
30
* Copyright (c) 2019, Allan Jude
31
* Copyright (c) 2019 Datto Inc.
32
* Copyright (c) 2022 Axcient.
33
* Copyright (c) 2025, Rob Norris <[email protected]>
34
*/
35
36
#include <sys/arc.h>
37
#include <sys/spa_impl.h>
38
#include <sys/dmu.h>
39
#include <sys/dmu_impl.h>
40
#include <sys/dmu_send.h>
41
#include <sys/dmu_recv.h>
42
#include <sys/dmu_tx.h>
43
#include <sys/dbuf.h>
44
#include <sys/dnode.h>
45
#include <sys/zfs_context.h>
46
#include <sys/dmu_objset.h>
47
#include <sys/dmu_traverse.h>
48
#include <sys/dsl_dataset.h>
49
#include <sys/dsl_dir.h>
50
#include <sys/dsl_prop.h>
51
#include <sys/dsl_pool.h>
52
#include <sys/dsl_synctask.h>
53
#include <sys/zfs_ioctl.h>
54
#include <sys/zap.h>
55
#include <sys/zvol.h>
56
#include <sys/zio_checksum.h>
57
#include <sys/zfs_znode.h>
58
#include <zfs_fletcher.h>
59
#include <sys/avl.h>
60
#include <sys/ddt.h>
61
#include <sys/zfs_onexit.h>
62
#include <sys/dsl_destroy.h>
63
#include <sys/blkptr.h>
64
#include <sys/dsl_bookmark.h>
65
#include <sys/zfeature.h>
66
#include <sys/bqueue.h>
67
#include <sys/objlist.h>
68
#ifdef _KERNEL
69
#include <sys/zfs_vfsops.h>
70
#endif
71
#include <sys/zfs_file.h>
72
#include <sys/cred.h>
73
74
static uint_t zfs_recv_queue_length = SPA_MAXBLOCKSIZE;
75
static uint_t zfs_recv_queue_ff = 20;
76
static uint_t zfs_recv_write_batch_size = 1024 * 1024;
77
static int zfs_recv_best_effort_corrective = 0;
78
79
static const void *const dmu_recv_tag = "dmu_recv_tag";
80
const char *const recv_clone_name = "%recv";
81
82
typedef enum {
83
ORNS_NO,
84
ORNS_YES,
85
ORNS_MAYBE
86
} or_need_sync_t;
87
88
static int receive_read_payload_and_next_header(dmu_recv_cookie_t *ra, int len,
89
void *buf);
90
91
struct receive_record_arg {
92
dmu_replay_record_t header;
93
void *payload; /* Pointer to a buffer containing the payload */
94
/*
95
* If the record is a WRITE or SPILL, pointer to the abd containing the
96
* payload.
97
*/
98
abd_t *abd;
99
int payload_size;
100
uint64_t bytes_read; /* bytes read from stream when record created */
101
boolean_t eos_marker; /* Marks the end of the stream */
102
bqueue_node_t node;
103
};
104
105
struct receive_writer_arg {
106
objset_t *os;
107
boolean_t byteswap;
108
bqueue_t q;
109
110
/*
111
* These three members are used to signal to the main thread when
112
* we're done.
113
*/
114
kmutex_t mutex;
115
kcondvar_t cv;
116
boolean_t done;
117
118
int err;
119
const char *tofs;
120
boolean_t heal;
121
boolean_t resumable;
122
boolean_t raw; /* DMU_BACKUP_FEATURE_RAW set */
123
boolean_t spill; /* DRR_FLAG_SPILL_BLOCK set */
124
boolean_t full; /* this is a full send stream */
125
uint64_t last_object;
126
uint64_t last_offset;
127
uint64_t max_object; /* highest object ID referenced in stream */
128
uint64_t bytes_read; /* bytes read when current record created */
129
130
list_t write_batch;
131
132
/* Encryption parameters for the last received DRR_OBJECT_RANGE */
133
boolean_t or_crypt_params_present;
134
uint64_t or_firstobj;
135
uint64_t or_numslots;
136
uint8_t or_salt[ZIO_DATA_SALT_LEN];
137
uint8_t or_iv[ZIO_DATA_IV_LEN];
138
uint8_t or_mac[ZIO_DATA_MAC_LEN];
139
boolean_t or_byteorder;
140
zio_t *heal_pio;
141
142
/* Keep track of DRR_FREEOBJECTS right after DRR_OBJECT_RANGE */
143
or_need_sync_t or_need_sync;
144
};
145
146
typedef struct dmu_recv_begin_arg {
147
const char *drba_origin;
148
dmu_recv_cookie_t *drba_cookie;
149
cred_t *drba_cred;
150
dsl_crypto_params_t *drba_dcp;
151
} dmu_recv_begin_arg_t;
152
153
static void
154
byteswap_record(dmu_replay_record_t *drr)
155
{
156
#define DO64(X) (drr->drr_u.X = BSWAP_64(drr->drr_u.X))
157
#define DO32(X) (drr->drr_u.X = BSWAP_32(drr->drr_u.X))
158
drr->drr_type = BSWAP_32(drr->drr_type);
159
drr->drr_payloadlen = BSWAP_32(drr->drr_payloadlen);
160
161
switch (drr->drr_type) {
162
case DRR_BEGIN:
163
DO64(drr_begin.drr_magic);
164
DO64(drr_begin.drr_versioninfo);
165
DO64(drr_begin.drr_creation_time);
166
DO32(drr_begin.drr_type);
167
DO32(drr_begin.drr_flags);
168
DO64(drr_begin.drr_toguid);
169
DO64(drr_begin.drr_fromguid);
170
break;
171
case DRR_OBJECT:
172
DO64(drr_object.drr_object);
173
DO32(drr_object.drr_type);
174
DO32(drr_object.drr_bonustype);
175
DO32(drr_object.drr_blksz);
176
DO32(drr_object.drr_bonuslen);
177
DO32(drr_object.drr_raw_bonuslen);
178
DO64(drr_object.drr_toguid);
179
DO64(drr_object.drr_maxblkid);
180
break;
181
case DRR_FREEOBJECTS:
182
DO64(drr_freeobjects.drr_firstobj);
183
DO64(drr_freeobjects.drr_numobjs);
184
DO64(drr_freeobjects.drr_toguid);
185
break;
186
case DRR_WRITE:
187
DO64(drr_write.drr_object);
188
DO32(drr_write.drr_type);
189
DO64(drr_write.drr_offset);
190
DO64(drr_write.drr_logical_size);
191
DO64(drr_write.drr_toguid);
192
ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_write.drr_key.ddk_cksum);
193
DO64(drr_write.drr_key.ddk_prop);
194
DO64(drr_write.drr_compressed_size);
195
break;
196
case DRR_WRITE_EMBEDDED:
197
DO64(drr_write_embedded.drr_object);
198
DO64(drr_write_embedded.drr_offset);
199
DO64(drr_write_embedded.drr_length);
200
DO64(drr_write_embedded.drr_toguid);
201
DO32(drr_write_embedded.drr_lsize);
202
DO32(drr_write_embedded.drr_psize);
203
break;
204
case DRR_FREE:
205
DO64(drr_free.drr_object);
206
DO64(drr_free.drr_offset);
207
DO64(drr_free.drr_length);
208
DO64(drr_free.drr_toguid);
209
break;
210
case DRR_SPILL:
211
DO64(drr_spill.drr_object);
212
DO64(drr_spill.drr_length);
213
DO64(drr_spill.drr_toguid);
214
DO64(drr_spill.drr_compressed_size);
215
DO32(drr_spill.drr_type);
216
break;
217
case DRR_OBJECT_RANGE:
218
DO64(drr_object_range.drr_firstobj);
219
DO64(drr_object_range.drr_numslots);
220
DO64(drr_object_range.drr_toguid);
221
break;
222
case DRR_REDACT:
223
DO64(drr_redact.drr_object);
224
DO64(drr_redact.drr_offset);
225
DO64(drr_redact.drr_length);
226
DO64(drr_redact.drr_toguid);
227
break;
228
case DRR_END:
229
DO64(drr_end.drr_toguid);
230
ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_end.drr_checksum);
231
break;
232
default:
233
break;
234
}
235
236
if (drr->drr_type != DRR_BEGIN) {
237
ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_checksum.drr_checksum);
238
}
239
240
#undef DO64
241
#undef DO32
242
}
243
244
static boolean_t
245
redact_snaps_contains(uint64_t *snaps, uint64_t num_snaps, uint64_t guid)
246
{
247
for (int i = 0; i < num_snaps; i++) {
248
if (snaps[i] == guid)
249
return (B_TRUE);
250
}
251
return (B_FALSE);
252
}
253
254
/*
255
* Check that the new stream we're trying to receive is redacted with respect to
256
* a subset of the snapshots that the origin was redacted with respect to. For
257
* the reasons behind this, see the man page on redacted zfs sends and receives.
258
*/
259
static boolean_t
260
compatible_redact_snaps(uint64_t *origin_snaps, uint64_t origin_num_snaps,
261
uint64_t *redact_snaps, uint64_t num_redact_snaps)
262
{
263
/*
264
* Short circuit the comparison; if we are redacted with respect to
265
* more snapshots than the origin, we can't be redacted with respect
266
* to a subset.
267
*/
268
if (num_redact_snaps > origin_num_snaps) {
269
return (B_FALSE);
270
}
271
272
for (int i = 0; i < num_redact_snaps; i++) {
273
if (!redact_snaps_contains(origin_snaps, origin_num_snaps,
274
redact_snaps[i])) {
275
return (B_FALSE);
276
}
277
}
278
return (B_TRUE);
279
}
280
281
static boolean_t
282
redact_check(dmu_recv_begin_arg_t *drba, dsl_dataset_t *origin)
283
{
284
uint64_t *origin_snaps;
285
uint64_t origin_num_snaps;
286
dmu_recv_cookie_t *drc = drba->drba_cookie;
287
struct drr_begin *drrb = drc->drc_drrb;
288
int featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo);
289
int err = 0;
290
boolean_t ret = B_TRUE;
291
uint64_t *redact_snaps;
292
uint_t numredactsnaps;
293
294
/*
295
* If this is a full send stream, we're safe no matter what.
296
*/
297
if (drrb->drr_fromguid == 0)
298
return (ret);
299
300
VERIFY(dsl_dataset_get_uint64_array_feature(origin,
301
SPA_FEATURE_REDACTED_DATASETS, &origin_num_snaps, &origin_snaps));
302
303
if (nvlist_lookup_uint64_array(drc->drc_begin_nvl,
304
BEGINNV_REDACT_FROM_SNAPS, &redact_snaps, &numredactsnaps) ==
305
0) {
306
/*
307
* If the send stream was sent from the redaction bookmark or
308
* the redacted version of the dataset, then we're safe. Verify
309
* that this is from the a compatible redaction bookmark or
310
* redacted dataset.
311
*/
312
if (!compatible_redact_snaps(origin_snaps, origin_num_snaps,
313
redact_snaps, numredactsnaps)) {
314
err = EINVAL;
315
}
316
} else if (featureflags & DMU_BACKUP_FEATURE_REDACTED) {
317
/*
318
* If the stream is redacted, it must be redacted with respect
319
* to a subset of what the origin is redacted with respect to.
320
* See case number 2 in the zfs man page section on redacted zfs
321
* send.
322
*/
323
err = nvlist_lookup_uint64_array(drc->drc_begin_nvl,
324
BEGINNV_REDACT_SNAPS, &redact_snaps, &numredactsnaps);
325
326
if (err != 0 || !compatible_redact_snaps(origin_snaps,
327
origin_num_snaps, redact_snaps, numredactsnaps)) {
328
err = EINVAL;
329
}
330
} else if (!redact_snaps_contains(origin_snaps, origin_num_snaps,
331
drrb->drr_toguid)) {
332
/*
333
* If the stream isn't redacted but the origin is, this must be
334
* one of the snapshots the origin is redacted with respect to.
335
* See case number 1 in the zfs man page section on redacted zfs
336
* send.
337
*/
338
err = EINVAL;
339
}
340
341
if (err != 0)
342
ret = B_FALSE;
343
return (ret);
344
}
345
346
/*
347
* If we previously received a stream with --large-block, we don't support
348
* receiving an incremental on top of it without --large-block. This avoids
349
* forcing a read-modify-write or trying to re-aggregate a string of WRITE
350
* records.
351
*/
352
static int
353
recv_check_large_blocks(dsl_dataset_t *ds, uint64_t featureflags)
354
{
355
if (dsl_dataset_feature_is_active(ds, SPA_FEATURE_LARGE_BLOCKS) &&
356
!(featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS))
357
return (SET_ERROR(ZFS_ERR_STREAM_LARGE_BLOCK_MISMATCH));
358
return (0);
359
}
360
361
static int
362
recv_begin_check_existing_impl(dmu_recv_begin_arg_t *drba, dsl_dataset_t *ds,
363
uint64_t fromguid, uint64_t featureflags)
364
{
365
uint64_t obj;
366
uint64_t children;
367
int error;
368
dsl_dataset_t *snap;
369
dsl_pool_t *dp = ds->ds_dir->dd_pool;
370
boolean_t encrypted = ds->ds_dir->dd_crypto_obj != 0;
371
boolean_t raw = (featureflags & DMU_BACKUP_FEATURE_RAW) != 0;
372
boolean_t embed = (featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) != 0;
373
374
/* Temporary clone name must not exist. */
375
error = zap_lookup(dp->dp_meta_objset,
376
dsl_dir_phys(ds->ds_dir)->dd_child_dir_zapobj, recv_clone_name,
377
8, 1, &obj);
378
if (error != ENOENT)
379
return (error == 0 ? SET_ERROR(EBUSY) : error);
380
381
/* Resume state must not be set. */
382
if (dsl_dataset_has_resume_receive_state(ds))
383
return (SET_ERROR(EBUSY));
384
385
/* New snapshot name must not exist if we're not healing it. */
386
error = zap_lookup(dp->dp_meta_objset,
387
dsl_dataset_phys(ds)->ds_snapnames_zapobj,
388
drba->drba_cookie->drc_tosnap, 8, 1, &obj);
389
if (drba->drba_cookie->drc_heal) {
390
if (error != 0)
391
return (error);
392
} else if (error != ENOENT) {
393
return (error == 0 ? SET_ERROR(EEXIST) : error);
394
}
395
396
/* Must not have children if receiving a ZVOL. */
397
error = zap_count(dp->dp_meta_objset,
398
dsl_dir_phys(ds->ds_dir)->dd_child_dir_zapobj, &children);
399
if (error != 0)
400
return (error);
401
if (drba->drba_cookie->drc_drrb->drr_type != DMU_OST_ZFS &&
402
children > 0)
403
return (SET_ERROR(ZFS_ERR_WRONG_PARENT));
404
405
/*
406
* Check snapshot limit before receiving. We'll recheck again at the
407
* end, but might as well abort before receiving if we're already over
408
* the limit.
409
*
410
* Note that we do not check the file system limit with
411
* dsl_dir_fscount_check because the temporary %clones don't count
412
* against that limit.
413
*/
414
error = dsl_fs_ss_limit_check(ds->ds_dir, 1, ZFS_PROP_SNAPSHOT_LIMIT,
415
NULL, drba->drba_cred);
416
if (error != 0)
417
return (error);
418
419
if (drba->drba_cookie->drc_heal) {
420
/* Encryption is incompatible with embedded data. */
421
if (encrypted && embed)
422
return (SET_ERROR(EINVAL));
423
424
/* Healing is not supported when in 'force' mode. */
425
if (drba->drba_cookie->drc_force)
426
return (SET_ERROR(EINVAL));
427
428
/* Must have keys loaded if doing encrypted non-raw recv. */
429
if (encrypted && !raw) {
430
if (spa_keystore_lookup_key(dp->dp_spa, ds->ds_object,
431
NULL, NULL) != 0)
432
return (SET_ERROR(EACCES));
433
}
434
435
error = dsl_dataset_hold_obj(dp, obj, FTAG, &snap);
436
if (error != 0)
437
return (error);
438
439
/*
440
* When not doing best effort corrective recv healing can only
441
* be done if the send stream is for the same snapshot as the
442
* one we are trying to heal.
443
*/
444
if (zfs_recv_best_effort_corrective == 0 &&
445
drba->drba_cookie->drc_drrb->drr_toguid !=
446
dsl_dataset_phys(snap)->ds_guid) {
447
dsl_dataset_rele(snap, FTAG);
448
return (SET_ERROR(ENOTSUP));
449
}
450
dsl_dataset_rele(snap, FTAG);
451
} else if (fromguid != 0) {
452
/* Sanity check the incremental recv */
453
uint64_t obj = dsl_dataset_phys(ds)->ds_prev_snap_obj;
454
455
/* Can't perform a raw receive on top of a non-raw receive */
456
if (!encrypted && raw)
457
return (SET_ERROR(EINVAL));
458
459
/* Encryption is incompatible with embedded data */
460
if (encrypted && embed)
461
return (SET_ERROR(EINVAL));
462
463
/* Find snapshot in this dir that matches fromguid. */
464
while (obj != 0) {
465
error = dsl_dataset_hold_obj(dp, obj, FTAG,
466
&snap);
467
if (error != 0)
468
return (SET_ERROR(ENODEV));
469
if (snap->ds_dir != ds->ds_dir) {
470
dsl_dataset_rele(snap, FTAG);
471
return (SET_ERROR(ENODEV));
472
}
473
if (dsl_dataset_phys(snap)->ds_guid == fromguid)
474
break;
475
obj = dsl_dataset_phys(snap)->ds_prev_snap_obj;
476
dsl_dataset_rele(snap, FTAG);
477
}
478
if (obj == 0)
479
return (SET_ERROR(ENODEV));
480
481
if (drba->drba_cookie->drc_force) {
482
drba->drba_cookie->drc_fromsnapobj = obj;
483
} else {
484
/*
485
* If we are not forcing, there must be no
486
* changes since fromsnap. Raw sends have an
487
* additional constraint that requires that
488
* no "noop" snapshots exist between fromsnap
489
* and tosnap for the IVset checking code to
490
* work properly.
491
*/
492
if (dsl_dataset_modified_since_snap(ds, snap) ||
493
(raw &&
494
dsl_dataset_phys(ds)->ds_prev_snap_obj !=
495
snap->ds_object)) {
496
dsl_dataset_rele(snap, FTAG);
497
return (SET_ERROR(ETXTBSY));
498
}
499
drba->drba_cookie->drc_fromsnapobj =
500
ds->ds_prev->ds_object;
501
}
502
503
if (dsl_dataset_feature_is_active(snap,
504
SPA_FEATURE_REDACTED_DATASETS) && !redact_check(drba,
505
snap)) {
506
dsl_dataset_rele(snap, FTAG);
507
return (SET_ERROR(EINVAL));
508
}
509
510
error = recv_check_large_blocks(snap, featureflags);
511
if (error != 0) {
512
dsl_dataset_rele(snap, FTAG);
513
return (error);
514
}
515
516
dsl_dataset_rele(snap, FTAG);
517
} else {
518
/* If full and not healing then must be forced. */
519
if (!drba->drba_cookie->drc_force)
520
return (SET_ERROR(EEXIST));
521
522
/*
523
* We don't support using zfs recv -F to blow away
524
* encrypted filesystems. This would require the
525
* dsl dir to point to the old encryption key and
526
* the new one at the same time during the receive.
527
*/
528
if ((!encrypted && raw) || encrypted)
529
return (SET_ERROR(EINVAL));
530
531
/*
532
* Perform the same encryption checks we would if
533
* we were creating a new dataset from scratch.
534
*/
535
if (!raw) {
536
boolean_t will_encrypt;
537
538
error = dmu_objset_create_crypt_check(
539
ds->ds_dir->dd_parent, drba->drba_dcp,
540
&will_encrypt);
541
if (error != 0)
542
return (error);
543
544
if (will_encrypt && embed)
545
return (SET_ERROR(EINVAL));
546
}
547
}
548
549
return (0);
550
}
551
552
/*
553
* Check that any feature flags used in the data stream we're receiving are
554
* supported by the pool we are receiving into.
555
*
556
* Note that some of the features we explicitly check here have additional
557
* (implicit) features they depend on, but those dependencies are enforced
558
* through the zfeature_register() calls declaring the features that we
559
* explicitly check.
560
*/
561
static int
562
recv_begin_check_feature_flags_impl(uint64_t featureflags, spa_t *spa)
563
{
564
/*
565
* Check if there are any unsupported feature flags.
566
*/
567
if (!DMU_STREAM_SUPPORTED(featureflags)) {
568
return (SET_ERROR(ZFS_ERR_UNKNOWN_SEND_STREAM_FEATURE));
569
}
570
571
/* Verify pool version supports SA if SA_SPILL feature set */
572
if ((featureflags & DMU_BACKUP_FEATURE_SA_SPILL) &&
573
spa_version(spa) < SPA_VERSION_SA)
574
return (SET_ERROR(ENOTSUP));
575
576
/*
577
* LZ4 compressed, ZSTD compressed, embedded, mooched, large blocks,
578
* and large_dnodes in the stream can only be used if those pool
579
* features are enabled because we don't attempt to decompress /
580
* un-embed / un-mooch / split up the blocks / dnodes during the
581
* receive process.
582
*/
583
if ((featureflags & DMU_BACKUP_FEATURE_LZ4) &&
584
!spa_feature_is_enabled(spa, SPA_FEATURE_LZ4_COMPRESS))
585
return (SET_ERROR(ENOTSUP));
586
if ((featureflags & DMU_BACKUP_FEATURE_ZSTD) &&
587
!spa_feature_is_enabled(spa, SPA_FEATURE_ZSTD_COMPRESS))
588
return (SET_ERROR(ENOTSUP));
589
if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) &&
590
!spa_feature_is_enabled(spa, SPA_FEATURE_EMBEDDED_DATA))
591
return (SET_ERROR(ENOTSUP));
592
if ((featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) &&
593
!spa_feature_is_enabled(spa, SPA_FEATURE_LARGE_BLOCKS))
594
return (SET_ERROR(ENOTSUP));
595
if ((featureflags & DMU_BACKUP_FEATURE_LARGE_DNODE) &&
596
!spa_feature_is_enabled(spa, SPA_FEATURE_LARGE_DNODE))
597
return (SET_ERROR(ENOTSUP));
598
if ((featureflags & DMU_BACKUP_FEATURE_LARGE_MICROZAP) &&
599
!spa_feature_is_enabled(spa, SPA_FEATURE_LARGE_MICROZAP))
600
return (SET_ERROR(ENOTSUP));
601
602
/*
603
* Receiving redacted streams requires that redacted datasets are
604
* enabled.
605
*/
606
if ((featureflags & DMU_BACKUP_FEATURE_REDACTED) &&
607
!spa_feature_is_enabled(spa, SPA_FEATURE_REDACTED_DATASETS))
608
return (SET_ERROR(ENOTSUP));
609
610
/*
611
* If the LONGNAME is not enabled on the target, fail that request.
612
*/
613
if ((featureflags & DMU_BACKUP_FEATURE_LONGNAME) &&
614
!spa_feature_is_enabled(spa, SPA_FEATURE_LONGNAME))
615
return (SET_ERROR(ENOTSUP));
616
617
return (0);
618
}
619
620
static int
621
dmu_recv_begin_check(void *arg, dmu_tx_t *tx)
622
{
623
dmu_recv_begin_arg_t *drba = arg;
624
dsl_pool_t *dp = dmu_tx_pool(tx);
625
struct drr_begin *drrb = drba->drba_cookie->drc_drrb;
626
uint64_t fromguid = drrb->drr_fromguid;
627
int flags = drrb->drr_flags;
628
ds_hold_flags_t dsflags = DS_HOLD_FLAG_NONE;
629
int error;
630
uint64_t featureflags = drba->drba_cookie->drc_featureflags;
631
dsl_dataset_t *ds;
632
const char *tofs = drba->drba_cookie->drc_tofs;
633
634
/* already checked */
635
ASSERT3U(drrb->drr_magic, ==, DMU_BACKUP_MAGIC);
636
ASSERT(!(featureflags & DMU_BACKUP_FEATURE_RESUMING));
637
638
if (DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo) ==
639
DMU_COMPOUNDSTREAM ||
640
drrb->drr_type >= DMU_OST_NUMTYPES ||
641
((flags & DRR_FLAG_CLONE) && drba->drba_origin == NULL))
642
return (SET_ERROR(EINVAL));
643
644
error = recv_begin_check_feature_flags_impl(featureflags, dp->dp_spa);
645
if (error != 0)
646
return (error);
647
648
/* Resumable receives require extensible datasets */
649
if (drba->drba_cookie->drc_resumable &&
650
!spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EXTENSIBLE_DATASET))
651
return (SET_ERROR(ENOTSUP));
652
653
if (featureflags & DMU_BACKUP_FEATURE_RAW) {
654
/* raw receives require the encryption feature */
655
if (!spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_ENCRYPTION))
656
return (SET_ERROR(ENOTSUP));
657
658
/* embedded data is incompatible with encryption and raw recv */
659
if (featureflags & DMU_BACKUP_FEATURE_EMBED_DATA)
660
return (SET_ERROR(EINVAL));
661
662
/* raw receives require spill block allocation flag */
663
if (!(flags & DRR_FLAG_SPILL_BLOCK))
664
return (SET_ERROR(ZFS_ERR_SPILL_BLOCK_FLAG_MISSING));
665
} else {
666
/*
667
* We support unencrypted datasets below encrypted ones now,
668
* so add the DS_HOLD_FLAG_DECRYPT flag only if we are dealing
669
* with a dataset we may encrypt.
670
*/
671
if (drba->drba_dcp == NULL ||
672
drba->drba_dcp->cp_crypt != ZIO_CRYPT_OFF) {
673
dsflags |= DS_HOLD_FLAG_DECRYPT;
674
}
675
}
676
677
error = dsl_dataset_hold_flags(dp, tofs, dsflags, FTAG, &ds);
678
if (error == 0) {
679
/* target fs already exists; recv into temp clone */
680
681
/* Can't recv a clone into an existing fs */
682
if (flags & DRR_FLAG_CLONE || drba->drba_origin) {
683
dsl_dataset_rele_flags(ds, dsflags, FTAG);
684
return (SET_ERROR(EINVAL));
685
}
686
687
error = recv_begin_check_existing_impl(drba, ds, fromguid,
688
featureflags);
689
dsl_dataset_rele_flags(ds, dsflags, FTAG);
690
} else if (error == ENOENT) {
691
/* target fs does not exist; must be a full backup or clone */
692
char buf[ZFS_MAX_DATASET_NAME_LEN];
693
objset_t *os;
694
695
/* healing recv must be done "into" an existing snapshot */
696
if (drba->drba_cookie->drc_heal == B_TRUE)
697
return (SET_ERROR(ENOTSUP));
698
699
/*
700
* If it's a non-clone incremental, we are missing the
701
* target fs, so fail the recv.
702
*/
703
if (fromguid != 0 && !((flags & DRR_FLAG_CLONE) ||
704
drba->drba_origin))
705
return (SET_ERROR(ENOENT));
706
707
/*
708
* If we're receiving a full send as a clone, and it doesn't
709
* contain all the necessary free records and freeobject
710
* records, reject it.
711
*/
712
if (fromguid == 0 && drba->drba_origin != NULL &&
713
!(flags & DRR_FLAG_FREERECORDS))
714
return (SET_ERROR(EINVAL));
715
716
/* Open the parent of tofs */
717
ASSERT3U(strlen(tofs), <, sizeof (buf));
718
(void) strlcpy(buf, tofs, strrchr(tofs, '/') - tofs + 1);
719
error = dsl_dataset_hold(dp, buf, FTAG, &ds);
720
if (error != 0)
721
return (error);
722
723
if ((featureflags & DMU_BACKUP_FEATURE_RAW) == 0 &&
724
drba->drba_origin == NULL) {
725
boolean_t will_encrypt;
726
727
/*
728
* Check that we aren't breaking any encryption rules
729
* and that we have all the parameters we need to
730
* create an encrypted dataset if necessary. If we are
731
* making an encrypted dataset the stream can't have
732
* embedded data.
733
*/
734
error = dmu_objset_create_crypt_check(ds->ds_dir,
735
drba->drba_dcp, &will_encrypt);
736
if (error != 0) {
737
dsl_dataset_rele(ds, FTAG);
738
return (error);
739
}
740
741
if (will_encrypt &&
742
(featureflags & DMU_BACKUP_FEATURE_EMBED_DATA)) {
743
dsl_dataset_rele(ds, FTAG);
744
return (SET_ERROR(EINVAL));
745
}
746
}
747
748
/*
749
* Check filesystem and snapshot limits before receiving. We'll
750
* recheck snapshot limits again at the end (we create the
751
* filesystems and increment those counts during begin_sync).
752
*/
753
error = dsl_fs_ss_limit_check(ds->ds_dir, 1,
754
ZFS_PROP_FILESYSTEM_LIMIT, NULL, drba->drba_cred);
755
if (error != 0) {
756
dsl_dataset_rele(ds, FTAG);
757
return (error);
758
}
759
760
error = dsl_fs_ss_limit_check(ds->ds_dir, 1,
761
ZFS_PROP_SNAPSHOT_LIMIT, NULL, drba->drba_cred);
762
if (error != 0) {
763
dsl_dataset_rele(ds, FTAG);
764
return (error);
765
}
766
767
/* can't recv below anything but filesystems (eg. no ZVOLs) */
768
error = dmu_objset_from_ds(ds, &os);
769
if (error != 0) {
770
dsl_dataset_rele(ds, FTAG);
771
return (error);
772
}
773
if (dmu_objset_type(os) != DMU_OST_ZFS) {
774
dsl_dataset_rele(ds, FTAG);
775
return (SET_ERROR(ZFS_ERR_WRONG_PARENT));
776
}
777
778
if (drba->drba_origin != NULL) {
779
dsl_dataset_t *origin;
780
error = dsl_dataset_hold_flags(dp, drba->drba_origin,
781
dsflags, FTAG, &origin);
782
if (error != 0) {
783
dsl_dataset_rele(ds, FTAG);
784
return (error);
785
}
786
if (!origin->ds_is_snapshot) {
787
dsl_dataset_rele_flags(origin, dsflags, FTAG);
788
dsl_dataset_rele(ds, FTAG);
789
return (SET_ERROR(EINVAL));
790
}
791
if (dsl_dataset_phys(origin)->ds_guid != fromguid &&
792
fromguid != 0) {
793
dsl_dataset_rele_flags(origin, dsflags, FTAG);
794
dsl_dataset_rele(ds, FTAG);
795
return (SET_ERROR(ENODEV));
796
}
797
798
if (origin->ds_dir->dd_crypto_obj != 0 &&
799
(featureflags & DMU_BACKUP_FEATURE_EMBED_DATA)) {
800
dsl_dataset_rele_flags(origin, dsflags, FTAG);
801
dsl_dataset_rele(ds, FTAG);
802
return (SET_ERROR(EINVAL));
803
}
804
805
/*
806
* If the origin is redacted we need to verify that this
807
* send stream can safely be received on top of the
808
* origin.
809
*/
810
if (dsl_dataset_feature_is_active(origin,
811
SPA_FEATURE_REDACTED_DATASETS)) {
812
if (!redact_check(drba, origin)) {
813
dsl_dataset_rele_flags(origin, dsflags,
814
FTAG);
815
dsl_dataset_rele_flags(ds, dsflags,
816
FTAG);
817
return (SET_ERROR(EINVAL));
818
}
819
}
820
821
error = recv_check_large_blocks(ds, featureflags);
822
if (error != 0) {
823
dsl_dataset_rele_flags(origin, dsflags, FTAG);
824
dsl_dataset_rele_flags(ds, dsflags, FTAG);
825
return (error);
826
}
827
828
dsl_dataset_rele_flags(origin, dsflags, FTAG);
829
}
830
831
dsl_dataset_rele(ds, FTAG);
832
error = 0;
833
}
834
return (error);
835
}
836
837
static void
838
dmu_recv_begin_sync(void *arg, dmu_tx_t *tx)
839
{
840
dmu_recv_begin_arg_t *drba = arg;
841
dsl_pool_t *dp = dmu_tx_pool(tx);
842
objset_t *mos = dp->dp_meta_objset;
843
dmu_recv_cookie_t *drc = drba->drba_cookie;
844
struct drr_begin *drrb = drc->drc_drrb;
845
const char *tofs = drc->drc_tofs;
846
uint64_t featureflags = drc->drc_featureflags;
847
dsl_dataset_t *ds, *newds;
848
objset_t *os;
849
uint64_t dsobj;
850
ds_hold_flags_t dsflags = DS_HOLD_FLAG_NONE;
851
int error;
852
uint64_t crflags = 0;
853
dsl_crypto_params_t dummy_dcp = { 0 };
854
dsl_crypto_params_t *dcp = drba->drba_dcp;
855
856
if (drrb->drr_flags & DRR_FLAG_CI_DATA)
857
crflags |= DS_FLAG_CI_DATASET;
858
859
if ((featureflags & DMU_BACKUP_FEATURE_RAW) == 0)
860
dsflags |= DS_HOLD_FLAG_DECRYPT;
861
862
/*
863
* Raw, non-incremental recvs always use a dummy dcp with
864
* the raw cmd set. Raw incremental recvs do not use a dcp
865
* since the encryption parameters are already set in stone.
866
*/
867
if (dcp == NULL && drrb->drr_fromguid == 0 &&
868
drba->drba_origin == NULL) {
869
ASSERT0P(dcp);
870
dcp = &dummy_dcp;
871
872
if (featureflags & DMU_BACKUP_FEATURE_RAW)
873
dcp->cp_cmd = DCP_CMD_RAW_RECV;
874
}
875
876
error = dsl_dataset_hold_flags(dp, tofs, dsflags, FTAG, &ds);
877
if (error == 0) {
878
/* Create temporary clone unless we're doing corrective recv */
879
dsl_dataset_t *snap = NULL;
880
881
if (drba->drba_cookie->drc_fromsnapobj != 0) {
882
VERIFY0(dsl_dataset_hold_obj(dp,
883
drba->drba_cookie->drc_fromsnapobj, FTAG, &snap));
884
ASSERT0P(dcp);
885
}
886
if (drc->drc_heal) {
887
/* When healing we want to use the provided snapshot */
888
VERIFY0(dsl_dataset_snap_lookup(ds, drc->drc_tosnap,
889
&dsobj));
890
} else {
891
dsobj = dsl_dataset_create_sync(ds->ds_dir,
892
recv_clone_name, snap, crflags, drba->drba_cred,
893
dcp, tx);
894
}
895
if (drba->drba_cookie->drc_fromsnapobj != 0)
896
dsl_dataset_rele(snap, FTAG);
897
dsl_dataset_rele_flags(ds, dsflags, FTAG);
898
} else {
899
dsl_dir_t *dd;
900
const char *tail;
901
dsl_dataset_t *origin = NULL;
902
903
VERIFY0(dsl_dir_hold(dp, tofs, FTAG, &dd, &tail));
904
905
if (drba->drba_origin != NULL) {
906
VERIFY0(dsl_dataset_hold(dp, drba->drba_origin,
907
FTAG, &origin));
908
ASSERT0P(dcp);
909
}
910
911
/* Create new dataset. */
912
dsobj = dsl_dataset_create_sync(dd, strrchr(tofs, '/') + 1,
913
origin, crflags, drba->drba_cred, dcp, tx);
914
if (origin != NULL)
915
dsl_dataset_rele(origin, FTAG);
916
dsl_dir_rele(dd, FTAG);
917
drc->drc_newfs = B_TRUE;
918
}
919
VERIFY0(dsl_dataset_own_obj_force(dp, dsobj, dsflags, dmu_recv_tag,
920
&newds));
921
if (dsl_dataset_feature_is_active(newds,
922
SPA_FEATURE_REDACTED_DATASETS)) {
923
/*
924
* If the origin dataset is redacted, the child will be redacted
925
* when we create it. We clear the new dataset's
926
* redaction info; if it should be redacted, we'll fill
927
* in its information later.
928
*/
929
dsl_dataset_deactivate_feature(newds,
930
SPA_FEATURE_REDACTED_DATASETS, tx);
931
}
932
VERIFY0(dmu_objset_from_ds(newds, &os));
933
934
if (drc->drc_resumable) {
935
dsl_dataset_zapify(newds, tx);
936
if (drrb->drr_fromguid != 0) {
937
VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_FROMGUID,
938
8, 1, &drrb->drr_fromguid, tx));
939
}
940
VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_TOGUID,
941
8, 1, &drrb->drr_toguid, tx));
942
VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_TONAME,
943
1, strlen(drrb->drr_toname) + 1, drrb->drr_toname, tx));
944
uint64_t one = 1;
945
uint64_t zero = 0;
946
VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_OBJECT,
947
8, 1, &one, tx));
948
VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_OFFSET,
949
8, 1, &zero, tx));
950
VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_BYTES,
951
8, 1, &zero, tx));
952
if (featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) {
953
VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_LARGEBLOCK,
954
8, 1, &one, tx));
955
}
956
if (featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) {
957
VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_EMBEDOK,
958
8, 1, &one, tx));
959
}
960
if (featureflags & DMU_BACKUP_FEATURE_COMPRESSED) {
961
VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_COMPRESSOK,
962
8, 1, &one, tx));
963
}
964
if (featureflags & DMU_BACKUP_FEATURE_RAW) {
965
VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_RAWOK,
966
8, 1, &one, tx));
967
}
968
969
uint64_t *redact_snaps;
970
uint_t numredactsnaps;
971
if (nvlist_lookup_uint64_array(drc->drc_begin_nvl,
972
BEGINNV_REDACT_FROM_SNAPS, &redact_snaps,
973
&numredactsnaps) == 0) {
974
VERIFY0(zap_add(mos, dsobj,
975
DS_FIELD_RESUME_REDACT_BOOKMARK_SNAPS,
976
sizeof (*redact_snaps), numredactsnaps,
977
redact_snaps, tx));
978
}
979
}
980
981
/*
982
* Usually the os->os_encrypted value is tied to the presence of a
983
* DSL Crypto Key object in the dd. However, that will not be received
984
* until dmu_recv_stream(), so we set the value manually for now.
985
*/
986
if (featureflags & DMU_BACKUP_FEATURE_RAW) {
987
os->os_encrypted = B_TRUE;
988
drba->drba_cookie->drc_raw = B_TRUE;
989
}
990
991
if (featureflags & DMU_BACKUP_FEATURE_REDACTED) {
992
uint64_t *redact_snaps;
993
uint_t numredactsnaps;
994
VERIFY0(nvlist_lookup_uint64_array(drc->drc_begin_nvl,
995
BEGINNV_REDACT_SNAPS, &redact_snaps, &numredactsnaps));
996
dsl_dataset_activate_redaction(newds, redact_snaps,
997
numredactsnaps, tx);
998
}
999
1000
if (featureflags & DMU_BACKUP_FEATURE_LARGE_MICROZAP) {
1001
/*
1002
* The source has seen a large microzap at least once in its
1003
* life, so we activate the feature here to match. It's not
1004
* strictly necessary since a large microzap is usable without
1005
* the feature active, but if that object is sent on from here,
1006
* we need this info to know to add the stream feature.
1007
*
1008
* There may be no large microzap in the incoming stream, or
1009
* ever again, but this is a very niche feature and its very
1010
* difficult to spot a large microzap in the stream, so its
1011
* not worth the effort of trying harder to activate the
1012
* feature at first use.
1013
*/
1014
dsl_dataset_activate_feature(dsobj, SPA_FEATURE_LARGE_MICROZAP,
1015
(void *)B_TRUE, tx);
1016
}
1017
1018
dmu_buf_will_dirty(newds->ds_dbuf, tx);
1019
dsl_dataset_phys(newds)->ds_flags |= DS_FLAG_INCONSISTENT;
1020
1021
/*
1022
* Activate longname feature if received
1023
*/
1024
if (featureflags & DMU_BACKUP_FEATURE_LONGNAME &&
1025
!dsl_dataset_feature_is_active(newds, SPA_FEATURE_LONGNAME)) {
1026
dsl_dataset_activate_feature(newds->ds_object,
1027
SPA_FEATURE_LONGNAME, (void *)B_TRUE, tx);
1028
newds->ds_feature[SPA_FEATURE_LONGNAME] = (void *)B_TRUE;
1029
}
1030
1031
/*
1032
* If we actually created a non-clone, we need to create the objset
1033
* in our new dataset. If this is a raw send we postpone this until
1034
* dmu_recv_stream() so that we can allocate the metadnode with the
1035
* properties from the DRR_BEGIN payload.
1036
*/
1037
rrw_enter(&newds->ds_bp_rwlock, RW_READER, FTAG);
1038
if (BP_IS_HOLE(dsl_dataset_get_blkptr(newds)) &&
1039
(featureflags & DMU_BACKUP_FEATURE_RAW) == 0 &&
1040
!drc->drc_heal) {
1041
(void) dmu_objset_create_impl(dp->dp_spa,
1042
newds, dsl_dataset_get_blkptr(newds), drrb->drr_type, tx);
1043
}
1044
rrw_exit(&newds->ds_bp_rwlock, FTAG);
1045
1046
drba->drba_cookie->drc_ds = newds;
1047
drba->drba_cookie->drc_os = os;
1048
1049
spa_history_log_internal_ds(newds, "receive", tx, " ");
1050
}
1051
1052
static int
1053
dmu_recv_resume_begin_check(void *arg, dmu_tx_t *tx)
1054
{
1055
dmu_recv_begin_arg_t *drba = arg;
1056
dmu_recv_cookie_t *drc = drba->drba_cookie;
1057
dsl_pool_t *dp = dmu_tx_pool(tx);
1058
struct drr_begin *drrb = drc->drc_drrb;
1059
int error;
1060
ds_hold_flags_t dsflags = DS_HOLD_FLAG_NONE;
1061
dsl_dataset_t *ds;
1062
const char *tofs = drc->drc_tofs;
1063
1064
/* already checked */
1065
ASSERT3U(drrb->drr_magic, ==, DMU_BACKUP_MAGIC);
1066
ASSERT(drc->drc_featureflags & DMU_BACKUP_FEATURE_RESUMING);
1067
1068
if (DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo) ==
1069
DMU_COMPOUNDSTREAM ||
1070
drrb->drr_type >= DMU_OST_NUMTYPES)
1071
return (SET_ERROR(EINVAL));
1072
1073
/*
1074
* This is mostly a sanity check since we should have already done these
1075
* checks during a previous attempt to receive the data.
1076
*/
1077
error = recv_begin_check_feature_flags_impl(drc->drc_featureflags,
1078
dp->dp_spa);
1079
if (error != 0)
1080
return (error);
1081
1082
/* 6 extra bytes for /%recv */
1083
char recvname[ZFS_MAX_DATASET_NAME_LEN + 6];
1084
1085
(void) snprintf(recvname, sizeof (recvname), "%s/%s",
1086
tofs, recv_clone_name);
1087
1088
if (drc->drc_featureflags & DMU_BACKUP_FEATURE_RAW) {
1089
/* raw receives require spill block allocation flag */
1090
if (!(drrb->drr_flags & DRR_FLAG_SPILL_BLOCK))
1091
return (SET_ERROR(ZFS_ERR_SPILL_BLOCK_FLAG_MISSING));
1092
} else {
1093
dsflags |= DS_HOLD_FLAG_DECRYPT;
1094
}
1095
1096
boolean_t recvexist = B_TRUE;
1097
if (dsl_dataset_hold_flags(dp, recvname, dsflags, FTAG, &ds) != 0) {
1098
/* %recv does not exist; continue in tofs */
1099
recvexist = B_FALSE;
1100
error = dsl_dataset_hold_flags(dp, tofs, dsflags, FTAG, &ds);
1101
if (error != 0)
1102
return (error);
1103
}
1104
1105
/*
1106
* Resume of full/newfs recv on existing dataset should be done with
1107
* force flag
1108
*/
1109
if (recvexist && drrb->drr_fromguid == 0 && !drc->drc_force) {
1110
dsl_dataset_rele_flags(ds, dsflags, FTAG);
1111
return (SET_ERROR(ZFS_ERR_RESUME_EXISTS));
1112
}
1113
1114
/* check that ds is marked inconsistent */
1115
if (!DS_IS_INCONSISTENT(ds)) {
1116
dsl_dataset_rele_flags(ds, dsflags, FTAG);
1117
return (SET_ERROR(EINVAL));
1118
}
1119
1120
/* check that there is resuming data, and that the toguid matches */
1121
if (!dsl_dataset_is_zapified(ds)) {
1122
dsl_dataset_rele_flags(ds, dsflags, FTAG);
1123
return (SET_ERROR(EINVAL));
1124
}
1125
uint64_t val;
1126
error = zap_lookup(dp->dp_meta_objset, ds->ds_object,
1127
DS_FIELD_RESUME_TOGUID, sizeof (val), 1, &val);
1128
if (error != 0 || drrb->drr_toguid != val) {
1129
dsl_dataset_rele_flags(ds, dsflags, FTAG);
1130
return (SET_ERROR(EINVAL));
1131
}
1132
1133
/*
1134
* Check if the receive is still running. If so, it will be owned.
1135
* Note that nothing else can own the dataset (e.g. after the receive
1136
* fails) because it will be marked inconsistent.
1137
*/
1138
if (dsl_dataset_has_owner(ds)) {
1139
dsl_dataset_rele_flags(ds, dsflags, FTAG);
1140
return (SET_ERROR(EBUSY));
1141
}
1142
1143
/* There should not be any snapshots of this fs yet. */
1144
if (ds->ds_prev != NULL && ds->ds_prev->ds_dir == ds->ds_dir) {
1145
dsl_dataset_rele_flags(ds, dsflags, FTAG);
1146
return (SET_ERROR(EINVAL));
1147
}
1148
1149
/*
1150
* Note: resume point will be checked when we process the first WRITE
1151
* record.
1152
*/
1153
1154
/* check that the origin matches */
1155
val = 0;
1156
(void) zap_lookup(dp->dp_meta_objset, ds->ds_object,
1157
DS_FIELD_RESUME_FROMGUID, sizeof (val), 1, &val);
1158
if (drrb->drr_fromguid != val) {
1159
dsl_dataset_rele_flags(ds, dsflags, FTAG);
1160
return (SET_ERROR(EINVAL));
1161
}
1162
1163
if (ds->ds_prev != NULL && drrb->drr_fromguid != 0)
1164
drc->drc_fromsnapobj = ds->ds_prev->ds_object;
1165
1166
/*
1167
* If we're resuming, and the send is redacted, then the original send
1168
* must have been redacted, and must have been redacted with respect to
1169
* the same snapshots.
1170
*/
1171
if (drc->drc_featureflags & DMU_BACKUP_FEATURE_REDACTED) {
1172
uint64_t num_ds_redact_snaps;
1173
uint64_t *ds_redact_snaps;
1174
1175
uint_t num_stream_redact_snaps;
1176
uint64_t *stream_redact_snaps;
1177
1178
if (nvlist_lookup_uint64_array(drc->drc_begin_nvl,
1179
BEGINNV_REDACT_SNAPS, &stream_redact_snaps,
1180
&num_stream_redact_snaps) != 0) {
1181
dsl_dataset_rele_flags(ds, dsflags, FTAG);
1182
return (SET_ERROR(EINVAL));
1183
}
1184
1185
if (!dsl_dataset_get_uint64_array_feature(ds,
1186
SPA_FEATURE_REDACTED_DATASETS, &num_ds_redact_snaps,
1187
&ds_redact_snaps)) {
1188
dsl_dataset_rele_flags(ds, dsflags, FTAG);
1189
return (SET_ERROR(EINVAL));
1190
}
1191
1192
for (int i = 0; i < num_ds_redact_snaps; i++) {
1193
if (!redact_snaps_contains(ds_redact_snaps,
1194
num_ds_redact_snaps, stream_redact_snaps[i])) {
1195
dsl_dataset_rele_flags(ds, dsflags, FTAG);
1196
return (SET_ERROR(EINVAL));
1197
}
1198
}
1199
}
1200
1201
error = recv_check_large_blocks(ds, drc->drc_featureflags);
1202
if (error != 0) {
1203
dsl_dataset_rele_flags(ds, dsflags, FTAG);
1204
return (error);
1205
}
1206
1207
dsl_dataset_rele_flags(ds, dsflags, FTAG);
1208
return (0);
1209
}
1210
1211
static void
1212
dmu_recv_resume_begin_sync(void *arg, dmu_tx_t *tx)
1213
{
1214
dmu_recv_begin_arg_t *drba = arg;
1215
dsl_pool_t *dp = dmu_tx_pool(tx);
1216
const char *tofs = drba->drba_cookie->drc_tofs;
1217
uint64_t featureflags = drba->drba_cookie->drc_featureflags;
1218
dsl_dataset_t *ds;
1219
ds_hold_flags_t dsflags = DS_HOLD_FLAG_NONE;
1220
/* 6 extra bytes for /%recv */
1221
char recvname[ZFS_MAX_DATASET_NAME_LEN + 6];
1222
1223
(void) snprintf(recvname, sizeof (recvname), "%s/%s", tofs,
1224
recv_clone_name);
1225
1226
if (featureflags & DMU_BACKUP_FEATURE_RAW) {
1227
drba->drba_cookie->drc_raw = B_TRUE;
1228
} else {
1229
dsflags |= DS_HOLD_FLAG_DECRYPT;
1230
}
1231
1232
if (dsl_dataset_own_force(dp, recvname, dsflags, dmu_recv_tag, &ds)
1233
!= 0) {
1234
/* %recv does not exist; continue in tofs */
1235
VERIFY0(dsl_dataset_own_force(dp, tofs, dsflags, dmu_recv_tag,
1236
&ds));
1237
drba->drba_cookie->drc_newfs = B_TRUE;
1238
}
1239
1240
ASSERT(DS_IS_INCONSISTENT(ds));
1241
rrw_enter(&ds->ds_bp_rwlock, RW_READER, FTAG);
1242
ASSERT(!BP_IS_HOLE(dsl_dataset_get_blkptr(ds)) ||
1243
drba->drba_cookie->drc_raw);
1244
rrw_exit(&ds->ds_bp_rwlock, FTAG);
1245
1246
drba->drba_cookie->drc_ds = ds;
1247
VERIFY0(dmu_objset_from_ds(ds, &drba->drba_cookie->drc_os));
1248
drba->drba_cookie->drc_should_save = B_TRUE;
1249
1250
spa_history_log_internal_ds(ds, "resume receive", tx, " ");
1251
}
1252
1253
/*
1254
* NB: callers *MUST* call dmu_recv_stream() if dmu_recv_begin()
1255
* succeeds; otherwise we will leak the holds on the datasets.
1256
*/
1257
int
1258
dmu_recv_begin(const char *tofs, const char *tosnap,
1259
dmu_replay_record_t *drr_begin, boolean_t force, boolean_t heal,
1260
boolean_t resumable, nvlist_t *localprops, nvlist_t *hidden_args,
1261
const char *origin, dmu_recv_cookie_t *drc, zfs_file_t *fp,
1262
offset_t *voffp)
1263
{
1264
dmu_recv_begin_arg_t drba = { 0 };
1265
int err = 0;
1266
1267
cred_t *cr = CRED();
1268
crhold(cr);
1269
1270
memset(drc, 0, sizeof (dmu_recv_cookie_t));
1271
drc->drc_drr_begin = drr_begin;
1272
drc->drc_drrb = &drr_begin->drr_u.drr_begin;
1273
drc->drc_tosnap = tosnap;
1274
drc->drc_tofs = tofs;
1275
drc->drc_force = force;
1276
drc->drc_heal = heal;
1277
drc->drc_resumable = resumable;
1278
drc->drc_cred = cr;
1279
drc->drc_clone = (origin != NULL);
1280
1281
if (drc->drc_drrb->drr_magic == BSWAP_64(DMU_BACKUP_MAGIC)) {
1282
drc->drc_byteswap = B_TRUE;
1283
(void) fletcher_4_incremental_byteswap(drr_begin,
1284
sizeof (dmu_replay_record_t), &drc->drc_cksum);
1285
byteswap_record(drr_begin);
1286
} else if (drc->drc_drrb->drr_magic == DMU_BACKUP_MAGIC) {
1287
(void) fletcher_4_incremental_native(drr_begin,
1288
sizeof (dmu_replay_record_t), &drc->drc_cksum);
1289
} else {
1290
crfree(cr);
1291
drc->drc_cred = NULL;
1292
return (SET_ERROR(EINVAL));
1293
}
1294
1295
drc->drc_fp = fp;
1296
drc->drc_voff = *voffp;
1297
drc->drc_featureflags =
1298
DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo);
1299
1300
uint32_t payloadlen = drc->drc_drr_begin->drr_payloadlen;
1301
1302
/*
1303
* Since OpenZFS 2.0.0, we have enforced a 64MB limit in userspace
1304
* configurable via ZFS_SENDRECV_MAX_NVLIST. We enforce 256MB as a hard
1305
* upper limit. Systems with less than 1GB of RAM will see a lower
1306
* limit from `arc_all_memory() / 4`.
1307
*/
1308
if (payloadlen > (MIN((1U << 28), arc_all_memory() / 4))) {
1309
crfree(cr);
1310
drc->drc_cred = NULL;
1311
return (SET_ERROR(E2BIG));
1312
}
1313
1314
if (payloadlen != 0) {
1315
void *payload = vmem_alloc(payloadlen, KM_SLEEP);
1316
/*
1317
* For compatibility with recursive send streams, we don't do
1318
* this here if the stream could be part of a package. Instead,
1319
* we'll do it in dmu_recv_stream. If we pull the next header
1320
* too early, and it's the END record, we break the `recv_skip`
1321
* logic.
1322
*/
1323
1324
err = receive_read_payload_and_next_header(drc, payloadlen,
1325
payload);
1326
if (err != 0) {
1327
vmem_free(payload, payloadlen);
1328
crfree(cr);
1329
drc->drc_cred = NULL;
1330
return (err);
1331
}
1332
err = nvlist_unpack(payload, payloadlen, &drc->drc_begin_nvl,
1333
KM_SLEEP);
1334
vmem_free(payload, payloadlen);
1335
if (err != 0) {
1336
kmem_free(drc->drc_next_rrd,
1337
sizeof (*drc->drc_next_rrd));
1338
crfree(cr);
1339
drc->drc_cred = NULL;
1340
return (err);
1341
}
1342
}
1343
1344
if (drc->drc_drrb->drr_flags & DRR_FLAG_SPILL_BLOCK)
1345
drc->drc_spill = B_TRUE;
1346
1347
drba.drba_origin = origin;
1348
drba.drba_cookie = drc;
1349
drba.drba_cred = drc->drc_cred;
1350
1351
if (drc->drc_featureflags & DMU_BACKUP_FEATURE_RESUMING) {
1352
err = dsl_sync_task(tofs,
1353
dmu_recv_resume_begin_check, dmu_recv_resume_begin_sync,
1354
&drba, 5, ZFS_SPACE_CHECK_NORMAL);
1355
} else {
1356
/*
1357
* For non-raw, non-incremental, non-resuming receives the
1358
* user can specify encryption parameters on the command line
1359
* with "zfs recv -o". For these receives we create a dcp and
1360
* pass it to the sync task. Creating the dcp will implicitly
1361
* remove the encryption params from the localprops nvlist,
1362
* which avoids errors when trying to set these normally
1363
* read-only properties. Any other kind of receive that
1364
* attempts to set these properties will fail as a result.
1365
*/
1366
if ((DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo) &
1367
DMU_BACKUP_FEATURE_RAW) == 0 &&
1368
origin == NULL && drc->drc_drrb->drr_fromguid == 0) {
1369
err = dsl_crypto_params_create_nvlist(DCP_CMD_NONE,
1370
localprops, hidden_args, &drba.drba_dcp);
1371
}
1372
1373
if (err == 0) {
1374
err = dsl_sync_task(tofs,
1375
dmu_recv_begin_check, dmu_recv_begin_sync,
1376
&drba, 5, ZFS_SPACE_CHECK_NORMAL);
1377
dsl_crypto_params_free(drba.drba_dcp, !!err);
1378
}
1379
}
1380
1381
if (err != 0) {
1382
kmem_free(drc->drc_next_rrd, sizeof (*drc->drc_next_rrd));
1383
nvlist_free(drc->drc_begin_nvl);
1384
crfree(cr);
1385
drc->drc_cred = NULL;
1386
}
1387
return (err);
1388
}
1389
1390
/*
1391
* Holds data need for corrective recv callback
1392
*/
1393
typedef struct cr_cb_data {
1394
uint64_t size;
1395
zbookmark_phys_t zb;
1396
spa_t *spa;
1397
} cr_cb_data_t;
1398
1399
static void
1400
corrective_read_done(zio_t *zio)
1401
{
1402
cr_cb_data_t *data = zio->io_private;
1403
/* Corruption corrected; update error log if needed */
1404
if (zio->io_error == 0) {
1405
spa_remove_error(data->spa, &data->zb,
1406
BP_GET_PHYSICAL_BIRTH(zio->io_bp));
1407
}
1408
kmem_free(data, sizeof (cr_cb_data_t));
1409
abd_free(zio->io_abd);
1410
}
1411
1412
/*
1413
* zio_rewrite the data pointed to by bp with the data from the rrd's abd.
1414
*/
1415
static int
1416
do_corrective_recv(struct receive_writer_arg *rwa, struct drr_write *drrw,
1417
struct receive_record_arg *rrd, blkptr_t *bp)
1418
{
1419
int err;
1420
zio_t *io;
1421
zbookmark_phys_t zb;
1422
dnode_t *dn;
1423
abd_t *abd = rrd->abd;
1424
zio_cksum_t bp_cksum = bp->blk_cksum;
1425
zio_flag_t flags = ZIO_FLAG_SPECULATIVE | ZIO_FLAG_DONT_RETRY |
1426
ZIO_FLAG_CANFAIL;
1427
1428
if (rwa->raw)
1429
flags |= ZIO_FLAG_RAW;
1430
1431
err = dnode_hold(rwa->os, drrw->drr_object, FTAG, &dn);
1432
if (err != 0)
1433
return (err);
1434
SET_BOOKMARK(&zb, dmu_objset_id(rwa->os), drrw->drr_object, 0,
1435
dbuf_whichblock(dn, 0, drrw->drr_offset));
1436
dnode_rele(dn, FTAG);
1437
1438
if (!rwa->raw && DRR_WRITE_COMPRESSED(drrw)) {
1439
/* Decompress the stream data */
1440
abd_t *dabd = abd_alloc_linear(
1441
drrw->drr_logical_size, B_FALSE);
1442
err = zio_decompress_data(drrw->drr_compressiontype,
1443
abd, dabd, abd_get_size(abd),
1444
abd_get_size(dabd), NULL);
1445
1446
if (err != 0) {
1447
abd_free(dabd);
1448
return (err);
1449
}
1450
/* Swap in the newly decompressed data into the abd */
1451
abd_free(abd);
1452
abd = dabd;
1453
}
1454
1455
if (!rwa->raw && BP_GET_COMPRESS(bp) != ZIO_COMPRESS_OFF) {
1456
/* Recompress the data */
1457
abd_t *cabd = abd_alloc_linear(BP_GET_PSIZE(bp),
1458
B_FALSE);
1459
uint64_t csize = zio_compress_data(BP_GET_COMPRESS(bp),
1460
abd, &cabd, abd_get_size(abd), BP_GET_PSIZE(bp),
1461
rwa->os->os_complevel);
1462
abd_zero_off(cabd, csize, BP_GET_PSIZE(bp) - csize);
1463
/* Swap in newly compressed data into the abd */
1464
abd_free(abd);
1465
abd = cabd;
1466
flags |= ZIO_FLAG_RAW_COMPRESS;
1467
}
1468
1469
/*
1470
* The stream is not encrypted but the data on-disk is.
1471
* We need to re-encrypt the buf using the same
1472
* encryption type, salt, iv, and mac that was used to encrypt
1473
* the block previosly.
1474
*/
1475
if (!rwa->raw && BP_USES_CRYPT(bp)) {
1476
dsl_dataset_t *ds;
1477
dsl_crypto_key_t *dck = NULL;
1478
uint8_t salt[ZIO_DATA_SALT_LEN];
1479
uint8_t iv[ZIO_DATA_IV_LEN];
1480
uint8_t mac[ZIO_DATA_MAC_LEN];
1481
boolean_t no_crypt = B_FALSE;
1482
dsl_pool_t *dp = dmu_objset_pool(rwa->os);
1483
abd_t *eabd = abd_alloc_linear(BP_GET_PSIZE(bp), B_FALSE);
1484
1485
zio_crypt_decode_params_bp(bp, salt, iv);
1486
zio_crypt_decode_mac_bp(bp, mac);
1487
1488
dsl_pool_config_enter(dp, FTAG);
1489
err = dsl_dataset_hold_flags(dp, rwa->tofs,
1490
DS_HOLD_FLAG_DECRYPT, FTAG, &ds);
1491
if (err != 0) {
1492
dsl_pool_config_exit(dp, FTAG);
1493
abd_free(eabd);
1494
return (SET_ERROR(EACCES));
1495
}
1496
1497
/* Look up the key from the spa's keystore */
1498
err = spa_keystore_lookup_key(rwa->os->os_spa,
1499
zb.zb_objset, FTAG, &dck);
1500
if (err != 0) {
1501
dsl_dataset_rele_flags(ds, DS_HOLD_FLAG_DECRYPT,
1502
FTAG);
1503
dsl_pool_config_exit(dp, FTAG);
1504
abd_free(eabd);
1505
return (SET_ERROR(EACCES));
1506
}
1507
1508
err = zio_do_crypt_abd(B_TRUE, &dck->dck_key,
1509
BP_GET_TYPE(bp), BP_SHOULD_BYTESWAP(bp), salt, iv,
1510
mac, abd_get_size(abd), abd, eabd, &no_crypt);
1511
1512
spa_keystore_dsl_key_rele(rwa->os->os_spa, dck, FTAG);
1513
dsl_dataset_rele_flags(ds, DS_HOLD_FLAG_DECRYPT, FTAG);
1514
dsl_pool_config_exit(dp, FTAG);
1515
1516
ASSERT0(no_crypt);
1517
if (err != 0) {
1518
abd_free(eabd);
1519
return (err);
1520
}
1521
/* Swap in the newly encrypted data into the abd */
1522
abd_free(abd);
1523
abd = eabd;
1524
1525
/*
1526
* We want to prevent zio_rewrite() from trying to
1527
* encrypt the data again
1528
*/
1529
flags |= ZIO_FLAG_RAW_ENCRYPT;
1530
}
1531
rrd->abd = abd;
1532
1533
io = zio_rewrite(NULL, rwa->os->os_spa, BP_GET_BIRTH(bp), bp,
1534
abd, BP_GET_PSIZE(bp), NULL, NULL, ZIO_PRIORITY_SYNC_WRITE, flags,
1535
&zb);
1536
1537
ASSERT(abd_get_size(abd) == BP_GET_LSIZE(bp) ||
1538
abd_get_size(abd) == BP_GET_PSIZE(bp));
1539
1540
/* compute new bp checksum value and make sure it matches the old one */
1541
zio_checksum_compute(io, BP_GET_CHECKSUM(bp), abd, abd_get_size(abd));
1542
if (!ZIO_CHECKSUM_EQUAL(bp_cksum, io->io_bp->blk_cksum)) {
1543
zio_destroy(io);
1544
if (zfs_recv_best_effort_corrective != 0)
1545
return (0);
1546
return (SET_ERROR(ECKSUM));
1547
}
1548
1549
/* Correct the corruption in place */
1550
err = zio_wait(io);
1551
if (err == 0) {
1552
cr_cb_data_t *cb_data =
1553
kmem_alloc(sizeof (cr_cb_data_t), KM_SLEEP);
1554
cb_data->spa = rwa->os->os_spa;
1555
cb_data->size = drrw->drr_logical_size;
1556
cb_data->zb = zb;
1557
/* Test if healing worked by re-reading the bp */
1558
err = zio_wait(zio_read(rwa->heal_pio, rwa->os->os_spa, bp,
1559
abd_alloc_for_io(drrw->drr_logical_size, B_FALSE),
1560
drrw->drr_logical_size, corrective_read_done,
1561
cb_data, ZIO_PRIORITY_ASYNC_READ, flags, NULL));
1562
}
1563
if (err != 0 && zfs_recv_best_effort_corrective != 0)
1564
err = 0;
1565
1566
return (err);
1567
}
1568
1569
static int
1570
receive_read(dmu_recv_cookie_t *drc, int len, void *buf)
1571
{
1572
int done = 0;
1573
1574
/*
1575
* The code doesn't rely on this (lengths being multiples of 8). See
1576
* comment in dump_bytes.
1577
*/
1578
ASSERT(len % 8 == 0 ||
1579
(drc->drc_featureflags & DMU_BACKUP_FEATURE_RAW) != 0);
1580
1581
while (done < len) {
1582
ssize_t resid = len - done;
1583
zfs_file_t *fp = drc->drc_fp;
1584
int err = zfs_file_read(fp, (char *)buf + done,
1585
len - done, &resid);
1586
if (err == 0 && resid == len - done) {
1587
/*
1588
* Note: ECKSUM or ZFS_ERR_STREAM_TRUNCATED indicates
1589
* that the receive was interrupted and can
1590
* potentially be resumed.
1591
*/
1592
err = SET_ERROR(ZFS_ERR_STREAM_TRUNCATED);
1593
}
1594
drc->drc_voff += len - done - resid;
1595
done = len - resid;
1596
if (err != 0)
1597
return (err);
1598
}
1599
1600
drc->drc_bytes_read += len;
1601
1602
ASSERT3U(done, ==, len);
1603
return (0);
1604
}
1605
1606
static inline uint8_t
1607
deduce_nblkptr(dmu_object_type_t bonus_type, uint64_t bonus_size)
1608
{
1609
if (bonus_type == DMU_OT_SA) {
1610
return (1);
1611
} else {
1612
return (1 +
1613
((DN_OLD_MAX_BONUSLEN -
1614
MIN(DN_OLD_MAX_BONUSLEN, bonus_size)) >> SPA_BLKPTRSHIFT));
1615
}
1616
}
1617
1618
static void
1619
save_resume_state(struct receive_writer_arg *rwa,
1620
uint64_t object, uint64_t offset, dmu_tx_t *tx)
1621
{
1622
int txgoff = dmu_tx_get_txg(tx) & TXG_MASK;
1623
1624
if (!rwa->resumable)
1625
return;
1626
1627
/*
1628
* We use ds_resume_bytes[] != 0 to indicate that we need to
1629
* update this on disk, so it must not be 0.
1630
*/
1631
ASSERT(rwa->bytes_read != 0);
1632
1633
/*
1634
* We only resume from write records, which have a valid
1635
* (non-meta-dnode) object number.
1636
*/
1637
ASSERT(object != 0);
1638
1639
/*
1640
* For resuming to work correctly, we must receive records in order,
1641
* sorted by object,offset. This is checked by the callers, but
1642
* assert it here for good measure.
1643
*/
1644
ASSERT3U(object, >=, rwa->os->os_dsl_dataset->ds_resume_object[txgoff]);
1645
ASSERT(object != rwa->os->os_dsl_dataset->ds_resume_object[txgoff] ||
1646
offset >= rwa->os->os_dsl_dataset->ds_resume_offset[txgoff]);
1647
ASSERT3U(rwa->bytes_read, >=,
1648
rwa->os->os_dsl_dataset->ds_resume_bytes[txgoff]);
1649
1650
rwa->os->os_dsl_dataset->ds_resume_object[txgoff] = object;
1651
rwa->os->os_dsl_dataset->ds_resume_offset[txgoff] = offset;
1652
rwa->os->os_dsl_dataset->ds_resume_bytes[txgoff] = rwa->bytes_read;
1653
}
1654
1655
static int
1656
receive_object_is_same_generation(objset_t *os, uint64_t object,
1657
dmu_object_type_t old_bonus_type, dmu_object_type_t new_bonus_type,
1658
const void *new_bonus, boolean_t *samegenp)
1659
{
1660
zfs_file_info_t zoi;
1661
int err;
1662
1663
dmu_buf_t *old_bonus_dbuf;
1664
err = dmu_bonus_hold(os, object, FTAG, &old_bonus_dbuf);
1665
if (err != 0)
1666
return (err);
1667
err = dmu_get_file_info(os, old_bonus_type, old_bonus_dbuf->db_data,
1668
&zoi);
1669
dmu_buf_rele(old_bonus_dbuf, FTAG);
1670
if (err != 0)
1671
return (err);
1672
uint64_t old_gen = zoi.zfi_generation;
1673
1674
err = dmu_get_file_info(os, new_bonus_type, new_bonus, &zoi);
1675
if (err != 0)
1676
return (err);
1677
uint64_t new_gen = zoi.zfi_generation;
1678
1679
*samegenp = (old_gen == new_gen);
1680
return (0);
1681
}
1682
1683
static int
1684
receive_handle_existing_object(const struct receive_writer_arg *rwa,
1685
const struct drr_object *drro, const dmu_object_info_t *doi,
1686
const void *bonus_data,
1687
uint64_t *object_to_hold, uint32_t *new_blksz)
1688
{
1689
uint32_t indblksz = drro->drr_indblkshift ?
1690
1ULL << drro->drr_indblkshift : 0;
1691
int nblkptr = deduce_nblkptr(drro->drr_bonustype,
1692
drro->drr_bonuslen);
1693
uint8_t dn_slots = drro->drr_dn_slots != 0 ?
1694
drro->drr_dn_slots : DNODE_MIN_SLOTS;
1695
boolean_t do_free_range = B_FALSE;
1696
int err;
1697
1698
*object_to_hold = drro->drr_object;
1699
1700
/* nblkptr should be bounded by the bonus size and type */
1701
if (rwa->raw && nblkptr != drro->drr_nblkptr)
1702
return (SET_ERROR(EINVAL));
1703
1704
/*
1705
* After the previous send stream, the sending system may
1706
* have freed this object, and then happened to re-allocate
1707
* this object number in a later txg. In this case, we are
1708
* receiving a different logical file, and the block size may
1709
* appear to be different. i.e. we may have a different
1710
* block size for this object than what the send stream says.
1711
* In this case we need to remove the object's contents,
1712
* so that its structure can be changed and then its contents
1713
* entirely replaced by subsequent WRITE records.
1714
*
1715
* If this is a -L (--large-block) incremental stream, and
1716
* the previous stream was not -L, the block size may appear
1717
* to increase. i.e. we may have a smaller block size for
1718
* this object than what the send stream says. In this case
1719
* we need to keep the object's contents and block size
1720
* intact, so that we don't lose parts of the object's
1721
* contents that are not changed by this incremental send
1722
* stream.
1723
*
1724
* We can distinguish between the two above cases by using
1725
* the ZPL's generation number (see
1726
* receive_object_is_same_generation()). However, we only
1727
* want to rely on the generation number when absolutely
1728
* necessary, because with raw receives, the generation is
1729
* encrypted. We also want to minimize dependence on the
1730
* ZPL, so that other types of datasets can also be received
1731
* (e.g. ZVOLs, although note that ZVOLS currently do not
1732
* reallocate their objects or change their structure).
1733
* Therefore, we check a number of different cases where we
1734
* know it is safe to discard the object's contents, before
1735
* using the ZPL's generation number to make the above
1736
* distinction.
1737
*/
1738
if (drro->drr_blksz != doi->doi_data_block_size) {
1739
if (rwa->raw) {
1740
/*
1741
* RAW streams always have large blocks, so
1742
* we are sure that the data is not needed
1743
* due to changing --large-block to be on.
1744
* Which is fortunate since the bonus buffer
1745
* (which contains the ZPL generation) is
1746
* encrypted, and the key might not be
1747
* loaded.
1748
*/
1749
do_free_range = B_TRUE;
1750
} else if (rwa->full) {
1751
/*
1752
* This is a full send stream, so it always
1753
* replaces what we have. Even if the
1754
* generation numbers happen to match, this
1755
* can not actually be the same logical file.
1756
* This is relevant when receiving a full
1757
* send as a clone.
1758
*/
1759
do_free_range = B_TRUE;
1760
} else if (drro->drr_type !=
1761
DMU_OT_PLAIN_FILE_CONTENTS ||
1762
doi->doi_type != DMU_OT_PLAIN_FILE_CONTENTS) {
1763
/*
1764
* PLAIN_FILE_CONTENTS are the only type of
1765
* objects that have ever been stored with
1766
* large blocks, so we don't need the special
1767
* logic below. ZAP blocks can shrink (when
1768
* there's only one block), so we don't want
1769
* to hit the error below about block size
1770
* only increasing.
1771
*/
1772
do_free_range = B_TRUE;
1773
} else if (doi->doi_max_offset <=
1774
doi->doi_data_block_size) {
1775
/*
1776
* There is only one block. We can free it,
1777
* because its contents will be replaced by a
1778
* WRITE record. This can not be the no-L ->
1779
* -L case, because the no-L case would have
1780
* resulted in multiple blocks. If we
1781
* supported -L -> no-L, it would not be safe
1782
* to free the file's contents. Fortunately,
1783
* that is not allowed (see
1784
* recv_check_large_blocks()).
1785
*/
1786
do_free_range = B_TRUE;
1787
} else {
1788
boolean_t is_same_gen;
1789
err = receive_object_is_same_generation(rwa->os,
1790
drro->drr_object, doi->doi_bonus_type,
1791
drro->drr_bonustype, bonus_data, &is_same_gen);
1792
if (err != 0)
1793
return (SET_ERROR(EINVAL));
1794
1795
if (is_same_gen) {
1796
/*
1797
* This is the same logical file, and
1798
* the block size must be increasing.
1799
* It could only decrease if
1800
* --large-block was changed to be
1801
* off, which is checked in
1802
* recv_check_large_blocks().
1803
*/
1804
if (drro->drr_blksz <=
1805
doi->doi_data_block_size)
1806
return (SET_ERROR(EINVAL));
1807
/*
1808
* We keep the existing blocksize and
1809
* contents.
1810
*/
1811
*new_blksz =
1812
doi->doi_data_block_size;
1813
} else {
1814
do_free_range = B_TRUE;
1815
}
1816
}
1817
}
1818
1819
/* nblkptr can only decrease if the object was reallocated */
1820
if (nblkptr < doi->doi_nblkptr)
1821
do_free_range = B_TRUE;
1822
1823
/* number of slots can only change on reallocation */
1824
if (dn_slots != doi->doi_dnodesize >> DNODE_SHIFT)
1825
do_free_range = B_TRUE;
1826
1827
/*
1828
* For raw sends we also check a few other fields to
1829
* ensure we are preserving the objset structure exactly
1830
* as it was on the receive side:
1831
* - A changed indirect block size
1832
* - A smaller nlevels
1833
*/
1834
if (rwa->raw) {
1835
if (indblksz != doi->doi_metadata_block_size)
1836
do_free_range = B_TRUE;
1837
if (drro->drr_nlevels < doi->doi_indirection)
1838
do_free_range = B_TRUE;
1839
}
1840
1841
if (do_free_range) {
1842
err = dmu_free_long_range(rwa->os, drro->drr_object,
1843
0, DMU_OBJECT_END);
1844
if (err != 0)
1845
return (SET_ERROR(EINVAL));
1846
}
1847
1848
/*
1849
* The dmu does not currently support decreasing nlevels or changing
1850
* indirect block size if there is already one, same as changing the
1851
* number of of dnode slots on an object. For non-raw sends this
1852
* does not matter and the new object can just use the previous one's
1853
* parameters. For raw sends, however, the structure of the received
1854
* dnode (including indirects and dnode slots) must match that of the
1855
* send side. Therefore, instead of using dmu_object_reclaim(), we
1856
* must free the object completely and call dmu_object_claim_dnsize()
1857
* instead.
1858
*/
1859
if ((rwa->raw && ((doi->doi_indirection > 1 &&
1860
indblksz != doi->doi_metadata_block_size) ||
1861
drro->drr_nlevels < doi->doi_indirection)) ||
1862
dn_slots != doi->doi_dnodesize >> DNODE_SHIFT) {
1863
err = dmu_free_long_object(rwa->os, drro->drr_object);
1864
if (err != 0)
1865
return (SET_ERROR(EINVAL));
1866
1867
txg_wait_synced(dmu_objset_pool(rwa->os), 0);
1868
*object_to_hold = DMU_NEW_OBJECT;
1869
}
1870
1871
/*
1872
* For raw receives, free everything beyond the new incoming
1873
* maxblkid. Normally this would be done with a DRR_FREE
1874
* record that would come after this DRR_OBJECT record is
1875
* processed. However, for raw receives we manually set the
1876
* maxblkid from the drr_maxblkid and so we must first free
1877
* everything above that blkid to ensure the DMU is always
1878
* consistent with itself. We will never free the first block
1879
* of the object here because a maxblkid of 0 could indicate
1880
* an object with a single block or one with no blocks. This
1881
* free may be skipped when dmu_free_long_range() was called
1882
* above since it covers the entire object's contents.
1883
*/
1884
if (rwa->raw && *object_to_hold != DMU_NEW_OBJECT && !do_free_range) {
1885
err = dmu_free_long_range(rwa->os, drro->drr_object,
1886
(drro->drr_maxblkid + 1) * doi->doi_data_block_size,
1887
DMU_OBJECT_END);
1888
if (err != 0)
1889
return (SET_ERROR(EINVAL));
1890
}
1891
return (0);
1892
}
1893
1894
noinline static int
1895
receive_object(struct receive_writer_arg *rwa, struct drr_object *drro,
1896
void *data)
1897
{
1898
dmu_object_info_t doi;
1899
dmu_tx_t *tx;
1900
int err;
1901
uint32_t new_blksz = drro->drr_blksz;
1902
uint8_t dn_slots = drro->drr_dn_slots != 0 ?
1903
drro->drr_dn_slots : DNODE_MIN_SLOTS;
1904
1905
if (drro->drr_type == DMU_OT_NONE ||
1906
!DMU_OT_IS_VALID(drro->drr_type) ||
1907
!DMU_OT_IS_VALID(drro->drr_bonustype) ||
1908
drro->drr_checksumtype >= ZIO_CHECKSUM_FUNCTIONS ||
1909
drro->drr_compress >= ZIO_COMPRESS_FUNCTIONS ||
1910
P2PHASE(drro->drr_blksz, SPA_MINBLOCKSIZE) ||
1911
drro->drr_blksz < SPA_MINBLOCKSIZE ||
1912
drro->drr_blksz > spa_maxblocksize(dmu_objset_spa(rwa->os)) ||
1913
drro->drr_bonuslen >
1914
DN_BONUS_SIZE(spa_maxdnodesize(dmu_objset_spa(rwa->os))) ||
1915
dn_slots >
1916
(spa_maxdnodesize(dmu_objset_spa(rwa->os)) >> DNODE_SHIFT)) {
1917
return (SET_ERROR(EINVAL));
1918
}
1919
1920
if (rwa->raw) {
1921
/*
1922
* We should have received a DRR_OBJECT_RANGE record
1923
* containing this block and stored it in rwa.
1924
*/
1925
if (drro->drr_object < rwa->or_firstobj ||
1926
drro->drr_object >= rwa->or_firstobj + rwa->or_numslots ||
1927
drro->drr_raw_bonuslen < drro->drr_bonuslen ||
1928
drro->drr_indblkshift > SPA_MAXBLOCKSHIFT ||
1929
drro->drr_nlevels > DN_MAX_LEVELS ||
1930
drro->drr_nblkptr > DN_MAX_NBLKPTR ||
1931
DN_SLOTS_TO_BONUSLEN(dn_slots) <
1932
drro->drr_raw_bonuslen)
1933
return (SET_ERROR(EINVAL));
1934
} else {
1935
/*
1936
* The DRR_OBJECT_SPILL flag is valid when the DRR_BEGIN
1937
* record indicates this by setting DRR_FLAG_SPILL_BLOCK.
1938
*/
1939
if (((drro->drr_flags & ~(DRR_OBJECT_SPILL))) ||
1940
(!rwa->spill && DRR_OBJECT_HAS_SPILL(drro->drr_flags))) {
1941
return (SET_ERROR(EINVAL));
1942
}
1943
1944
if (drro->drr_raw_bonuslen != 0 || drro->drr_nblkptr != 0 ||
1945
drro->drr_indblkshift != 0 || drro->drr_nlevels != 0) {
1946
return (SET_ERROR(EINVAL));
1947
}
1948
}
1949
1950
err = dmu_object_info(rwa->os, drro->drr_object, &doi);
1951
1952
if (err != 0 && err != ENOENT && err != EEXIST)
1953
return (SET_ERROR(EINVAL));
1954
1955
if (drro->drr_object > rwa->max_object)
1956
rwa->max_object = drro->drr_object;
1957
1958
/*
1959
* If we are losing blkptrs or changing the block size this must
1960
* be a new file instance. We must clear out the previous file
1961
* contents before we can change this type of metadata in the dnode.
1962
* Raw receives will also check that the indirect structure of the
1963
* dnode hasn't changed.
1964
*/
1965
uint64_t object_to_hold;
1966
if (err == 0) {
1967
err = receive_handle_existing_object(rwa, drro, &doi, data,
1968
&object_to_hold, &new_blksz);
1969
if (err != 0)
1970
return (err);
1971
} else if (err == EEXIST) {
1972
/*
1973
* The object requested is currently an interior slot of a
1974
* multi-slot dnode. This will be resolved when the next txg
1975
* is synced out, since the send stream will have told us
1976
* to free this slot when we freed the associated dnode
1977
* earlier in the stream.
1978
*/
1979
txg_wait_synced(dmu_objset_pool(rwa->os), 0);
1980
1981
if (dmu_object_info(rwa->os, drro->drr_object, NULL) != ENOENT)
1982
return (SET_ERROR(EINVAL));
1983
1984
/* object was freed and we are about to allocate a new one */
1985
object_to_hold = DMU_NEW_OBJECT;
1986
} else {
1987
/*
1988
* If the only record in this range so far was DRR_FREEOBJECTS
1989
* with at least one actually freed object, it's possible that
1990
* the block will now be converted to a hole. We need to wait
1991
* for the txg to sync to prevent races.
1992
*/
1993
if (rwa->or_need_sync == ORNS_YES)
1994
txg_wait_synced(dmu_objset_pool(rwa->os), 0);
1995
1996
/* object is free and we are about to allocate a new one */
1997
object_to_hold = DMU_NEW_OBJECT;
1998
}
1999
2000
/* Only relevant for the first object in the range */
2001
rwa->or_need_sync = ORNS_NO;
2002
2003
/*
2004
* If this is a multi-slot dnode there is a chance that this
2005
* object will expand into a slot that is already used by
2006
* another object from the previous snapshot. We must free
2007
* these objects before we attempt to allocate the new dnode.
2008
*/
2009
if (dn_slots > 1) {
2010
boolean_t need_sync = B_FALSE;
2011
2012
for (uint64_t slot = drro->drr_object + 1;
2013
slot < drro->drr_object + dn_slots;
2014
slot++) {
2015
dmu_object_info_t slot_doi;
2016
2017
err = dmu_object_info(rwa->os, slot, &slot_doi);
2018
if (err == ENOENT || err == EEXIST)
2019
continue;
2020
else if (err != 0)
2021
return (err);
2022
2023
err = dmu_free_long_object(rwa->os, slot);
2024
if (err != 0)
2025
return (err);
2026
2027
need_sync = B_TRUE;
2028
}
2029
2030
if (need_sync)
2031
txg_wait_synced(dmu_objset_pool(rwa->os), 0);
2032
}
2033
2034
tx = dmu_tx_create(rwa->os);
2035
dmu_tx_hold_bonus(tx, object_to_hold);
2036
dmu_tx_hold_write(tx, object_to_hold, 0, 0);
2037
err = dmu_tx_assign(tx, DMU_TX_WAIT);
2038
if (err != 0) {
2039
dmu_tx_abort(tx);
2040
return (err);
2041
}
2042
2043
if (object_to_hold == DMU_NEW_OBJECT) {
2044
/* Currently free, wants to be allocated */
2045
err = dmu_object_claim_dnsize(rwa->os, drro->drr_object,
2046
drro->drr_type, new_blksz,
2047
drro->drr_bonustype, drro->drr_bonuslen,
2048
dn_slots << DNODE_SHIFT, tx);
2049
} else if (drro->drr_type != doi.doi_type ||
2050
new_blksz != doi.doi_data_block_size ||
2051
drro->drr_bonustype != doi.doi_bonus_type ||
2052
drro->drr_bonuslen != doi.doi_bonus_size) {
2053
/* Currently allocated, but with different properties */
2054
err = dmu_object_reclaim_dnsize(rwa->os, drro->drr_object,
2055
drro->drr_type, new_blksz,
2056
drro->drr_bonustype, drro->drr_bonuslen,
2057
dn_slots << DNODE_SHIFT, rwa->spill ?
2058
DRR_OBJECT_HAS_SPILL(drro->drr_flags) : B_FALSE, tx);
2059
} else if (rwa->spill && !DRR_OBJECT_HAS_SPILL(drro->drr_flags)) {
2060
/*
2061
* Currently allocated, the existing version of this object
2062
* may reference a spill block that is no longer allocated
2063
* at the source and needs to be freed.
2064
*/
2065
err = dmu_object_rm_spill(rwa->os, drro->drr_object, tx);
2066
}
2067
2068
if (err != 0) {
2069
dmu_tx_commit(tx);
2070
return (SET_ERROR(EINVAL));
2071
}
2072
2073
if (rwa->or_crypt_params_present) {
2074
/*
2075
* Set the crypt params for the buffer associated with this
2076
* range of dnodes. This causes the blkptr_t to have the
2077
* same crypt params (byteorder, salt, iv, mac) as on the
2078
* sending side.
2079
*
2080
* Since we are committing this tx now, it is possible for
2081
* the dnode block to end up on-disk with the incorrect MAC,
2082
* if subsequent objects in this block are received in a
2083
* different txg. However, since the dataset is marked as
2084
* inconsistent, no code paths will do a non-raw read (or
2085
* decrypt the block / verify the MAC). The receive code and
2086
* scrub code can safely do raw reads and verify the
2087
* checksum. They don't need to verify the MAC.
2088
*/
2089
dmu_buf_t *db = NULL;
2090
uint64_t offset = rwa->or_firstobj * DNODE_MIN_SIZE;
2091
2092
err = dmu_buf_hold_by_dnode(DMU_META_DNODE(rwa->os),
2093
offset, FTAG, &db, DMU_READ_PREFETCH | DMU_READ_NO_DECRYPT);
2094
if (err != 0) {
2095
dmu_tx_commit(tx);
2096
return (SET_ERROR(EINVAL));
2097
}
2098
2099
dmu_buf_set_crypt_params(db, rwa->or_byteorder,
2100
rwa->or_salt, rwa->or_iv, rwa->or_mac, tx);
2101
2102
dmu_buf_rele(db, FTAG);
2103
2104
rwa->or_crypt_params_present = B_FALSE;
2105
}
2106
2107
dmu_object_set_checksum(rwa->os, drro->drr_object,
2108
drro->drr_checksumtype, tx);
2109
dmu_object_set_compress(rwa->os, drro->drr_object,
2110
drro->drr_compress, tx);
2111
2112
/* handle more restrictive dnode structuring for raw recvs */
2113
if (rwa->raw) {
2114
/*
2115
* Set the indirect block size, block shift, nlevels.
2116
* This will not fail because we ensured all of the
2117
* blocks were freed earlier if this is a new object.
2118
* For non-new objects block size and indirect block
2119
* shift cannot change and nlevels can only increase.
2120
*/
2121
ASSERT3U(new_blksz, ==, drro->drr_blksz);
2122
VERIFY0(dmu_object_set_blocksize(rwa->os, drro->drr_object,
2123
drro->drr_blksz, drro->drr_indblkshift, tx));
2124
VERIFY0(dmu_object_set_nlevels(rwa->os, drro->drr_object,
2125
drro->drr_nlevels, tx));
2126
2127
/*
2128
* Set the maxblkid. This will always succeed because
2129
* we freed all blocks beyond the new maxblkid above.
2130
*/
2131
VERIFY0(dmu_object_set_maxblkid(rwa->os, drro->drr_object,
2132
drro->drr_maxblkid, tx));
2133
}
2134
2135
if (data != NULL) {
2136
dmu_buf_t *db;
2137
dnode_t *dn;
2138
dmu_flags_t flags = DMU_READ_NO_PREFETCH;
2139
2140
if (rwa->raw)
2141
flags |= DMU_READ_NO_DECRYPT;
2142
2143
VERIFY0(dnode_hold(rwa->os, drro->drr_object, FTAG, &dn));
2144
VERIFY0(dmu_bonus_hold_by_dnode(dn, FTAG, &db, flags));
2145
2146
dmu_buf_will_dirty(db, tx);
2147
2148
ASSERT3U(db->db_size, >=, drro->drr_bonuslen);
2149
memcpy(db->db_data, data, DRR_OBJECT_PAYLOAD_SIZE(drro));
2150
2151
/*
2152
* Raw bonus buffers have their byteorder determined by the
2153
* DRR_OBJECT_RANGE record.
2154
*/
2155
if (rwa->byteswap && !rwa->raw) {
2156
dmu_object_byteswap_t byteswap =
2157
DMU_OT_BYTESWAP(drro->drr_bonustype);
2158
dmu_ot_byteswap[byteswap].ob_func(db->db_data,
2159
DRR_OBJECT_PAYLOAD_SIZE(drro));
2160
}
2161
dmu_buf_rele(db, FTAG);
2162
dnode_rele(dn, FTAG);
2163
}
2164
2165
/*
2166
* If the receive fails, we want the resume stream to start with the
2167
* same record that we last successfully received. There is no way to
2168
* request resume from the object record, but we can benefit from the
2169
* fact that sender always sends object record before anything else,
2170
* after which it will "resend" data at offset 0 and resume normally.
2171
*/
2172
save_resume_state(rwa, drro->drr_object, 0, tx);
2173
2174
dmu_tx_commit(tx);
2175
2176
return (0);
2177
}
2178
2179
noinline static int
2180
receive_freeobjects(struct receive_writer_arg *rwa,
2181
struct drr_freeobjects *drrfo)
2182
{
2183
uint64_t obj;
2184
int next_err = 0;
2185
2186
if (drrfo->drr_firstobj + drrfo->drr_numobjs < drrfo->drr_firstobj)
2187
return (SET_ERROR(EINVAL));
2188
2189
for (obj = drrfo->drr_firstobj == 0 ? 1 : drrfo->drr_firstobj;
2190
obj < drrfo->drr_firstobj + drrfo->drr_numobjs &&
2191
obj < DN_MAX_OBJECT && next_err == 0;
2192
next_err = dmu_object_next(rwa->os, &obj, FALSE, 0)) {
2193
dmu_object_info_t doi;
2194
int err;
2195
2196
err = dmu_object_info(rwa->os, obj, &doi);
2197
if (err == ENOENT)
2198
continue;
2199
else if (err != 0)
2200
return (err);
2201
2202
err = dmu_free_long_object(rwa->os, obj);
2203
2204
if (err != 0)
2205
return (err);
2206
2207
if (rwa->or_need_sync == ORNS_MAYBE)
2208
rwa->or_need_sync = ORNS_YES;
2209
}
2210
if (next_err != ESRCH)
2211
return (next_err);
2212
return (0);
2213
}
2214
2215
/*
2216
* Note: if this fails, the caller will clean up any records left on the
2217
* rwa->write_batch list.
2218
*/
2219
static int
2220
flush_write_batch_impl(struct receive_writer_arg *rwa)
2221
{
2222
dnode_t *dn;
2223
int err;
2224
2225
if (dnode_hold(rwa->os, rwa->last_object, FTAG, &dn) != 0)
2226
return (SET_ERROR(EINVAL));
2227
2228
struct receive_record_arg *last_rrd = list_tail(&rwa->write_batch);
2229
struct drr_write *last_drrw = &last_rrd->header.drr_u.drr_write;
2230
2231
struct receive_record_arg *first_rrd = list_head(&rwa->write_batch);
2232
struct drr_write *first_drrw = &first_rrd->header.drr_u.drr_write;
2233
2234
ASSERT3U(rwa->last_object, ==, last_drrw->drr_object);
2235
ASSERT3U(rwa->last_offset, ==, last_drrw->drr_offset);
2236
2237
dmu_tx_t *tx = dmu_tx_create(rwa->os);
2238
dmu_tx_hold_write_by_dnode(tx, dn, first_drrw->drr_offset,
2239
last_drrw->drr_offset - first_drrw->drr_offset +
2240
last_drrw->drr_logical_size);
2241
err = dmu_tx_assign(tx, DMU_TX_WAIT);
2242
if (err != 0) {
2243
dmu_tx_abort(tx);
2244
dnode_rele(dn, FTAG);
2245
return (err);
2246
}
2247
2248
struct receive_record_arg *rrd;
2249
while ((rrd = list_head(&rwa->write_batch)) != NULL) {
2250
struct drr_write *drrw = &rrd->header.drr_u.drr_write;
2251
abd_t *abd = rrd->abd;
2252
2253
ASSERT3U(drrw->drr_object, ==, rwa->last_object);
2254
2255
if (drrw->drr_logical_size != dn->dn_datablksz) {
2256
/*
2257
* The WRITE record is larger than the object's block
2258
* size. We must be receiving an incremental
2259
* large-block stream into a dataset that previously did
2260
* a non-large-block receive. Lightweight writes must
2261
* be exactly one block, so we need to decompress the
2262
* data (if compressed) and do a normal dmu_write().
2263
*/
2264
ASSERT3U(drrw->drr_logical_size, >, dn->dn_datablksz);
2265
if (DRR_WRITE_COMPRESSED(drrw)) {
2266
abd_t *decomp_abd =
2267
abd_alloc_linear(drrw->drr_logical_size,
2268
B_FALSE);
2269
2270
err = zio_decompress_data(
2271
drrw->drr_compressiontype,
2272
abd, decomp_abd,
2273
abd_get_size(abd),
2274
abd_get_size(decomp_abd), NULL);
2275
2276
if (err == 0) {
2277
dmu_write_by_dnode(dn,
2278
drrw->drr_offset,
2279
drrw->drr_logical_size,
2280
abd_to_buf(decomp_abd), tx,
2281
DMU_READ_NO_PREFETCH |
2282
DMU_UNCACHEDIO);
2283
}
2284
abd_free(decomp_abd);
2285
} else {
2286
dmu_write_by_dnode(dn,
2287
drrw->drr_offset,
2288
drrw->drr_logical_size,
2289
abd_to_buf(abd), tx,
2290
DMU_READ_NO_PREFETCH |
2291
DMU_UNCACHEDIO);
2292
}
2293
if (err == 0)
2294
abd_free(abd);
2295
} else {
2296
zio_prop_t zp = {0};
2297
dmu_write_policy(rwa->os, dn, 0, 0, &zp);
2298
2299
zio_flag_t zio_flags = 0;
2300
2301
if (rwa->raw) {
2302
zp.zp_encrypt = B_TRUE;
2303
zp.zp_compress = drrw->drr_compressiontype;
2304
zp.zp_byteorder = ZFS_HOST_BYTEORDER ^
2305
!!DRR_IS_RAW_BYTESWAPPED(drrw->drr_flags) ^
2306
rwa->byteswap;
2307
memcpy(zp.zp_salt, drrw->drr_salt,
2308
ZIO_DATA_SALT_LEN);
2309
memcpy(zp.zp_iv, drrw->drr_iv,
2310
ZIO_DATA_IV_LEN);
2311
memcpy(zp.zp_mac, drrw->drr_mac,
2312
ZIO_DATA_MAC_LEN);
2313
if (DMU_OT_IS_ENCRYPTED(zp.zp_type)) {
2314
zp.zp_nopwrite = B_FALSE;
2315
zp.zp_copies = MIN(zp.zp_copies,
2316
SPA_DVAS_PER_BP - 1);
2317
zp.zp_gang_copies =
2318
MIN(zp.zp_gang_copies,
2319
SPA_DVAS_PER_BP - 1);
2320
}
2321
zio_flags |= ZIO_FLAG_RAW;
2322
} else if (DRR_WRITE_COMPRESSED(drrw)) {
2323
ASSERT3U(drrw->drr_compressed_size, >, 0);
2324
ASSERT3U(drrw->drr_logical_size, >=,
2325
drrw->drr_compressed_size);
2326
zp.zp_compress = drrw->drr_compressiontype;
2327
zio_flags |= ZIO_FLAG_RAW_COMPRESS;
2328
} else if (rwa->byteswap) {
2329
/*
2330
* Note: compressed blocks never need to be
2331
* byteswapped, because WRITE records for
2332
* metadata blocks are never compressed. The
2333
* exception is raw streams, which are written
2334
* in the original byteorder, and the byteorder
2335
* bit is preserved in the BP by setting
2336
* zp_byteorder above.
2337
*/
2338
dmu_object_byteswap_t byteswap =
2339
DMU_OT_BYTESWAP(drrw->drr_type);
2340
dmu_ot_byteswap[byteswap].ob_func(
2341
abd_to_buf(abd),
2342
DRR_WRITE_PAYLOAD_SIZE(drrw));
2343
}
2344
2345
/*
2346
* Since this data can't be read until the receive
2347
* completes, we can do a "lightweight" write for
2348
* improved performance.
2349
*/
2350
err = dmu_lightweight_write_by_dnode(dn,
2351
drrw->drr_offset, abd, &zp, zio_flags, tx);
2352
}
2353
2354
if (err != 0) {
2355
/*
2356
* This rrd is left on the list, so the caller will
2357
* free it (and the abd).
2358
*/
2359
break;
2360
}
2361
2362
/*
2363
* Note: If the receive fails, we want the resume stream to
2364
* start with the same record that we last successfully
2365
* received (as opposed to the next record), so that we can
2366
* verify that we are resuming from the correct location.
2367
*/
2368
save_resume_state(rwa, drrw->drr_object, drrw->drr_offset, tx);
2369
2370
list_remove(&rwa->write_batch, rrd);
2371
kmem_free(rrd, sizeof (*rrd));
2372
}
2373
2374
dmu_tx_commit(tx);
2375
dnode_rele(dn, FTAG);
2376
return (err);
2377
}
2378
2379
noinline static int
2380
flush_write_batch(struct receive_writer_arg *rwa)
2381
{
2382
if (list_is_empty(&rwa->write_batch))
2383
return (0);
2384
int err = rwa->err;
2385
if (err == 0)
2386
err = flush_write_batch_impl(rwa);
2387
if (err != 0) {
2388
struct receive_record_arg *rrd;
2389
while ((rrd = list_remove_head(&rwa->write_batch)) != NULL) {
2390
abd_free(rrd->abd);
2391
kmem_free(rrd, sizeof (*rrd));
2392
}
2393
}
2394
ASSERT(list_is_empty(&rwa->write_batch));
2395
return (err);
2396
}
2397
2398
noinline static int
2399
receive_process_write_record(struct receive_writer_arg *rwa,
2400
struct receive_record_arg *rrd)
2401
{
2402
int err = 0;
2403
2404
ASSERT3U(rrd->header.drr_type, ==, DRR_WRITE);
2405
struct drr_write *drrw = &rrd->header.drr_u.drr_write;
2406
2407
if (drrw->drr_offset + drrw->drr_logical_size < drrw->drr_offset ||
2408
!DMU_OT_IS_VALID(drrw->drr_type))
2409
return (SET_ERROR(EINVAL));
2410
2411
if (rwa->heal) {
2412
blkptr_t *bp;
2413
dmu_buf_t *dbp;
2414
dmu_flags_t flags = DB_RF_CANFAIL;
2415
2416
if (rwa->raw)
2417
flags |= DMU_READ_NO_DECRYPT;
2418
2419
if (rwa->byteswap) {
2420
dmu_object_byteswap_t byteswap =
2421
DMU_OT_BYTESWAP(drrw->drr_type);
2422
dmu_ot_byteswap[byteswap].ob_func(abd_to_buf(rrd->abd),
2423
DRR_WRITE_PAYLOAD_SIZE(drrw));
2424
}
2425
2426
err = dmu_buf_hold_noread(rwa->os, drrw->drr_object,
2427
drrw->drr_offset, FTAG, &dbp);
2428
if (err != 0)
2429
return (err);
2430
2431
/* Try to read the object to see if it needs healing */
2432
err = dbuf_read((dmu_buf_impl_t *)dbp, NULL, flags);
2433
/*
2434
* We only try to heal when dbuf_read() returns a ECKSUMs.
2435
* Other errors (even EIO) get returned to caller.
2436
* EIO indicates that the device is not present/accessible,
2437
* so writing to it will likely fail.
2438
* If the block is healthy, we don't want to overwrite it
2439
* unnecessarily.
2440
*/
2441
if (err != ECKSUM) {
2442
dmu_buf_rele(dbp, FTAG);
2443
return (err);
2444
}
2445
/* Make sure the on-disk block and recv record sizes match */
2446
if (drrw->drr_logical_size != dbp->db_size) {
2447
err = ENOTSUP;
2448
dmu_buf_rele(dbp, FTAG);
2449
return (err);
2450
}
2451
/* Get the block pointer for the corrupted block */
2452
bp = dmu_buf_get_blkptr(dbp);
2453
err = do_corrective_recv(rwa, drrw, rrd, bp);
2454
dmu_buf_rele(dbp, FTAG);
2455
return (err);
2456
}
2457
2458
/*
2459
* For resuming to work, records must be in increasing order
2460
* by (object, offset).
2461
*/
2462
if (drrw->drr_object < rwa->last_object ||
2463
(drrw->drr_object == rwa->last_object &&
2464
drrw->drr_offset < rwa->last_offset)) {
2465
return (SET_ERROR(EINVAL));
2466
}
2467
2468
struct receive_record_arg *first_rrd = list_head(&rwa->write_batch);
2469
struct drr_write *first_drrw = &first_rrd->header.drr_u.drr_write;
2470
uint64_t batch_size =
2471
MIN(zfs_recv_write_batch_size, DMU_MAX_ACCESS / 2);
2472
if (first_rrd != NULL &&
2473
(drrw->drr_object != first_drrw->drr_object ||
2474
drrw->drr_offset >= first_drrw->drr_offset + batch_size)) {
2475
err = flush_write_batch(rwa);
2476
if (err != 0)
2477
return (err);
2478
}
2479
2480
rwa->last_object = drrw->drr_object;
2481
rwa->last_offset = drrw->drr_offset;
2482
2483
if (rwa->last_object > rwa->max_object)
2484
rwa->max_object = rwa->last_object;
2485
2486
list_insert_tail(&rwa->write_batch, rrd);
2487
/*
2488
* Return EAGAIN to indicate that we will use this rrd again,
2489
* so the caller should not free it
2490
*/
2491
return (EAGAIN);
2492
}
2493
2494
static int
2495
receive_write_embedded(struct receive_writer_arg *rwa,
2496
struct drr_write_embedded *drrwe, void *data)
2497
{
2498
dmu_tx_t *tx;
2499
int err;
2500
2501
if (drrwe->drr_offset + drrwe->drr_length < drrwe->drr_offset)
2502
return (SET_ERROR(EINVAL));
2503
2504
if (drrwe->drr_psize > BPE_PAYLOAD_SIZE)
2505
return (SET_ERROR(EINVAL));
2506
2507
if (drrwe->drr_etype >= NUM_BP_EMBEDDED_TYPES)
2508
return (SET_ERROR(EINVAL));
2509
if (drrwe->drr_compression >= ZIO_COMPRESS_FUNCTIONS)
2510
return (SET_ERROR(EINVAL));
2511
if (rwa->raw)
2512
return (SET_ERROR(EINVAL));
2513
2514
if (drrwe->drr_object > rwa->max_object)
2515
rwa->max_object = drrwe->drr_object;
2516
2517
tx = dmu_tx_create(rwa->os);
2518
2519
dmu_tx_hold_write(tx, drrwe->drr_object,
2520
drrwe->drr_offset, drrwe->drr_length);
2521
err = dmu_tx_assign(tx, DMU_TX_WAIT);
2522
if (err != 0) {
2523
dmu_tx_abort(tx);
2524
return (err);
2525
}
2526
2527
dmu_write_embedded(rwa->os, drrwe->drr_object,
2528
drrwe->drr_offset, data, drrwe->drr_etype,
2529
drrwe->drr_compression, drrwe->drr_lsize, drrwe->drr_psize,
2530
rwa->byteswap ^ ZFS_HOST_BYTEORDER, tx);
2531
2532
/* See comment in restore_write. */
2533
save_resume_state(rwa, drrwe->drr_object, drrwe->drr_offset, tx);
2534
dmu_tx_commit(tx);
2535
return (0);
2536
}
2537
2538
static int
2539
receive_spill(struct receive_writer_arg *rwa, struct drr_spill *drrs,
2540
abd_t *abd)
2541
{
2542
dmu_buf_t *db, *db_spill;
2543
int err;
2544
2545
if (drrs->drr_length < SPA_MINBLOCKSIZE ||
2546
drrs->drr_length > spa_maxblocksize(dmu_objset_spa(rwa->os)))
2547
return (SET_ERROR(EINVAL));
2548
2549
/*
2550
* This is an unmodified spill block which was added to the stream
2551
* to resolve an issue with incorrectly removing spill blocks. It
2552
* should be ignored by current versions of the code which support
2553
* the DRR_FLAG_SPILL_BLOCK flag.
2554
*/
2555
if (rwa->spill && DRR_SPILL_IS_UNMODIFIED(drrs->drr_flags)) {
2556
abd_free(abd);
2557
return (0);
2558
}
2559
2560
if (rwa->raw) {
2561
if (!DMU_OT_IS_VALID(drrs->drr_type) ||
2562
drrs->drr_compressiontype >= ZIO_COMPRESS_FUNCTIONS ||
2563
drrs->drr_compressed_size == 0)
2564
return (SET_ERROR(EINVAL));
2565
}
2566
2567
if (dmu_object_info(rwa->os, drrs->drr_object, NULL) != 0)
2568
return (SET_ERROR(EINVAL));
2569
2570
if (drrs->drr_object > rwa->max_object)
2571
rwa->max_object = drrs->drr_object;
2572
2573
VERIFY0(dmu_bonus_hold(rwa->os, drrs->drr_object, FTAG, &db));
2574
if ((err = dmu_spill_hold_by_bonus(db, DMU_READ_NO_DECRYPT |
2575
DB_RF_CANFAIL, FTAG, &db_spill)) != 0) {
2576
dmu_buf_rele(db, FTAG);
2577
return (err);
2578
}
2579
2580
dmu_tx_t *tx = dmu_tx_create(rwa->os);
2581
2582
dmu_tx_hold_spill(tx, db->db_object);
2583
2584
err = dmu_tx_assign(tx, DMU_TX_WAIT);
2585
if (err != 0) {
2586
dmu_buf_rele(db, FTAG);
2587
dmu_buf_rele(db_spill, FTAG);
2588
dmu_tx_abort(tx);
2589
return (err);
2590
}
2591
2592
/*
2593
* Spill blocks may both grow and shrink. When a change in size
2594
* occurs any existing dbuf must be updated to match the logical
2595
* size of the provided arc_buf_t.
2596
*/
2597
if (db_spill->db_size != drrs->drr_length) {
2598
dmu_buf_will_fill(db_spill, tx, B_FALSE);
2599
VERIFY0(dbuf_spill_set_blksz(db_spill,
2600
drrs->drr_length, tx));
2601
}
2602
2603
arc_buf_t *abuf;
2604
if (rwa->raw) {
2605
boolean_t byteorder = ZFS_HOST_BYTEORDER ^
2606
!!DRR_IS_RAW_BYTESWAPPED(drrs->drr_flags) ^
2607
rwa->byteswap;
2608
2609
abuf = arc_loan_raw_buf(dmu_objset_spa(rwa->os),
2610
drrs->drr_object, byteorder, drrs->drr_salt,
2611
drrs->drr_iv, drrs->drr_mac, drrs->drr_type,
2612
drrs->drr_compressed_size, drrs->drr_length,
2613
drrs->drr_compressiontype, 0);
2614
} else {
2615
abuf = arc_loan_buf(dmu_objset_spa(rwa->os),
2616
DMU_OT_IS_METADATA(drrs->drr_type),
2617
drrs->drr_length);
2618
if (rwa->byteswap) {
2619
dmu_object_byteswap_t byteswap =
2620
DMU_OT_BYTESWAP(drrs->drr_type);
2621
dmu_ot_byteswap[byteswap].ob_func(abd_to_buf(abd),
2622
DRR_SPILL_PAYLOAD_SIZE(drrs));
2623
}
2624
}
2625
2626
memcpy(abuf->b_data, abd_to_buf(abd), DRR_SPILL_PAYLOAD_SIZE(drrs));
2627
abd_free(abd);
2628
dbuf_assign_arcbuf((dmu_buf_impl_t *)db_spill, abuf, tx,
2629
DMU_UNCACHEDIO);
2630
2631
dmu_buf_rele(db, FTAG);
2632
dmu_buf_rele(db_spill, FTAG);
2633
2634
dmu_tx_commit(tx);
2635
return (0);
2636
}
2637
2638
noinline static int
2639
receive_free(struct receive_writer_arg *rwa, struct drr_free *drrf)
2640
{
2641
int err;
2642
2643
if (drrf->drr_length != -1ULL &&
2644
drrf->drr_offset + drrf->drr_length < drrf->drr_offset)
2645
return (SET_ERROR(EINVAL));
2646
2647
if (dmu_object_info(rwa->os, drrf->drr_object, NULL) != 0)
2648
return (SET_ERROR(EINVAL));
2649
2650
if (drrf->drr_object > rwa->max_object)
2651
rwa->max_object = drrf->drr_object;
2652
2653
err = dmu_free_long_range(rwa->os, drrf->drr_object,
2654
drrf->drr_offset, drrf->drr_length);
2655
2656
return (err);
2657
}
2658
2659
static int
2660
receive_object_range(struct receive_writer_arg *rwa,
2661
struct drr_object_range *drror)
2662
{
2663
/*
2664
* By default, we assume this block is in our native format
2665
* (ZFS_HOST_BYTEORDER). We then take into account whether
2666
* the send stream is byteswapped (rwa->byteswap). Finally,
2667
* we need to byteswap again if this particular block was
2668
* in non-native format on the send side.
2669
*/
2670
boolean_t byteorder = ZFS_HOST_BYTEORDER ^ rwa->byteswap ^
2671
!!DRR_IS_RAW_BYTESWAPPED(drror->drr_flags);
2672
2673
/*
2674
* Since dnode block sizes are constant, we should not need to worry
2675
* about making sure that the dnode block size is the same on the
2676
* sending and receiving sides for the time being. For non-raw sends,
2677
* this does not matter (and in fact we do not send a DRR_OBJECT_RANGE
2678
* record at all). Raw sends require this record type because the
2679
* encryption parameters are used to protect an entire block of bonus
2680
* buffers. If the size of dnode blocks ever becomes variable,
2681
* handling will need to be added to ensure that dnode block sizes
2682
* match on the sending and receiving side.
2683
*/
2684
if (drror->drr_numslots != DNODES_PER_BLOCK ||
2685
P2PHASE(drror->drr_firstobj, DNODES_PER_BLOCK) != 0 ||
2686
!rwa->raw)
2687
return (SET_ERROR(EINVAL));
2688
2689
if (drror->drr_firstobj > rwa->max_object)
2690
rwa->max_object = drror->drr_firstobj;
2691
2692
/*
2693
* The DRR_OBJECT_RANGE handling must be deferred to receive_object()
2694
* so that the block of dnodes is not written out when it's empty,
2695
* and converted to a HOLE BP.
2696
*/
2697
rwa->or_crypt_params_present = B_TRUE;
2698
rwa->or_firstobj = drror->drr_firstobj;
2699
rwa->or_numslots = drror->drr_numslots;
2700
memcpy(rwa->or_salt, drror->drr_salt, ZIO_DATA_SALT_LEN);
2701
memcpy(rwa->or_iv, drror->drr_iv, ZIO_DATA_IV_LEN);
2702
memcpy(rwa->or_mac, drror->drr_mac, ZIO_DATA_MAC_LEN);
2703
rwa->or_byteorder = byteorder;
2704
2705
rwa->or_need_sync = ORNS_MAYBE;
2706
2707
return (0);
2708
}
2709
2710
/*
2711
* Until we have the ability to redact large ranges of data efficiently, we
2712
* process these records as frees.
2713
*/
2714
noinline static int
2715
receive_redact(struct receive_writer_arg *rwa, struct drr_redact *drrr)
2716
{
2717
struct drr_free drrf = {0};
2718
drrf.drr_length = drrr->drr_length;
2719
drrf.drr_object = drrr->drr_object;
2720
drrf.drr_offset = drrr->drr_offset;
2721
drrf.drr_toguid = drrr->drr_toguid;
2722
return (receive_free(rwa, &drrf));
2723
}
2724
2725
/* used to destroy the drc_ds on error */
2726
static void
2727
dmu_recv_cleanup_ds(dmu_recv_cookie_t *drc)
2728
{
2729
dsl_dataset_t *ds = drc->drc_ds;
2730
ds_hold_flags_t dsflags;
2731
2732
dsflags = (drc->drc_raw) ? DS_HOLD_FLAG_NONE : DS_HOLD_FLAG_DECRYPT;
2733
/*
2734
* Wait for the txg sync before cleaning up the receive. For
2735
* resumable receives, this ensures that our resume state has
2736
* been written out to disk. For raw receives, this ensures
2737
* that the user accounting code will not attempt to do anything
2738
* after we stopped receiving the dataset.
2739
*/
2740
txg_wait_synced(ds->ds_dir->dd_pool, 0);
2741
ds->ds_objset->os_raw_receive = B_FALSE;
2742
2743
rrw_enter(&ds->ds_bp_rwlock, RW_READER, FTAG);
2744
if (drc->drc_resumable && drc->drc_should_save &&
2745
!BP_IS_HOLE(dsl_dataset_get_blkptr(ds))) {
2746
rrw_exit(&ds->ds_bp_rwlock, FTAG);
2747
dsl_dataset_disown(ds, dsflags, dmu_recv_tag);
2748
} else {
2749
char name[ZFS_MAX_DATASET_NAME_LEN];
2750
rrw_exit(&ds->ds_bp_rwlock, FTAG);
2751
dsl_dataset_name(ds, name);
2752
dsl_dataset_disown(ds, dsflags, dmu_recv_tag);
2753
if (!drc->drc_heal)
2754
(void) dsl_destroy_head(name);
2755
}
2756
}
2757
2758
static void
2759
receive_cksum(dmu_recv_cookie_t *drc, int len, void *buf)
2760
{
2761
if (drc->drc_byteswap) {
2762
(void) fletcher_4_incremental_byteswap(buf, len,
2763
&drc->drc_cksum);
2764
} else {
2765
(void) fletcher_4_incremental_native(buf, len, &drc->drc_cksum);
2766
}
2767
}
2768
2769
/*
2770
* Read the payload into a buffer of size len, and update the current record's
2771
* payload field.
2772
* Allocate drc->drc_next_rrd and read the next record's header into
2773
* drc->drc_next_rrd->header.
2774
* Verify checksum of payload and next record.
2775
*/
2776
static int
2777
receive_read_payload_and_next_header(dmu_recv_cookie_t *drc, int len, void *buf)
2778
{
2779
int err;
2780
2781
if (len != 0) {
2782
ASSERT3U(len, <=, SPA_MAXBLOCKSIZE);
2783
err = receive_read(drc, len, buf);
2784
if (err != 0)
2785
return (err);
2786
receive_cksum(drc, len, buf);
2787
2788
/* note: rrd is NULL when reading the begin record's payload */
2789
if (drc->drc_rrd != NULL) {
2790
drc->drc_rrd->payload = buf;
2791
drc->drc_rrd->payload_size = len;
2792
drc->drc_rrd->bytes_read = drc->drc_bytes_read;
2793
}
2794
} else {
2795
ASSERT0P(buf);
2796
}
2797
2798
drc->drc_prev_cksum = drc->drc_cksum;
2799
2800
drc->drc_next_rrd = kmem_zalloc(sizeof (*drc->drc_next_rrd), KM_SLEEP);
2801
err = receive_read(drc, sizeof (drc->drc_next_rrd->header),
2802
&drc->drc_next_rrd->header);
2803
drc->drc_next_rrd->bytes_read = drc->drc_bytes_read;
2804
2805
if (err != 0) {
2806
kmem_free(drc->drc_next_rrd, sizeof (*drc->drc_next_rrd));
2807
drc->drc_next_rrd = NULL;
2808
return (err);
2809
}
2810
if (drc->drc_next_rrd->header.drr_type == DRR_BEGIN) {
2811
kmem_free(drc->drc_next_rrd, sizeof (*drc->drc_next_rrd));
2812
drc->drc_next_rrd = NULL;
2813
return (SET_ERROR(EINVAL));
2814
}
2815
2816
/*
2817
* Note: checksum is of everything up to but not including the
2818
* checksum itself.
2819
*/
2820
ASSERT3U(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
2821
==, sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t));
2822
receive_cksum(drc,
2823
offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
2824
&drc->drc_next_rrd->header);
2825
2826
zio_cksum_t cksum_orig =
2827
drc->drc_next_rrd->header.drr_u.drr_checksum.drr_checksum;
2828
zio_cksum_t *cksump =
2829
&drc->drc_next_rrd->header.drr_u.drr_checksum.drr_checksum;
2830
2831
if (drc->drc_byteswap)
2832
byteswap_record(&drc->drc_next_rrd->header);
2833
2834
if ((!ZIO_CHECKSUM_IS_ZERO(cksump)) &&
2835
!ZIO_CHECKSUM_EQUAL(drc->drc_cksum, *cksump)) {
2836
kmem_free(drc->drc_next_rrd, sizeof (*drc->drc_next_rrd));
2837
drc->drc_next_rrd = NULL;
2838
return (SET_ERROR(ECKSUM));
2839
}
2840
2841
receive_cksum(drc, sizeof (cksum_orig), &cksum_orig);
2842
2843
return (0);
2844
}
2845
2846
/*
2847
* Issue the prefetch reads for any necessary indirect blocks.
2848
*
2849
* We use the object ignore list to tell us whether or not to issue prefetches
2850
* for a given object. We do this for both correctness (in case the blocksize
2851
* of an object has changed) and performance (if the object doesn't exist, don't
2852
* needlessly try to issue prefetches). We also trim the list as we go through
2853
* the stream to prevent it from growing to an unbounded size.
2854
*
2855
* The object numbers within will always be in sorted order, and any write
2856
* records we see will also be in sorted order, but they're not sorted with
2857
* respect to each other (i.e. we can get several object records before
2858
* receiving each object's write records). As a result, once we've reached a
2859
* given object number, we can safely remove any reference to lower object
2860
* numbers in the ignore list. In practice, we receive up to 32 object records
2861
* before receiving write records, so the list can have up to 32 nodes in it.
2862
*/
2863
static void
2864
receive_read_prefetch(dmu_recv_cookie_t *drc, uint64_t object, uint64_t offset,
2865
uint64_t length)
2866
{
2867
if (!objlist_exists(drc->drc_ignore_objlist, object)) {
2868
dmu_prefetch(drc->drc_os, object, 1, offset, length,
2869
ZIO_PRIORITY_SYNC_READ);
2870
}
2871
}
2872
2873
/*
2874
* Read records off the stream, issuing any necessary prefetches.
2875
*/
2876
static int
2877
receive_read_record(dmu_recv_cookie_t *drc)
2878
{
2879
int err;
2880
2881
switch (drc->drc_rrd->header.drr_type) {
2882
case DRR_OBJECT:
2883
{
2884
struct drr_object *drro =
2885
&drc->drc_rrd->header.drr_u.drr_object;
2886
uint32_t size = DRR_OBJECT_PAYLOAD_SIZE(drro);
2887
void *buf = NULL;
2888
dmu_object_info_t doi;
2889
2890
if (size != 0)
2891
buf = kmem_zalloc(size, KM_SLEEP);
2892
2893
err = receive_read_payload_and_next_header(drc, size, buf);
2894
if (err != 0) {
2895
kmem_free(buf, size);
2896
return (err);
2897
}
2898
err = dmu_object_info(drc->drc_os, drro->drr_object, &doi);
2899
/*
2900
* See receive_read_prefetch for an explanation why we're
2901
* storing this object in the ignore_obj_list.
2902
*/
2903
if (err == ENOENT || err == EEXIST ||
2904
(err == 0 && doi.doi_data_block_size != drro->drr_blksz)) {
2905
objlist_insert(drc->drc_ignore_objlist,
2906
drro->drr_object);
2907
err = 0;
2908
}
2909
return (err);
2910
}
2911
case DRR_FREEOBJECTS:
2912
{
2913
err = receive_read_payload_and_next_header(drc, 0, NULL);
2914
return (err);
2915
}
2916
case DRR_WRITE:
2917
{
2918
struct drr_write *drrw = &drc->drc_rrd->header.drr_u.drr_write;
2919
int size = DRR_WRITE_PAYLOAD_SIZE(drrw);
2920
abd_t *abd = abd_alloc_linear(size, B_FALSE);
2921
err = receive_read_payload_and_next_header(drc, size,
2922
abd_to_buf(abd));
2923
if (err != 0) {
2924
abd_free(abd);
2925
return (err);
2926
}
2927
drc->drc_rrd->abd = abd;
2928
receive_read_prefetch(drc, drrw->drr_object, drrw->drr_offset,
2929
drrw->drr_logical_size);
2930
return (err);
2931
}
2932
case DRR_WRITE_EMBEDDED:
2933
{
2934
struct drr_write_embedded *drrwe =
2935
&drc->drc_rrd->header.drr_u.drr_write_embedded;
2936
uint32_t size = P2ROUNDUP(drrwe->drr_psize, 8);
2937
void *buf = kmem_zalloc(size, KM_SLEEP);
2938
2939
err = receive_read_payload_and_next_header(drc, size, buf);
2940
if (err != 0) {
2941
kmem_free(buf, size);
2942
return (err);
2943
}
2944
2945
receive_read_prefetch(drc, drrwe->drr_object, drrwe->drr_offset,
2946
drrwe->drr_length);
2947
return (err);
2948
}
2949
case DRR_FREE:
2950
case DRR_REDACT:
2951
{
2952
/*
2953
* It might be beneficial to prefetch indirect blocks here, but
2954
* we don't really have the data to decide for sure.
2955
*/
2956
err = receive_read_payload_and_next_header(drc, 0, NULL);
2957
return (err);
2958
}
2959
case DRR_END:
2960
{
2961
struct drr_end *drre = &drc->drc_rrd->header.drr_u.drr_end;
2962
if (!ZIO_CHECKSUM_EQUAL(drc->drc_prev_cksum,
2963
drre->drr_checksum))
2964
return (SET_ERROR(ECKSUM));
2965
return (0);
2966
}
2967
case DRR_SPILL:
2968
{
2969
struct drr_spill *drrs = &drc->drc_rrd->header.drr_u.drr_spill;
2970
int size = DRR_SPILL_PAYLOAD_SIZE(drrs);
2971
abd_t *abd = abd_alloc_linear(size, B_FALSE);
2972
err = receive_read_payload_and_next_header(drc, size,
2973
abd_to_buf(abd));
2974
if (err != 0)
2975
abd_free(abd);
2976
else
2977
drc->drc_rrd->abd = abd;
2978
return (err);
2979
}
2980
case DRR_OBJECT_RANGE:
2981
{
2982
err = receive_read_payload_and_next_header(drc, 0, NULL);
2983
return (err);
2984
2985
}
2986
default:
2987
return (SET_ERROR(EINVAL));
2988
}
2989
}
2990
2991
2992
2993
static void
2994
dprintf_drr(struct receive_record_arg *rrd, int err)
2995
{
2996
#ifdef ZFS_DEBUG
2997
switch (rrd->header.drr_type) {
2998
case DRR_OBJECT:
2999
{
3000
struct drr_object *drro = &rrd->header.drr_u.drr_object;
3001
dprintf("drr_type = OBJECT obj = %llu type = %u "
3002
"bonustype = %u blksz = %u bonuslen = %u cksumtype = %u "
3003
"compress = %u dn_slots = %u err = %d\n",
3004
(u_longlong_t)drro->drr_object, drro->drr_type,
3005
drro->drr_bonustype, drro->drr_blksz, drro->drr_bonuslen,
3006
drro->drr_checksumtype, drro->drr_compress,
3007
drro->drr_dn_slots, err);
3008
break;
3009
}
3010
case DRR_FREEOBJECTS:
3011
{
3012
struct drr_freeobjects *drrfo =
3013
&rrd->header.drr_u.drr_freeobjects;
3014
dprintf("drr_type = FREEOBJECTS firstobj = %llu "
3015
"numobjs = %llu err = %d\n",
3016
(u_longlong_t)drrfo->drr_firstobj,
3017
(u_longlong_t)drrfo->drr_numobjs, err);
3018
break;
3019
}
3020
case DRR_WRITE:
3021
{
3022
struct drr_write *drrw = &rrd->header.drr_u.drr_write;
3023
dprintf("drr_type = WRITE obj = %llu type = %u offset = %llu "
3024
"lsize = %llu cksumtype = %u flags = %u "
3025
"compress = %u psize = %llu err = %d\n",
3026
(u_longlong_t)drrw->drr_object, drrw->drr_type,
3027
(u_longlong_t)drrw->drr_offset,
3028
(u_longlong_t)drrw->drr_logical_size,
3029
drrw->drr_checksumtype, drrw->drr_flags,
3030
drrw->drr_compressiontype,
3031
(u_longlong_t)drrw->drr_compressed_size, err);
3032
break;
3033
}
3034
case DRR_WRITE_BYREF:
3035
{
3036
struct drr_write_byref *drrwbr =
3037
&rrd->header.drr_u.drr_write_byref;
3038
dprintf("drr_type = WRITE_BYREF obj = %llu offset = %llu "
3039
"length = %llu toguid = %llx refguid = %llx "
3040
"refobject = %llu refoffset = %llu cksumtype = %u "
3041
"flags = %u err = %d\n",
3042
(u_longlong_t)drrwbr->drr_object,
3043
(u_longlong_t)drrwbr->drr_offset,
3044
(u_longlong_t)drrwbr->drr_length,
3045
(u_longlong_t)drrwbr->drr_toguid,
3046
(u_longlong_t)drrwbr->drr_refguid,
3047
(u_longlong_t)drrwbr->drr_refobject,
3048
(u_longlong_t)drrwbr->drr_refoffset,
3049
drrwbr->drr_checksumtype, drrwbr->drr_flags, err);
3050
break;
3051
}
3052
case DRR_WRITE_EMBEDDED:
3053
{
3054
struct drr_write_embedded *drrwe =
3055
&rrd->header.drr_u.drr_write_embedded;
3056
dprintf("drr_type = WRITE_EMBEDDED obj = %llu offset = %llu "
3057
"length = %llu compress = %u etype = %u lsize = %u "
3058
"psize = %u err = %d\n",
3059
(u_longlong_t)drrwe->drr_object,
3060
(u_longlong_t)drrwe->drr_offset,
3061
(u_longlong_t)drrwe->drr_length,
3062
drrwe->drr_compression, drrwe->drr_etype,
3063
drrwe->drr_lsize, drrwe->drr_psize, err);
3064
break;
3065
}
3066
case DRR_FREE:
3067
{
3068
struct drr_free *drrf = &rrd->header.drr_u.drr_free;
3069
dprintf("drr_type = FREE obj = %llu offset = %llu "
3070
"length = %lld err = %d\n",
3071
(u_longlong_t)drrf->drr_object,
3072
(u_longlong_t)drrf->drr_offset,
3073
(longlong_t)drrf->drr_length,
3074
err);
3075
break;
3076
}
3077
case DRR_SPILL:
3078
{
3079
struct drr_spill *drrs = &rrd->header.drr_u.drr_spill;
3080
dprintf("drr_type = SPILL obj = %llu length = %llu "
3081
"err = %d\n", (u_longlong_t)drrs->drr_object,
3082
(u_longlong_t)drrs->drr_length, err);
3083
break;
3084
}
3085
case DRR_OBJECT_RANGE:
3086
{
3087
struct drr_object_range *drror =
3088
&rrd->header.drr_u.drr_object_range;
3089
dprintf("drr_type = OBJECT_RANGE firstobj = %llu "
3090
"numslots = %llu flags = %u err = %d\n",
3091
(u_longlong_t)drror->drr_firstobj,
3092
(u_longlong_t)drror->drr_numslots,
3093
drror->drr_flags, err);
3094
break;
3095
}
3096
default:
3097
return;
3098
}
3099
#endif
3100
}
3101
3102
/*
3103
* Commit the records to the pool.
3104
*/
3105
static int
3106
receive_process_record(struct receive_writer_arg *rwa,
3107
struct receive_record_arg *rrd)
3108
{
3109
int err;
3110
3111
/* Processing in order, therefore bytes_read should be increasing. */
3112
ASSERT3U(rrd->bytes_read, >=, rwa->bytes_read);
3113
rwa->bytes_read = rrd->bytes_read;
3114
3115
/* We can only heal write records; other ones get ignored */
3116
if (rwa->heal && rrd->header.drr_type != DRR_WRITE) {
3117
if (rrd->abd != NULL) {
3118
abd_free(rrd->abd);
3119
rrd->abd = NULL;
3120
} else if (rrd->payload != NULL) {
3121
kmem_free(rrd->payload, rrd->payload_size);
3122
rrd->payload = NULL;
3123
}
3124
return (0);
3125
}
3126
3127
if (!rwa->heal && rrd->header.drr_type != DRR_WRITE) {
3128
err = flush_write_batch(rwa);
3129
if (err != 0) {
3130
if (rrd->abd != NULL) {
3131
abd_free(rrd->abd);
3132
rrd->abd = NULL;
3133
rrd->payload = NULL;
3134
} else if (rrd->payload != NULL) {
3135
kmem_free(rrd->payload, rrd->payload_size);
3136
rrd->payload = NULL;
3137
}
3138
3139
return (err);
3140
}
3141
}
3142
3143
switch (rrd->header.drr_type) {
3144
case DRR_OBJECT:
3145
{
3146
struct drr_object *drro = &rrd->header.drr_u.drr_object;
3147
err = receive_object(rwa, drro, rrd->payload);
3148
kmem_free(rrd->payload, rrd->payload_size);
3149
rrd->payload = NULL;
3150
break;
3151
}
3152
case DRR_FREEOBJECTS:
3153
{
3154
struct drr_freeobjects *drrfo =
3155
&rrd->header.drr_u.drr_freeobjects;
3156
err = receive_freeobjects(rwa, drrfo);
3157
break;
3158
}
3159
case DRR_WRITE:
3160
{
3161
err = receive_process_write_record(rwa, rrd);
3162
if (rwa->heal) {
3163
/*
3164
* If healing - always free the abd after processing
3165
*/
3166
abd_free(rrd->abd);
3167
rrd->abd = NULL;
3168
} else if (err != EAGAIN) {
3169
/*
3170
* On success, a non-healing
3171
* receive_process_write_record() returns
3172
* EAGAIN to indicate that we do not want to free
3173
* the rrd or arc_buf.
3174
*/
3175
ASSERT(err != 0);
3176
abd_free(rrd->abd);
3177
rrd->abd = NULL;
3178
}
3179
break;
3180
}
3181
case DRR_WRITE_EMBEDDED:
3182
{
3183
struct drr_write_embedded *drrwe =
3184
&rrd->header.drr_u.drr_write_embedded;
3185
err = receive_write_embedded(rwa, drrwe, rrd->payload);
3186
kmem_free(rrd->payload, rrd->payload_size);
3187
rrd->payload = NULL;
3188
break;
3189
}
3190
case DRR_FREE:
3191
{
3192
struct drr_free *drrf = &rrd->header.drr_u.drr_free;
3193
err = receive_free(rwa, drrf);
3194
break;
3195
}
3196
case DRR_SPILL:
3197
{
3198
struct drr_spill *drrs = &rrd->header.drr_u.drr_spill;
3199
err = receive_spill(rwa, drrs, rrd->abd);
3200
if (err != 0)
3201
abd_free(rrd->abd);
3202
rrd->abd = NULL;
3203
rrd->payload = NULL;
3204
break;
3205
}
3206
case DRR_OBJECT_RANGE:
3207
{
3208
struct drr_object_range *drror =
3209
&rrd->header.drr_u.drr_object_range;
3210
err = receive_object_range(rwa, drror);
3211
break;
3212
}
3213
case DRR_REDACT:
3214
{
3215
struct drr_redact *drrr = &rrd->header.drr_u.drr_redact;
3216
err = receive_redact(rwa, drrr);
3217
break;
3218
}
3219
default:
3220
err = (SET_ERROR(EINVAL));
3221
}
3222
3223
if (err != 0)
3224
dprintf_drr(rrd, err);
3225
3226
return (err);
3227
}
3228
3229
/*
3230
* dmu_recv_stream's worker thread; pull records off the queue, and then call
3231
* receive_process_record When we're done, signal the main thread and exit.
3232
*/
3233
static __attribute__((noreturn)) void
3234
receive_writer_thread(void *arg)
3235
{
3236
struct receive_writer_arg *rwa = arg;
3237
struct receive_record_arg *rrd;
3238
fstrans_cookie_t cookie = spl_fstrans_mark();
3239
3240
for (rrd = bqueue_dequeue(&rwa->q); !rrd->eos_marker;
3241
rrd = bqueue_dequeue(&rwa->q)) {
3242
/*
3243
* If there's an error, the main thread will stop putting things
3244
* on the queue, but we need to clear everything in it before we
3245
* can exit.
3246
*/
3247
int err = 0;
3248
if (rwa->err == 0) {
3249
err = receive_process_record(rwa, rrd);
3250
} else if (rrd->abd != NULL) {
3251
abd_free(rrd->abd);
3252
rrd->abd = NULL;
3253
rrd->payload = NULL;
3254
} else if (rrd->payload != NULL) {
3255
kmem_free(rrd->payload, rrd->payload_size);
3256
rrd->payload = NULL;
3257
}
3258
/*
3259
* EAGAIN indicates that this record has been saved (on
3260
* raw->write_batch), and will be used again, so we don't
3261
* free it.
3262
* When healing data we always need to free the record.
3263
*/
3264
if (err != EAGAIN || rwa->heal) {
3265
if (rwa->err == 0)
3266
rwa->err = err;
3267
kmem_free(rrd, sizeof (*rrd));
3268
}
3269
}
3270
kmem_free(rrd, sizeof (*rrd));
3271
3272
if (rwa->heal) {
3273
zio_wait(rwa->heal_pio);
3274
} else {
3275
int err = flush_write_batch(rwa);
3276
if (rwa->err == 0)
3277
rwa->err = err;
3278
}
3279
mutex_enter(&rwa->mutex);
3280
rwa->done = B_TRUE;
3281
cv_signal(&rwa->cv);
3282
mutex_exit(&rwa->mutex);
3283
spl_fstrans_unmark(cookie);
3284
thread_exit();
3285
}
3286
3287
static int
3288
resume_check(dmu_recv_cookie_t *drc, nvlist_t *begin_nvl)
3289
{
3290
uint64_t val;
3291
objset_t *mos = dmu_objset_pool(drc->drc_os)->dp_meta_objset;
3292
uint64_t dsobj = dmu_objset_id(drc->drc_os);
3293
uint64_t resume_obj, resume_off;
3294
3295
if (nvlist_lookup_uint64(begin_nvl,
3296
"resume_object", &resume_obj) != 0 ||
3297
nvlist_lookup_uint64(begin_nvl,
3298
"resume_offset", &resume_off) != 0) {
3299
return (SET_ERROR(EINVAL));
3300
}
3301
VERIFY0(zap_lookup(mos, dsobj,
3302
DS_FIELD_RESUME_OBJECT, sizeof (val), 1, &val));
3303
if (resume_obj != val)
3304
return (SET_ERROR(EINVAL));
3305
VERIFY0(zap_lookup(mos, dsobj,
3306
DS_FIELD_RESUME_OFFSET, sizeof (val), 1, &val));
3307
if (resume_off != val)
3308
return (SET_ERROR(EINVAL));
3309
3310
return (0);
3311
}
3312
3313
/*
3314
* Read in the stream's records, one by one, and apply them to the pool. There
3315
* are two threads involved; the thread that calls this function will spin up a
3316
* worker thread, read the records off the stream one by one, and issue
3317
* prefetches for any necessary indirect blocks. It will then push the records
3318
* onto an internal blocking queue. The worker thread will pull the records off
3319
* the queue, and actually write the data into the DMU. This way, the worker
3320
* thread doesn't have to wait for reads to complete, since everything it needs
3321
* (the indirect blocks) will be prefetched.
3322
*
3323
* NB: callers *must* call dmu_recv_end() if this succeeds.
3324
*/
3325
int
3326
dmu_recv_stream(dmu_recv_cookie_t *drc, offset_t *voffp)
3327
{
3328
int err = 0;
3329
struct receive_writer_arg *rwa = kmem_zalloc(sizeof (*rwa), KM_SLEEP);
3330
3331
if (dsl_dataset_has_resume_receive_state(drc->drc_ds)) {
3332
uint64_t bytes = 0;
3333
(void) zap_lookup(drc->drc_ds->ds_dir->dd_pool->dp_meta_objset,
3334
drc->drc_ds->ds_object, DS_FIELD_RESUME_BYTES,
3335
sizeof (bytes), 1, &bytes);
3336
drc->drc_bytes_read += bytes;
3337
}
3338
3339
drc->drc_ignore_objlist = objlist_create();
3340
3341
/* these were verified in dmu_recv_begin */
3342
ASSERT3U(DMU_GET_STREAM_HDRTYPE(drc->drc_drrb->drr_versioninfo), ==,
3343
DMU_SUBSTREAM);
3344
ASSERT3U(drc->drc_drrb->drr_type, <, DMU_OST_NUMTYPES);
3345
3346
ASSERT(dsl_dataset_phys(drc->drc_ds)->ds_flags & DS_FLAG_INCONSISTENT);
3347
ASSERT0(drc->drc_os->os_encrypted &&
3348
(drc->drc_featureflags & DMU_BACKUP_FEATURE_EMBED_DATA));
3349
3350
/* handle DSL encryption key payload */
3351
if (drc->drc_featureflags & DMU_BACKUP_FEATURE_RAW) {
3352
nvlist_t *keynvl = NULL;
3353
3354
ASSERT(drc->drc_os->os_encrypted);
3355
ASSERT(drc->drc_raw);
3356
3357
err = nvlist_lookup_nvlist(drc->drc_begin_nvl, "crypt_keydata",
3358
&keynvl);
3359
if (err != 0)
3360
goto out;
3361
3362
if (!drc->drc_heal) {
3363
/*
3364
* If this is a new dataset we set the key immediately.
3365
* Otherwise we don't want to change the key until we
3366
* are sure the rest of the receive succeeded so we
3367
* stash the keynvl away until then.
3368
*/
3369
err = dsl_crypto_recv_raw(spa_name(drc->drc_os->os_spa),
3370
drc->drc_ds->ds_object, drc->drc_fromsnapobj,
3371
drc->drc_drrb->drr_type, keynvl, drc->drc_newfs);
3372
if (err != 0)
3373
goto out;
3374
}
3375
3376
/* see comment in dmu_recv_end_sync() */
3377
drc->drc_ivset_guid = 0;
3378
(void) nvlist_lookup_uint64(keynvl, "to_ivset_guid",
3379
&drc->drc_ivset_guid);
3380
3381
if (!drc->drc_newfs)
3382
drc->drc_keynvl = fnvlist_dup(keynvl);
3383
}
3384
3385
if (drc->drc_featureflags & DMU_BACKUP_FEATURE_RESUMING) {
3386
err = resume_check(drc, drc->drc_begin_nvl);
3387
if (err != 0)
3388
goto out;
3389
}
3390
3391
/*
3392
* For compatibility with recursive send streams, we do this here,
3393
* rather than in dmu_recv_begin. If we pull the next header too
3394
* early, and it's the END record, we break the `recv_skip` logic.
3395
*/
3396
if (drc->drc_drr_begin->drr_payloadlen == 0) {
3397
err = receive_read_payload_and_next_header(drc, 0, NULL);
3398
if (err != 0)
3399
goto out;
3400
}
3401
3402
/*
3403
* If we failed before this point we will clean up any new resume
3404
* state that was created. Now that we've gotten past the initial
3405
* checks we are ok to retain that resume state.
3406
*/
3407
drc->drc_should_save = B_TRUE;
3408
3409
(void) bqueue_init(&rwa->q, zfs_recv_queue_ff,
3410
MAX(zfs_recv_queue_length, 2 * zfs_max_recordsize),
3411
offsetof(struct receive_record_arg, node));
3412
cv_init(&rwa->cv, NULL, CV_DEFAULT, NULL);
3413
mutex_init(&rwa->mutex, NULL, MUTEX_DEFAULT, NULL);
3414
rwa->os = drc->drc_os;
3415
rwa->byteswap = drc->drc_byteswap;
3416
rwa->heal = drc->drc_heal;
3417
rwa->tofs = drc->drc_tofs;
3418
rwa->resumable = drc->drc_resumable;
3419
rwa->raw = drc->drc_raw;
3420
rwa->spill = drc->drc_spill;
3421
rwa->full = (drc->drc_drr_begin->drr_u.drr_begin.drr_fromguid == 0);
3422
rwa->os->os_raw_receive = drc->drc_raw;
3423
if (drc->drc_heal) {
3424
rwa->heal_pio = zio_root(drc->drc_os->os_spa, NULL, NULL,
3425
ZIO_FLAG_GODFATHER);
3426
}
3427
list_create(&rwa->write_batch, sizeof (struct receive_record_arg),
3428
offsetof(struct receive_record_arg, node.bqn_node));
3429
3430
(void) thread_create(NULL, 0, receive_writer_thread, rwa, 0, curproc,
3431
TS_RUN, minclsyspri);
3432
/*
3433
* We're reading rwa->err without locks, which is safe since we are the
3434
* only reader, and the worker thread is the only writer. It's ok if we
3435
* miss a write for an iteration or two of the loop, since the writer
3436
* thread will keep freeing records we send it until we send it an eos
3437
* marker.
3438
*
3439
* We can leave this loop in 3 ways: First, if rwa->err is
3440
* non-zero. In that case, the writer thread will free the rrd we just
3441
* pushed. Second, if we're interrupted; in that case, either it's the
3442
* first loop and drc->drc_rrd was never allocated, or it's later, and
3443
* drc->drc_rrd has been handed off to the writer thread who will free
3444
* it. Finally, if receive_read_record fails or we're at the end of the
3445
* stream, then we free drc->drc_rrd and exit.
3446
*/
3447
while (rwa->err == 0) {
3448
if (issig()) {
3449
err = SET_ERROR(EINTR);
3450
break;
3451
}
3452
3453
ASSERT0P(drc->drc_rrd);
3454
drc->drc_rrd = drc->drc_next_rrd;
3455
drc->drc_next_rrd = NULL;
3456
/* Allocates and loads header into drc->drc_next_rrd */
3457
err = receive_read_record(drc);
3458
3459
if (drc->drc_rrd->header.drr_type == DRR_END || err != 0) {
3460
kmem_free(drc->drc_rrd, sizeof (*drc->drc_rrd));
3461
drc->drc_rrd = NULL;
3462
break;
3463
}
3464
3465
bqueue_enqueue(&rwa->q, drc->drc_rrd,
3466
sizeof (struct receive_record_arg) +
3467
drc->drc_rrd->payload_size);
3468
drc->drc_rrd = NULL;
3469
}
3470
3471
ASSERT0P(drc->drc_rrd);
3472
drc->drc_rrd = kmem_zalloc(sizeof (*drc->drc_rrd), KM_SLEEP);
3473
drc->drc_rrd->eos_marker = B_TRUE;
3474
bqueue_enqueue_flush(&rwa->q, drc->drc_rrd, 1);
3475
3476
mutex_enter(&rwa->mutex);
3477
while (!rwa->done) {
3478
/*
3479
* We need to use cv_wait_sig() so that any process that may
3480
* be sleeping here can still fork.
3481
*/
3482
(void) cv_wait_sig(&rwa->cv, &rwa->mutex);
3483
}
3484
mutex_exit(&rwa->mutex);
3485
3486
/*
3487
* If we are receiving a full stream as a clone, all object IDs which
3488
* are greater than the maximum ID referenced in the stream are
3489
* by definition unused and must be freed.
3490
*/
3491
if (drc->drc_clone && drc->drc_drrb->drr_fromguid == 0) {
3492
uint64_t obj = rwa->max_object + 1;
3493
int free_err = 0;
3494
int next_err = 0;
3495
3496
while (next_err == 0) {
3497
free_err = dmu_free_long_object(rwa->os, obj);
3498
if (free_err != 0 && free_err != ENOENT)
3499
break;
3500
3501
next_err = dmu_object_next(rwa->os, &obj, FALSE, 0);
3502
}
3503
3504
if (err == 0) {
3505
if (free_err != 0 && free_err != ENOENT)
3506
err = free_err;
3507
else if (next_err != ESRCH)
3508
err = next_err;
3509
}
3510
}
3511
3512
cv_destroy(&rwa->cv);
3513
mutex_destroy(&rwa->mutex);
3514
bqueue_destroy(&rwa->q);
3515
list_destroy(&rwa->write_batch);
3516
if (err == 0)
3517
err = rwa->err;
3518
3519
out:
3520
/*
3521
* If we hit an error before we started the receive_writer_thread
3522
* we need to clean up the next_rrd we create by processing the
3523
* DRR_BEGIN record.
3524
*/
3525
if (drc->drc_next_rrd != NULL)
3526
kmem_free(drc->drc_next_rrd, sizeof (*drc->drc_next_rrd));
3527
3528
/*
3529
* The objset will be invalidated by dmu_recv_end() when we do
3530
* dsl_dataset_clone_swap_sync_impl().
3531
*/
3532
drc->drc_os = NULL;
3533
3534
kmem_free(rwa, sizeof (*rwa));
3535
nvlist_free(drc->drc_begin_nvl);
3536
3537
if (err != 0) {
3538
/*
3539
* Clean up references. If receive is not resumable,
3540
* destroy what we created, so we don't leave it in
3541
* the inconsistent state.
3542
*/
3543
dmu_recv_cleanup_ds(drc);
3544
nvlist_free(drc->drc_keynvl);
3545
crfree(drc->drc_cred);
3546
drc->drc_cred = NULL;
3547
}
3548
3549
objlist_destroy(drc->drc_ignore_objlist);
3550
drc->drc_ignore_objlist = NULL;
3551
*voffp = drc->drc_voff;
3552
return (err);
3553
}
3554
3555
static int
3556
dmu_recv_end_check(void *arg, dmu_tx_t *tx)
3557
{
3558
dmu_recv_cookie_t *drc = arg;
3559
dsl_pool_t *dp = dmu_tx_pool(tx);
3560
int error;
3561
3562
ASSERT3P(drc->drc_ds->ds_owner, ==, dmu_recv_tag);
3563
3564
if (drc->drc_heal) {
3565
error = 0;
3566
} else if (!drc->drc_newfs) {
3567
dsl_dataset_t *origin_head;
3568
3569
error = dsl_dataset_hold(dp, drc->drc_tofs, FTAG, &origin_head);
3570
if (error != 0)
3571
return (error);
3572
if (drc->drc_force) {
3573
/*
3574
* We will destroy any snapshots in tofs (i.e. before
3575
* origin_head) that are after the origin (which is
3576
* the snap before drc_ds, because drc_ds can not
3577
* have any snaps of its own).
3578
*/
3579
uint64_t obj;
3580
3581
obj = dsl_dataset_phys(origin_head)->ds_prev_snap_obj;
3582
while (obj !=
3583
dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj) {
3584
dsl_dataset_t *snap;
3585
error = dsl_dataset_hold_obj(dp, obj, FTAG,
3586
&snap);
3587
if (error != 0)
3588
break;
3589
if (snap->ds_dir != origin_head->ds_dir)
3590
error = SET_ERROR(EINVAL);
3591
if (error == 0) {
3592
error = dsl_destroy_snapshot_check_impl(
3593
snap, B_FALSE);
3594
}
3595
obj = dsl_dataset_phys(snap)->ds_prev_snap_obj;
3596
dsl_dataset_rele(snap, FTAG);
3597
if (error != 0)
3598
break;
3599
}
3600
if (error != 0) {
3601
dsl_dataset_rele(origin_head, FTAG);
3602
return (error);
3603
}
3604
}
3605
if (drc->drc_keynvl != NULL) {
3606
error = dsl_crypto_recv_raw_key_check(drc->drc_ds,
3607
drc->drc_keynvl, tx);
3608
if (error != 0) {
3609
dsl_dataset_rele(origin_head, FTAG);
3610
return (error);
3611
}
3612
}
3613
3614
error = dsl_dataset_clone_swap_check_impl(drc->drc_ds,
3615
origin_head, drc->drc_force, drc->drc_owner, tx);
3616
if (error != 0) {
3617
dsl_dataset_rele(origin_head, FTAG);
3618
return (error);
3619
}
3620
error = dsl_dataset_snapshot_check_impl(origin_head,
3621
drc->drc_tosnap, tx, B_TRUE, 1, drc->drc_cred);
3622
dsl_dataset_rele(origin_head, FTAG);
3623
if (error != 0)
3624
return (error);
3625
3626
error = dsl_destroy_head_check_impl(drc->drc_ds, 1);
3627
} else {
3628
error = dsl_dataset_snapshot_check_impl(drc->drc_ds,
3629
drc->drc_tosnap, tx, B_TRUE, 1, drc->drc_cred);
3630
}
3631
return (error);
3632
}
3633
3634
static void
3635
dmu_recv_end_sync(void *arg, dmu_tx_t *tx)
3636
{
3637
dmu_recv_cookie_t *drc = arg;
3638
dsl_pool_t *dp = dmu_tx_pool(tx);
3639
boolean_t encrypted = drc->drc_ds->ds_dir->dd_crypto_obj != 0;
3640
uint64_t newsnapobj = 0;
3641
3642
spa_history_log_internal_ds(drc->drc_ds, "finish receiving",
3643
tx, "snap=%s", drc->drc_tosnap);
3644
drc->drc_ds->ds_objset->os_raw_receive = B_FALSE;
3645
3646
if (drc->drc_heal) {
3647
if (drc->drc_keynvl != NULL) {
3648
nvlist_free(drc->drc_keynvl);
3649
drc->drc_keynvl = NULL;
3650
}
3651
} else if (!drc->drc_newfs) {
3652
dsl_dataset_t *origin_head;
3653
3654
VERIFY0(dsl_dataset_hold(dp, drc->drc_tofs, FTAG,
3655
&origin_head));
3656
3657
if (drc->drc_force) {
3658
/*
3659
* Destroy any snapshots of drc_tofs (origin_head)
3660
* after the origin (the snap before drc_ds).
3661
*/
3662
uint64_t obj;
3663
3664
obj = dsl_dataset_phys(origin_head)->ds_prev_snap_obj;
3665
while (obj !=
3666
dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj) {
3667
dsl_dataset_t *snap;
3668
VERIFY0(dsl_dataset_hold_obj(dp, obj, FTAG,
3669
&snap));
3670
ASSERT3P(snap->ds_dir, ==, origin_head->ds_dir);
3671
obj = dsl_dataset_phys(snap)->ds_prev_snap_obj;
3672
dsl_destroy_snapshot_sync_impl(snap,
3673
B_FALSE, tx);
3674
dsl_dataset_rele(snap, FTAG);
3675
}
3676
}
3677
if (drc->drc_keynvl != NULL) {
3678
dsl_crypto_recv_raw_key_sync(drc->drc_ds,
3679
drc->drc_keynvl, tx);
3680
nvlist_free(drc->drc_keynvl);
3681
drc->drc_keynvl = NULL;
3682
}
3683
3684
VERIFY3P(drc->drc_ds->ds_prev, ==,
3685
origin_head->ds_prev);
3686
3687
dsl_dataset_clone_swap_sync_impl(drc->drc_ds,
3688
origin_head, tx);
3689
/*
3690
* The objset was evicted by dsl_dataset_clone_swap_sync_impl,
3691
* so drc_os is no longer valid.
3692
*/
3693
drc->drc_os = NULL;
3694
3695
dsl_dataset_snapshot_sync_impl(origin_head,
3696
drc->drc_tosnap, tx);
3697
3698
/* set snapshot's creation time and guid */
3699
dmu_buf_will_dirty(origin_head->ds_prev->ds_dbuf, tx);
3700
dsl_dataset_phys(origin_head->ds_prev)->ds_creation_time =
3701
drc->drc_drrb->drr_creation_time;
3702
dsl_dataset_phys(origin_head->ds_prev)->ds_guid =
3703
drc->drc_drrb->drr_toguid;
3704
dsl_dataset_phys(origin_head->ds_prev)->ds_flags &=
3705
~DS_FLAG_INCONSISTENT;
3706
3707
dmu_buf_will_dirty(origin_head->ds_dbuf, tx);
3708
dsl_dataset_phys(origin_head)->ds_flags &=
3709
~DS_FLAG_INCONSISTENT;
3710
3711
newsnapobj =
3712
dsl_dataset_phys(origin_head)->ds_prev_snap_obj;
3713
3714
dsl_dataset_rele(origin_head, FTAG);
3715
dsl_destroy_head_sync_impl(drc->drc_ds, tx);
3716
3717
if (drc->drc_owner != NULL)
3718
VERIFY3P(origin_head->ds_owner, ==, drc->drc_owner);
3719
} else {
3720
dsl_dataset_t *ds = drc->drc_ds;
3721
3722
dsl_dataset_snapshot_sync_impl(ds, drc->drc_tosnap, tx);
3723
3724
/* set snapshot's creation time and guid */
3725
dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
3726
dsl_dataset_phys(ds->ds_prev)->ds_creation_time =
3727
drc->drc_drrb->drr_creation_time;
3728
dsl_dataset_phys(ds->ds_prev)->ds_guid =
3729
drc->drc_drrb->drr_toguid;
3730
dsl_dataset_phys(ds->ds_prev)->ds_flags &=
3731
~DS_FLAG_INCONSISTENT;
3732
3733
dmu_buf_will_dirty(ds->ds_dbuf, tx);
3734
dsl_dataset_phys(ds)->ds_flags &= ~DS_FLAG_INCONSISTENT;
3735
if (dsl_dataset_has_resume_receive_state(ds)) {
3736
(void) zap_remove(dp->dp_meta_objset, ds->ds_object,
3737
DS_FIELD_RESUME_FROMGUID, tx);
3738
(void) zap_remove(dp->dp_meta_objset, ds->ds_object,
3739
DS_FIELD_RESUME_OBJECT, tx);
3740
(void) zap_remove(dp->dp_meta_objset, ds->ds_object,
3741
DS_FIELD_RESUME_OFFSET, tx);
3742
(void) zap_remove(dp->dp_meta_objset, ds->ds_object,
3743
DS_FIELD_RESUME_BYTES, tx);
3744
(void) zap_remove(dp->dp_meta_objset, ds->ds_object,
3745
DS_FIELD_RESUME_TOGUID, tx);
3746
(void) zap_remove(dp->dp_meta_objset, ds->ds_object,
3747
DS_FIELD_RESUME_TONAME, tx);
3748
(void) zap_remove(dp->dp_meta_objset, ds->ds_object,
3749
DS_FIELD_RESUME_REDACT_BOOKMARK_SNAPS, tx);
3750
}
3751
newsnapobj =
3752
dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj;
3753
}
3754
3755
/*
3756
* If this is a raw receive, the crypt_keydata nvlist will include
3757
* a to_ivset_guid for us to set on the new snapshot. This value
3758
* will override the value generated by the snapshot code. However,
3759
* this value may not be present, because older implementations of
3760
* the raw send code did not include this value, and we are still
3761
* allowed to receive them if the zfs_disable_ivset_guid_check
3762
* tunable is set, in which case we will leave the newly-generated
3763
* value.
3764
*/
3765
if (!drc->drc_heal && drc->drc_raw && drc->drc_ivset_guid != 0) {
3766
dmu_object_zapify(dp->dp_meta_objset, newsnapobj,
3767
DMU_OT_DSL_DATASET, tx);
3768
VERIFY0(zap_update(dp->dp_meta_objset, newsnapobj,
3769
DS_FIELD_IVSET_GUID, sizeof (uint64_t), 1,
3770
&drc->drc_ivset_guid, tx));
3771
}
3772
3773
/*
3774
* Release the hold from dmu_recv_begin. This must be done before
3775
* we return to open context, so that when we free the dataset's dnode
3776
* we can evict its bonus buffer. Since the dataset may be destroyed
3777
* at this point (and therefore won't have a valid pointer to the spa)
3778
* we release the key mapping manually here while we do have a valid
3779
* pointer, if it exists.
3780
*/
3781
if (!drc->drc_raw && encrypted) {
3782
(void) spa_keystore_remove_mapping(dmu_tx_pool(tx)->dp_spa,
3783
drc->drc_ds->ds_object, drc->drc_ds);
3784
}
3785
dsl_dataset_disown(drc->drc_ds, 0, dmu_recv_tag);
3786
drc->drc_ds = NULL;
3787
}
3788
3789
static int dmu_recv_end_modified_blocks = 3;
3790
3791
static int
3792
dmu_recv_existing_end(dmu_recv_cookie_t *drc)
3793
{
3794
#ifdef _KERNEL
3795
/*
3796
* We will be destroying the ds; make sure its origin is unmounted if
3797
* necessary.
3798
*/
3799
char name[ZFS_MAX_DATASET_NAME_LEN];
3800
dsl_dataset_name(drc->drc_ds, name);
3801
zfs_destroy_unmount_origin(name);
3802
#endif
3803
3804
return (dsl_sync_task(drc->drc_tofs,
3805
dmu_recv_end_check, dmu_recv_end_sync, drc,
3806
dmu_recv_end_modified_blocks, ZFS_SPACE_CHECK_NORMAL));
3807
}
3808
3809
static int
3810
dmu_recv_new_end(dmu_recv_cookie_t *drc)
3811
{
3812
return (dsl_sync_task(drc->drc_tofs,
3813
dmu_recv_end_check, dmu_recv_end_sync, drc,
3814
dmu_recv_end_modified_blocks, ZFS_SPACE_CHECK_NORMAL));
3815
}
3816
3817
int
3818
dmu_recv_end(dmu_recv_cookie_t *drc, void *owner)
3819
{
3820
int error;
3821
3822
drc->drc_owner = owner;
3823
3824
if (drc->drc_newfs)
3825
error = dmu_recv_new_end(drc);
3826
else
3827
error = dmu_recv_existing_end(drc);
3828
3829
if (error != 0) {
3830
dmu_recv_cleanup_ds(drc);
3831
nvlist_free(drc->drc_keynvl);
3832
} else if (!drc->drc_heal) {
3833
if (drc->drc_newfs) {
3834
zvol_create_minors(drc->drc_tofs);
3835
}
3836
char *snapname = kmem_asprintf("%s@%s",
3837
drc->drc_tofs, drc->drc_tosnap);
3838
zvol_create_minors(snapname);
3839
kmem_strfree(snapname);
3840
}
3841
3842
crfree(drc->drc_cred);
3843
drc->drc_cred = NULL;
3844
3845
return (error);
3846
}
3847
3848
/*
3849
* Return TRUE if this objset is currently being received into.
3850
*/
3851
boolean_t
3852
dmu_objset_is_receiving(objset_t *os)
3853
{
3854
return (os->os_dsl_dataset != NULL &&
3855
os->os_dsl_dataset->ds_owner == dmu_recv_tag);
3856
}
3857
3858
ZFS_MODULE_PARAM(zfs_recv, zfs_recv_, queue_length, UINT, ZMOD_RW,
3859
"Maximum receive queue length");
3860
3861
ZFS_MODULE_PARAM(zfs_recv, zfs_recv_, queue_ff, UINT, ZMOD_RW,
3862
"Receive queue fill fraction");
3863
3864
ZFS_MODULE_PARAM(zfs_recv, zfs_recv_, write_batch_size, UINT, ZMOD_RW,
3865
"Maximum amount of writes to batch into one transaction");
3866
3867
ZFS_MODULE_PARAM(zfs_recv, zfs_recv_, best_effort_corrective, INT, ZMOD_RW,
3868
"Ignore errors during corrective receive");
3869
3870