Path: blob/main/sys/contrib/openzfs/cmd/zstream/zstream_redup.c
48383 views
// SPDX-License-Identifier: CDDL-1.01/*2* CDDL HEADER START3*4* This file and its contents are supplied under the terms of the5* Common Development and Distribution License ("CDDL"), version 1.0.6* You may only use this file in accordance with the terms of version7* 1.0 of the CDDL.8*9* A full copy of the text of the CDDL should have accompanied this10* source. A copy of the CDDL is also available via the Internet at11* http://www.illumos.org/license/CDDL.12*13* CDDL HEADER END14*/1516/*17* Copyright (c) 2020 by Delphix. All rights reserved.18*/1920#include <assert.h>21#include <cityhash.h>22#include <ctype.h>23#include <errno.h>24#include <fcntl.h>25#include <libzfs.h>26#include <libzutil.h>27#include <stddef.h>28#include <stdio.h>29#include <stdlib.h>30#include <string.h>31#include <umem.h>32#include <unistd.h>33#include <sys/debug.h>34#include <sys/stat.h>35#include <sys/zfs_ioctl.h>36#include <sys/zio_checksum.h>37#include "zfs_fletcher.h"38#include "zstream.h"394041#define MAX_RDT_PHYSMEM_PERCENT 2042#define SMALLEST_POSSIBLE_MAX_RDT_MB 1284344typedef struct redup_entry {45struct redup_entry *rde_next;46uint64_t rde_guid;47uint64_t rde_object;48uint64_t rde_offset;49uint64_t rde_stream_offset;50} redup_entry_t;5152typedef struct redup_table {53redup_entry_t **redup_hash_array;54umem_cache_t *ddecache;55uint64_t ddt_count;56int numhashbits;57} redup_table_t;5859void *60safe_calloc(size_t n)61{62void *rv = calloc(1, n);63if (rv == NULL) {64fprintf(stderr,65"Error: could not allocate %u bytes of memory\n",66(int)n);67exit(1);68}69return (rv);70}7172/*73* Safe version of fread(), exits on error.74*/75int76sfread(void *buf, size_t size, FILE *fp)77{78int rv = fread(buf, size, 1, fp);79if (rv == 0 && ferror(fp)) {80(void) fprintf(stderr, "Error while reading file: %s\n",81strerror(errno));82exit(1);83}84return (rv);85}8687/*88* Safe version of pread(), exits on error.89*/90static void91spread(int fd, void *buf, size_t count, off_t offset)92{93ssize_t err = pread(fd, buf, count, offset);94if (err == -1) {95(void) fprintf(stderr,96"Error while reading file: %s\n",97strerror(errno));98exit(1);99} else if (err != count) {100(void) fprintf(stderr,101"Error while reading file: short read\n");102exit(1);103}104}105106static int107dump_record(dmu_replay_record_t *drr, void *payload, int payload_len,108zio_cksum_t *zc, int outfd)109{110assert(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum)111== sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t));112fletcher_4_incremental_native(drr,113offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum), zc);114if (drr->drr_type != DRR_BEGIN) {115assert(ZIO_CHECKSUM_IS_ZERO(&drr->drr_u.116drr_checksum.drr_checksum));117drr->drr_u.drr_checksum.drr_checksum = *zc;118}119fletcher_4_incremental_native(&drr->drr_u.drr_checksum.drr_checksum,120sizeof (zio_cksum_t), zc);121if (write(outfd, drr, sizeof (*drr)) == -1)122return (errno);123if (payload_len != 0) {124fletcher_4_incremental_native(payload, payload_len, zc);125if (write(outfd, payload, payload_len) == -1)126return (errno);127}128return (0);129}130131static void132rdt_insert(redup_table_t *rdt,133uint64_t guid, uint64_t object, uint64_t offset, uint64_t stream_offset)134{135uint64_t ch = cityhash3(guid, object, offset);136uint64_t hashcode = BF64_GET(ch, 0, rdt->numhashbits);137redup_entry_t **rdepp;138139rdepp = &(rdt->redup_hash_array[hashcode]);140redup_entry_t *rde = umem_cache_alloc(rdt->ddecache, UMEM_NOFAIL);141rde->rde_next = *rdepp;142rde->rde_guid = guid;143rde->rde_object = object;144rde->rde_offset = offset;145rde->rde_stream_offset = stream_offset;146*rdepp = rde;147rdt->ddt_count++;148}149150static void151rdt_lookup(redup_table_t *rdt,152uint64_t guid, uint64_t object, uint64_t offset,153uint64_t *stream_offsetp)154{155uint64_t ch = cityhash3(guid, object, offset);156uint64_t hashcode = BF64_GET(ch, 0, rdt->numhashbits);157158for (redup_entry_t *rde = rdt->redup_hash_array[hashcode];159rde != NULL; rde = rde->rde_next) {160if (rde->rde_guid == guid &&161rde->rde_object == object &&162rde->rde_offset == offset) {163*stream_offsetp = rde->rde_stream_offset;164return;165}166}167assert(!"could not find expected redup table entry");168}169170/*171* Convert a dedup stream (generated by "zfs send -D") to a172* non-deduplicated stream. The entire infd will be converted, including173* any substreams in a stream package (generated by "zfs send -RD"). The174* infd must be seekable.175*/176static void177zfs_redup_stream(int infd, int outfd, boolean_t verbose)178{179int bufsz = SPA_MAXBLOCKSIZE;180dmu_replay_record_t thedrr;181dmu_replay_record_t *drr = &thedrr;182redup_table_t rdt;183zio_cksum_t stream_cksum;184uint64_t numbuckets;185uint64_t num_records = 0;186uint64_t num_write_byref_records = 0;187188memset(&thedrr, 0, sizeof (dmu_replay_record_t));189190#ifdef _ILP32191uint64_t max_rde_size = SMALLEST_POSSIBLE_MAX_RDT_MB << 20;192#else193uint64_t physmem = sysconf(_SC_PHYS_PAGES) * sysconf(_SC_PAGESIZE);194uint64_t max_rde_size =195MAX((physmem * MAX_RDT_PHYSMEM_PERCENT) / 100,196SMALLEST_POSSIBLE_MAX_RDT_MB << 20);197#endif198199numbuckets = max_rde_size / (sizeof (redup_entry_t));200201/*202* numbuckets must be a power of 2. Increase number to203* a power of 2 if necessary.204*/205if (!ISP2(numbuckets))206numbuckets = 1ULL << highbit64(numbuckets);207208rdt.redup_hash_array =209safe_calloc(numbuckets * sizeof (redup_entry_t *));210rdt.ddecache = umem_cache_create("rde", sizeof (redup_entry_t), 0,211NULL, NULL, NULL, NULL, NULL, 0);212rdt.numhashbits = highbit64(numbuckets) - 1;213rdt.ddt_count = 0;214215char *buf = safe_calloc(bufsz);216FILE *ofp = fdopen(infd, "r");217long offset = ftell(ofp);218int begin = 0;219boolean_t seen = B_FALSE;220while (sfread(drr, sizeof (*drr), ofp) != 0) {221num_records++;222223/*224* We need to regenerate the checksum.225*/226if (drr->drr_type != DRR_BEGIN) {227memset(&drr->drr_u.drr_checksum.drr_checksum, 0,228sizeof (drr->drr_u.drr_checksum.drr_checksum));229}230231uint64_t payload_size = 0;232switch (drr->drr_type) {233case DRR_BEGIN:234{235struct drr_begin *drrb = &drr->drr_u.drr_begin;236int fflags;237ZIO_SET_CHECKSUM(&stream_cksum, 0, 0, 0, 0);238VERIFY0(begin++);239seen = B_TRUE;240241assert(drrb->drr_magic == DMU_BACKUP_MAGIC);242243/* clear the DEDUP feature flag for this stream */244fflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo);245fflags &= ~(DMU_BACKUP_FEATURE_DEDUP |246DMU_BACKUP_FEATURE_DEDUPPROPS);247/* cppcheck-suppress syntaxError */248DMU_SET_FEATUREFLAGS(drrb->drr_versioninfo, fflags);249250uint32_t sz = drr->drr_payloadlen;251252VERIFY3U(sz, <=, 1U << 28);253254if (sz != 0) {255if (sz > bufsz) {256free(buf);257buf = safe_calloc(sz);258bufsz = sz;259}260(void) sfread(buf, sz, ofp);261}262payload_size = sz;263break;264}265266case DRR_END:267{268struct drr_end *drre = &drr->drr_u.drr_end;269/*270* We would prefer to just check --begin == 0, but271* replication streams have an end of stream END272* record, so we must avoid tripping it.273*/274VERIFY3B(seen, ==, B_TRUE);275begin--;276/*277* Use the recalculated checksum, unless this is278* the END record of a stream package, which has279* no checksum.280*/281if (!ZIO_CHECKSUM_IS_ZERO(&drre->drr_checksum))282drre->drr_checksum = stream_cksum;283break;284}285286case DRR_OBJECT:287{288struct drr_object *drro = &drr->drr_u.drr_object;289VERIFY3S(begin, ==, 1);290291if (drro->drr_bonuslen > 0) {292payload_size = DRR_OBJECT_PAYLOAD_SIZE(drro);293(void) sfread(buf, payload_size, ofp);294}295break;296}297298case DRR_SPILL:299{300struct drr_spill *drrs = &drr->drr_u.drr_spill;301VERIFY3S(begin, ==, 1);302payload_size = DRR_SPILL_PAYLOAD_SIZE(drrs);303(void) sfread(buf, payload_size, ofp);304break;305}306307case DRR_WRITE_BYREF:308{309struct drr_write_byref drrwb =310drr->drr_u.drr_write_byref;311VERIFY3S(begin, ==, 1);312313num_write_byref_records++;314315/*316* Look up in hash table by drrwb->drr_refguid,317* drr_refobject, drr_refoffset. Replace this318* record with the found WRITE record, but with319* drr_object,drr_offset,drr_toguid replaced with ours.320*/321uint64_t stream_offset = 0;322rdt_lookup(&rdt, drrwb.drr_refguid,323drrwb.drr_refobject, drrwb.drr_refoffset,324&stream_offset);325326spread(infd, drr, sizeof (*drr), stream_offset);327328assert(drr->drr_type == DRR_WRITE);329struct drr_write *drrw = &drr->drr_u.drr_write;330assert(drrw->drr_toguid == drrwb.drr_refguid);331assert(drrw->drr_object == drrwb.drr_refobject);332assert(drrw->drr_offset == drrwb.drr_refoffset);333334payload_size = DRR_WRITE_PAYLOAD_SIZE(drrw);335spread(infd, buf, payload_size,336stream_offset + sizeof (*drr));337338drrw->drr_toguid = drrwb.drr_toguid;339drrw->drr_object = drrwb.drr_object;340drrw->drr_offset = drrwb.drr_offset;341break;342}343344case DRR_WRITE:345{346struct drr_write *drrw = &drr->drr_u.drr_write;347VERIFY3S(begin, ==, 1);348payload_size = DRR_WRITE_PAYLOAD_SIZE(drrw);349(void) sfread(buf, payload_size, ofp);350351rdt_insert(&rdt, drrw->drr_toguid,352drrw->drr_object, drrw->drr_offset, offset);353break;354}355356case DRR_WRITE_EMBEDDED:357{358struct drr_write_embedded *drrwe =359&drr->drr_u.drr_write_embedded;360VERIFY3S(begin, ==, 1);361payload_size =362P2ROUNDUP((uint64_t)drrwe->drr_psize, 8);363(void) sfread(buf, payload_size, ofp);364break;365}366367case DRR_FREEOBJECTS:368case DRR_FREE:369case DRR_OBJECT_RANGE:370VERIFY3S(begin, ==, 1);371break;372373default:374(void) fprintf(stderr, "INVALID record type 0x%x\n",375drr->drr_type);376/* should never happen, so assert */377assert(B_FALSE);378}379380if (feof(ofp)) {381fprintf(stderr, "Error: unexpected end-of-file\n");382exit(1);383}384if (ferror(ofp)) {385fprintf(stderr, "Error while reading file: %s\n",386strerror(errno));387exit(1);388}389390/*391* We need to recalculate the checksum, and it needs to be392* initially zero to do that. BEGIN records don't have393* a checksum.394*/395if (drr->drr_type != DRR_BEGIN) {396memset(&drr->drr_u.drr_checksum.drr_checksum, 0,397sizeof (drr->drr_u.drr_checksum.drr_checksum));398}399if (dump_record(drr, buf, payload_size,400&stream_cksum, outfd) != 0)401break;402if (drr->drr_type == DRR_END) {403/*404* Typically the END record is either the last405* thing in the stream, or it is followed406* by a BEGIN record (which also zeros the checksum).407* However, a stream package ends with two END408* records. The last END record's checksum starts409* from zero.410*/411ZIO_SET_CHECKSUM(&stream_cksum, 0, 0, 0, 0);412}413offset = ftell(ofp);414}415416if (verbose) {417char mem_str[16];418zfs_nicenum(rdt.ddt_count * sizeof (redup_entry_t),419mem_str, sizeof (mem_str));420fprintf(stderr, "converted stream with %llu total records, "421"including %llu dedup records, using %sB memory.\n",422(long long)num_records,423(long long)num_write_byref_records,424mem_str);425}426427umem_cache_destroy(rdt.ddecache);428free(rdt.redup_hash_array);429free(buf);430(void) fclose(ofp);431}432433int434zstream_do_redup(int argc, char *argv[])435{436boolean_t verbose = B_FALSE;437int c;438439while ((c = getopt(argc, argv, "v")) != -1) {440switch (c) {441case 'v':442verbose = B_TRUE;443break;444case '?':445(void) fprintf(stderr, "invalid option '%c'\n",446optopt);447zstream_usage();448break;449}450}451452argc -= optind;453argv += optind;454455if (argc != 1)456zstream_usage();457458const char *filename = argv[0];459460if (isatty(STDOUT_FILENO)) {461(void) fprintf(stderr,462"Error: Stream can not be written to a terminal.\n"463"You must redirect standard output.\n");464return (1);465}466467int fd = open(filename, O_RDONLY);468if (fd == -1) {469(void) fprintf(stderr,470"Error while opening file '%s': %s\n",471filename, strerror(errno));472exit(1);473}474475fletcher_4_init();476zfs_redup_stream(fd, STDOUT_FILENO, verbose);477fletcher_4_fini();478479close(fd);480481return (0);482}483484485