Path: blob/main/sys/contrib/openzfs/module/zfs/dmu_send.c
48383 views
// SPDX-License-Identifier: CDDL-1.01/*2* CDDL HEADER START3*4* The contents of this file are subject to the terms of the5* Common Development and Distribution License (the "License").6* You may not use this file except in compliance with the License.7*8* You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE9* or https://opensource.org/licenses/CDDL-1.0.10* See the License for the specific language governing permissions11* and limitations under the License.12*13* When distributing Covered Code, include this CDDL HEADER in each14* file and include the License file at usr/src/OPENSOLARIS.LICENSE.15* If applicable, add the following below this CDDL HEADER, with the16* fields enclosed by brackets "[]" replaced with your own identifying17* information: Portions Copyright [yyyy] [name of copyright owner]18*19* CDDL HEADER END20*/21/*22* Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.23* Copyright 2011 Nexenta Systems, Inc. All rights reserved.24* Copyright (c) 2011, 2018 by Delphix. All rights reserved.25* Copyright (c) 2014, Joyent, Inc. All rights reserved.26* Copyright 2014 HybridCluster. All rights reserved.27* Copyright 2016 RackTop Systems.28* Copyright (c) 2016 Actifio, Inc. All rights reserved.29* Copyright (c) 2019, 2024, Klara, Inc.30* Copyright (c) 2019, Allan Jude31*/3233#include <sys/dmu.h>34#include <sys/dmu_impl.h>35#include <sys/dmu_tx.h>36#include <sys/dbuf.h>37#include <sys/dnode.h>38#include <sys/zfs_context.h>39#include <sys/dmu_objset.h>40#include <sys/dmu_traverse.h>41#include <sys/dsl_dataset.h>42#include <sys/dsl_dir.h>43#include <sys/dsl_prop.h>44#include <sys/dsl_pool.h>45#include <sys/dsl_synctask.h>46#include <sys/spa_impl.h>47#include <sys/zfs_ioctl.h>48#include <sys/zap.h>49#include <sys/zio_checksum.h>50#include <sys/zfs_znode.h>51#include <zfs_fletcher.h>52#include <sys/avl.h>53#include <sys/ddt.h>54#include <sys/zfs_onexit.h>55#include <sys/dmu_send.h>56#include <sys/dmu_recv.h>57#include <sys/dsl_destroy.h>58#include <sys/blkptr.h>59#include <sys/dsl_bookmark.h>60#include <sys/zfeature.h>61#include <sys/bqueue.h>62#include <sys/zvol.h>63#include <sys/policy.h>64#include <sys/objlist.h>65#ifdef _KERNEL66#include <sys/zfs_vfsops.h>67#endif6869/* Set this tunable to TRUE to replace corrupt data with 0x2f5baddb10c */70static int zfs_send_corrupt_data = B_FALSE;71/*72* This tunable controls the amount of data (measured in bytes) that will be73* prefetched by zfs send. If the main thread is blocking on reads that haven't74* completed, this variable might need to be increased. If instead the main75* thread is issuing new reads because the prefetches have fallen out of the76* cache, this may need to be decreased.77*/78static uint_t zfs_send_queue_length = SPA_MAXBLOCKSIZE;79/*80* This tunable controls the length of the queues that zfs send worker threads81* use to communicate. If the send_main_thread is blocking on these queues,82* this variable may need to be increased. If there is a significant slowdown83* at the start of a send as these threads consume all the available IO84* resources, this variable may need to be decreased.85*/86static uint_t zfs_send_no_prefetch_queue_length = 1024 * 1024;87/*88* These tunables control the fill fraction of the queues by zfs send. The fill89* fraction controls the frequency with which threads have to be cv_signaled.90* If a lot of cpu time is being spent on cv_signal, then these should be tuned91* down. If the queues empty before the signalled thread can catch up, then92* these should be tuned up.93*/94static uint_t zfs_send_queue_ff = 20;95static uint_t zfs_send_no_prefetch_queue_ff = 20;9697/*98* Use this to override the recordsize calculation for fast zfs send estimates.99*/100static uint_t zfs_override_estimate_recordsize = 0;101102/* Set this tunable to FALSE to disable setting of DRR_FLAG_FREERECORDS */103static const boolean_t zfs_send_set_freerecords_bit = B_TRUE;104105/* Set this tunable to FALSE is disable sending unmodified spill blocks. */106static int zfs_send_unmodified_spill_blocks = B_TRUE;107108static inline boolean_t109overflow_multiply(uint64_t a, uint64_t b, uint64_t *c)110{111uint64_t temp = a * b;112if (b != 0 && temp / b != a)113return (B_FALSE);114*c = temp;115return (B_TRUE);116}117118struct send_thread_arg {119bqueue_t q;120objset_t *os; /* Objset to traverse */121uint64_t fromtxg; /* Traverse from this txg */122int flags; /* flags to pass to traverse_dataset */123int error_code;124boolean_t cancel;125zbookmark_phys_t resume;126uint64_t *num_blocks_visited;127};128129struct redact_list_thread_arg {130boolean_t cancel;131bqueue_t q;132zbookmark_phys_t resume;133redaction_list_t *rl;134boolean_t mark_redact;135int error_code;136uint64_t *num_blocks_visited;137};138139struct send_merge_thread_arg {140bqueue_t q;141objset_t *os;142struct redact_list_thread_arg *from_arg;143struct send_thread_arg *to_arg;144struct redact_list_thread_arg *redact_arg;145int error;146boolean_t cancel;147};148149struct send_range {150boolean_t eos_marker; /* Marks the end of the stream */151uint64_t object;152uint64_t start_blkid;153uint64_t end_blkid;154bqueue_node_t ln;155enum type {DATA, HOLE, OBJECT, OBJECT_RANGE, REDACT,156PREVIOUSLY_REDACTED} type;157union {158struct srd {159dmu_object_type_t obj_type;160uint32_t datablksz; // logical size161uint32_t datasz; // payload size162blkptr_t bp;163arc_buf_t *abuf;164abd_t *abd;165kmutex_t lock;166kcondvar_t cv;167boolean_t io_outstanding;168boolean_t io_compressed;169int io_err;170} data;171struct srh {172uint32_t datablksz;173} hole;174struct sro {175/*176* This is a pointer because embedding it in the177* struct causes these structures to be massively larger178* for all range types; this makes the code much less179* memory efficient.180*/181dnode_phys_t *dnp;182blkptr_t bp;183/* Piggyback unmodified spill block */184struct send_range *spill_range;185} object;186struct srr {187uint32_t datablksz;188} redact;189struct sror {190blkptr_t bp;191} object_range;192} sru;193};194195/*196* The list of data whose inclusion in a send stream can be pending from197* one call to backup_cb to another. Multiple calls to dump_free(),198* dump_freeobjects(), and dump_redact() can be aggregated into a single199* DRR_FREE, DRR_FREEOBJECTS, or DRR_REDACT replay record.200*/201typedef enum {202PENDING_NONE,203PENDING_FREE,204PENDING_FREEOBJECTS,205PENDING_REDACT206} dmu_pendop_t;207208typedef struct dmu_send_cookie {209dmu_replay_record_t *dsc_drr;210dmu_send_outparams_t *dsc_dso;211offset_t *dsc_off;212objset_t *dsc_os;213zio_cksum_t dsc_zc;214uint64_t dsc_toguid;215uint64_t dsc_fromtxg;216int dsc_err;217dmu_pendop_t dsc_pending_op;218uint64_t dsc_featureflags;219uint64_t dsc_last_data_object;220uint64_t dsc_last_data_offset;221uint64_t dsc_resume_object;222uint64_t dsc_resume_offset;223boolean_t dsc_sent_begin;224boolean_t dsc_sent_end;225} dmu_send_cookie_t;226227static int do_dump(dmu_send_cookie_t *dscp, struct send_range *range);228229static void230range_free(struct send_range *range)231{232if (range->type == OBJECT) {233size_t size = sizeof (dnode_phys_t) *234(range->sru.object.dnp->dn_extra_slots + 1);235kmem_free(range->sru.object.dnp, size);236if (range->sru.object.spill_range)237range_free(range->sru.object.spill_range);238} else if (range->type == DATA) {239mutex_enter(&range->sru.data.lock);240while (range->sru.data.io_outstanding)241cv_wait(&range->sru.data.cv, &range->sru.data.lock);242if (range->sru.data.abd != NULL)243abd_free(range->sru.data.abd);244if (range->sru.data.abuf != NULL) {245arc_buf_destroy(range->sru.data.abuf,246&range->sru.data.abuf);247}248mutex_exit(&range->sru.data.lock);249250cv_destroy(&range->sru.data.cv);251mutex_destroy(&range->sru.data.lock);252}253kmem_free(range, sizeof (*range));254}255256/*257* For all record types except BEGIN, fill in the checksum (overlaid in258* drr_u.drr_checksum.drr_checksum). The checksum verifies everything259* up to the start of the checksum itself.260*/261static int262dump_record(dmu_send_cookie_t *dscp, void *payload, int payload_len)263{264dmu_send_outparams_t *dso = dscp->dsc_dso;265ASSERT3U(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),266==, sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t));267(void) fletcher_4_incremental_native(dscp->dsc_drr,268offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),269&dscp->dsc_zc);270if (dscp->dsc_drr->drr_type == DRR_BEGIN) {271dscp->dsc_sent_begin = B_TRUE;272} else {273ASSERT(ZIO_CHECKSUM_IS_ZERO(&dscp->dsc_drr->drr_u.274drr_checksum.drr_checksum));275dscp->dsc_drr->drr_u.drr_checksum.drr_checksum = dscp->dsc_zc;276}277if (dscp->dsc_drr->drr_type == DRR_END) {278dscp->dsc_sent_end = B_TRUE;279}280(void) fletcher_4_incremental_native(&dscp->dsc_drr->281drr_u.drr_checksum.drr_checksum,282sizeof (zio_cksum_t), &dscp->dsc_zc);283*dscp->dsc_off += sizeof (dmu_replay_record_t);284dscp->dsc_err = dso->dso_outfunc(dscp->dsc_os, dscp->dsc_drr,285sizeof (dmu_replay_record_t), dso->dso_arg);286if (dscp->dsc_err != 0)287return (SET_ERROR(EINTR));288if (payload_len != 0) {289*dscp->dsc_off += payload_len;290/*291* payload is null when dso_dryrun == B_TRUE (i.e. when we're292* doing a send size calculation)293*/294if (payload != NULL) {295(void) fletcher_4_incremental_native(296payload, payload_len, &dscp->dsc_zc);297}298299/*300* The code does not rely on this (len being a multiple of 8).301* We keep this assertion because of the corresponding assertion302* in receive_read(). Keeping this assertion ensures that we do303* not inadvertently break backwards compatibility (causing the304* assertion in receive_read() to trigger on old software).305*306* Raw sends cannot be received on old software, and so can307* bypass this assertion.308*/309310ASSERT((payload_len % 8 == 0) ||311(dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW));312313dscp->dsc_err = dso->dso_outfunc(dscp->dsc_os, payload,314payload_len, dso->dso_arg);315if (dscp->dsc_err != 0)316return (SET_ERROR(EINTR));317}318return (0);319}320321/*322* Fill in the drr_free struct, or perform aggregation if the previous record is323* also a free record, and the two are adjacent.324*325* Note that we send free records even for a full send, because we want to be326* able to receive a full send as a clone, which requires a list of all the free327* and freeobject records that were generated on the source.328*/329static int330dump_free(dmu_send_cookie_t *dscp, uint64_t object, uint64_t offset,331uint64_t length)332{333struct drr_free *drrf = &(dscp->dsc_drr->drr_u.drr_free);334335/*336* When we receive a free record, dbuf_free_range() assumes337* that the receiving system doesn't have any dbufs in the range338* being freed. This is always true because there is a one-record339* constraint: we only send one WRITE record for any given340* object,offset. We know that the one-record constraint is341* true because we always send data in increasing order by342* object,offset.343*344* If the increasing-order constraint ever changes, we should find345* another way to assert that the one-record constraint is still346* satisfied.347*/348ASSERT(object > dscp->dsc_last_data_object ||349(object == dscp->dsc_last_data_object &&350offset > dscp->dsc_last_data_offset));351352/*353* If there is a pending op, but it's not PENDING_FREE, push it out,354* since free block aggregation can only be done for blocks of the355* same type (i.e., DRR_FREE records can only be aggregated with356* other DRR_FREE records. DRR_FREEOBJECTS records can only be357* aggregated with other DRR_FREEOBJECTS records).358*/359if (dscp->dsc_pending_op != PENDING_NONE &&360dscp->dsc_pending_op != PENDING_FREE) {361if (dump_record(dscp, NULL, 0) != 0)362return (SET_ERROR(EINTR));363dscp->dsc_pending_op = PENDING_NONE;364}365366if (dscp->dsc_pending_op == PENDING_FREE) {367/*368* Check to see whether this free block can be aggregated369* with pending one.370*/371if (drrf->drr_object == object && drrf->drr_offset +372drrf->drr_length == offset) {373if (offset + length < offset || length == UINT64_MAX)374drrf->drr_length = UINT64_MAX;375else376drrf->drr_length += length;377return (0);378} else {379/* not a continuation. Push out pending record */380if (dump_record(dscp, NULL, 0) != 0)381return (SET_ERROR(EINTR));382dscp->dsc_pending_op = PENDING_NONE;383}384}385/* create a FREE record and make it pending */386memset(dscp->dsc_drr, 0, sizeof (dmu_replay_record_t));387dscp->dsc_drr->drr_type = DRR_FREE;388drrf->drr_object = object;389drrf->drr_offset = offset;390if (offset + length < offset)391drrf->drr_length = DMU_OBJECT_END;392else393drrf->drr_length = length;394drrf->drr_toguid = dscp->dsc_toguid;395if (length == DMU_OBJECT_END) {396if (dump_record(dscp, NULL, 0) != 0)397return (SET_ERROR(EINTR));398} else {399dscp->dsc_pending_op = PENDING_FREE;400}401402return (0);403}404405/*406* Fill in the drr_redact struct, or perform aggregation if the previous record407* is also a redaction record, and the two are adjacent.408*/409static int410dump_redact(dmu_send_cookie_t *dscp, uint64_t object, uint64_t offset,411uint64_t length)412{413struct drr_redact *drrr = &dscp->dsc_drr->drr_u.drr_redact;414415/*416* If there is a pending op, but it's not PENDING_REDACT, push it out,417* since free block aggregation can only be done for blocks of the418* same type (i.e., DRR_REDACT records can only be aggregated with419* other DRR_REDACT records).420*/421if (dscp->dsc_pending_op != PENDING_NONE &&422dscp->dsc_pending_op != PENDING_REDACT) {423if (dump_record(dscp, NULL, 0) != 0)424return (SET_ERROR(EINTR));425dscp->dsc_pending_op = PENDING_NONE;426}427428if (dscp->dsc_pending_op == PENDING_REDACT) {429/*430* Check to see whether this redacted block can be aggregated431* with pending one.432*/433if (drrr->drr_object == object && drrr->drr_offset +434drrr->drr_length == offset) {435drrr->drr_length += length;436return (0);437} else {438/* not a continuation. Push out pending record */439if (dump_record(dscp, NULL, 0) != 0)440return (SET_ERROR(EINTR));441dscp->dsc_pending_op = PENDING_NONE;442}443}444/* create a REDACT record and make it pending */445memset(dscp->dsc_drr, 0, sizeof (dmu_replay_record_t));446dscp->dsc_drr->drr_type = DRR_REDACT;447drrr->drr_object = object;448drrr->drr_offset = offset;449drrr->drr_length = length;450drrr->drr_toguid = dscp->dsc_toguid;451dscp->dsc_pending_op = PENDING_REDACT;452453return (0);454}455456static int457dmu_dump_write(dmu_send_cookie_t *dscp, dmu_object_type_t type, uint64_t object,458uint64_t offset, int lsize, int psize, const blkptr_t *bp,459boolean_t io_compressed, void *data)460{461uint64_t payload_size;462boolean_t raw = (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW);463struct drr_write *drrw = &(dscp->dsc_drr->drr_u.drr_write);464465/*466* We send data in increasing object, offset order.467* See comment in dump_free() for details.468*/469ASSERT(object > dscp->dsc_last_data_object ||470(object == dscp->dsc_last_data_object &&471offset > dscp->dsc_last_data_offset));472dscp->dsc_last_data_object = object;473dscp->dsc_last_data_offset = offset + lsize - 1;474475/*476* If there is any kind of pending aggregation (currently either477* a grouping of free objects or free blocks), push it out to478* the stream, since aggregation can't be done across operations479* of different types.480*/481if (dscp->dsc_pending_op != PENDING_NONE) {482if (dump_record(dscp, NULL, 0) != 0)483return (SET_ERROR(EINTR));484dscp->dsc_pending_op = PENDING_NONE;485}486/* write a WRITE record */487memset(dscp->dsc_drr, 0, sizeof (dmu_replay_record_t));488dscp->dsc_drr->drr_type = DRR_WRITE;489drrw->drr_object = object;490drrw->drr_type = type;491drrw->drr_offset = offset;492drrw->drr_toguid = dscp->dsc_toguid;493drrw->drr_logical_size = lsize;494495/* only set the compression fields if the buf is compressed or raw */496boolean_t compressed =497(bp != NULL ? BP_GET_COMPRESS(bp) != ZIO_COMPRESS_OFF &&498io_compressed : lsize != psize);499if (raw || compressed) {500ASSERT(bp != NULL);501ASSERT(raw || dscp->dsc_featureflags &502DMU_BACKUP_FEATURE_COMPRESSED);503ASSERT(!BP_IS_EMBEDDED(bp));504ASSERT3S(psize, >, 0);505506if (raw) {507ASSERT(BP_IS_PROTECTED(bp));508509/*510* This is a raw protected block so we need to pass511* along everything the receiving side will need to512* interpret this block, including the byteswap, salt,513* IV, and MAC.514*/515if (BP_SHOULD_BYTESWAP(bp))516drrw->drr_flags |= DRR_RAW_BYTESWAP;517zio_crypt_decode_params_bp(bp, drrw->drr_salt,518drrw->drr_iv);519zio_crypt_decode_mac_bp(bp, drrw->drr_mac);520} else {521/* this is a compressed block */522ASSERT(dscp->dsc_featureflags &523DMU_BACKUP_FEATURE_COMPRESSED);524ASSERT(!BP_SHOULD_BYTESWAP(bp));525ASSERT(!DMU_OT_IS_METADATA(BP_GET_TYPE(bp)));526ASSERT3U(BP_GET_COMPRESS(bp), !=, ZIO_COMPRESS_OFF);527ASSERT3S(lsize, >=, psize);528}529530/* set fields common to compressed and raw sends */531drrw->drr_compressiontype = BP_GET_COMPRESS(bp);532drrw->drr_compressed_size = psize;533payload_size = drrw->drr_compressed_size;534} else {535payload_size = drrw->drr_logical_size;536}537538if (bp == NULL || BP_IS_EMBEDDED(bp) || (BP_IS_PROTECTED(bp) && !raw)) {539/*540* There's no pre-computed checksum for partial-block writes,541* embedded BP's, or encrypted BP's that are being sent as542* plaintext, so (like fletcher4-checksummed blocks) userland543* will have to compute a dedup-capable checksum itself.544*/545drrw->drr_checksumtype = ZIO_CHECKSUM_OFF;546} else {547drrw->drr_checksumtype = BP_GET_CHECKSUM(bp);548if (zio_checksum_table[drrw->drr_checksumtype].ci_flags &549ZCHECKSUM_FLAG_DEDUP)550drrw->drr_flags |= DRR_CHECKSUM_DEDUP;551DDK_SET_LSIZE(&drrw->drr_key, BP_GET_LSIZE(bp));552DDK_SET_PSIZE(&drrw->drr_key, BP_GET_PSIZE(bp));553DDK_SET_COMPRESS(&drrw->drr_key, BP_GET_COMPRESS(bp));554DDK_SET_CRYPT(&drrw->drr_key, BP_IS_PROTECTED(bp));555drrw->drr_key.ddk_cksum = bp->blk_cksum;556}557558if (dump_record(dscp, data, payload_size) != 0)559return (SET_ERROR(EINTR));560return (0);561}562563static int564dump_write_embedded(dmu_send_cookie_t *dscp, uint64_t object, uint64_t offset,565int blksz, const blkptr_t *bp)566{567char buf[BPE_PAYLOAD_SIZE];568struct drr_write_embedded *drrw =569&(dscp->dsc_drr->drr_u.drr_write_embedded);570571if (dscp->dsc_pending_op != PENDING_NONE) {572if (dump_record(dscp, NULL, 0) != 0)573return (SET_ERROR(EINTR));574dscp->dsc_pending_op = PENDING_NONE;575}576577ASSERT(BP_IS_EMBEDDED(bp));578579memset(dscp->dsc_drr, 0, sizeof (dmu_replay_record_t));580dscp->dsc_drr->drr_type = DRR_WRITE_EMBEDDED;581drrw->drr_object = object;582drrw->drr_offset = offset;583drrw->drr_length = blksz;584drrw->drr_toguid = dscp->dsc_toguid;585drrw->drr_compression = BP_GET_COMPRESS(bp);586drrw->drr_etype = BPE_GET_ETYPE(bp);587drrw->drr_lsize = BPE_GET_LSIZE(bp);588drrw->drr_psize = BPE_GET_PSIZE(bp);589590decode_embedded_bp_compressed(bp, buf);591592uint32_t psize = drrw->drr_psize;593uint32_t rsize = P2ROUNDUP(psize, 8);594595if (psize != rsize)596memset(buf + psize, 0, rsize - psize);597598if (dump_record(dscp, buf, rsize) != 0)599return (SET_ERROR(EINTR));600return (0);601}602603static int604dump_spill(dmu_send_cookie_t *dscp, const blkptr_t *bp, uint64_t object,605void *data)606{607struct drr_spill *drrs = &(dscp->dsc_drr->drr_u.drr_spill);608uint64_t blksz = BP_GET_LSIZE(bp);609uint64_t payload_size = blksz;610611if (dscp->dsc_pending_op != PENDING_NONE) {612if (dump_record(dscp, NULL, 0) != 0)613return (SET_ERROR(EINTR));614dscp->dsc_pending_op = PENDING_NONE;615}616617/* write a SPILL record */618memset(dscp->dsc_drr, 0, sizeof (dmu_replay_record_t));619dscp->dsc_drr->drr_type = DRR_SPILL;620drrs->drr_object = object;621drrs->drr_length = blksz;622drrs->drr_toguid = dscp->dsc_toguid;623624/* See comment in piggyback_unmodified_spill() for full details */625if (zfs_send_unmodified_spill_blocks &&626(BP_GET_LOGICAL_BIRTH(bp) <= dscp->dsc_fromtxg)) {627drrs->drr_flags |= DRR_SPILL_UNMODIFIED;628}629630/* handle raw send fields */631if (dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW) {632ASSERT(BP_IS_PROTECTED(bp));633634if (BP_SHOULD_BYTESWAP(bp))635drrs->drr_flags |= DRR_RAW_BYTESWAP;636drrs->drr_compressiontype = BP_GET_COMPRESS(bp);637drrs->drr_compressed_size = BP_GET_PSIZE(bp);638zio_crypt_decode_params_bp(bp, drrs->drr_salt, drrs->drr_iv);639zio_crypt_decode_mac_bp(bp, drrs->drr_mac);640payload_size = drrs->drr_compressed_size;641}642643if (dump_record(dscp, data, payload_size) != 0)644return (SET_ERROR(EINTR));645return (0);646}647648static int649dump_freeobjects(dmu_send_cookie_t *dscp, uint64_t firstobj, uint64_t numobjs)650{651struct drr_freeobjects *drrfo = &(dscp->dsc_drr->drr_u.drr_freeobjects);652uint64_t maxobj = DNODES_PER_BLOCK *653(DMU_META_DNODE(dscp->dsc_os)->dn_maxblkid + 1);654655/*656* ZoL < 0.7 does not handle large FREEOBJECTS records correctly,657* leading to zfs recv never completing. to avoid this issue, don't658* send FREEOBJECTS records for object IDs which cannot exist on the659* receiving side.660*/661if (maxobj > 0) {662if (maxobj <= firstobj)663return (0);664665if (maxobj < firstobj + numobjs)666numobjs = maxobj - firstobj;667}668669/*670* If there is a pending op, but it's not PENDING_FREEOBJECTS,671* push it out, since free block aggregation can only be done for672* blocks of the same type (i.e., DRR_FREE records can only be673* aggregated with other DRR_FREE records. DRR_FREEOBJECTS records674* can only be aggregated with other DRR_FREEOBJECTS records).675*/676if (dscp->dsc_pending_op != PENDING_NONE &&677dscp->dsc_pending_op != PENDING_FREEOBJECTS) {678if (dump_record(dscp, NULL, 0) != 0)679return (SET_ERROR(EINTR));680dscp->dsc_pending_op = PENDING_NONE;681}682683if (dscp->dsc_pending_op == PENDING_FREEOBJECTS) {684/*685* See whether this free object array can be aggregated686* with pending one687*/688if (drrfo->drr_firstobj + drrfo->drr_numobjs == firstobj) {689drrfo->drr_numobjs += numobjs;690return (0);691} else {692/* can't be aggregated. Push out pending record */693if (dump_record(dscp, NULL, 0) != 0)694return (SET_ERROR(EINTR));695dscp->dsc_pending_op = PENDING_NONE;696}697}698699/* write a FREEOBJECTS record */700memset(dscp->dsc_drr, 0, sizeof (dmu_replay_record_t));701dscp->dsc_drr->drr_type = DRR_FREEOBJECTS;702drrfo->drr_firstobj = firstobj;703drrfo->drr_numobjs = numobjs;704drrfo->drr_toguid = dscp->dsc_toguid;705706dscp->dsc_pending_op = PENDING_FREEOBJECTS;707708return (0);709}710711static int712dump_dnode(dmu_send_cookie_t *dscp, const blkptr_t *bp, uint64_t object,713dnode_phys_t *dnp)714{715struct drr_object *drro = &(dscp->dsc_drr->drr_u.drr_object);716int bonuslen;717718if (object < dscp->dsc_resume_object) {719/*720* Note: when resuming, we will visit all the dnodes in721* the block of dnodes that we are resuming from. In722* this case it's unnecessary to send the dnodes prior to723* the one we are resuming from. We should be at most one724* block's worth of dnodes behind the resume point.725*/726ASSERT3U(dscp->dsc_resume_object - object, <,7271 << (DNODE_BLOCK_SHIFT - DNODE_SHIFT));728return (0);729}730731if (dnp == NULL || dnp->dn_type == DMU_OT_NONE)732return (dump_freeobjects(dscp, object, 1));733734if (dscp->dsc_pending_op != PENDING_NONE) {735if (dump_record(dscp, NULL, 0) != 0)736return (SET_ERROR(EINTR));737dscp->dsc_pending_op = PENDING_NONE;738}739740/* write an OBJECT record */741memset(dscp->dsc_drr, 0, sizeof (dmu_replay_record_t));742dscp->dsc_drr->drr_type = DRR_OBJECT;743drro->drr_object = object;744drro->drr_type = dnp->dn_type;745drro->drr_bonustype = dnp->dn_bonustype;746drro->drr_blksz = dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT;747drro->drr_bonuslen = dnp->dn_bonuslen;748drro->drr_dn_slots = dnp->dn_extra_slots + 1;749drro->drr_checksumtype = dnp->dn_checksum;750drro->drr_compress = dnp->dn_compress;751drro->drr_toguid = dscp->dsc_toguid;752753if (!(dscp->dsc_featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) &&754drro->drr_blksz > SPA_OLD_MAXBLOCKSIZE)755drro->drr_blksz = SPA_OLD_MAXBLOCKSIZE;756757bonuslen = P2ROUNDUP(dnp->dn_bonuslen, 8);758759if ((dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW)) {760ASSERT(BP_IS_ENCRYPTED(bp));761762if (BP_SHOULD_BYTESWAP(bp))763drro->drr_flags |= DRR_RAW_BYTESWAP;764765/* needed for reconstructing dnp on recv side */766drro->drr_maxblkid = dnp->dn_maxblkid;767drro->drr_indblkshift = dnp->dn_indblkshift;768drro->drr_nlevels = dnp->dn_nlevels;769drro->drr_nblkptr = dnp->dn_nblkptr;770771/*772* Since we encrypt the entire bonus area, the (raw) part773* beyond the bonuslen is actually nonzero, so we need774* to send it.775*/776if (bonuslen != 0) {777if (drro->drr_bonuslen > DN_MAX_BONUS_LEN(dnp))778return (SET_ERROR(EINVAL));779drro->drr_raw_bonuslen = DN_MAX_BONUS_LEN(dnp);780bonuslen = drro->drr_raw_bonuslen;781}782}783784/*785* DRR_OBJECT_SPILL is set for every dnode which references a786* spill block. This allows the receiving pool to definitively787* determine when a spill block should be kept or freed.788*/789if (dnp->dn_flags & DNODE_FLAG_SPILL_BLKPTR)790drro->drr_flags |= DRR_OBJECT_SPILL;791792if (dump_record(dscp, DN_BONUS(dnp), bonuslen) != 0)793return (SET_ERROR(EINTR));794795/* Free anything past the end of the file. */796if (dump_free(dscp, object, (dnp->dn_maxblkid + 1) *797(dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT), DMU_OBJECT_END) != 0)798return (SET_ERROR(EINTR));799800if (dscp->dsc_err != 0)801return (SET_ERROR(EINTR));802803return (0);804}805806static int807dump_object_range(dmu_send_cookie_t *dscp, const blkptr_t *bp,808uint64_t firstobj, uint64_t numslots)809{810struct drr_object_range *drror =811&(dscp->dsc_drr->drr_u.drr_object_range);812813/* we only use this record type for raw sends */814ASSERT(BP_IS_PROTECTED(bp));815ASSERT(dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW);816ASSERT3U(BP_GET_COMPRESS(bp), ==, ZIO_COMPRESS_OFF);817ASSERT3U(BP_GET_TYPE(bp), ==, DMU_OT_DNODE);818ASSERT0(BP_GET_LEVEL(bp));819820if (dscp->dsc_pending_op != PENDING_NONE) {821if (dump_record(dscp, NULL, 0) != 0)822return (SET_ERROR(EINTR));823dscp->dsc_pending_op = PENDING_NONE;824}825826memset(dscp->dsc_drr, 0, sizeof (dmu_replay_record_t));827dscp->dsc_drr->drr_type = DRR_OBJECT_RANGE;828drror->drr_firstobj = firstobj;829drror->drr_numslots = numslots;830drror->drr_toguid = dscp->dsc_toguid;831if (BP_SHOULD_BYTESWAP(bp))832drror->drr_flags |= DRR_RAW_BYTESWAP;833zio_crypt_decode_params_bp(bp, drror->drr_salt, drror->drr_iv);834zio_crypt_decode_mac_bp(bp, drror->drr_mac);835836if (dump_record(dscp, NULL, 0) != 0)837return (SET_ERROR(EINTR));838return (0);839}840841static boolean_t842send_do_embed(const blkptr_t *bp, uint64_t featureflags)843{844if (!BP_IS_EMBEDDED(bp))845return (B_FALSE);846847/*848* Compression function must be legacy, or explicitly enabled.849*/850if ((BP_GET_COMPRESS(bp) >= ZIO_COMPRESS_LEGACY_FUNCTIONS &&851!(featureflags & DMU_BACKUP_FEATURE_LZ4)))852return (B_FALSE);853854/*855* If we have not set the ZSTD feature flag, we can't send ZSTD856* compressed embedded blocks, as the receiver may not support them.857*/858if ((BP_GET_COMPRESS(bp) == ZIO_COMPRESS_ZSTD &&859!(featureflags & DMU_BACKUP_FEATURE_ZSTD)))860return (B_FALSE);861862/*863* Embed type must be explicitly enabled.864*/865switch (BPE_GET_ETYPE(bp)) {866case BP_EMBEDDED_TYPE_DATA:867if (featureflags & DMU_BACKUP_FEATURE_EMBED_DATA)868return (B_TRUE);869break;870default:871return (B_FALSE);872}873return (B_FALSE);874}875876/*877* This function actually handles figuring out what kind of record needs to be878* dumped, and calling the appropriate helper function. In most cases,879* the data has already been read by send_reader_thread().880*/881static int882do_dump(dmu_send_cookie_t *dscp, struct send_range *range)883{884int err = 0;885switch (range->type) {886case OBJECT:887err = dump_dnode(dscp, &range->sru.object.bp, range->object,888range->sru.object.dnp);889/* Dump piggybacked unmodified spill block */890if (!err && range->sru.object.spill_range)891err = do_dump(dscp, range->sru.object.spill_range);892return (err);893case OBJECT_RANGE: {894ASSERT3U(range->start_blkid + 1, ==, range->end_blkid);895if (!(dscp->dsc_featureflags & DMU_BACKUP_FEATURE_RAW)) {896return (0);897}898uint64_t epb = BP_GET_LSIZE(&range->sru.object_range.bp) >>899DNODE_SHIFT;900uint64_t firstobj = range->start_blkid * epb;901err = dump_object_range(dscp, &range->sru.object_range.bp,902firstobj, epb);903break;904}905case REDACT: {906struct srr *srrp = &range->sru.redact;907err = dump_redact(dscp, range->object, range->start_blkid *908srrp->datablksz, (range->end_blkid - range->start_blkid) *909srrp->datablksz);910return (err);911}912case DATA: {913struct srd *srdp = &range->sru.data;914blkptr_t *bp = &srdp->bp;915spa_t *spa =916dmu_objset_spa(dscp->dsc_os);917918ASSERT3U(srdp->datablksz, ==, BP_GET_LSIZE(bp));919ASSERT3U(range->start_blkid + 1, ==, range->end_blkid);920921if (send_do_embed(bp, dscp->dsc_featureflags)) {922err = dump_write_embedded(dscp, range->object,923range->start_blkid * srdp->datablksz,924srdp->datablksz, bp);925return (err);926}927ASSERT(range->object > dscp->dsc_resume_object ||928(range->object == dscp->dsc_resume_object &&929(range->start_blkid == DMU_SPILL_BLKID ||930range->start_blkid * srdp->datablksz >=931dscp->dsc_resume_offset)));932/* it's a level-0 block of a regular object */933934mutex_enter(&srdp->lock);935while (srdp->io_outstanding)936cv_wait(&srdp->cv, &srdp->lock);937err = srdp->io_err;938mutex_exit(&srdp->lock);939940if (err != 0) {941if (zfs_send_corrupt_data &&942!dscp->dsc_dso->dso_dryrun) {943/*944* Send a block filled with 0x"zfs badd bloc"945*/946srdp->abuf = arc_alloc_buf(spa, &srdp->abuf,947ARC_BUFC_DATA, srdp->datablksz);948uint64_t *ptr;949for (ptr = srdp->abuf->b_data;950(char *)ptr < (char *)srdp->abuf->b_data +951srdp->datablksz; ptr++)952*ptr = 0x2f5baddb10cULL;953} else {954return (SET_ERROR(EIO));955}956}957958ASSERT(dscp->dsc_dso->dso_dryrun ||959srdp->abuf != NULL || srdp->abd != NULL);960961char *data = NULL;962if (srdp->abd != NULL) {963data = abd_to_buf(srdp->abd);964ASSERT0P(srdp->abuf);965} else if (srdp->abuf != NULL) {966data = srdp->abuf->b_data;967}968969if (BP_GET_TYPE(bp) == DMU_OT_SA) {970ASSERT3U(range->start_blkid, ==, DMU_SPILL_BLKID);971err = dump_spill(dscp, bp, range->object, data);972return (err);973}974975uint64_t offset = range->start_blkid * srdp->datablksz;976977/*978* If we have large blocks stored on disk but the send flags979* don't allow us to send large blocks, we split the data from980* the arc buf into chunks.981*/982if (srdp->datablksz > SPA_OLD_MAXBLOCKSIZE &&983!(dscp->dsc_featureflags &984DMU_BACKUP_FEATURE_LARGE_BLOCKS)) {985while (srdp->datablksz > 0 && err == 0) {986int n = MIN(srdp->datablksz,987SPA_OLD_MAXBLOCKSIZE);988err = dmu_dump_write(dscp, srdp->obj_type,989range->object, offset, n, n, NULL, B_FALSE,990data);991offset += n;992/*993* When doing dry run, data==NULL is used as a994* sentinel value by995* dmu_dump_write()->dump_record().996*/997if (data != NULL)998data += n;999srdp->datablksz -= n;1000}1001} else {1002err = dmu_dump_write(dscp, srdp->obj_type,1003range->object, offset,1004srdp->datablksz, srdp->datasz, bp,1005srdp->io_compressed, data);1006}1007return (err);1008}1009case HOLE: {1010struct srh *srhp = &range->sru.hole;1011if (range->object == DMU_META_DNODE_OBJECT) {1012uint32_t span = srhp->datablksz >> DNODE_SHIFT;1013uint64_t first_obj = range->start_blkid * span;1014uint64_t numobj = range->end_blkid * span - first_obj;1015return (dump_freeobjects(dscp, first_obj, numobj));1016}1017uint64_t offset = 0;10181019/*1020* If this multiply overflows, we don't need to send this block.1021* Even if it has a birth time, it can never not be a hole, so1022* we don't need to send records for it.1023*/1024if (!overflow_multiply(range->start_blkid, srhp->datablksz,1025&offset)) {1026return (0);1027}1028uint64_t len = 0;10291030if (!overflow_multiply(range->end_blkid, srhp->datablksz, &len))1031len = UINT64_MAX;1032len = len - offset;1033return (dump_free(dscp, range->object, offset, len));1034}1035default:1036panic("Invalid range type in do_dump: %d", range->type);1037}1038return (err);1039}10401041static struct send_range *1042range_alloc(enum type type, uint64_t object, uint64_t start_blkid,1043uint64_t end_blkid, boolean_t eos)1044{1045struct send_range *range = kmem_alloc(sizeof (*range), KM_SLEEP);1046range->type = type;1047range->object = object;1048range->start_blkid = start_blkid;1049range->end_blkid = end_blkid;1050range->eos_marker = eos;1051if (type == DATA) {1052range->sru.data.abd = NULL;1053range->sru.data.abuf = NULL;1054mutex_init(&range->sru.data.lock, NULL, MUTEX_DEFAULT, NULL);1055cv_init(&range->sru.data.cv, NULL, CV_DEFAULT, NULL);1056range->sru.data.io_outstanding = 0;1057range->sru.data.io_err = 0;1058range->sru.data.io_compressed = B_FALSE;1059} else if (type == OBJECT) {1060range->sru.object.spill_range = NULL;1061}1062return (range);1063}10641065/*1066* This is the callback function to traverse_dataset that acts as a worker1067* thread for dmu_send_impl.1068*/1069static int1070send_cb(spa_t *spa, zilog_t *zilog, const blkptr_t *bp,1071const zbookmark_phys_t *zb, const struct dnode_phys *dnp, void *arg)1072{1073(void) zilog;1074struct send_thread_arg *sta = arg;1075struct send_range *record;10761077ASSERT(zb->zb_object == DMU_META_DNODE_OBJECT ||1078zb->zb_object >= sta->resume.zb_object);10791080/*1081* All bps of an encrypted os should have the encryption bit set.1082* If this is not true it indicates tampering and we report an error.1083*/1084if (sta->os->os_encrypted &&1085!BP_IS_HOLE(bp) && !BP_USES_CRYPT(bp)) {1086spa_log_error(spa, zb, BP_GET_PHYSICAL_BIRTH(bp));1087return (SET_ERROR(EIO));1088}10891090if (sta->cancel)1091return (SET_ERROR(EINTR));1092if (zb->zb_object != DMU_META_DNODE_OBJECT &&1093DMU_OBJECT_IS_SPECIAL(zb->zb_object))1094return (0);1095atomic_inc_64(sta->num_blocks_visited);10961097if (zb->zb_level == ZB_DNODE_LEVEL) {1098if (zb->zb_object == DMU_META_DNODE_OBJECT)1099return (0);1100record = range_alloc(OBJECT, zb->zb_object, 0, 0, B_FALSE);1101record->sru.object.bp = *bp;1102size_t size = sizeof (*dnp) * (dnp->dn_extra_slots + 1);1103record->sru.object.dnp = kmem_alloc(size, KM_SLEEP);1104memcpy(record->sru.object.dnp, dnp, size);1105bqueue_enqueue(&sta->q, record, sizeof (*record));1106return (0);1107}1108if (zb->zb_level == 0 && zb->zb_object == DMU_META_DNODE_OBJECT &&1109!BP_IS_HOLE(bp)) {1110record = range_alloc(OBJECT_RANGE, 0, zb->zb_blkid,1111zb->zb_blkid + 1, B_FALSE);1112record->sru.object_range.bp = *bp;1113bqueue_enqueue(&sta->q, record, sizeof (*record));1114return (0);1115}1116if (zb->zb_level < 0 || (zb->zb_level > 0 && !BP_IS_HOLE(bp)))1117return (0);1118if (zb->zb_object == DMU_META_DNODE_OBJECT && !BP_IS_HOLE(bp))1119return (0);11201121uint64_t span = bp_span_in_blocks(dnp->dn_indblkshift, zb->zb_level);1122uint64_t start;11231124/*1125* If this multiply overflows, we don't need to send this block.1126* Even if it has a birth time, it can never not be a hole, so1127* we don't need to send records for it.1128*/1129if (!overflow_multiply(span, zb->zb_blkid, &start) || (!(zb->zb_blkid ==1130DMU_SPILL_BLKID || DMU_OT_IS_METADATA(dnp->dn_type)) &&1131span * zb->zb_blkid > dnp->dn_maxblkid)) {1132ASSERT(BP_IS_HOLE(bp));1133return (0);1134}11351136if (zb->zb_blkid == DMU_SPILL_BLKID)1137ASSERT3U(BP_GET_TYPE(bp), ==, DMU_OT_SA);11381139enum type record_type = DATA;1140if (BP_IS_HOLE(bp))1141record_type = HOLE;1142else if (BP_IS_REDACTED(bp))1143record_type = REDACT;1144else1145record_type = DATA;11461147record = range_alloc(record_type, zb->zb_object, start,1148(start + span < start ? 0 : start + span), B_FALSE);11491150uint64_t datablksz = (zb->zb_blkid == DMU_SPILL_BLKID ?1151BP_GET_LSIZE(bp) : dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT);11521153if (BP_IS_HOLE(bp)) {1154record->sru.hole.datablksz = datablksz;1155} else if (BP_IS_REDACTED(bp)) {1156record->sru.redact.datablksz = datablksz;1157} else {1158record->sru.data.datablksz = datablksz;1159record->sru.data.obj_type = dnp->dn_type;1160record->sru.data.bp = *bp;1161}11621163bqueue_enqueue(&sta->q, record, sizeof (*record));1164return (0);1165}11661167struct redact_list_cb_arg {1168uint64_t *num_blocks_visited;1169bqueue_t *q;1170boolean_t *cancel;1171boolean_t mark_redact;1172};11731174static int1175redact_list_cb(redact_block_phys_t *rb, void *arg)1176{1177struct redact_list_cb_arg *rlcap = arg;11781179atomic_inc_64(rlcap->num_blocks_visited);1180if (*rlcap->cancel)1181return (-1);11821183struct send_range *data = range_alloc(REDACT, rb->rbp_object,1184rb->rbp_blkid, rb->rbp_blkid + redact_block_get_count(rb), B_FALSE);1185ASSERT3U(data->end_blkid, >, rb->rbp_blkid);1186if (rlcap->mark_redact) {1187data->type = REDACT;1188data->sru.redact.datablksz = redact_block_get_size(rb);1189} else {1190data->type = PREVIOUSLY_REDACTED;1191}1192bqueue_enqueue(rlcap->q, data, sizeof (*data));11931194return (0);1195}11961197/*1198* This function kicks off the traverse_dataset. It also handles setting the1199* error code of the thread in case something goes wrong, and pushes the End of1200* Stream record when the traverse_dataset call has finished.1201*/1202static __attribute__((noreturn)) void1203send_traverse_thread(void *arg)1204{1205struct send_thread_arg *st_arg = arg;1206int err = 0;1207struct send_range *data;1208fstrans_cookie_t cookie = spl_fstrans_mark();12091210err = traverse_dataset_resume(st_arg->os->os_dsl_dataset,1211st_arg->fromtxg, &st_arg->resume,1212st_arg->flags | TRAVERSE_LOGICAL, send_cb, st_arg);12131214if (err != EINTR)1215st_arg->error_code = err;1216data = range_alloc(DATA, 0, 0, 0, B_TRUE);1217bqueue_enqueue_flush(&st_arg->q, data, sizeof (*data));1218spl_fstrans_unmark(cookie);1219thread_exit();1220}12211222/*1223* Utility function that causes End of Stream records to compare after of all1224* others, so that other threads' comparison logic can stay simple.1225*/1226static int __attribute__((unused))1227send_range_after(const struct send_range *from, const struct send_range *to)1228{1229if (from->eos_marker == B_TRUE)1230return (1);1231if (to->eos_marker == B_TRUE)1232return (-1);12331234uint64_t from_obj = from->object;1235uint64_t from_end_obj = from->object + 1;1236uint64_t to_obj = to->object;1237uint64_t to_end_obj = to->object + 1;1238if (from_obj == 0) {1239ASSERT(from->type == HOLE || from->type == OBJECT_RANGE);1240from_obj = from->start_blkid << DNODES_PER_BLOCK_SHIFT;1241from_end_obj = from->end_blkid << DNODES_PER_BLOCK_SHIFT;1242}1243if (to_obj == 0) {1244ASSERT(to->type == HOLE || to->type == OBJECT_RANGE);1245to_obj = to->start_blkid << DNODES_PER_BLOCK_SHIFT;1246to_end_obj = to->end_blkid << DNODES_PER_BLOCK_SHIFT;1247}12481249if (from_end_obj <= to_obj)1250return (-1);1251if (from_obj >= to_end_obj)1252return (1);1253int64_t cmp = TREE_CMP(to->type == OBJECT_RANGE, from->type ==1254OBJECT_RANGE);1255if (unlikely(cmp))1256return (cmp);1257cmp = TREE_CMP(to->type == OBJECT, from->type == OBJECT);1258if (unlikely(cmp))1259return (cmp);1260if (from->end_blkid <= to->start_blkid)1261return (-1);1262if (from->start_blkid >= to->end_blkid)1263return (1);1264return (0);1265}12661267/*1268* Pop the new data off the queue, check that the records we receive are in1269* the right order, but do not free the old data. This is used so that the1270* records can be sent on to the main thread without copying the data.1271*/1272static struct send_range *1273get_next_range_nofree(bqueue_t *bq, struct send_range *prev)1274{1275struct send_range *next = bqueue_dequeue(bq);1276ASSERT3S(send_range_after(prev, next), ==, -1);1277return (next);1278}12791280/*1281* Pop the new data off the queue, check that the records we receive are in1282* the right order, and free the old data.1283*/1284static struct send_range *1285get_next_range(bqueue_t *bq, struct send_range *prev)1286{1287struct send_range *next = get_next_range_nofree(bq, prev);1288range_free(prev);1289return (next);1290}12911292static __attribute__((noreturn)) void1293redact_list_thread(void *arg)1294{1295struct redact_list_thread_arg *rlt_arg = arg;1296struct send_range *record;1297fstrans_cookie_t cookie = spl_fstrans_mark();1298if (rlt_arg->rl != NULL) {1299struct redact_list_cb_arg rlcba = {0};1300rlcba.cancel = &rlt_arg->cancel;1301rlcba.q = &rlt_arg->q;1302rlcba.num_blocks_visited = rlt_arg->num_blocks_visited;1303rlcba.mark_redact = rlt_arg->mark_redact;1304int err = dsl_redaction_list_traverse(rlt_arg->rl,1305&rlt_arg->resume, redact_list_cb, &rlcba);1306if (err != EINTR)1307rlt_arg->error_code = err;1308}1309record = range_alloc(DATA, 0, 0, 0, B_TRUE);1310bqueue_enqueue_flush(&rlt_arg->q, record, sizeof (*record));1311spl_fstrans_unmark(cookie);13121313thread_exit();1314}13151316/*1317* Compare the start point of the two provided ranges. End of stream ranges1318* compare last, objects compare before any data or hole inside that object and1319* multi-object holes that start at the same object.1320*/1321static int1322send_range_start_compare(struct send_range *r1, struct send_range *r2)1323{1324uint64_t r1_objequiv = r1->object;1325uint64_t r1_l0equiv = r1->start_blkid;1326uint64_t r2_objequiv = r2->object;1327uint64_t r2_l0equiv = r2->start_blkid;1328int64_t cmp = TREE_CMP(r1->eos_marker, r2->eos_marker);1329if (unlikely(cmp))1330return (cmp);1331if (r1->object == 0) {1332r1_objequiv = r1->start_blkid * DNODES_PER_BLOCK;1333r1_l0equiv = 0;1334}1335if (r2->object == 0) {1336r2_objequiv = r2->start_blkid * DNODES_PER_BLOCK;1337r2_l0equiv = 0;1338}13391340cmp = TREE_CMP(r1_objequiv, r2_objequiv);1341if (likely(cmp))1342return (cmp);1343cmp = TREE_CMP(r2->type == OBJECT_RANGE, r1->type == OBJECT_RANGE);1344if (unlikely(cmp))1345return (cmp);1346cmp = TREE_CMP(r2->type == OBJECT, r1->type == OBJECT);1347if (unlikely(cmp))1348return (cmp);13491350return (TREE_CMP(r1_l0equiv, r2_l0equiv));1351}13521353enum q_idx {1354REDACT_IDX = 0,1355TO_IDX,1356FROM_IDX,1357NUM_THREADS1358};13591360/*1361* This function returns the next range the send_merge_thread should operate on.1362* The inputs are two arrays; the first one stores the range at the front of the1363* queues stored in the second one. The ranges are sorted in descending1364* priority order; the metadata from earlier ranges overrules metadata from1365* later ranges. out_mask is used to return which threads the ranges came from;1366* bit i is set if ranges[i] started at the same place as the returned range.1367*1368* This code is not hardcoded to compare a specific number of threads; it could1369* be used with any number, just by changing the q_idx enum.1370*1371* The "next range" is the one with the earliest start; if two starts are equal,1372* the highest-priority range is the next to operate on. If a higher-priority1373* range starts in the middle of the first range, then the first range will be1374* truncated to end where the higher-priority range starts, and we will operate1375* on that one next time. In this way, we make sure that each block covered by1376* some range gets covered by a returned range, and each block covered is1377* returned using the metadata of the highest-priority range it appears in.1378*1379* For example, if the three ranges at the front of the queues were [2,4),1380* [3,5), and [1,3), then the ranges returned would be [1,2) with the metadata1381* from the third range, [2,4) with the metadata from the first range, and then1382* [4,5) with the metadata from the second.1383*/1384static struct send_range *1385find_next_range(struct send_range **ranges, bqueue_t **qs, uint64_t *out_mask)1386{1387int idx = 0; // index of the range with the earliest start1388int i;1389uint64_t bmask = 0;1390for (i = 1; i < NUM_THREADS; i++) {1391if (send_range_start_compare(ranges[i], ranges[idx]) < 0)1392idx = i;1393}1394if (ranges[idx]->eos_marker) {1395struct send_range *ret = range_alloc(DATA, 0, 0, 0, B_TRUE);1396*out_mask = 0;1397return (ret);1398}1399/*1400* Find all the ranges that start at that same point.1401*/1402for (i = 0; i < NUM_THREADS; i++) {1403if (send_range_start_compare(ranges[i], ranges[idx]) == 0)1404bmask |= 1 << i;1405}1406*out_mask = bmask;1407/*1408* OBJECT_RANGE records only come from the TO thread, and should always1409* be treated as overlapping with nothing and sent on immediately. They1410* are only used in raw sends, and are never redacted.1411*/1412if (ranges[idx]->type == OBJECT_RANGE) {1413ASSERT3U(idx, ==, TO_IDX);1414ASSERT3U(*out_mask, ==, 1 << TO_IDX);1415struct send_range *ret = ranges[idx];1416ranges[idx] = get_next_range_nofree(qs[idx], ranges[idx]);1417return (ret);1418}1419/*1420* Find the first start or end point after the start of the first range.1421*/1422uint64_t first_change = ranges[idx]->end_blkid;1423for (i = 0; i < NUM_THREADS; i++) {1424if (i == idx || ranges[i]->eos_marker ||1425ranges[i]->object > ranges[idx]->object ||1426ranges[i]->object == DMU_META_DNODE_OBJECT)1427continue;1428ASSERT3U(ranges[i]->object, ==, ranges[idx]->object);1429if (first_change > ranges[i]->start_blkid &&1430(bmask & (1 << i)) == 0)1431first_change = ranges[i]->start_blkid;1432else if (first_change > ranges[i]->end_blkid)1433first_change = ranges[i]->end_blkid;1434}1435/*1436* Update all ranges to no longer overlap with the range we're1437* returning. All such ranges must start at the same place as the range1438* being returned, and end at or after first_change. Thus we update1439* their start to first_change. If that makes them size 0, then free1440* them and pull a new range from that thread.1441*/1442for (i = 0; i < NUM_THREADS; i++) {1443if (i == idx || (bmask & (1 << i)) == 0)1444continue;1445ASSERT3U(first_change, >, ranges[i]->start_blkid);1446ranges[i]->start_blkid = first_change;1447ASSERT3U(ranges[i]->start_blkid, <=, ranges[i]->end_blkid);1448if (ranges[i]->start_blkid == ranges[i]->end_blkid)1449ranges[i] = get_next_range(qs[i], ranges[i]);1450}1451/*1452* Short-circuit the simple case; if the range doesn't overlap with1453* anything else, or it only overlaps with things that start at the same1454* place and are longer, send it on.1455*/1456if (first_change == ranges[idx]->end_blkid) {1457struct send_range *ret = ranges[idx];1458ranges[idx] = get_next_range_nofree(qs[idx], ranges[idx]);1459return (ret);1460}14611462/*1463* Otherwise, return a truncated copy of ranges[idx] and move the start1464* of ranges[idx] back to first_change.1465*/1466struct send_range *ret = kmem_alloc(sizeof (*ret), KM_SLEEP);1467*ret = *ranges[idx];1468ret->end_blkid = first_change;1469ranges[idx]->start_blkid = first_change;1470return (ret);1471}14721473#define FROM_AND_REDACT_BITS ((1 << REDACT_IDX) | (1 << FROM_IDX))14741475/*1476* Merge the results from the from thread and the to thread, and then hand the1477* records off to send_prefetch_thread to prefetch them. If this is not a1478* send from a redaction bookmark, the from thread will push an end of stream1479* record and stop, and we'll just send everything that was changed in the1480* to_ds since the ancestor's creation txg. If it is, then since1481* traverse_dataset has a canonical order, we can compare each change as1482* they're pulled off the queues. That will give us a stream that is1483* appropriately sorted, and covers all records. In addition, we pull the1484* data from the redact_list_thread and use that to determine which blocks1485* should be redacted.1486*/1487static __attribute__((noreturn)) void1488send_merge_thread(void *arg)1489{1490struct send_merge_thread_arg *smt_arg = arg;1491struct send_range *front_ranges[NUM_THREADS];1492bqueue_t *queues[NUM_THREADS];1493int err = 0;1494fstrans_cookie_t cookie = spl_fstrans_mark();14951496if (smt_arg->redact_arg == NULL) {1497front_ranges[REDACT_IDX] =1498kmem_zalloc(sizeof (struct send_range), KM_SLEEP);1499front_ranges[REDACT_IDX]->eos_marker = B_TRUE;1500front_ranges[REDACT_IDX]->type = REDACT;1501queues[REDACT_IDX] = NULL;1502} else {1503front_ranges[REDACT_IDX] =1504bqueue_dequeue(&smt_arg->redact_arg->q);1505queues[REDACT_IDX] = &smt_arg->redact_arg->q;1506}1507front_ranges[TO_IDX] = bqueue_dequeue(&smt_arg->to_arg->q);1508queues[TO_IDX] = &smt_arg->to_arg->q;1509front_ranges[FROM_IDX] = bqueue_dequeue(&smt_arg->from_arg->q);1510queues[FROM_IDX] = &smt_arg->from_arg->q;1511uint64_t mask = 0;1512struct send_range *range;1513for (range = find_next_range(front_ranges, queues, &mask);1514!range->eos_marker && err == 0 && !smt_arg->cancel;1515range = find_next_range(front_ranges, queues, &mask)) {1516/*1517* If the range in question was in both the from redact bookmark1518* and the bookmark we're using to redact, then don't send it.1519* It's already redacted on the receiving system, so a redaction1520* record would be redundant.1521*/1522if ((mask & FROM_AND_REDACT_BITS) == FROM_AND_REDACT_BITS) {1523ASSERT3U(range->type, ==, REDACT);1524range_free(range);1525continue;1526}1527bqueue_enqueue(&smt_arg->q, range, sizeof (*range));15281529if (smt_arg->to_arg->error_code != 0) {1530err = smt_arg->to_arg->error_code;1531} else if (smt_arg->from_arg->error_code != 0) {1532err = smt_arg->from_arg->error_code;1533} else if (smt_arg->redact_arg != NULL &&1534smt_arg->redact_arg->error_code != 0) {1535err = smt_arg->redact_arg->error_code;1536}1537}1538if (smt_arg->cancel && err == 0)1539err = SET_ERROR(EINTR);1540smt_arg->error = err;1541if (smt_arg->error != 0) {1542smt_arg->to_arg->cancel = B_TRUE;1543smt_arg->from_arg->cancel = B_TRUE;1544if (smt_arg->redact_arg != NULL)1545smt_arg->redact_arg->cancel = B_TRUE;1546}1547for (int i = 0; i < NUM_THREADS; i++) {1548while (!front_ranges[i]->eos_marker) {1549front_ranges[i] = get_next_range(queues[i],1550front_ranges[i]);1551}1552range_free(front_ranges[i]);1553}1554range->eos_marker = B_TRUE;1555bqueue_enqueue_flush(&smt_arg->q, range, 1);1556spl_fstrans_unmark(cookie);1557thread_exit();1558}15591560struct send_reader_thread_arg {1561struct send_merge_thread_arg *smta;1562bqueue_t q;1563boolean_t cancel;1564boolean_t issue_reads;1565uint64_t featureflags;1566int error;1567};15681569static void1570dmu_send_read_done(zio_t *zio)1571{1572struct send_range *range = zio->io_private;15731574mutex_enter(&range->sru.data.lock);1575if (zio->io_error != 0) {1576abd_free(range->sru.data.abd);1577range->sru.data.abd = NULL;1578range->sru.data.io_err = zio->io_error;1579}15801581ASSERT(range->sru.data.io_outstanding);1582range->sru.data.io_outstanding = B_FALSE;1583cv_broadcast(&range->sru.data.cv);1584mutex_exit(&range->sru.data.lock);1585}15861587static void1588issue_data_read(struct send_reader_thread_arg *srta, struct send_range *range)1589{1590struct srd *srdp = &range->sru.data;1591blkptr_t *bp = &srdp->bp;1592objset_t *os = srta->smta->os;15931594ASSERT3U(range->type, ==, DATA);1595ASSERT3U(range->start_blkid + 1, ==, range->end_blkid);1596/*1597* If we have large blocks stored on disk but1598* the send flags don't allow us to send large1599* blocks, we split the data from the arc buf1600* into chunks.1601*/1602boolean_t split_large_blocks =1603srdp->datablksz > SPA_OLD_MAXBLOCKSIZE &&1604!(srta->featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS);1605/*1606* We should only request compressed data from the ARC if all1607* the following are true:1608* - stream compression was requested1609* - we aren't splitting large blocks into smaller chunks1610* - the data won't need to be byteswapped before sending1611* - this isn't an embedded block1612* - this isn't metadata (if receiving on a different endian1613* system it can be byteswapped more easily)1614*/1615boolean_t request_compressed =1616(srta->featureflags & DMU_BACKUP_FEATURE_COMPRESSED) &&1617!split_large_blocks && !BP_SHOULD_BYTESWAP(bp) &&1618!BP_IS_EMBEDDED(bp) && !DMU_OT_IS_METADATA(BP_GET_TYPE(bp));16191620zio_flag_t zioflags = ZIO_FLAG_CANFAIL;16211622if (srta->featureflags & DMU_BACKUP_FEATURE_RAW) {1623zioflags |= ZIO_FLAG_RAW;1624srdp->io_compressed = B_TRUE;1625} else if (request_compressed) {1626zioflags |= ZIO_FLAG_RAW_COMPRESS;1627srdp->io_compressed = B_TRUE;1628}16291630srdp->datasz = (zioflags & ZIO_FLAG_RAW_COMPRESS) ?1631BP_GET_PSIZE(bp) : BP_GET_LSIZE(bp);16321633if (!srta->issue_reads)1634return;1635if (BP_IS_REDACTED(bp))1636return;1637if (send_do_embed(bp, srta->featureflags))1638return;16391640zbookmark_phys_t zb = {1641.zb_objset = dmu_objset_id(os),1642.zb_object = range->object,1643.zb_level = 0,1644.zb_blkid = range->start_blkid,1645};16461647arc_flags_t aflags = ARC_FLAG_CACHED_ONLY;16481649int arc_err = arc_read(NULL, os->os_spa, bp,1650arc_getbuf_func, &srdp->abuf, ZIO_PRIORITY_ASYNC_READ,1651zioflags, &aflags, &zb);1652/*1653* If the data is not already cached in the ARC, we read directly1654* from zio. This avoids the performance overhead of adding a new1655* entry to the ARC, and we also avoid polluting the ARC cache with1656* data that is not likely to be used in the future.1657*/1658if (arc_err != 0) {1659srdp->abd = abd_alloc_linear(srdp->datasz, B_FALSE);1660srdp->io_outstanding = B_TRUE;1661zio_nowait(zio_read(NULL, os->os_spa, bp, srdp->abd,1662srdp->datasz, dmu_send_read_done, range,1663ZIO_PRIORITY_ASYNC_READ, zioflags, &zb));1664}1665}16661667/*1668* Create a new record with the given values.1669*/1670static void1671enqueue_range(struct send_reader_thread_arg *srta, bqueue_t *q, dnode_t *dn,1672uint64_t blkid, uint64_t count, const blkptr_t *bp, uint32_t datablksz)1673{1674enum type range_type = (bp == NULL || BP_IS_HOLE(bp) ? HOLE :1675(BP_IS_REDACTED(bp) ? REDACT : DATA));16761677struct send_range *range = range_alloc(range_type, dn->dn_object,1678blkid, blkid + count, B_FALSE);16791680if (blkid == DMU_SPILL_BLKID) {1681ASSERT3P(bp, !=, NULL);1682ASSERT3U(BP_GET_TYPE(bp), ==, DMU_OT_SA);1683}16841685switch (range_type) {1686case HOLE:1687range->sru.hole.datablksz = datablksz;1688break;1689case DATA:1690ASSERT3U(count, ==, 1);1691range->sru.data.datablksz = datablksz;1692range->sru.data.obj_type = dn->dn_type;1693range->sru.data.bp = *bp;1694issue_data_read(srta, range);1695break;1696case REDACT:1697range->sru.redact.datablksz = datablksz;1698break;1699default:1700break;1701}1702bqueue_enqueue(q, range, datablksz);1703}17041705/*1706* Send DRR_SPILL records for unmodified spill blocks. This is useful1707* because changing certain attributes of the object (e.g. blocksize)1708* can cause old versions of ZFS to incorrectly remove a spill block.1709* Including these records in the stream forces an up to date version1710* to always be written ensuring they're never lost. Current versions1711* of the code which understand the DRR_FLAG_SPILL_BLOCK feature can1712* ignore these unmodified spill blocks.1713*1714* We piggyback the spill_range to dnode range instead of enqueueing it1715* so send_range_after won't complain.1716*/1717static uint64_t1718piggyback_unmodified_spill(struct send_reader_thread_arg *srta,1719struct send_range *range)1720{1721ASSERT3U(range->type, ==, OBJECT);17221723dnode_phys_t *dnp = range->sru.object.dnp;1724uint64_t fromtxg = srta->smta->to_arg->fromtxg;17251726if (!zfs_send_unmodified_spill_blocks ||1727!(dnp->dn_flags & DNODE_FLAG_SPILL_BLKPTR) ||1728!(BP_GET_LOGICAL_BIRTH(DN_SPILL_BLKPTR(dnp)) <= fromtxg))1729return (0);17301731blkptr_t *bp = DN_SPILL_BLKPTR(dnp);1732struct send_range *spill_range = range_alloc(DATA, range->object,1733DMU_SPILL_BLKID, DMU_SPILL_BLKID+1, B_FALSE);1734spill_range->sru.data.bp = *bp;1735spill_range->sru.data.obj_type = dnp->dn_type;1736spill_range->sru.data.datablksz = BP_GET_LSIZE(bp);17371738issue_data_read(srta, spill_range);1739range->sru.object.spill_range = spill_range;17401741return (BP_GET_LSIZE(bp));1742}17431744/*1745* This thread is responsible for two things: First, it retrieves the correct1746* blkptr in the to ds if we need to send the data because of something from1747* the from thread. As a result of this, we're the first ones to discover that1748* some indirect blocks can be discarded because they're not holes. Second,1749* it issues prefetches for the data we need to send.1750*/1751static __attribute__((noreturn)) void1752send_reader_thread(void *arg)1753{1754struct send_reader_thread_arg *srta = arg;1755struct send_merge_thread_arg *smta = srta->smta;1756bqueue_t *inq = &smta->q;1757bqueue_t *outq = &srta->q;1758objset_t *os = smta->os;1759fstrans_cookie_t cookie = spl_fstrans_mark();1760struct send_range *range = bqueue_dequeue(inq);1761int err = 0;17621763/*1764* If the record we're analyzing is from a redaction bookmark from the1765* fromds, then we need to know whether or not it exists in the tods so1766* we know whether to create records for it or not. If it does, we need1767* the datablksz so we can generate an appropriate record for it.1768* Finally, if it isn't redacted, we need the blkptr so that we can send1769* a WRITE record containing the actual data.1770*/1771uint64_t last_obj = UINT64_MAX;1772uint64_t last_obj_exists = B_TRUE;1773while (!range->eos_marker && !srta->cancel && smta->error == 0 &&1774err == 0) {1775uint64_t spill = 0;1776switch (range->type) {1777case DATA:1778issue_data_read(srta, range);1779bqueue_enqueue(outq, range, range->sru.data.datablksz);1780range = get_next_range_nofree(inq, range);1781break;1782case OBJECT:1783spill = piggyback_unmodified_spill(srta, range);1784zfs_fallthrough;1785case HOLE:1786case OBJECT_RANGE:1787case REDACT: // Redacted blocks must exist1788bqueue_enqueue(outq, range, sizeof (*range) + spill);1789range = get_next_range_nofree(inq, range);1790break;1791case PREVIOUSLY_REDACTED: {1792/*1793* This entry came from the "from bookmark" when1794* sending from a bookmark that has a redaction1795* list. We need to check if this object/blkid1796* exists in the target ("to") dataset, and if1797* not then we drop this entry. We also need1798* to fill in the block pointer so that we know1799* what to prefetch.1800*1801* To accomplish the above, we first cache whether or1802* not the last object we examined exists. If it1803* doesn't, we can drop this record. If it does, we hold1804* the dnode and use it to call dbuf_dnode_findbp. We do1805* this instead of dbuf_bookmark_findbp because we will1806* often operate on large ranges, and holding the dnode1807* once is more efficient.1808*/1809boolean_t object_exists = B_TRUE;1810/*1811* If the data is redacted, we only care if it exists,1812* so that we don't send records for objects that have1813* been deleted.1814*/1815dnode_t *dn;1816if (range->object == last_obj && !last_obj_exists) {1817/*1818* If we're still examining the same object as1819* previously, and it doesn't exist, we don't1820* need to call dbuf_bookmark_findbp.1821*/1822object_exists = B_FALSE;1823} else {1824err = dnode_hold(os, range->object, FTAG, &dn);1825if (err == ENOENT) {1826object_exists = B_FALSE;1827err = 0;1828}1829last_obj = range->object;1830last_obj_exists = object_exists;1831}18321833if (err != 0) {1834break;1835} else if (!object_exists) {1836/*1837* The block was modified, but doesn't1838* exist in the to dataset; if it was1839* deleted in the to dataset, then we'll1840* visit the hole bp for it at some point.1841*/1842range = get_next_range(inq, range);1843continue;1844}1845uint64_t file_max =1846MIN(dn->dn_maxblkid, range->end_blkid);1847/*1848* The object exists, so we need to try to find the1849* blkptr for each block in the range we're processing.1850*/1851rw_enter(&dn->dn_struct_rwlock, RW_READER);1852for (uint64_t blkid = range->start_blkid;1853blkid < file_max; blkid++) {1854blkptr_t bp;1855uint32_t datablksz =1856dn->dn_phys->dn_datablkszsec <<1857SPA_MINBLOCKSHIFT;1858uint64_t offset = blkid * datablksz;1859/*1860* This call finds the next non-hole block in1861* the object. This is to prevent a1862* performance problem where we're unredacting1863* a large hole. Using dnode_next_offset to1864* skip over the large hole avoids iterating1865* over every block in it.1866*/1867err = dnode_next_offset(dn, DNODE_FIND_HAVELOCK,1868&offset, 1, 1, 0);1869if (err == ESRCH) {1870offset = UINT64_MAX;1871err = 0;1872} else if (err != 0) {1873break;1874}1875if (offset != blkid * datablksz) {1876/*1877* if there is a hole from here1878* (blkid) to offset1879*/1880offset = MIN(offset, file_max *1881datablksz);1882uint64_t nblks = (offset / datablksz) -1883blkid;1884enqueue_range(srta, outq, dn, blkid,1885nblks, NULL, datablksz);1886blkid += nblks;1887}1888if (blkid >= file_max)1889break;1890err = dbuf_dnode_findbp(dn, 0, blkid, &bp,1891NULL, NULL);1892if (err != 0)1893break;1894ASSERT(!BP_IS_HOLE(&bp));1895enqueue_range(srta, outq, dn, blkid, 1, &bp,1896datablksz);1897}1898rw_exit(&dn->dn_struct_rwlock);1899dnode_rele(dn, FTAG);1900range = get_next_range(inq, range);1901}1902}1903}1904if (srta->cancel || err != 0) {1905smta->cancel = B_TRUE;1906srta->error = err;1907} else if (smta->error != 0) {1908srta->error = smta->error;1909}1910while (!range->eos_marker)1911range = get_next_range(inq, range);19121913bqueue_enqueue_flush(outq, range, 1);1914spl_fstrans_unmark(cookie);1915thread_exit();1916}19171918#define NUM_SNAPS_NOT_REDACTED UINT64_MAX19191920struct dmu_send_params {1921/* Pool args */1922const void *tag; // Tag dp was held with, will be used to release dp.1923dsl_pool_t *dp;1924/* To snapshot args */1925const char *tosnap;1926dsl_dataset_t *to_ds;1927/* From snapshot args */1928zfs_bookmark_phys_t ancestor_zb;1929uint64_t *fromredactsnaps;1930/* NUM_SNAPS_NOT_REDACTED if not sending from redaction bookmark */1931uint64_t numfromredactsnaps;1932/* Stream params */1933boolean_t is_clone;1934boolean_t embedok;1935boolean_t large_block_ok;1936boolean_t compressok;1937boolean_t rawok;1938boolean_t savedok;1939uint64_t resumeobj;1940uint64_t resumeoff;1941uint64_t saved_guid;1942zfs_bookmark_phys_t *redactbook;1943/* Stream output params */1944dmu_send_outparams_t *dso;19451946/* Stream progress params */1947offset_t *off;1948int outfd;1949char saved_toname[MAXNAMELEN];1950};19511952static int1953setup_featureflags(struct dmu_send_params *dspp, objset_t *os,1954uint64_t *featureflags)1955{1956dsl_dataset_t *to_ds = dspp->to_ds;1957dsl_pool_t *dp = dspp->dp;19581959if (dmu_objset_type(os) == DMU_OST_ZFS) {1960uint64_t version;1961if (zfs_get_zplprop(os, ZFS_PROP_VERSION, &version) != 0)1962return (SET_ERROR(EINVAL));19631964if (version >= ZPL_VERSION_SA)1965*featureflags |= DMU_BACKUP_FEATURE_SA_SPILL;1966}19671968/* raw sends imply large_block_ok */1969if ((dspp->rawok || dspp->large_block_ok) &&1970dsl_dataset_feature_is_active(to_ds, SPA_FEATURE_LARGE_BLOCKS)) {1971*featureflags |= DMU_BACKUP_FEATURE_LARGE_BLOCKS;1972}19731974/* encrypted datasets will not have embedded blocks */1975if ((dspp->embedok || dspp->rawok) && !os->os_encrypted &&1976spa_feature_is_active(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA)) {1977*featureflags |= DMU_BACKUP_FEATURE_EMBED_DATA;1978}19791980/* raw send implies compressok */1981if (dspp->compressok || dspp->rawok)1982*featureflags |= DMU_BACKUP_FEATURE_COMPRESSED;19831984if (dspp->rawok && os->os_encrypted)1985*featureflags |= DMU_BACKUP_FEATURE_RAW;19861987if ((*featureflags &1988(DMU_BACKUP_FEATURE_EMBED_DATA | DMU_BACKUP_FEATURE_COMPRESSED |1989DMU_BACKUP_FEATURE_RAW)) != 0 &&1990spa_feature_is_active(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS)) {1991*featureflags |= DMU_BACKUP_FEATURE_LZ4;1992}19931994/*1995* We specifically do not include DMU_BACKUP_FEATURE_EMBED_DATA here to1996* allow sending ZSTD compressed datasets to a receiver that does not1997* support ZSTD1998*/1999if ((*featureflags &2000(DMU_BACKUP_FEATURE_COMPRESSED | DMU_BACKUP_FEATURE_RAW)) != 0 &&2001dsl_dataset_feature_is_active(to_ds, SPA_FEATURE_ZSTD_COMPRESS)) {2002*featureflags |= DMU_BACKUP_FEATURE_ZSTD;2003}20042005if (dspp->resumeobj != 0 || dspp->resumeoff != 0) {2006*featureflags |= DMU_BACKUP_FEATURE_RESUMING;2007}20082009if (dspp->redactbook != NULL) {2010*featureflags |= DMU_BACKUP_FEATURE_REDACTED;2011}20122013if (dsl_dataset_feature_is_active(to_ds, SPA_FEATURE_LARGE_DNODE)) {2014*featureflags |= DMU_BACKUP_FEATURE_LARGE_DNODE;2015}20162017if (dsl_dataset_feature_is_active(to_ds, SPA_FEATURE_LONGNAME)) {2018*featureflags |= DMU_BACKUP_FEATURE_LONGNAME;2019}20202021if (dsl_dataset_feature_is_active(to_ds, SPA_FEATURE_LARGE_MICROZAP)) {2022/*2023* We must never split a large microzap block, so we can only2024* send large microzaps if LARGE_BLOCKS is already enabled.2025*/2026if (!(*featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS))2027return (SET_ERROR(ZFS_ERR_STREAM_LARGE_MICROZAP));2028*featureflags |= DMU_BACKUP_FEATURE_LARGE_MICROZAP;2029}20302031return (0);2032}20332034static dmu_replay_record_t *2035create_begin_record(struct dmu_send_params *dspp, objset_t *os,2036uint64_t featureflags)2037{2038dmu_replay_record_t *drr = kmem_zalloc(sizeof (dmu_replay_record_t),2039KM_SLEEP);2040drr->drr_type = DRR_BEGIN;20412042struct drr_begin *drrb = &drr->drr_u.drr_begin;2043dsl_dataset_t *to_ds = dspp->to_ds;20442045drrb->drr_magic = DMU_BACKUP_MAGIC;2046drrb->drr_creation_time = dsl_dataset_phys(to_ds)->ds_creation_time;2047drrb->drr_type = dmu_objset_type(os);2048drrb->drr_toguid = dsl_dataset_phys(to_ds)->ds_guid;2049drrb->drr_fromguid = dspp->ancestor_zb.zbm_guid;20502051DMU_SET_STREAM_HDRTYPE(drrb->drr_versioninfo, DMU_SUBSTREAM);2052DMU_SET_FEATUREFLAGS(drrb->drr_versioninfo, featureflags);20532054if (dspp->is_clone)2055drrb->drr_flags |= DRR_FLAG_CLONE;2056if (dsl_dataset_phys(dspp->to_ds)->ds_flags & DS_FLAG_CI_DATASET)2057drrb->drr_flags |= DRR_FLAG_CI_DATA;2058if (zfs_send_set_freerecords_bit)2059drrb->drr_flags |= DRR_FLAG_FREERECORDS;2060drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_SPILL_BLOCK;20612062if (dspp->savedok) {2063drrb->drr_toguid = dspp->saved_guid;2064strlcpy(drrb->drr_toname, dspp->saved_toname,2065sizeof (drrb->drr_toname));2066} else {2067dsl_dataset_name(to_ds, drrb->drr_toname);2068if (!to_ds->ds_is_snapshot) {2069(void) strlcat(drrb->drr_toname, "@--head--",2070sizeof (drrb->drr_toname));2071}2072}2073return (drr);2074}20752076static void2077setup_to_thread(struct send_thread_arg *to_arg, objset_t *to_os,2078dmu_sendstatus_t *dssp, uint64_t fromtxg, boolean_t rawok)2079{2080VERIFY0(bqueue_init(&to_arg->q, zfs_send_no_prefetch_queue_ff,2081MAX(zfs_send_no_prefetch_queue_length, 2 * zfs_max_recordsize),2082offsetof(struct send_range, ln)));2083to_arg->error_code = 0;2084to_arg->cancel = B_FALSE;2085to_arg->os = to_os;2086to_arg->fromtxg = fromtxg;2087to_arg->flags = TRAVERSE_PRE | TRAVERSE_PREFETCH_METADATA;2088if (rawok)2089to_arg->flags |= TRAVERSE_NO_DECRYPT;2090if (zfs_send_corrupt_data)2091to_arg->flags |= TRAVERSE_HARD;2092to_arg->num_blocks_visited = &dssp->dss_blocks;2093(void) thread_create(NULL, 0, send_traverse_thread, to_arg, 0,2094curproc, TS_RUN, minclsyspri);2095}20962097static void2098setup_from_thread(struct redact_list_thread_arg *from_arg,2099redaction_list_t *from_rl, dmu_sendstatus_t *dssp)2100{2101VERIFY0(bqueue_init(&from_arg->q, zfs_send_no_prefetch_queue_ff,2102MAX(zfs_send_no_prefetch_queue_length, 2 * zfs_max_recordsize),2103offsetof(struct send_range, ln)));2104from_arg->error_code = 0;2105from_arg->cancel = B_FALSE;2106from_arg->rl = from_rl;2107from_arg->mark_redact = B_FALSE;2108from_arg->num_blocks_visited = &dssp->dss_blocks;2109/*2110* If from_ds is null, send_traverse_thread just returns success and2111* enqueues an eos marker.2112*/2113(void) thread_create(NULL, 0, redact_list_thread, from_arg, 0,2114curproc, TS_RUN, minclsyspri);2115}21162117static void2118setup_redact_list_thread(struct redact_list_thread_arg *rlt_arg,2119struct dmu_send_params *dspp, redaction_list_t *rl, dmu_sendstatus_t *dssp)2120{2121if (dspp->redactbook == NULL)2122return;21232124rlt_arg->cancel = B_FALSE;2125VERIFY0(bqueue_init(&rlt_arg->q, zfs_send_no_prefetch_queue_ff,2126MAX(zfs_send_no_prefetch_queue_length, 2 * zfs_max_recordsize),2127offsetof(struct send_range, ln)));2128rlt_arg->error_code = 0;2129rlt_arg->mark_redact = B_TRUE;2130rlt_arg->rl = rl;2131rlt_arg->num_blocks_visited = &dssp->dss_blocks;21322133(void) thread_create(NULL, 0, redact_list_thread, rlt_arg, 0,2134curproc, TS_RUN, minclsyspri);2135}21362137static void2138setup_merge_thread(struct send_merge_thread_arg *smt_arg,2139struct dmu_send_params *dspp, struct redact_list_thread_arg *from_arg,2140struct send_thread_arg *to_arg, struct redact_list_thread_arg *rlt_arg,2141objset_t *os)2142{2143VERIFY0(bqueue_init(&smt_arg->q, zfs_send_no_prefetch_queue_ff,2144MAX(zfs_send_no_prefetch_queue_length, 2 * zfs_max_recordsize),2145offsetof(struct send_range, ln)));2146smt_arg->cancel = B_FALSE;2147smt_arg->error = 0;2148smt_arg->from_arg = from_arg;2149smt_arg->to_arg = to_arg;2150if (dspp->redactbook != NULL)2151smt_arg->redact_arg = rlt_arg;21522153smt_arg->os = os;2154(void) thread_create(NULL, 0, send_merge_thread, smt_arg, 0, curproc,2155TS_RUN, minclsyspri);2156}21572158static void2159setup_reader_thread(struct send_reader_thread_arg *srt_arg,2160struct dmu_send_params *dspp, struct send_merge_thread_arg *smt_arg,2161uint64_t featureflags)2162{2163VERIFY0(bqueue_init(&srt_arg->q, zfs_send_queue_ff,2164MAX(zfs_send_queue_length, 2 * zfs_max_recordsize),2165offsetof(struct send_range, ln)));2166srt_arg->smta = smt_arg;2167srt_arg->issue_reads = !dspp->dso->dso_dryrun;2168srt_arg->featureflags = featureflags;2169(void) thread_create(NULL, 0, send_reader_thread, srt_arg, 0,2170curproc, TS_RUN, minclsyspri);2171}21722173static int2174setup_resume_points(struct dmu_send_params *dspp,2175struct send_thread_arg *to_arg, struct redact_list_thread_arg *from_arg,2176struct redact_list_thread_arg *rlt_arg,2177struct send_merge_thread_arg *smt_arg, boolean_t resuming, objset_t *os,2178redaction_list_t *redact_rl, nvlist_t *nvl)2179{2180(void) smt_arg;2181dsl_dataset_t *to_ds = dspp->to_ds;2182int err = 0;21832184uint64_t obj = 0;2185uint64_t blkid = 0;2186if (resuming) {2187obj = dspp->resumeobj;2188dmu_object_info_t to_doi;2189err = dmu_object_info(os, obj, &to_doi);2190if (err != 0)2191return (err);21922193blkid = dspp->resumeoff / to_doi.doi_data_block_size;2194}2195/*2196* If we're resuming a redacted send, we can skip to the appropriate2197* point in the redaction bookmark by binary searching through it.2198*/2199if (redact_rl != NULL) {2200SET_BOOKMARK(&rlt_arg->resume, to_ds->ds_object, obj, 0, blkid);2201}22022203SET_BOOKMARK(&to_arg->resume, to_ds->ds_object, obj, 0, blkid);2204if (nvlist_exists(nvl, BEGINNV_REDACT_FROM_SNAPS)) {2205uint64_t objset = dspp->ancestor_zb.zbm_redaction_obj;2206/*2207* Note: If the resume point is in an object whose2208* blocksize is different in the from vs to snapshots,2209* we will have divided by the "wrong" blocksize.2210* However, in this case fromsnap's send_cb() will2211* detect that the blocksize has changed and therefore2212* ignore this object.2213*2214* If we're resuming a send from a redaction bookmark,2215* we still cannot accidentally suggest blocks behind2216* the to_ds. In addition, we know that any blocks in2217* the object in the to_ds will have to be sent, since2218* the size changed. Therefore, we can't cause any harm2219* this way either.2220*/2221SET_BOOKMARK(&from_arg->resume, objset, obj, 0, blkid);2222}2223if (resuming) {2224fnvlist_add_uint64(nvl, BEGINNV_RESUME_OBJECT, dspp->resumeobj);2225fnvlist_add_uint64(nvl, BEGINNV_RESUME_OFFSET, dspp->resumeoff);2226}2227return (0);2228}22292230static dmu_sendstatus_t *2231setup_send_progress(struct dmu_send_params *dspp)2232{2233dmu_sendstatus_t *dssp = kmem_zalloc(sizeof (*dssp), KM_SLEEP);2234dssp->dss_outfd = dspp->outfd;2235dssp->dss_off = dspp->off;2236dssp->dss_proc = curproc;2237mutex_enter(&dspp->to_ds->ds_sendstream_lock);2238list_insert_head(&dspp->to_ds->ds_sendstreams, dssp);2239mutex_exit(&dspp->to_ds->ds_sendstream_lock);2240return (dssp);2241}22422243/*2244* Actually do the bulk of the work in a zfs send.2245*2246* The idea is that we want to do a send from ancestor_zb to to_ds. We also2247* want to not send any data that has been modified by all the datasets in2248* redactsnaparr, and store the list of blocks that are redacted in this way in2249* a bookmark named redactbook, created on the to_ds. We do this by creating2250* several worker threads, whose function is described below.2251*2252* There are three cases.2253* The first case is a redacted zfs send. In this case there are 5 threads.2254* The first thread is the to_ds traversal thread: it calls dataset_traverse on2255* the to_ds and finds all the blocks that have changed since ancestor_zb (if2256* it's a full send, that's all blocks in the dataset). It then sends those2257* blocks on to the send merge thread. The redact list thread takes the data2258* from the redaction bookmark and sends those blocks on to the send merge2259* thread. The send merge thread takes the data from the to_ds traversal2260* thread, and combines it with the redaction records from the redact list2261* thread. If a block appears in both the to_ds's data and the redaction data,2262* the send merge thread will mark it as redacted and send it on to the prefetch2263* thread. Otherwise, the send merge thread will send the block on to the2264* prefetch thread unchanged. The prefetch thread will issue prefetch reads for2265* any data that isn't redacted, and then send the data on to the main thread.2266* The main thread behaves the same as in a normal send case, issuing demand2267* reads for data blocks and sending out records over the network2268*2269* The graphic below diagrams the flow of data in the case of a redacted zfs2270* send. Each box represents a thread, and each line represents the flow of2271* data.2272*2273* Records from the |2274* redaction bookmark |2275* +--------------------+ | +---------------------------+2276* | | v | Send Merge Thread |2277* | Redact List Thread +----------> Apply redaction marks to |2278* | | | records as specified by |2279* +--------------------+ | redaction ranges |2280* +----^---------------+------+2281* | | Merged data2282* | |2283* | +------------v--------+2284* | | Prefetch Thread |2285* +--------------------+ | | Issues prefetch |2286* | to_ds Traversal | | | reads of data blocks|2287* | Thread (finds +---------------+ +------------+--------+2288* | candidate blocks) | Blocks modified | Prefetched data2289* +--------------------+ by to_ds since |2290* ancestor_zb +------------v----+2291* | Main Thread | File Descriptor2292* | Sends data over +->(to zfs receive)2293* | wire |2294* +-----------------+2295*2296* The second case is an incremental send from a redaction bookmark. The to_ds2297* traversal thread and the main thread behave the same as in the redacted2298* send case. The new thread is the from bookmark traversal thread. It2299* iterates over the redaction list in the redaction bookmark, and enqueues2300* records for each block that was redacted in the original send. The send2301* merge thread now has to merge the data from the two threads. For details2302* about that process, see the header comment of send_merge_thread(). Any data2303* it decides to send on will be prefetched by the prefetch thread. Note that2304* you can perform a redacted send from a redaction bookmark; in that case,2305* the data flow behaves very similarly to the flow in the redacted send case,2306* except with the addition of the bookmark traversal thread iterating over the2307* redaction bookmark. The send_merge_thread also has to take on the2308* responsibility of merging the redact list thread's records, the bookmark2309* traversal thread's records, and the to_ds records.2310*2311* +---------------------+2312* | |2313* | Redact List Thread +--------------+2314* | | |2315* +---------------------+ |2316* Blocks in redaction list | Ranges modified by every secure snap2317* of from bookmark | (or EOS if not readcted)2318* |2319* +---------------------+ | +----v----------------------+2320* | bookmark Traversal | v | Send Merge Thread |2321* | Thread (finds +---------> Merges bookmark, rlt, and |2322* | candidate blocks) | | to_ds send records |2323* +---------------------+ +----^---------------+------+2324* | | Merged data2325* | +------------v--------+2326* | | Prefetch Thread |2327* +--------------------+ | | Issues prefetch |2328* | to_ds Traversal | | | reads of data blocks|2329* | Thread (finds +---------------+ +------------+--------+2330* | candidate blocks) | Blocks modified | Prefetched data2331* +--------------------+ by to_ds since +------------v----+2332* ancestor_zb | Main Thread | File Descriptor2333* | Sends data over +->(to zfs receive)2334* | wire |2335* +-----------------+2336*2337* The final case is a simple zfs full or incremental send. The to_ds traversal2338* thread behaves the same as always. The redact list thread is never started.2339* The send merge thread takes all the blocks that the to_ds traversal thread2340* sends it, prefetches the data, and sends the blocks on to the main thread.2341* The main thread sends the data over the wire.2342*2343* To keep performance acceptable, we want to prefetch the data in the worker2344* threads. While the to_ds thread could simply use the TRAVERSE_PREFETCH2345* feature built into traverse_dataset, the combining and deletion of records2346* due to redaction and sends from redaction bookmarks mean that we could2347* issue many unnecessary prefetches. As a result, we only prefetch data2348* after we've determined that the record is not going to be redacted. To2349* prevent the prefetching from getting too far ahead of the main thread, the2350* blocking queues that are used for communication are capped not by the2351* number of entries in the queue, but by the sum of the size of the2352* prefetches associated with them. The limit on the amount of data that the2353* thread can prefetch beyond what the main thread has reached is controlled2354* by the global variable zfs_send_queue_length. In addition, to prevent poor2355* performance in the beginning of a send, we also limit the distance ahead2356* that the traversal threads can be. That distance is controlled by the2357* zfs_send_no_prefetch_queue_length tunable.2358*2359* Note: Releases dp using the specified tag.2360*/2361static int2362dmu_send_impl(struct dmu_send_params *dspp)2363{2364objset_t *os;2365dmu_replay_record_t *drr;2366dmu_sendstatus_t *dssp;2367dmu_send_cookie_t dsc = {0};2368int err;2369uint64_t fromtxg = dspp->ancestor_zb.zbm_creation_txg;2370uint64_t featureflags = 0;2371struct redact_list_thread_arg *from_arg;2372struct send_thread_arg *to_arg;2373struct redact_list_thread_arg *rlt_arg;2374struct send_merge_thread_arg *smt_arg;2375struct send_reader_thread_arg *srt_arg;2376struct send_range *range;2377redaction_list_t *from_rl = NULL;2378redaction_list_t *redact_rl = NULL;2379boolean_t resuming = (dspp->resumeobj != 0 || dspp->resumeoff != 0);2380boolean_t book_resuming = resuming;23812382dsl_dataset_t *to_ds = dspp->to_ds;2383zfs_bookmark_phys_t *ancestor_zb = &dspp->ancestor_zb;2384dsl_pool_t *dp = dspp->dp;2385const void *tag = dspp->tag;23862387err = dmu_objset_from_ds(to_ds, &os);2388if (err != 0) {2389dsl_pool_rele(dp, tag);2390return (err);2391}23922393/*2394* If this is a non-raw send of an encrypted ds, we can ensure that2395* the objset_phys_t is authenticated. This is safe because this is2396* either a snapshot or we have owned the dataset, ensuring that2397* it can't be modified.2398*/2399if (!dspp->rawok && os->os_encrypted &&2400arc_is_unauthenticated(os->os_phys_buf)) {2401zbookmark_phys_t zb;24022403SET_BOOKMARK(&zb, to_ds->ds_object, ZB_ROOT_OBJECT,2404ZB_ROOT_LEVEL, ZB_ROOT_BLKID);2405err = arc_untransform(os->os_phys_buf, os->os_spa,2406&zb, B_FALSE);2407if (err != 0) {2408dsl_pool_rele(dp, tag);2409return (err);2410}24112412ASSERT0(arc_is_unauthenticated(os->os_phys_buf));2413}24142415if ((err = setup_featureflags(dspp, os, &featureflags)) != 0) {2416dsl_pool_rele(dp, tag);2417return (err);2418}24192420/*2421* If we're doing a redacted send, hold the bookmark's redaction list.2422*/2423if (dspp->redactbook != NULL) {2424err = dsl_redaction_list_hold_obj(dp,2425dspp->redactbook->zbm_redaction_obj, FTAG,2426&redact_rl);2427if (err != 0) {2428dsl_pool_rele(dp, tag);2429return (SET_ERROR(EINVAL));2430}2431dsl_redaction_list_long_hold(dp, redact_rl, FTAG);2432}24332434/*2435* If we're sending from a redaction bookmark, hold the redaction list2436* so that we can consider sending the redacted blocks.2437*/2438if (ancestor_zb->zbm_redaction_obj != 0) {2439err = dsl_redaction_list_hold_obj(dp,2440ancestor_zb->zbm_redaction_obj, FTAG, &from_rl);2441if (err != 0) {2442if (redact_rl != NULL) {2443dsl_redaction_list_long_rele(redact_rl, FTAG);2444dsl_redaction_list_rele(redact_rl, FTAG);2445}2446dsl_pool_rele(dp, tag);2447return (SET_ERROR(EINVAL));2448}2449dsl_redaction_list_long_hold(dp, from_rl, FTAG);2450}24512452dsl_dataset_long_hold(to_ds, FTAG);24532454from_arg = kmem_zalloc(sizeof (*from_arg), KM_SLEEP);2455to_arg = kmem_zalloc(sizeof (*to_arg), KM_SLEEP);2456rlt_arg = kmem_zalloc(sizeof (*rlt_arg), KM_SLEEP);2457smt_arg = kmem_zalloc(sizeof (*smt_arg), KM_SLEEP);2458srt_arg = kmem_zalloc(sizeof (*srt_arg), KM_SLEEP);24592460drr = create_begin_record(dspp, os, featureflags);2461dssp = setup_send_progress(dspp);24622463dsc.dsc_drr = drr;2464dsc.dsc_dso = dspp->dso;2465dsc.dsc_os = os;2466dsc.dsc_off = dspp->off;2467dsc.dsc_toguid = dsl_dataset_phys(to_ds)->ds_guid;2468dsc.dsc_fromtxg = fromtxg;2469dsc.dsc_pending_op = PENDING_NONE;2470dsc.dsc_featureflags = featureflags;2471dsc.dsc_resume_object = dspp->resumeobj;2472dsc.dsc_resume_offset = dspp->resumeoff;24732474dsl_pool_rele(dp, tag);24752476void *payload = NULL;2477size_t payload_len = 0;2478nvlist_t *nvl = fnvlist_alloc();24792480/*2481* If we're doing a redacted send, we include the snapshots we're2482* redacted with respect to so that the target system knows what send2483* streams can be correctly received on top of this dataset. If we're2484* instead sending a redacted dataset, we include the snapshots that the2485* dataset was created with respect to.2486*/2487if (dspp->redactbook != NULL) {2488fnvlist_add_uint64_array(nvl, BEGINNV_REDACT_SNAPS,2489redact_rl->rl_phys->rlp_snaps,2490redact_rl->rl_phys->rlp_num_snaps);2491} else if (dsl_dataset_feature_is_active(to_ds,2492SPA_FEATURE_REDACTED_DATASETS)) {2493uint64_t *tods_guids;2494uint64_t length;2495VERIFY(dsl_dataset_get_uint64_array_feature(to_ds,2496SPA_FEATURE_REDACTED_DATASETS, &length, &tods_guids));2497fnvlist_add_uint64_array(nvl, BEGINNV_REDACT_SNAPS, tods_guids,2498length);2499}25002501/*2502* If we're sending from a redaction bookmark, then we should retrieve2503* the guids of that bookmark so we can send them over the wire.2504*/2505if (from_rl != NULL) {2506fnvlist_add_uint64_array(nvl, BEGINNV_REDACT_FROM_SNAPS,2507from_rl->rl_phys->rlp_snaps,2508from_rl->rl_phys->rlp_num_snaps);2509}25102511/*2512* If the snapshot we're sending from is redacted, include the redaction2513* list in the stream.2514*/2515if (dspp->numfromredactsnaps != NUM_SNAPS_NOT_REDACTED) {2516ASSERT0P(from_rl);2517fnvlist_add_uint64_array(nvl, BEGINNV_REDACT_FROM_SNAPS,2518dspp->fromredactsnaps, (uint_t)dspp->numfromredactsnaps);2519if (dspp->numfromredactsnaps > 0) {2520kmem_free(dspp->fromredactsnaps,2521dspp->numfromredactsnaps * sizeof (uint64_t));2522dspp->fromredactsnaps = NULL;2523}2524}25252526if (resuming || book_resuming) {2527err = setup_resume_points(dspp, to_arg, from_arg,2528rlt_arg, smt_arg, resuming, os, redact_rl, nvl);2529if (err != 0)2530goto out;2531}25322533if (featureflags & DMU_BACKUP_FEATURE_RAW) {2534uint64_t ivset_guid = ancestor_zb->zbm_ivset_guid;2535nvlist_t *keynvl = NULL;2536ASSERT(os->os_encrypted);25372538err = dsl_crypto_populate_key_nvlist(os, ivset_guid,2539&keynvl);2540if (err != 0) {2541fnvlist_free(nvl);2542goto out;2543}25442545fnvlist_add_nvlist(nvl, "crypt_keydata", keynvl);2546fnvlist_free(keynvl);2547}25482549if (!nvlist_empty(nvl)) {2550payload = fnvlist_pack(nvl, &payload_len);2551drr->drr_payloadlen = payload_len;2552}25532554fnvlist_free(nvl);2555err = dump_record(&dsc, payload, payload_len);2556fnvlist_pack_free(payload, payload_len);2557if (err != 0) {2558err = dsc.dsc_err;2559goto out;2560}25612562setup_to_thread(to_arg, os, dssp, fromtxg, dspp->rawok);2563setup_from_thread(from_arg, from_rl, dssp);2564setup_redact_list_thread(rlt_arg, dspp, redact_rl, dssp);2565setup_merge_thread(smt_arg, dspp, from_arg, to_arg, rlt_arg, os);2566setup_reader_thread(srt_arg, dspp, smt_arg, featureflags);25672568range = bqueue_dequeue(&srt_arg->q);2569while (err == 0 && !range->eos_marker) {2570err = do_dump(&dsc, range);2571range = get_next_range(&srt_arg->q, range);2572if (issig())2573err = SET_ERROR(EINTR);2574}25752576/*2577* If we hit an error or are interrupted, cancel our worker threads and2578* clear the queue of any pending records. The threads will pass the2579* cancel up the tree of worker threads, and each one will clean up any2580* pending records before exiting.2581*/2582if (err != 0) {2583srt_arg->cancel = B_TRUE;2584while (!range->eos_marker) {2585range = get_next_range(&srt_arg->q, range);2586}2587}2588range_free(range);25892590bqueue_destroy(&srt_arg->q);2591bqueue_destroy(&smt_arg->q);2592if (dspp->redactbook != NULL)2593bqueue_destroy(&rlt_arg->q);2594bqueue_destroy(&to_arg->q);2595bqueue_destroy(&from_arg->q);25962597if (err == 0 && srt_arg->error != 0)2598err = srt_arg->error;25992600if (err != 0)2601goto out;26022603if (dsc.dsc_pending_op != PENDING_NONE)2604if (dump_record(&dsc, NULL, 0) != 0)2605err = SET_ERROR(EINTR);26062607if (err != 0) {2608if (err == EINTR && dsc.dsc_err != 0)2609err = dsc.dsc_err;2610goto out;2611}26122613/*2614* Send the DRR_END record if this is not a saved stream.2615* Otherwise, the omitted DRR_END record will signal to2616* the receive side that the stream is incomplete.2617*/2618if (!dspp->savedok) {2619memset(drr, 0, sizeof (dmu_replay_record_t));2620drr->drr_type = DRR_END;2621drr->drr_u.drr_end.drr_checksum = dsc.dsc_zc;2622drr->drr_u.drr_end.drr_toguid = dsc.dsc_toguid;26232624if (dump_record(&dsc, NULL, 0) != 0)2625err = dsc.dsc_err;2626}2627out:2628mutex_enter(&to_ds->ds_sendstream_lock);2629list_remove(&to_ds->ds_sendstreams, dssp);2630mutex_exit(&to_ds->ds_sendstream_lock);26312632VERIFY(err != 0 || (dsc.dsc_sent_begin &&2633(dsc.dsc_sent_end || dspp->savedok)));26342635kmem_free(drr, sizeof (dmu_replay_record_t));2636kmem_free(dssp, sizeof (dmu_sendstatus_t));2637kmem_free(from_arg, sizeof (*from_arg));2638kmem_free(to_arg, sizeof (*to_arg));2639kmem_free(rlt_arg, sizeof (*rlt_arg));2640kmem_free(smt_arg, sizeof (*smt_arg));2641kmem_free(srt_arg, sizeof (*srt_arg));26422643dsl_dataset_long_rele(to_ds, FTAG);2644if (from_rl != NULL) {2645dsl_redaction_list_long_rele(from_rl, FTAG);2646dsl_redaction_list_rele(from_rl, FTAG);2647}2648if (redact_rl != NULL) {2649dsl_redaction_list_long_rele(redact_rl, FTAG);2650dsl_redaction_list_rele(redact_rl, FTAG);2651}26522653return (err);2654}26552656int2657dmu_send_obj(const char *pool, uint64_t tosnap, uint64_t fromsnap,2658boolean_t embedok, boolean_t large_block_ok, boolean_t compressok,2659boolean_t rawok, boolean_t savedok, int outfd, offset_t *off,2660dmu_send_outparams_t *dsop)2661{2662int err;2663dsl_dataset_t *fromds;2664ds_hold_flags_t dsflags;2665struct dmu_send_params dspp = {0};2666dspp.embedok = embedok;2667dspp.large_block_ok = large_block_ok;2668dspp.compressok = compressok;2669dspp.outfd = outfd;2670dspp.off = off;2671dspp.dso = dsop;2672dspp.tag = FTAG;2673dspp.rawok = rawok;2674dspp.savedok = savedok;26752676dsflags = (rawok) ? DS_HOLD_FLAG_NONE : DS_HOLD_FLAG_DECRYPT;2677err = dsl_pool_hold(pool, FTAG, &dspp.dp);2678if (err != 0)2679return (err);26802681err = dsl_dataset_hold_obj_flags(dspp.dp, tosnap, dsflags, FTAG,2682&dspp.to_ds);2683if (err != 0) {2684dsl_pool_rele(dspp.dp, FTAG);2685return (err);2686}26872688if (fromsnap != 0) {2689err = dsl_dataset_hold_obj(dspp.dp, fromsnap, FTAG, &fromds);26902691if (err != 0) {2692dsl_dataset_rele_flags(dspp.to_ds, dsflags, FTAG);2693dsl_pool_rele(dspp.dp, FTAG);2694return (err);2695}2696dspp.ancestor_zb.zbm_guid = dsl_dataset_phys(fromds)->ds_guid;2697dspp.ancestor_zb.zbm_creation_txg =2698dsl_dataset_phys(fromds)->ds_creation_txg;2699dspp.ancestor_zb.zbm_creation_time =2700dsl_dataset_phys(fromds)->ds_creation_time;27012702if (dsl_dataset_is_zapified(fromds)) {2703(void) zap_lookup(dspp.dp->dp_meta_objset,2704fromds->ds_object, DS_FIELD_IVSET_GUID, 8, 1,2705&dspp.ancestor_zb.zbm_ivset_guid);2706}27072708/* See dmu_send for the reasons behind this. */2709uint64_t *fromredact;27102711if (!dsl_dataset_get_uint64_array_feature(fromds,2712SPA_FEATURE_REDACTED_DATASETS,2713&dspp.numfromredactsnaps,2714&fromredact)) {2715dspp.numfromredactsnaps = NUM_SNAPS_NOT_REDACTED;2716} else if (dspp.numfromredactsnaps > 0) {2717uint64_t size = dspp.numfromredactsnaps *2718sizeof (uint64_t);2719dspp.fromredactsnaps = kmem_zalloc(size, KM_SLEEP);2720memcpy(dspp.fromredactsnaps, fromredact, size);2721}27222723boolean_t is_before =2724dsl_dataset_is_before(dspp.to_ds, fromds, 0);2725dspp.is_clone = (dspp.to_ds->ds_dir !=2726fromds->ds_dir);2727dsl_dataset_rele(fromds, FTAG);2728if (!is_before) {2729dsl_pool_rele(dspp.dp, FTAG);2730err = SET_ERROR(EXDEV);2731} else {2732err = dmu_send_impl(&dspp);2733}2734} else {2735dspp.numfromredactsnaps = NUM_SNAPS_NOT_REDACTED;2736err = dmu_send_impl(&dspp);2737}2738if (dspp.fromredactsnaps)2739kmem_free(dspp.fromredactsnaps,2740dspp.numfromredactsnaps * sizeof (uint64_t));27412742dsl_dataset_rele_flags(dspp.to_ds, dsflags, FTAG);2743return (err);2744}27452746int2747dmu_send(const char *tosnap, const char *fromsnap, boolean_t embedok,2748boolean_t large_block_ok, boolean_t compressok, boolean_t rawok,2749boolean_t savedok, uint64_t resumeobj, uint64_t resumeoff,2750const char *redactbook, int outfd, offset_t *off,2751dmu_send_outparams_t *dsop)2752{2753int err = 0;2754ds_hold_flags_t dsflags;2755boolean_t owned = B_FALSE;2756dsl_dataset_t *fromds = NULL;2757zfs_bookmark_phys_t book = {0};2758struct dmu_send_params dspp = {0};27592760dsflags = (rawok) ? DS_HOLD_FLAG_NONE : DS_HOLD_FLAG_DECRYPT;2761dspp.tosnap = tosnap;2762dspp.embedok = embedok;2763dspp.large_block_ok = large_block_ok;2764dspp.compressok = compressok;2765dspp.outfd = outfd;2766dspp.off = off;2767dspp.dso = dsop;2768dspp.tag = FTAG;2769dspp.resumeobj = resumeobj;2770dspp.resumeoff = resumeoff;2771dspp.rawok = rawok;2772dspp.savedok = savedok;27732774if (fromsnap != NULL && strpbrk(fromsnap, "@#") == NULL)2775return (SET_ERROR(EINVAL));27762777err = dsl_pool_hold(tosnap, FTAG, &dspp.dp);2778if (err != 0)2779return (err);27802781if (strchr(tosnap, '@') == NULL && spa_writeable(dspp.dp->dp_spa)) {2782/*2783* We are sending a filesystem or volume. Ensure2784* that it doesn't change by owning the dataset.2785*/27862787if (savedok) {2788/*2789* We are looking for the dataset that represents the2790* partially received send stream. If this stream was2791* received as a new snapshot of an existing dataset,2792* this will be saved in a hidden clone named2793* "<pool>/<dataset>/%recv". Otherwise, the stream2794* will be saved in the live dataset itself. In2795* either case we need to use dsl_dataset_own_force()2796* because the stream is marked as inconsistent,2797* which would normally make it unavailable to be2798* owned.2799*/2800char *name = kmem_asprintf("%s/%s", tosnap,2801recv_clone_name);2802err = dsl_dataset_own_force(dspp.dp, name, dsflags,2803FTAG, &dspp.to_ds);2804if (err == ENOENT) {2805err = dsl_dataset_own_force(dspp.dp, tosnap,2806dsflags, FTAG, &dspp.to_ds);2807}28082809if (err == 0) {2810owned = B_TRUE;2811err = zap_lookup(dspp.dp->dp_meta_objset,2812dspp.to_ds->ds_object,2813DS_FIELD_RESUME_TOGUID, 8, 1,2814&dspp.saved_guid);2815}28162817if (err == 0) {2818err = zap_lookup(dspp.dp->dp_meta_objset,2819dspp.to_ds->ds_object,2820DS_FIELD_RESUME_TONAME, 1,2821sizeof (dspp.saved_toname),2822dspp.saved_toname);2823}2824/* Only disown if there was an error in the lookups */2825if (owned && (err != 0))2826dsl_dataset_disown(dspp.to_ds, dsflags, FTAG);28272828kmem_strfree(name);2829} else {2830err = dsl_dataset_own(dspp.dp, tosnap, dsflags,2831FTAG, &dspp.to_ds);2832if (err == 0)2833owned = B_TRUE;2834}2835} else {2836err = dsl_dataset_hold_flags(dspp.dp, tosnap, dsflags, FTAG,2837&dspp.to_ds);2838}28392840if (err != 0) {2841/* Note: dsl dataset is not owned at this point */2842dsl_pool_rele(dspp.dp, FTAG);2843return (err);2844}28452846if (redactbook != NULL) {2847char path[ZFS_MAX_DATASET_NAME_LEN];2848(void) strlcpy(path, tosnap, sizeof (path));2849char *at = strchr(path, '@');2850if (at == NULL) {2851err = EINVAL;2852} else {2853(void) snprintf(at, sizeof (path) - (at - path), "#%s",2854redactbook);2855err = dsl_bookmark_lookup(dspp.dp, path,2856NULL, &book);2857dspp.redactbook = &book;2858}2859}28602861if (err != 0) {2862dsl_pool_rele(dspp.dp, FTAG);2863if (owned)2864dsl_dataset_disown(dspp.to_ds, dsflags, FTAG);2865else2866dsl_dataset_rele_flags(dspp.to_ds, dsflags, FTAG);2867return (err);2868}28692870if (fromsnap != NULL) {2871zfs_bookmark_phys_t *zb = &dspp.ancestor_zb;2872int fsnamelen;2873if (strpbrk(tosnap, "@#") != NULL)2874fsnamelen = strpbrk(tosnap, "@#") - tosnap;2875else2876fsnamelen = strlen(tosnap);28772878/*2879* If the fromsnap is in a different filesystem, then2880* mark the send stream as a clone.2881*/2882if (strncmp(tosnap, fromsnap, fsnamelen) != 0 ||2883(fromsnap[fsnamelen] != '@' &&2884fromsnap[fsnamelen] != '#')) {2885dspp.is_clone = B_TRUE;2886}28872888if (strchr(fromsnap, '@') != NULL) {2889err = dsl_dataset_hold(dspp.dp, fromsnap, FTAG,2890&fromds);28912892if (err != 0) {2893ASSERT0P(fromds);2894} else {2895/*2896* We need to make a deep copy of the redact2897* snapshots of the from snapshot, because the2898* array will be freed when we evict from_ds.2899*/2900uint64_t *fromredact;2901if (!dsl_dataset_get_uint64_array_feature(2902fromds, SPA_FEATURE_REDACTED_DATASETS,2903&dspp.numfromredactsnaps,2904&fromredact)) {2905dspp.numfromredactsnaps =2906NUM_SNAPS_NOT_REDACTED;2907} else if (dspp.numfromredactsnaps > 0) {2908uint64_t size =2909dspp.numfromredactsnaps *2910sizeof (uint64_t);2911dspp.fromredactsnaps = kmem_zalloc(size,2912KM_SLEEP);2913memcpy(dspp.fromredactsnaps, fromredact,2914size);2915}2916if (!dsl_dataset_is_before(dspp.to_ds, fromds,29170)) {2918err = SET_ERROR(EXDEV);2919} else {2920zb->zbm_creation_txg =2921dsl_dataset_phys(fromds)->2922ds_creation_txg;2923zb->zbm_creation_time =2924dsl_dataset_phys(fromds)->2925ds_creation_time;2926zb->zbm_guid =2927dsl_dataset_phys(fromds)->ds_guid;2928zb->zbm_redaction_obj = 0;29292930if (dsl_dataset_is_zapified(fromds)) {2931(void) zap_lookup(2932dspp.dp->dp_meta_objset,2933fromds->ds_object,2934DS_FIELD_IVSET_GUID, 8, 1,2935&zb->zbm_ivset_guid);2936}2937}2938dsl_dataset_rele(fromds, FTAG);2939}2940} else {2941dspp.numfromredactsnaps = NUM_SNAPS_NOT_REDACTED;2942err = dsl_bookmark_lookup(dspp.dp, fromsnap, dspp.to_ds,2943zb);2944if (err == EXDEV && zb->zbm_redaction_obj != 0 &&2945zb->zbm_guid ==2946dsl_dataset_phys(dspp.to_ds)->ds_guid)2947err = 0;2948}29492950if (err == 0) {2951/* dmu_send_impl will call dsl_pool_rele for us. */2952err = dmu_send_impl(&dspp);2953} else {2954if (dspp.fromredactsnaps)2955kmem_free(dspp.fromredactsnaps,2956dspp.numfromredactsnaps *2957sizeof (uint64_t));2958dsl_pool_rele(dspp.dp, FTAG);2959}2960} else {2961dspp.numfromredactsnaps = NUM_SNAPS_NOT_REDACTED;2962err = dmu_send_impl(&dspp);2963}2964if (owned)2965dsl_dataset_disown(dspp.to_ds, dsflags, FTAG);2966else2967dsl_dataset_rele_flags(dspp.to_ds, dsflags, FTAG);2968return (err);2969}29702971static int2972dmu_adjust_send_estimate_for_indirects(dsl_dataset_t *ds, uint64_t uncompressed,2973uint64_t compressed, boolean_t stream_compressed, uint64_t *sizep)2974{2975int err = 0;2976uint64_t size;2977/*2978* Assume that space (both on-disk and in-stream) is dominated by2979* data. We will adjust for indirect blocks and the copies property,2980* but ignore per-object space used (eg, dnodes and DRR_OBJECT records).2981*/29822983uint64_t recordsize;2984uint64_t record_count;2985objset_t *os;2986VERIFY0(dmu_objset_from_ds(ds, &os));29872988/* Assume all (uncompressed) blocks are recordsize. */2989if (zfs_override_estimate_recordsize != 0) {2990recordsize = zfs_override_estimate_recordsize;2991} else if (os->os_phys->os_type == DMU_OST_ZVOL) {2992err = dsl_prop_get_int_ds(ds,2993zfs_prop_to_name(ZFS_PROP_VOLBLOCKSIZE), &recordsize);2994} else {2995err = dsl_prop_get_int_ds(ds,2996zfs_prop_to_name(ZFS_PROP_RECORDSIZE), &recordsize);2997}2998if (err != 0)2999return (err);3000record_count = uncompressed / recordsize;30013002/*3003* If we're estimating a send size for a compressed stream, use the3004* compressed data size to estimate the stream size. Otherwise, use the3005* uncompressed data size.3006*/3007size = stream_compressed ? compressed : uncompressed;30083009/*3010* Subtract out approximate space used by indirect blocks.3011* Assume most space is used by data blocks (non-indirect, non-dnode).3012* Assume no ditto blocks or internal fragmentation.3013*3014* Therefore, space used by indirect blocks is sizeof(blkptr_t) per3015* block.3016*/3017size -= record_count * sizeof (blkptr_t);30183019/* Add in the space for the record associated with each block. */3020size += record_count * sizeof (dmu_replay_record_t);30213022*sizep = size;30233024return (0);3025}30263027int3028dmu_send_estimate_fast(dsl_dataset_t *origds, dsl_dataset_t *fromds,3029zfs_bookmark_phys_t *frombook, boolean_t stream_compressed,3030boolean_t saved, uint64_t *sizep)3031{3032int err;3033dsl_dataset_t *ds = origds;3034uint64_t uncomp, comp;30353036ASSERT(dsl_pool_config_held(origds->ds_dir->dd_pool));3037ASSERT(fromds == NULL || frombook == NULL);30383039/*3040* If this is a saved send we may actually be sending3041* from the %recv clone used for resuming.3042*/3043if (saved) {3044objset_t *mos = origds->ds_dir->dd_pool->dp_meta_objset;3045uint64_t guid;3046char dsname[ZFS_MAX_DATASET_NAME_LEN + 6];30473048dsl_dataset_name(origds, dsname);3049(void) strcat(dsname, "/");3050(void) strlcat(dsname, recv_clone_name, sizeof (dsname));30513052err = dsl_dataset_hold(origds->ds_dir->dd_pool,3053dsname, FTAG, &ds);3054if (err != ENOENT && err != 0) {3055return (err);3056} else if (err == ENOENT) {3057ds = origds;3058}30593060/* check that this dataset has partially received data */3061err = zap_lookup(mos, ds->ds_object,3062DS_FIELD_RESUME_TOGUID, 8, 1, &guid);3063if (err != 0) {3064err = SET_ERROR(err == ENOENT ? EINVAL : err);3065goto out;3066}30673068err = zap_lookup(mos, ds->ds_object,3069DS_FIELD_RESUME_TONAME, 1, sizeof (dsname), dsname);3070if (err != 0) {3071err = SET_ERROR(err == ENOENT ? EINVAL : err);3072goto out;3073}3074}30753076/* tosnap must be a snapshot or the target of a saved send */3077if (!ds->ds_is_snapshot && ds == origds)3078return (SET_ERROR(EINVAL));30793080if (fromds != NULL) {3081uint64_t used;3082if (!fromds->ds_is_snapshot) {3083err = SET_ERROR(EINVAL);3084goto out;3085}30863087if (!dsl_dataset_is_before(ds, fromds, 0)) {3088err = SET_ERROR(EXDEV);3089goto out;3090}30913092err = dsl_dataset_space_written(fromds, ds, &used, &comp,3093&uncomp);3094if (err != 0)3095goto out;3096} else if (frombook != NULL) {3097uint64_t used;3098err = dsl_dataset_space_written_bookmark(frombook, ds, &used,3099&comp, &uncomp);3100if (err != 0)3101goto out;3102} else {3103uncomp = dsl_dataset_phys(ds)->ds_uncompressed_bytes;3104comp = dsl_dataset_phys(ds)->ds_compressed_bytes;3105}31063107err = dmu_adjust_send_estimate_for_indirects(ds, uncomp, comp,3108stream_compressed, sizep);3109/*3110* Add the size of the BEGIN and END records to the estimate.3111*/3112*sizep += 2 * sizeof (dmu_replay_record_t);31133114out:3115if (ds != origds)3116dsl_dataset_rele(ds, FTAG);3117return (err);3118}31193120ZFS_MODULE_PARAM(zfs_send, zfs_send_, corrupt_data, INT, ZMOD_RW,3121"Allow sending corrupt data");31223123ZFS_MODULE_PARAM(zfs_send, zfs_send_, queue_length, UINT, ZMOD_RW,3124"Maximum send queue length");31253126ZFS_MODULE_PARAM(zfs_send, zfs_send_, unmodified_spill_blocks, INT, ZMOD_RW,3127"Send unmodified spill blocks");31283129ZFS_MODULE_PARAM(zfs_send, zfs_send_, no_prefetch_queue_length, UINT, ZMOD_RW,3130"Maximum send queue length for non-prefetch queues");31313132ZFS_MODULE_PARAM(zfs_send, zfs_send_, queue_ff, UINT, ZMOD_RW,3133"Send queue fill fraction");31343135ZFS_MODULE_PARAM(zfs_send, zfs_send_, no_prefetch_queue_ff, UINT, ZMOD_RW,3136"Send queue fill fraction for non-prefetch queues");31373138ZFS_MODULE_PARAM(zfs_send, zfs_, override_estimate_recordsize, UINT, ZMOD_RW,3139"Override block size estimate with fixed size");314031413142