Path: blob/master/Utilities/cmzstd/lib/compress/zstdmt_compress.c
3158 views
/*1* Copyright (c) Meta Platforms, Inc. and affiliates.2* All rights reserved.3*4* This source code is licensed under both the BSD-style license (found in the5* LICENSE file in the root directory of this source tree) and the GPLv2 (found6* in the COPYING file in the root directory of this source tree).7* You may select, at your option, one of the above-listed licenses.8*/91011/* ====== Compiler specifics ====== */12#if defined(_MSC_VER)13# pragma warning(disable : 4204) /* disable: C4204: non-constant aggregate initializer */14#endif151617/* ====== Constants ====== */18#define ZSTDMT_OVERLAPLOG_DEFAULT 0192021/* ====== Dependencies ====== */22#include "../common/allocations.h" /* ZSTD_customMalloc, ZSTD_customCalloc, ZSTD_customFree */23#include "../common/zstd_deps.h" /* ZSTD_memcpy, ZSTD_memset, INT_MAX, UINT_MAX */24#include "../common/mem.h" /* MEM_STATIC */25#include "../common/pool.h" /* threadpool */26#include "../common/threading.h" /* mutex */27#include "zstd_compress_internal.h" /* MIN, ERROR, ZSTD_*, ZSTD_highbit32 */28#include "zstd_ldm.h"29#include "zstdmt_compress.h"3031/* Guards code to support resizing the SeqPool.32* We will want to resize the SeqPool to save memory in the future.33* Until then, comment the code out since it is unused.34*/35#define ZSTD_RESIZE_SEQPOOL 03637/* ====== Debug ====== */38#if defined(DEBUGLEVEL) && (DEBUGLEVEL>=2) \39&& !defined(_MSC_VER) \40&& !defined(__MINGW32__)4142# include <stdio.h>43# include <unistd.h>44# include <sys/times.h>4546# define DEBUG_PRINTHEX(l,p,n) { \47unsigned debug_u; \48for (debug_u=0; debug_u<(n); debug_u++) \49RAWLOG(l, "%02X ", ((const unsigned char*)(p))[debug_u]); \50RAWLOG(l, " \n"); \51}5253static unsigned long long GetCurrentClockTimeMicroseconds(void)54{55static clock_t _ticksPerSecond = 0;56if (_ticksPerSecond <= 0) _ticksPerSecond = sysconf(_SC_CLK_TCK);5758{ struct tms junk; clock_t newTicks = (clock_t) times(&junk);59return ((((unsigned long long)newTicks)*(1000000))/_ticksPerSecond);60} }6162#define MUTEX_WAIT_TIME_DLEVEL 663#define ZSTD_PTHREAD_MUTEX_LOCK(mutex) { \64if (DEBUGLEVEL >= MUTEX_WAIT_TIME_DLEVEL) { \65unsigned long long const beforeTime = GetCurrentClockTimeMicroseconds(); \66ZSTD_pthread_mutex_lock(mutex); \67{ unsigned long long const afterTime = GetCurrentClockTimeMicroseconds(); \68unsigned long long const elapsedTime = (afterTime-beforeTime); \69if (elapsedTime > 1000) { /* or whatever threshold you like; I'm using 1 millisecond here */ \70DEBUGLOG(MUTEX_WAIT_TIME_DLEVEL, "Thread took %llu microseconds to acquire mutex %s \n", \71elapsedTime, #mutex); \72} } \73} else { \74ZSTD_pthread_mutex_lock(mutex); \75} \76}7778#else7980# define ZSTD_PTHREAD_MUTEX_LOCK(m) ZSTD_pthread_mutex_lock(m)81# define DEBUG_PRINTHEX(l,p,n) {}8283#endif848586/* ===== Buffer Pool ===== */87/* a single Buffer Pool can be invoked from multiple threads in parallel */8889typedef struct buffer_s {90void* start;91size_t capacity;92} buffer_t;9394static const buffer_t g_nullBuffer = { NULL, 0 };9596typedef struct ZSTDMT_bufferPool_s {97ZSTD_pthread_mutex_t poolMutex;98size_t bufferSize;99unsigned totalBuffers;100unsigned nbBuffers;101ZSTD_customMem cMem;102buffer_t bTable[1]; /* variable size */103} ZSTDMT_bufferPool;104105static ZSTDMT_bufferPool* ZSTDMT_createBufferPool(unsigned maxNbBuffers, ZSTD_customMem cMem)106{107ZSTDMT_bufferPool* const bufPool = (ZSTDMT_bufferPool*)ZSTD_customCalloc(108sizeof(ZSTDMT_bufferPool) + (maxNbBuffers-1) * sizeof(buffer_t), cMem);109if (bufPool==NULL) return NULL;110if (ZSTD_pthread_mutex_init(&bufPool->poolMutex, NULL)) {111ZSTD_customFree(bufPool, cMem);112return NULL;113}114bufPool->bufferSize = 64 KB;115bufPool->totalBuffers = maxNbBuffers;116bufPool->nbBuffers = 0;117bufPool->cMem = cMem;118return bufPool;119}120121static void ZSTDMT_freeBufferPool(ZSTDMT_bufferPool* bufPool)122{123unsigned u;124DEBUGLOG(3, "ZSTDMT_freeBufferPool (address:%08X)", (U32)(size_t)bufPool);125if (!bufPool) return; /* compatibility with free on NULL */126for (u=0; u<bufPool->totalBuffers; u++) {127DEBUGLOG(4, "free buffer %2u (address:%08X)", u, (U32)(size_t)bufPool->bTable[u].start);128ZSTD_customFree(bufPool->bTable[u].start, bufPool->cMem);129}130ZSTD_pthread_mutex_destroy(&bufPool->poolMutex);131ZSTD_customFree(bufPool, bufPool->cMem);132}133134/* only works at initialization, not during compression */135static size_t ZSTDMT_sizeof_bufferPool(ZSTDMT_bufferPool* bufPool)136{137size_t const poolSize = sizeof(*bufPool)138+ (bufPool->totalBuffers - 1) * sizeof(buffer_t);139unsigned u;140size_t totalBufferSize = 0;141ZSTD_pthread_mutex_lock(&bufPool->poolMutex);142for (u=0; u<bufPool->totalBuffers; u++)143totalBufferSize += bufPool->bTable[u].capacity;144ZSTD_pthread_mutex_unlock(&bufPool->poolMutex);145146return poolSize + totalBufferSize;147}148149/* ZSTDMT_setBufferSize() :150* all future buffers provided by this buffer pool will have _at least_ this size151* note : it's better for all buffers to have same size,152* as they become freely interchangeable, reducing malloc/free usages and memory fragmentation */153static void ZSTDMT_setBufferSize(ZSTDMT_bufferPool* const bufPool, size_t const bSize)154{155ZSTD_pthread_mutex_lock(&bufPool->poolMutex);156DEBUGLOG(4, "ZSTDMT_setBufferSize: bSize = %u", (U32)bSize);157bufPool->bufferSize = bSize;158ZSTD_pthread_mutex_unlock(&bufPool->poolMutex);159}160161162static ZSTDMT_bufferPool* ZSTDMT_expandBufferPool(ZSTDMT_bufferPool* srcBufPool, unsigned maxNbBuffers)163{164if (srcBufPool==NULL) return NULL;165if (srcBufPool->totalBuffers >= maxNbBuffers) /* good enough */166return srcBufPool;167/* need a larger buffer pool */168{ ZSTD_customMem const cMem = srcBufPool->cMem;169size_t const bSize = srcBufPool->bufferSize; /* forward parameters */170ZSTDMT_bufferPool* newBufPool;171ZSTDMT_freeBufferPool(srcBufPool);172newBufPool = ZSTDMT_createBufferPool(maxNbBuffers, cMem);173if (newBufPool==NULL) return newBufPool;174ZSTDMT_setBufferSize(newBufPool, bSize);175return newBufPool;176}177}178179/** ZSTDMT_getBuffer() :180* assumption : bufPool must be valid181* @return : a buffer, with start pointer and size182* note: allocation may fail, in this case, start==NULL and size==0 */183static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool)184{185size_t const bSize = bufPool->bufferSize;186DEBUGLOG(5, "ZSTDMT_getBuffer: bSize = %u", (U32)bufPool->bufferSize);187ZSTD_pthread_mutex_lock(&bufPool->poolMutex);188if (bufPool->nbBuffers) { /* try to use an existing buffer */189buffer_t const buf = bufPool->bTable[--(bufPool->nbBuffers)];190size_t const availBufferSize = buf.capacity;191bufPool->bTable[bufPool->nbBuffers] = g_nullBuffer;192if ((availBufferSize >= bSize) & ((availBufferSize>>3) <= bSize)) {193/* large enough, but not too much */194DEBUGLOG(5, "ZSTDMT_getBuffer: provide buffer %u of size %u",195bufPool->nbBuffers, (U32)buf.capacity);196ZSTD_pthread_mutex_unlock(&bufPool->poolMutex);197return buf;198}199/* size conditions not respected : scratch this buffer, create new one */200DEBUGLOG(5, "ZSTDMT_getBuffer: existing buffer does not meet size conditions => freeing");201ZSTD_customFree(buf.start, bufPool->cMem);202}203ZSTD_pthread_mutex_unlock(&bufPool->poolMutex);204/* create new buffer */205DEBUGLOG(5, "ZSTDMT_getBuffer: create a new buffer");206{ buffer_t buffer;207void* const start = ZSTD_customMalloc(bSize, bufPool->cMem);208buffer.start = start; /* note : start can be NULL if malloc fails ! */209buffer.capacity = (start==NULL) ? 0 : bSize;210if (start==NULL) {211DEBUGLOG(5, "ZSTDMT_getBuffer: buffer allocation failure !!");212} else {213DEBUGLOG(5, "ZSTDMT_getBuffer: created buffer of size %u", (U32)bSize);214}215return buffer;216}217}218219#if ZSTD_RESIZE_SEQPOOL220/** ZSTDMT_resizeBuffer() :221* assumption : bufPool must be valid222* @return : a buffer that is at least the buffer pool buffer size.223* If a reallocation happens, the data in the input buffer is copied.224*/225static buffer_t ZSTDMT_resizeBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buffer)226{227size_t const bSize = bufPool->bufferSize;228if (buffer.capacity < bSize) {229void* const start = ZSTD_customMalloc(bSize, bufPool->cMem);230buffer_t newBuffer;231newBuffer.start = start;232newBuffer.capacity = start == NULL ? 0 : bSize;233if (start != NULL) {234assert(newBuffer.capacity >= buffer.capacity);235ZSTD_memcpy(newBuffer.start, buffer.start, buffer.capacity);236DEBUGLOG(5, "ZSTDMT_resizeBuffer: created buffer of size %u", (U32)bSize);237return newBuffer;238}239DEBUGLOG(5, "ZSTDMT_resizeBuffer: buffer allocation failure !!");240}241return buffer;242}243#endif244245/* store buffer for later re-use, up to pool capacity */246static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf)247{248DEBUGLOG(5, "ZSTDMT_releaseBuffer");249if (buf.start == NULL) return; /* compatible with release on NULL */250ZSTD_pthread_mutex_lock(&bufPool->poolMutex);251if (bufPool->nbBuffers < bufPool->totalBuffers) {252bufPool->bTable[bufPool->nbBuffers++] = buf; /* stored for later use */253DEBUGLOG(5, "ZSTDMT_releaseBuffer: stored buffer of size %u in slot %u",254(U32)buf.capacity, (U32)(bufPool->nbBuffers-1));255ZSTD_pthread_mutex_unlock(&bufPool->poolMutex);256return;257}258ZSTD_pthread_mutex_unlock(&bufPool->poolMutex);259/* Reached bufferPool capacity (should not happen) */260DEBUGLOG(5, "ZSTDMT_releaseBuffer: pool capacity reached => freeing ");261ZSTD_customFree(buf.start, bufPool->cMem);262}263264/* We need 2 output buffers per worker since each dstBuff must be flushed after it is released.265* The 3 additional buffers are as follows:266* 1 buffer for input loading267* 1 buffer for "next input" when submitting current one268* 1 buffer stuck in queue */269#define BUF_POOL_MAX_NB_BUFFERS(nbWorkers) (2*(nbWorkers) + 3)270271/* After a worker releases its rawSeqStore, it is immediately ready for reuse.272* So we only need one seq buffer per worker. */273#define SEQ_POOL_MAX_NB_BUFFERS(nbWorkers) (nbWorkers)274275/* ===== Seq Pool Wrapper ====== */276277typedef ZSTDMT_bufferPool ZSTDMT_seqPool;278279static size_t ZSTDMT_sizeof_seqPool(ZSTDMT_seqPool* seqPool)280{281return ZSTDMT_sizeof_bufferPool(seqPool);282}283284static rawSeqStore_t bufferToSeq(buffer_t buffer)285{286rawSeqStore_t seq = kNullRawSeqStore;287seq.seq = (rawSeq*)buffer.start;288seq.capacity = buffer.capacity / sizeof(rawSeq);289return seq;290}291292static buffer_t seqToBuffer(rawSeqStore_t seq)293{294buffer_t buffer;295buffer.start = seq.seq;296buffer.capacity = seq.capacity * sizeof(rawSeq);297return buffer;298}299300static rawSeqStore_t ZSTDMT_getSeq(ZSTDMT_seqPool* seqPool)301{302if (seqPool->bufferSize == 0) {303return kNullRawSeqStore;304}305return bufferToSeq(ZSTDMT_getBuffer(seqPool));306}307308#if ZSTD_RESIZE_SEQPOOL309static rawSeqStore_t ZSTDMT_resizeSeq(ZSTDMT_seqPool* seqPool, rawSeqStore_t seq)310{311return bufferToSeq(ZSTDMT_resizeBuffer(seqPool, seqToBuffer(seq)));312}313#endif314315static void ZSTDMT_releaseSeq(ZSTDMT_seqPool* seqPool, rawSeqStore_t seq)316{317ZSTDMT_releaseBuffer(seqPool, seqToBuffer(seq));318}319320static void ZSTDMT_setNbSeq(ZSTDMT_seqPool* const seqPool, size_t const nbSeq)321{322ZSTDMT_setBufferSize(seqPool, nbSeq * sizeof(rawSeq));323}324325static ZSTDMT_seqPool* ZSTDMT_createSeqPool(unsigned nbWorkers, ZSTD_customMem cMem)326{327ZSTDMT_seqPool* const seqPool = ZSTDMT_createBufferPool(SEQ_POOL_MAX_NB_BUFFERS(nbWorkers), cMem);328if (seqPool == NULL) return NULL;329ZSTDMT_setNbSeq(seqPool, 0);330return seqPool;331}332333static void ZSTDMT_freeSeqPool(ZSTDMT_seqPool* seqPool)334{335ZSTDMT_freeBufferPool(seqPool);336}337338static ZSTDMT_seqPool* ZSTDMT_expandSeqPool(ZSTDMT_seqPool* pool, U32 nbWorkers)339{340return ZSTDMT_expandBufferPool(pool, SEQ_POOL_MAX_NB_BUFFERS(nbWorkers));341}342343344/* ===== CCtx Pool ===== */345/* a single CCtx Pool can be invoked from multiple threads in parallel */346347typedef struct {348ZSTD_pthread_mutex_t poolMutex;349int totalCCtx;350int availCCtx;351ZSTD_customMem cMem;352ZSTD_CCtx* cctx[1]; /* variable size */353} ZSTDMT_CCtxPool;354355/* note : all CCtx borrowed from the pool should be released back to the pool _before_ freeing the pool */356static void ZSTDMT_freeCCtxPool(ZSTDMT_CCtxPool* pool)357{358int cid;359for (cid=0; cid<pool->totalCCtx; cid++)360ZSTD_freeCCtx(pool->cctx[cid]); /* note : compatible with free on NULL */361ZSTD_pthread_mutex_destroy(&pool->poolMutex);362ZSTD_customFree(pool, pool->cMem);363}364365/* ZSTDMT_createCCtxPool() :366* implies nbWorkers >= 1 , checked by caller ZSTDMT_createCCtx() */367static ZSTDMT_CCtxPool* ZSTDMT_createCCtxPool(int nbWorkers,368ZSTD_customMem cMem)369{370ZSTDMT_CCtxPool* const cctxPool = (ZSTDMT_CCtxPool*) ZSTD_customCalloc(371sizeof(ZSTDMT_CCtxPool) + (nbWorkers-1)*sizeof(ZSTD_CCtx*), cMem);372assert(nbWorkers > 0);373if (!cctxPool) return NULL;374if (ZSTD_pthread_mutex_init(&cctxPool->poolMutex, NULL)) {375ZSTD_customFree(cctxPool, cMem);376return NULL;377}378cctxPool->cMem = cMem;379cctxPool->totalCCtx = nbWorkers;380cctxPool->availCCtx = 1; /* at least one cctx for single-thread mode */381cctxPool->cctx[0] = ZSTD_createCCtx_advanced(cMem);382if (!cctxPool->cctx[0]) { ZSTDMT_freeCCtxPool(cctxPool); return NULL; }383DEBUGLOG(3, "cctxPool created, with %u workers", nbWorkers);384return cctxPool;385}386387static ZSTDMT_CCtxPool* ZSTDMT_expandCCtxPool(ZSTDMT_CCtxPool* srcPool,388int nbWorkers)389{390if (srcPool==NULL) return NULL;391if (nbWorkers <= srcPool->totalCCtx) return srcPool; /* good enough */392/* need a larger cctx pool */393{ ZSTD_customMem const cMem = srcPool->cMem;394ZSTDMT_freeCCtxPool(srcPool);395return ZSTDMT_createCCtxPool(nbWorkers, cMem);396}397}398399/* only works during initialization phase, not during compression */400static size_t ZSTDMT_sizeof_CCtxPool(ZSTDMT_CCtxPool* cctxPool)401{402ZSTD_pthread_mutex_lock(&cctxPool->poolMutex);403{ unsigned const nbWorkers = cctxPool->totalCCtx;404size_t const poolSize = sizeof(*cctxPool)405+ (nbWorkers-1) * sizeof(ZSTD_CCtx*);406unsigned u;407size_t totalCCtxSize = 0;408for (u=0; u<nbWorkers; u++) {409totalCCtxSize += ZSTD_sizeof_CCtx(cctxPool->cctx[u]);410}411ZSTD_pthread_mutex_unlock(&cctxPool->poolMutex);412assert(nbWorkers > 0);413return poolSize + totalCCtxSize;414}415}416417static ZSTD_CCtx* ZSTDMT_getCCtx(ZSTDMT_CCtxPool* cctxPool)418{419DEBUGLOG(5, "ZSTDMT_getCCtx");420ZSTD_pthread_mutex_lock(&cctxPool->poolMutex);421if (cctxPool->availCCtx) {422cctxPool->availCCtx--;423{ ZSTD_CCtx* const cctx = cctxPool->cctx[cctxPool->availCCtx];424ZSTD_pthread_mutex_unlock(&cctxPool->poolMutex);425return cctx;426} }427ZSTD_pthread_mutex_unlock(&cctxPool->poolMutex);428DEBUGLOG(5, "create one more CCtx");429return ZSTD_createCCtx_advanced(cctxPool->cMem); /* note : can be NULL, when creation fails ! */430}431432static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx)433{434if (cctx==NULL) return; /* compatibility with release on NULL */435ZSTD_pthread_mutex_lock(&pool->poolMutex);436if (pool->availCCtx < pool->totalCCtx)437pool->cctx[pool->availCCtx++] = cctx;438else {439/* pool overflow : should not happen, since totalCCtx==nbWorkers */440DEBUGLOG(4, "CCtx pool overflow : free cctx");441ZSTD_freeCCtx(cctx);442}443ZSTD_pthread_mutex_unlock(&pool->poolMutex);444}445446/* ==== Serial State ==== */447448typedef struct {449void const* start;450size_t size;451} range_t;452453typedef struct {454/* All variables in the struct are protected by mutex. */455ZSTD_pthread_mutex_t mutex;456ZSTD_pthread_cond_t cond;457ZSTD_CCtx_params params;458ldmState_t ldmState;459XXH64_state_t xxhState;460unsigned nextJobID;461/* Protects ldmWindow.462* Must be acquired after the main mutex when acquiring both.463*/464ZSTD_pthread_mutex_t ldmWindowMutex;465ZSTD_pthread_cond_t ldmWindowCond; /* Signaled when ldmWindow is updated */466ZSTD_window_t ldmWindow; /* A thread-safe copy of ldmState.window */467} serialState_t;468469static int470ZSTDMT_serialState_reset(serialState_t* serialState,471ZSTDMT_seqPool* seqPool,472ZSTD_CCtx_params params,473size_t jobSize,474const void* dict, size_t const dictSize,475ZSTD_dictContentType_e dictContentType)476{477/* Adjust parameters */478if (params.ldmParams.enableLdm == ZSTD_ps_enable) {479DEBUGLOG(4, "LDM window size = %u KB", (1U << params.cParams.windowLog) >> 10);480ZSTD_ldm_adjustParameters(¶ms.ldmParams, ¶ms.cParams);481assert(params.ldmParams.hashLog >= params.ldmParams.bucketSizeLog);482assert(params.ldmParams.hashRateLog < 32);483} else {484ZSTD_memset(¶ms.ldmParams, 0, sizeof(params.ldmParams));485}486serialState->nextJobID = 0;487if (params.fParams.checksumFlag)488XXH64_reset(&serialState->xxhState, 0);489if (params.ldmParams.enableLdm == ZSTD_ps_enable) {490ZSTD_customMem cMem = params.customMem;491unsigned const hashLog = params.ldmParams.hashLog;492size_t const hashSize = ((size_t)1 << hashLog) * sizeof(ldmEntry_t);493unsigned const bucketLog =494params.ldmParams.hashLog - params.ldmParams.bucketSizeLog;495unsigned const prevBucketLog =496serialState->params.ldmParams.hashLog -497serialState->params.ldmParams.bucketSizeLog;498size_t const numBuckets = (size_t)1 << bucketLog;499/* Size the seq pool tables */500ZSTDMT_setNbSeq(seqPool, ZSTD_ldm_getMaxNbSeq(params.ldmParams, jobSize));501/* Reset the window */502ZSTD_window_init(&serialState->ldmState.window);503/* Resize tables and output space if necessary. */504if (serialState->ldmState.hashTable == NULL || serialState->params.ldmParams.hashLog < hashLog) {505ZSTD_customFree(serialState->ldmState.hashTable, cMem);506serialState->ldmState.hashTable = (ldmEntry_t*)ZSTD_customMalloc(hashSize, cMem);507}508if (serialState->ldmState.bucketOffsets == NULL || prevBucketLog < bucketLog) {509ZSTD_customFree(serialState->ldmState.bucketOffsets, cMem);510serialState->ldmState.bucketOffsets = (BYTE*)ZSTD_customMalloc(numBuckets, cMem);511}512if (!serialState->ldmState.hashTable || !serialState->ldmState.bucketOffsets)513return 1;514/* Zero the tables */515ZSTD_memset(serialState->ldmState.hashTable, 0, hashSize);516ZSTD_memset(serialState->ldmState.bucketOffsets, 0, numBuckets);517518/* Update window state and fill hash table with dict */519serialState->ldmState.loadedDictEnd = 0;520if (dictSize > 0) {521if (dictContentType == ZSTD_dct_rawContent) {522BYTE const* const dictEnd = (const BYTE*)dict + dictSize;523ZSTD_window_update(&serialState->ldmState.window, dict, dictSize, /* forceNonContiguous */ 0);524ZSTD_ldm_fillHashTable(&serialState->ldmState, (const BYTE*)dict, dictEnd, ¶ms.ldmParams);525serialState->ldmState.loadedDictEnd = params.forceWindow ? 0 : (U32)(dictEnd - serialState->ldmState.window.base);526} else {527/* don't even load anything */528}529}530531/* Initialize serialState's copy of ldmWindow. */532serialState->ldmWindow = serialState->ldmState.window;533}534535serialState->params = params;536serialState->params.jobSize = (U32)jobSize;537return 0;538}539540static int ZSTDMT_serialState_init(serialState_t* serialState)541{542int initError = 0;543ZSTD_memset(serialState, 0, sizeof(*serialState));544initError |= ZSTD_pthread_mutex_init(&serialState->mutex, NULL);545initError |= ZSTD_pthread_cond_init(&serialState->cond, NULL);546initError |= ZSTD_pthread_mutex_init(&serialState->ldmWindowMutex, NULL);547initError |= ZSTD_pthread_cond_init(&serialState->ldmWindowCond, NULL);548return initError;549}550551static void ZSTDMT_serialState_free(serialState_t* serialState)552{553ZSTD_customMem cMem = serialState->params.customMem;554ZSTD_pthread_mutex_destroy(&serialState->mutex);555ZSTD_pthread_cond_destroy(&serialState->cond);556ZSTD_pthread_mutex_destroy(&serialState->ldmWindowMutex);557ZSTD_pthread_cond_destroy(&serialState->ldmWindowCond);558ZSTD_customFree(serialState->ldmState.hashTable, cMem);559ZSTD_customFree(serialState->ldmState.bucketOffsets, cMem);560}561562static void ZSTDMT_serialState_update(serialState_t* serialState,563ZSTD_CCtx* jobCCtx, rawSeqStore_t seqStore,564range_t src, unsigned jobID)565{566/* Wait for our turn */567ZSTD_PTHREAD_MUTEX_LOCK(&serialState->mutex);568while (serialState->nextJobID < jobID) {569DEBUGLOG(5, "wait for serialState->cond");570ZSTD_pthread_cond_wait(&serialState->cond, &serialState->mutex);571}572/* A future job may error and skip our job */573if (serialState->nextJobID == jobID) {574/* It is now our turn, do any processing necessary */575if (serialState->params.ldmParams.enableLdm == ZSTD_ps_enable) {576size_t error;577assert(seqStore.seq != NULL && seqStore.pos == 0 &&578seqStore.size == 0 && seqStore.capacity > 0);579assert(src.size <= serialState->params.jobSize);580ZSTD_window_update(&serialState->ldmState.window, src.start, src.size, /* forceNonContiguous */ 0);581error = ZSTD_ldm_generateSequences(582&serialState->ldmState, &seqStore,583&serialState->params.ldmParams, src.start, src.size);584/* We provide a large enough buffer to never fail. */585assert(!ZSTD_isError(error)); (void)error;586/* Update ldmWindow to match the ldmState.window and signal the main587* thread if it is waiting for a buffer.588*/589ZSTD_PTHREAD_MUTEX_LOCK(&serialState->ldmWindowMutex);590serialState->ldmWindow = serialState->ldmState.window;591ZSTD_pthread_cond_signal(&serialState->ldmWindowCond);592ZSTD_pthread_mutex_unlock(&serialState->ldmWindowMutex);593}594if (serialState->params.fParams.checksumFlag && src.size > 0)595XXH64_update(&serialState->xxhState, src.start, src.size);596}597/* Now it is the next jobs turn */598serialState->nextJobID++;599ZSTD_pthread_cond_broadcast(&serialState->cond);600ZSTD_pthread_mutex_unlock(&serialState->mutex);601602if (seqStore.size > 0) {603size_t const err = ZSTD_referenceExternalSequences(604jobCCtx, seqStore.seq, seqStore.size);605assert(serialState->params.ldmParams.enableLdm == ZSTD_ps_enable);606assert(!ZSTD_isError(err));607(void)err;608}609}610611static void ZSTDMT_serialState_ensureFinished(serialState_t* serialState,612unsigned jobID, size_t cSize)613{614ZSTD_PTHREAD_MUTEX_LOCK(&serialState->mutex);615if (serialState->nextJobID <= jobID) {616assert(ZSTD_isError(cSize)); (void)cSize;617DEBUGLOG(5, "Skipping past job %u because of error", jobID);618serialState->nextJobID = jobID + 1;619ZSTD_pthread_cond_broadcast(&serialState->cond);620621ZSTD_PTHREAD_MUTEX_LOCK(&serialState->ldmWindowMutex);622ZSTD_window_clear(&serialState->ldmWindow);623ZSTD_pthread_cond_signal(&serialState->ldmWindowCond);624ZSTD_pthread_mutex_unlock(&serialState->ldmWindowMutex);625}626ZSTD_pthread_mutex_unlock(&serialState->mutex);627628}629630631/* ------------------------------------------ */632/* ===== Worker thread ===== */633/* ------------------------------------------ */634635static const range_t kNullRange = { NULL, 0 };636637typedef struct {638size_t consumed; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx */639size_t cSize; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx, then set0 by mtctx */640ZSTD_pthread_mutex_t job_mutex; /* Thread-safe - used by mtctx and worker */641ZSTD_pthread_cond_t job_cond; /* Thread-safe - used by mtctx and worker */642ZSTDMT_CCtxPool* cctxPool; /* Thread-safe - used by mtctx and (all) workers */643ZSTDMT_bufferPool* bufPool; /* Thread-safe - used by mtctx and (all) workers */644ZSTDMT_seqPool* seqPool; /* Thread-safe - used by mtctx and (all) workers */645serialState_t* serial; /* Thread-safe - used by mtctx and (all) workers */646buffer_t dstBuff; /* set by worker (or mtctx), then read by worker & mtctx, then modified by mtctx => no barrier */647range_t prefix; /* set by mtctx, then read by worker & mtctx => no barrier */648range_t src; /* set by mtctx, then read by worker & mtctx => no barrier */649unsigned jobID; /* set by mtctx, then read by worker => no barrier */650unsigned firstJob; /* set by mtctx, then read by worker => no barrier */651unsigned lastJob; /* set by mtctx, then read by worker => no barrier */652ZSTD_CCtx_params params; /* set by mtctx, then read by worker => no barrier */653const ZSTD_CDict* cdict; /* set by mtctx, then read by worker => no barrier */654unsigned long long fullFrameSize; /* set by mtctx, then read by worker => no barrier */655size_t dstFlushed; /* used only by mtctx */656unsigned frameChecksumNeeded; /* used only by mtctx */657} ZSTDMT_jobDescription;658659#define JOB_ERROR(e) { \660ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex); \661job->cSize = e; \662ZSTD_pthread_mutex_unlock(&job->job_mutex); \663goto _endJob; \664}665666/* ZSTDMT_compressionJob() is a POOL_function type */667static void ZSTDMT_compressionJob(void* jobDescription)668{669ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription;670ZSTD_CCtx_params jobParams = job->params; /* do not modify job->params ! copy it, modify the copy */671ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(job->cctxPool);672rawSeqStore_t rawSeqStore = ZSTDMT_getSeq(job->seqPool);673buffer_t dstBuff = job->dstBuff;674size_t lastCBlockSize = 0;675676/* resources */677if (cctx==NULL) JOB_ERROR(ERROR(memory_allocation));678if (dstBuff.start == NULL) { /* streaming job : doesn't provide a dstBuffer */679dstBuff = ZSTDMT_getBuffer(job->bufPool);680if (dstBuff.start==NULL) JOB_ERROR(ERROR(memory_allocation));681job->dstBuff = dstBuff; /* this value can be read in ZSTDMT_flush, when it copies the whole job */682}683if (jobParams.ldmParams.enableLdm == ZSTD_ps_enable && rawSeqStore.seq == NULL)684JOB_ERROR(ERROR(memory_allocation));685686/* Don't compute the checksum for chunks, since we compute it externally,687* but write it in the header.688*/689if (job->jobID != 0) jobParams.fParams.checksumFlag = 0;690/* Don't run LDM for the chunks, since we handle it externally */691jobParams.ldmParams.enableLdm = ZSTD_ps_disable;692/* Correct nbWorkers to 0. */693jobParams.nbWorkers = 0;694695696/* init */697if (job->cdict) {698size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, NULL, 0, ZSTD_dct_auto, ZSTD_dtlm_fast, job->cdict, &jobParams, job->fullFrameSize);699assert(job->firstJob); /* only allowed for first job */700if (ZSTD_isError(initError)) JOB_ERROR(initError);701} else { /* srcStart points at reloaded section */702U64 const pledgedSrcSize = job->firstJob ? job->fullFrameSize : job->src.size;703{ size_t const forceWindowError = ZSTD_CCtxParams_setParameter(&jobParams, ZSTD_c_forceMaxWindow, !job->firstJob);704if (ZSTD_isError(forceWindowError)) JOB_ERROR(forceWindowError);705}706if (!job->firstJob) {707size_t const err = ZSTD_CCtxParams_setParameter(&jobParams, ZSTD_c_deterministicRefPrefix, 0);708if (ZSTD_isError(err)) JOB_ERROR(err);709}710{ size_t const initError = ZSTD_compressBegin_advanced_internal(cctx,711job->prefix.start, job->prefix.size, ZSTD_dct_rawContent, /* load dictionary in "content-only" mode (no header analysis) */712ZSTD_dtlm_fast,713NULL, /*cdict*/714&jobParams, pledgedSrcSize);715if (ZSTD_isError(initError)) JOB_ERROR(initError);716} }717718/* Perform serial step as early as possible, but after CCtx initialization */719ZSTDMT_serialState_update(job->serial, cctx, rawSeqStore, job->src, job->jobID);720721if (!job->firstJob) { /* flush and overwrite frame header when it's not first job */722size_t const hSize = ZSTD_compressContinue_public(cctx, dstBuff.start, dstBuff.capacity, job->src.start, 0);723if (ZSTD_isError(hSize)) JOB_ERROR(hSize);724DEBUGLOG(5, "ZSTDMT_compressionJob: flush and overwrite %u bytes of frame header (not first job)", (U32)hSize);725ZSTD_invalidateRepCodes(cctx);726}727728/* compress */729{ size_t const chunkSize = 4*ZSTD_BLOCKSIZE_MAX;730int const nbChunks = (int)((job->src.size + (chunkSize-1)) / chunkSize);731const BYTE* ip = (const BYTE*) job->src.start;732BYTE* const ostart = (BYTE*)dstBuff.start;733BYTE* op = ostart;734BYTE* oend = op + dstBuff.capacity;735int chunkNb;736if (sizeof(size_t) > sizeof(int)) assert(job->src.size < ((size_t)INT_MAX) * chunkSize); /* check overflow */737DEBUGLOG(5, "ZSTDMT_compressionJob: compress %u bytes in %i blocks", (U32)job->src.size, nbChunks);738assert(job->cSize == 0);739for (chunkNb = 1; chunkNb < nbChunks; chunkNb++) {740size_t const cSize = ZSTD_compressContinue_public(cctx, op, oend-op, ip, chunkSize);741if (ZSTD_isError(cSize)) JOB_ERROR(cSize);742ip += chunkSize;743op += cSize; assert(op < oend);744/* stats */745ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex);746job->cSize += cSize;747job->consumed = chunkSize * chunkNb;748DEBUGLOG(5, "ZSTDMT_compressionJob: compress new block : cSize==%u bytes (total: %u)",749(U32)cSize, (U32)job->cSize);750ZSTD_pthread_cond_signal(&job->job_cond); /* warns some more data is ready to be flushed */751ZSTD_pthread_mutex_unlock(&job->job_mutex);752}753/* last block */754assert(chunkSize > 0);755assert((chunkSize & (chunkSize - 1)) == 0); /* chunkSize must be power of 2 for mask==(chunkSize-1) to work */756if ((nbChunks > 0) | job->lastJob /*must output a "last block" flag*/ ) {757size_t const lastBlockSize1 = job->src.size & (chunkSize-1);758size_t const lastBlockSize = ((lastBlockSize1==0) & (job->src.size>=chunkSize)) ? chunkSize : lastBlockSize1;759size_t const cSize = (job->lastJob) ?760ZSTD_compressEnd_public(cctx, op, oend-op, ip, lastBlockSize) :761ZSTD_compressContinue_public(cctx, op, oend-op, ip, lastBlockSize);762if (ZSTD_isError(cSize)) JOB_ERROR(cSize);763lastCBlockSize = cSize;764} }765if (!job->firstJob) {766/* Double check that we don't have an ext-dict, because then our767* repcode invalidation doesn't work.768*/769assert(!ZSTD_window_hasExtDict(cctx->blockState.matchState.window));770}771ZSTD_CCtx_trace(cctx, 0);772773_endJob:774ZSTDMT_serialState_ensureFinished(job->serial, job->jobID, job->cSize);775if (job->prefix.size > 0)776DEBUGLOG(5, "Finished with prefix: %zx", (size_t)job->prefix.start);777DEBUGLOG(5, "Finished with source: %zx", (size_t)job->src.start);778/* release resources */779ZSTDMT_releaseSeq(job->seqPool, rawSeqStore);780ZSTDMT_releaseCCtx(job->cctxPool, cctx);781/* report */782ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex);783if (ZSTD_isError(job->cSize)) assert(lastCBlockSize == 0);784job->cSize += lastCBlockSize;785job->consumed = job->src.size; /* when job->consumed == job->src.size , compression job is presumed completed */786ZSTD_pthread_cond_signal(&job->job_cond);787ZSTD_pthread_mutex_unlock(&job->job_mutex);788}789790791/* ------------------------------------------ */792/* ===== Multi-threaded compression ===== */793/* ------------------------------------------ */794795typedef struct {796range_t prefix; /* read-only non-owned prefix buffer */797buffer_t buffer;798size_t filled;799} inBuff_t;800801typedef struct {802BYTE* buffer; /* The round input buffer. All jobs get references803* to pieces of the buffer. ZSTDMT_tryGetInputRange()804* handles handing out job input buffers, and makes805* sure it doesn't overlap with any pieces still in use.806*/807size_t capacity; /* The capacity of buffer. */808size_t pos; /* The position of the current inBuff in the round809* buffer. Updated past the end if the inBuff once810* the inBuff is sent to the worker thread.811* pos <= capacity.812*/813} roundBuff_t;814815static const roundBuff_t kNullRoundBuff = {NULL, 0, 0};816817#define RSYNC_LENGTH 32818/* Don't create chunks smaller than the zstd block size.819* This stops us from regressing compression ratio too much,820* and ensures our output fits in ZSTD_compressBound().821*822* If this is shrunk < ZSTD_BLOCKSIZELOG_MIN then823* ZSTD_COMPRESSBOUND() will need to be updated.824*/825#define RSYNC_MIN_BLOCK_LOG ZSTD_BLOCKSIZELOG_MAX826#define RSYNC_MIN_BLOCK_SIZE (1<<RSYNC_MIN_BLOCK_LOG)827828typedef struct {829U64 hash;830U64 hitMask;831U64 primePower;832} rsyncState_t;833834struct ZSTDMT_CCtx_s {835POOL_ctx* factory;836ZSTDMT_jobDescription* jobs;837ZSTDMT_bufferPool* bufPool;838ZSTDMT_CCtxPool* cctxPool;839ZSTDMT_seqPool* seqPool;840ZSTD_CCtx_params params;841size_t targetSectionSize;842size_t targetPrefixSize;843int jobReady; /* 1 => one job is already prepared, but pool has shortage of workers. Don't create a new job. */844inBuff_t inBuff;845roundBuff_t roundBuff;846serialState_t serial;847rsyncState_t rsync;848unsigned jobIDMask;849unsigned doneJobID;850unsigned nextJobID;851unsigned frameEnded;852unsigned allJobsCompleted;853unsigned long long frameContentSize;854unsigned long long consumed;855unsigned long long produced;856ZSTD_customMem cMem;857ZSTD_CDict* cdictLocal;858const ZSTD_CDict* cdict;859unsigned providedFactory: 1;860};861862static void ZSTDMT_freeJobsTable(ZSTDMT_jobDescription* jobTable, U32 nbJobs, ZSTD_customMem cMem)863{864U32 jobNb;865if (jobTable == NULL) return;866for (jobNb=0; jobNb<nbJobs; jobNb++) {867ZSTD_pthread_mutex_destroy(&jobTable[jobNb].job_mutex);868ZSTD_pthread_cond_destroy(&jobTable[jobNb].job_cond);869}870ZSTD_customFree(jobTable, cMem);871}872873/* ZSTDMT_allocJobsTable()874* allocate and init a job table.875* update *nbJobsPtr to next power of 2 value, as size of table */876static ZSTDMT_jobDescription* ZSTDMT_createJobsTable(U32* nbJobsPtr, ZSTD_customMem cMem)877{878U32 const nbJobsLog2 = ZSTD_highbit32(*nbJobsPtr) + 1;879U32 const nbJobs = 1 << nbJobsLog2;880U32 jobNb;881ZSTDMT_jobDescription* const jobTable = (ZSTDMT_jobDescription*)882ZSTD_customCalloc(nbJobs * sizeof(ZSTDMT_jobDescription), cMem);883int initError = 0;884if (jobTable==NULL) return NULL;885*nbJobsPtr = nbJobs;886for (jobNb=0; jobNb<nbJobs; jobNb++) {887initError |= ZSTD_pthread_mutex_init(&jobTable[jobNb].job_mutex, NULL);888initError |= ZSTD_pthread_cond_init(&jobTable[jobNb].job_cond, NULL);889}890if (initError != 0) {891ZSTDMT_freeJobsTable(jobTable, nbJobs, cMem);892return NULL;893}894return jobTable;895}896897static size_t ZSTDMT_expandJobsTable (ZSTDMT_CCtx* mtctx, U32 nbWorkers) {898U32 nbJobs = nbWorkers + 2;899if (nbJobs > mtctx->jobIDMask+1) { /* need more job capacity */900ZSTDMT_freeJobsTable(mtctx->jobs, mtctx->jobIDMask+1, mtctx->cMem);901mtctx->jobIDMask = 0;902mtctx->jobs = ZSTDMT_createJobsTable(&nbJobs, mtctx->cMem);903if (mtctx->jobs==NULL) return ERROR(memory_allocation);904assert((nbJobs != 0) && ((nbJobs & (nbJobs - 1)) == 0)); /* ensure nbJobs is a power of 2 */905mtctx->jobIDMask = nbJobs - 1;906}907return 0;908}909910911/* ZSTDMT_CCtxParam_setNbWorkers():912* Internal use only */913static size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned nbWorkers)914{915return ZSTD_CCtxParams_setParameter(params, ZSTD_c_nbWorkers, (int)nbWorkers);916}917918MEM_STATIC ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced_internal(unsigned nbWorkers, ZSTD_customMem cMem, ZSTD_threadPool* pool)919{920ZSTDMT_CCtx* mtctx;921U32 nbJobs = nbWorkers + 2;922int initError;923DEBUGLOG(3, "ZSTDMT_createCCtx_advanced (nbWorkers = %u)", nbWorkers);924925if (nbWorkers < 1) return NULL;926nbWorkers = MIN(nbWorkers , ZSTDMT_NBWORKERS_MAX);927if ((cMem.customAlloc!=NULL) ^ (cMem.customFree!=NULL))928/* invalid custom allocator */929return NULL;930931mtctx = (ZSTDMT_CCtx*) ZSTD_customCalloc(sizeof(ZSTDMT_CCtx), cMem);932if (!mtctx) return NULL;933ZSTDMT_CCtxParam_setNbWorkers(&mtctx->params, nbWorkers);934mtctx->cMem = cMem;935mtctx->allJobsCompleted = 1;936if (pool != NULL) {937mtctx->factory = pool;938mtctx->providedFactory = 1;939}940else {941mtctx->factory = POOL_create_advanced(nbWorkers, 0, cMem);942mtctx->providedFactory = 0;943}944mtctx->jobs = ZSTDMT_createJobsTable(&nbJobs, cMem);945assert(nbJobs > 0); assert((nbJobs & (nbJobs - 1)) == 0); /* ensure nbJobs is a power of 2 */946mtctx->jobIDMask = nbJobs - 1;947mtctx->bufPool = ZSTDMT_createBufferPool(BUF_POOL_MAX_NB_BUFFERS(nbWorkers), cMem);948mtctx->cctxPool = ZSTDMT_createCCtxPool(nbWorkers, cMem);949mtctx->seqPool = ZSTDMT_createSeqPool(nbWorkers, cMem);950initError = ZSTDMT_serialState_init(&mtctx->serial);951mtctx->roundBuff = kNullRoundBuff;952if (!mtctx->factory | !mtctx->jobs | !mtctx->bufPool | !mtctx->cctxPool | !mtctx->seqPool | initError) {953ZSTDMT_freeCCtx(mtctx);954return NULL;955}956DEBUGLOG(3, "mt_cctx created, for %u threads", nbWorkers);957return mtctx;958}959960ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbWorkers, ZSTD_customMem cMem, ZSTD_threadPool* pool)961{962#ifdef ZSTD_MULTITHREAD963return ZSTDMT_createCCtx_advanced_internal(nbWorkers, cMem, pool);964#else965(void)nbWorkers;966(void)cMem;967(void)pool;968return NULL;969#endif970}971972973/* ZSTDMT_releaseAllJobResources() :974* note : ensure all workers are killed first ! */975static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx)976{977unsigned jobID;978DEBUGLOG(3, "ZSTDMT_releaseAllJobResources");979for (jobID=0; jobID <= mtctx->jobIDMask; jobID++) {980/* Copy the mutex/cond out */981ZSTD_pthread_mutex_t const mutex = mtctx->jobs[jobID].job_mutex;982ZSTD_pthread_cond_t const cond = mtctx->jobs[jobID].job_cond;983984DEBUGLOG(4, "job%02u: release dst address %08X", jobID, (U32)(size_t)mtctx->jobs[jobID].dstBuff.start);985ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].dstBuff);986987/* Clear the job description, but keep the mutex/cond */988ZSTD_memset(&mtctx->jobs[jobID], 0, sizeof(mtctx->jobs[jobID]));989mtctx->jobs[jobID].job_mutex = mutex;990mtctx->jobs[jobID].job_cond = cond;991}992mtctx->inBuff.buffer = g_nullBuffer;993mtctx->inBuff.filled = 0;994mtctx->allJobsCompleted = 1;995}996997static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* mtctx)998{999DEBUGLOG(4, "ZSTDMT_waitForAllJobsCompleted");1000while (mtctx->doneJobID < mtctx->nextJobID) {1001unsigned const jobID = mtctx->doneJobID & mtctx->jobIDMask;1002ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[jobID].job_mutex);1003while (mtctx->jobs[jobID].consumed < mtctx->jobs[jobID].src.size) {1004DEBUGLOG(4, "waiting for jobCompleted signal from job %u", mtctx->doneJobID); /* we want to block when waiting for data to flush */1005ZSTD_pthread_cond_wait(&mtctx->jobs[jobID].job_cond, &mtctx->jobs[jobID].job_mutex);1006}1007ZSTD_pthread_mutex_unlock(&mtctx->jobs[jobID].job_mutex);1008mtctx->doneJobID++;1009}1010}10111012size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx)1013{1014if (mtctx==NULL) return 0; /* compatible with free on NULL */1015if (!mtctx->providedFactory)1016POOL_free(mtctx->factory); /* stop and free worker threads */1017ZSTDMT_releaseAllJobResources(mtctx); /* release job resources into pools first */1018ZSTDMT_freeJobsTable(mtctx->jobs, mtctx->jobIDMask+1, mtctx->cMem);1019ZSTDMT_freeBufferPool(mtctx->bufPool);1020ZSTDMT_freeCCtxPool(mtctx->cctxPool);1021ZSTDMT_freeSeqPool(mtctx->seqPool);1022ZSTDMT_serialState_free(&mtctx->serial);1023ZSTD_freeCDict(mtctx->cdictLocal);1024if (mtctx->roundBuff.buffer)1025ZSTD_customFree(mtctx->roundBuff.buffer, mtctx->cMem);1026ZSTD_customFree(mtctx, mtctx->cMem);1027return 0;1028}10291030size_t ZSTDMT_sizeof_CCtx(ZSTDMT_CCtx* mtctx)1031{1032if (mtctx == NULL) return 0; /* supports sizeof NULL */1033return sizeof(*mtctx)1034+ POOL_sizeof(mtctx->factory)1035+ ZSTDMT_sizeof_bufferPool(mtctx->bufPool)1036+ (mtctx->jobIDMask+1) * sizeof(ZSTDMT_jobDescription)1037+ ZSTDMT_sizeof_CCtxPool(mtctx->cctxPool)1038+ ZSTDMT_sizeof_seqPool(mtctx->seqPool)1039+ ZSTD_sizeof_CDict(mtctx->cdictLocal)1040+ mtctx->roundBuff.capacity;1041}104210431044/* ZSTDMT_resize() :1045* @return : error code if fails, 0 on success */1046static size_t ZSTDMT_resize(ZSTDMT_CCtx* mtctx, unsigned nbWorkers)1047{1048if (POOL_resize(mtctx->factory, nbWorkers)) return ERROR(memory_allocation);1049FORWARD_IF_ERROR( ZSTDMT_expandJobsTable(mtctx, nbWorkers) , "");1050mtctx->bufPool = ZSTDMT_expandBufferPool(mtctx->bufPool, BUF_POOL_MAX_NB_BUFFERS(nbWorkers));1051if (mtctx->bufPool == NULL) return ERROR(memory_allocation);1052mtctx->cctxPool = ZSTDMT_expandCCtxPool(mtctx->cctxPool, nbWorkers);1053if (mtctx->cctxPool == NULL) return ERROR(memory_allocation);1054mtctx->seqPool = ZSTDMT_expandSeqPool(mtctx->seqPool, nbWorkers);1055if (mtctx->seqPool == NULL) return ERROR(memory_allocation);1056ZSTDMT_CCtxParam_setNbWorkers(&mtctx->params, nbWorkers);1057return 0;1058}105910601061/*! ZSTDMT_updateCParams_whileCompressing() :1062* Updates a selected set of compression parameters, remaining compatible with currently active frame.1063* New parameters will be applied to next compression job. */1064void ZSTDMT_updateCParams_whileCompressing(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_params* cctxParams)1065{1066U32 const saved_wlog = mtctx->params.cParams.windowLog; /* Do not modify windowLog while compressing */1067int const compressionLevel = cctxParams->compressionLevel;1068DEBUGLOG(5, "ZSTDMT_updateCParams_whileCompressing (level:%i)",1069compressionLevel);1070mtctx->params.compressionLevel = compressionLevel;1071{ ZSTD_compressionParameters cParams = ZSTD_getCParamsFromCCtxParams(cctxParams, ZSTD_CONTENTSIZE_UNKNOWN, 0, ZSTD_cpm_noAttachDict);1072cParams.windowLog = saved_wlog;1073mtctx->params.cParams = cParams;1074}1075}10761077/* ZSTDMT_getFrameProgression():1078* tells how much data has been consumed (input) and produced (output) for current frame.1079* able to count progression inside worker threads.1080* Note : mutex will be acquired during statistics collection inside workers. */1081ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx)1082{1083ZSTD_frameProgression fps;1084DEBUGLOG(5, "ZSTDMT_getFrameProgression");1085fps.ingested = mtctx->consumed + mtctx->inBuff.filled;1086fps.consumed = mtctx->consumed;1087fps.produced = fps.flushed = mtctx->produced;1088fps.currentJobID = mtctx->nextJobID;1089fps.nbActiveWorkers = 0;1090{ unsigned jobNb;1091unsigned lastJobNb = mtctx->nextJobID + mtctx->jobReady; assert(mtctx->jobReady <= 1);1092DEBUGLOG(6, "ZSTDMT_getFrameProgression: jobs: from %u to <%u (jobReady:%u)",1093mtctx->doneJobID, lastJobNb, mtctx->jobReady)1094for (jobNb = mtctx->doneJobID ; jobNb < lastJobNb ; jobNb++) {1095unsigned const wJobID = jobNb & mtctx->jobIDMask;1096ZSTDMT_jobDescription* jobPtr = &mtctx->jobs[wJobID];1097ZSTD_pthread_mutex_lock(&jobPtr->job_mutex);1098{ size_t const cResult = jobPtr->cSize;1099size_t const produced = ZSTD_isError(cResult) ? 0 : cResult;1100size_t const flushed = ZSTD_isError(cResult) ? 0 : jobPtr->dstFlushed;1101assert(flushed <= produced);1102fps.ingested += jobPtr->src.size;1103fps.consumed += jobPtr->consumed;1104fps.produced += produced;1105fps.flushed += flushed;1106fps.nbActiveWorkers += (jobPtr->consumed < jobPtr->src.size);1107}1108ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex);1109}1110}1111return fps;1112}111311141115size_t ZSTDMT_toFlushNow(ZSTDMT_CCtx* mtctx)1116{1117size_t toFlush;1118unsigned const jobID = mtctx->doneJobID;1119assert(jobID <= mtctx->nextJobID);1120if (jobID == mtctx->nextJobID) return 0; /* no active job => nothing to flush */11211122/* look into oldest non-fully-flushed job */1123{ unsigned const wJobID = jobID & mtctx->jobIDMask;1124ZSTDMT_jobDescription* const jobPtr = &mtctx->jobs[wJobID];1125ZSTD_pthread_mutex_lock(&jobPtr->job_mutex);1126{ size_t const cResult = jobPtr->cSize;1127size_t const produced = ZSTD_isError(cResult) ? 0 : cResult;1128size_t const flushed = ZSTD_isError(cResult) ? 0 : jobPtr->dstFlushed;1129assert(flushed <= produced);1130assert(jobPtr->consumed <= jobPtr->src.size);1131toFlush = produced - flushed;1132/* if toFlush==0, nothing is available to flush.1133* However, jobID is expected to still be active:1134* if jobID was already completed and fully flushed,1135* ZSTDMT_flushProduced() should have already moved onto next job.1136* Therefore, some input has not yet been consumed. */1137if (toFlush==0) {1138assert(jobPtr->consumed < jobPtr->src.size);1139}1140}1141ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex);1142}11431144return toFlush;1145}114611471148/* ------------------------------------------ */1149/* ===== Multi-threaded compression ===== */1150/* ------------------------------------------ */11511152static unsigned ZSTDMT_computeTargetJobLog(const ZSTD_CCtx_params* params)1153{1154unsigned jobLog;1155if (params->ldmParams.enableLdm == ZSTD_ps_enable) {1156/* In Long Range Mode, the windowLog is typically oversized.1157* In which case, it's preferable to determine the jobSize1158* based on cycleLog instead. */1159jobLog = MAX(21, ZSTD_cycleLog(params->cParams.chainLog, params->cParams.strategy) + 3);1160} else {1161jobLog = MAX(20, params->cParams.windowLog + 2);1162}1163return MIN(jobLog, (unsigned)ZSTDMT_JOBLOG_MAX);1164}11651166static int ZSTDMT_overlapLog_default(ZSTD_strategy strat)1167{1168switch(strat)1169{1170case ZSTD_btultra2:1171return 9;1172case ZSTD_btultra:1173case ZSTD_btopt:1174return 8;1175case ZSTD_btlazy2:1176case ZSTD_lazy2:1177return 7;1178case ZSTD_lazy:1179case ZSTD_greedy:1180case ZSTD_dfast:1181case ZSTD_fast:1182default:;1183}1184return 6;1185}11861187static int ZSTDMT_overlapLog(int ovlog, ZSTD_strategy strat)1188{1189assert(0 <= ovlog && ovlog <= 9);1190if (ovlog == 0) return ZSTDMT_overlapLog_default(strat);1191return ovlog;1192}11931194static size_t ZSTDMT_computeOverlapSize(const ZSTD_CCtx_params* params)1195{1196int const overlapRLog = 9 - ZSTDMT_overlapLog(params->overlapLog, params->cParams.strategy);1197int ovLog = (overlapRLog >= 8) ? 0 : (params->cParams.windowLog - overlapRLog);1198assert(0 <= overlapRLog && overlapRLog <= 8);1199if (params->ldmParams.enableLdm == ZSTD_ps_enable) {1200/* In Long Range Mode, the windowLog is typically oversized.1201* In which case, it's preferable to determine the jobSize1202* based on chainLog instead.1203* Then, ovLog becomes a fraction of the jobSize, rather than windowSize */1204ovLog = MIN(params->cParams.windowLog, ZSTDMT_computeTargetJobLog(params) - 2)1205- overlapRLog;1206}1207assert(0 <= ovLog && ovLog <= ZSTD_WINDOWLOG_MAX);1208DEBUGLOG(4, "overlapLog : %i", params->overlapLog);1209DEBUGLOG(4, "overlap size : %i", 1 << ovLog);1210return (ovLog==0) ? 0 : (size_t)1 << ovLog;1211}12121213/* ====================================== */1214/* ======= Streaming API ======= */1215/* ====================================== */12161217size_t ZSTDMT_initCStream_internal(1218ZSTDMT_CCtx* mtctx,1219const void* dict, size_t dictSize, ZSTD_dictContentType_e dictContentType,1220const ZSTD_CDict* cdict, ZSTD_CCtx_params params,1221unsigned long long pledgedSrcSize)1222{1223DEBUGLOG(4, "ZSTDMT_initCStream_internal (pledgedSrcSize=%u, nbWorkers=%u, cctxPool=%u)",1224(U32)pledgedSrcSize, params.nbWorkers, mtctx->cctxPool->totalCCtx);12251226/* params supposed partially fully validated at this point */1227assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams)));1228assert(!((dict) && (cdict))); /* either dict or cdict, not both */12291230/* init */1231if (params.nbWorkers != mtctx->params.nbWorkers)1232FORWARD_IF_ERROR( ZSTDMT_resize(mtctx, params.nbWorkers) , "");12331234if (params.jobSize != 0 && params.jobSize < ZSTDMT_JOBSIZE_MIN) params.jobSize = ZSTDMT_JOBSIZE_MIN;1235if (params.jobSize > (size_t)ZSTDMT_JOBSIZE_MAX) params.jobSize = (size_t)ZSTDMT_JOBSIZE_MAX;12361237DEBUGLOG(4, "ZSTDMT_initCStream_internal: %u workers", params.nbWorkers);12381239if (mtctx->allJobsCompleted == 0) { /* previous compression not correctly finished */1240ZSTDMT_waitForAllJobsCompleted(mtctx);1241ZSTDMT_releaseAllJobResources(mtctx);1242mtctx->allJobsCompleted = 1;1243}12441245mtctx->params = params;1246mtctx->frameContentSize = pledgedSrcSize;1247if (dict) {1248ZSTD_freeCDict(mtctx->cdictLocal);1249mtctx->cdictLocal = ZSTD_createCDict_advanced(dict, dictSize,1250ZSTD_dlm_byCopy, dictContentType, /* note : a loadPrefix becomes an internal CDict */1251params.cParams, mtctx->cMem);1252mtctx->cdict = mtctx->cdictLocal;1253if (mtctx->cdictLocal == NULL) return ERROR(memory_allocation);1254} else {1255ZSTD_freeCDict(mtctx->cdictLocal);1256mtctx->cdictLocal = NULL;1257mtctx->cdict = cdict;1258}12591260mtctx->targetPrefixSize = ZSTDMT_computeOverlapSize(¶ms);1261DEBUGLOG(4, "overlapLog=%i => %u KB", params.overlapLog, (U32)(mtctx->targetPrefixSize>>10));1262mtctx->targetSectionSize = params.jobSize;1263if (mtctx->targetSectionSize == 0) {1264mtctx->targetSectionSize = 1ULL << ZSTDMT_computeTargetJobLog(¶ms);1265}1266assert(mtctx->targetSectionSize <= (size_t)ZSTDMT_JOBSIZE_MAX);12671268if (params.rsyncable) {1269/* Aim for the targetsectionSize as the average job size. */1270U32 const jobSizeKB = (U32)(mtctx->targetSectionSize >> 10);1271U32 const rsyncBits = (assert(jobSizeKB >= 1), ZSTD_highbit32(jobSizeKB) + 10);1272/* We refuse to create jobs < RSYNC_MIN_BLOCK_SIZE bytes, so make sure our1273* expected job size is at least 4x larger. */1274assert(rsyncBits >= RSYNC_MIN_BLOCK_LOG + 2);1275DEBUGLOG(4, "rsyncLog = %u", rsyncBits);1276mtctx->rsync.hash = 0;1277mtctx->rsync.hitMask = (1ULL << rsyncBits) - 1;1278mtctx->rsync.primePower = ZSTD_rollingHash_primePower(RSYNC_LENGTH);1279}1280if (mtctx->targetSectionSize < mtctx->targetPrefixSize) mtctx->targetSectionSize = mtctx->targetPrefixSize; /* job size must be >= overlap size */1281DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32)(mtctx->targetSectionSize>>10), (U32)params.jobSize);1282DEBUGLOG(4, "inBuff Size : %u KB", (U32)(mtctx->targetSectionSize>>10));1283ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(mtctx->targetSectionSize));1284{1285/* If ldm is enabled we need windowSize space. */1286size_t const windowSize = mtctx->params.ldmParams.enableLdm == ZSTD_ps_enable ? (1U << mtctx->params.cParams.windowLog) : 0;1287/* Two buffers of slack, plus extra space for the overlap1288* This is the minimum slack that LDM works with. One extra because1289* flush might waste up to targetSectionSize-1 bytes. Another extra1290* for the overlap (if > 0), then one to fill which doesn't overlap1291* with the LDM window.1292*/1293size_t const nbSlackBuffers = 2 + (mtctx->targetPrefixSize > 0);1294size_t const slackSize = mtctx->targetSectionSize * nbSlackBuffers;1295/* Compute the total size, and always have enough slack */1296size_t const nbWorkers = MAX(mtctx->params.nbWorkers, 1);1297size_t const sectionsSize = mtctx->targetSectionSize * nbWorkers;1298size_t const capacity = MAX(windowSize, sectionsSize) + slackSize;1299if (mtctx->roundBuff.capacity < capacity) {1300if (mtctx->roundBuff.buffer)1301ZSTD_customFree(mtctx->roundBuff.buffer, mtctx->cMem);1302mtctx->roundBuff.buffer = (BYTE*)ZSTD_customMalloc(capacity, mtctx->cMem);1303if (mtctx->roundBuff.buffer == NULL) {1304mtctx->roundBuff.capacity = 0;1305return ERROR(memory_allocation);1306}1307mtctx->roundBuff.capacity = capacity;1308}1309}1310DEBUGLOG(4, "roundBuff capacity : %u KB", (U32)(mtctx->roundBuff.capacity>>10));1311mtctx->roundBuff.pos = 0;1312mtctx->inBuff.buffer = g_nullBuffer;1313mtctx->inBuff.filled = 0;1314mtctx->inBuff.prefix = kNullRange;1315mtctx->doneJobID = 0;1316mtctx->nextJobID = 0;1317mtctx->frameEnded = 0;1318mtctx->allJobsCompleted = 0;1319mtctx->consumed = 0;1320mtctx->produced = 0;1321if (ZSTDMT_serialState_reset(&mtctx->serial, mtctx->seqPool, params, mtctx->targetSectionSize,1322dict, dictSize, dictContentType))1323return ERROR(memory_allocation);1324return 0;1325}132613271328/* ZSTDMT_writeLastEmptyBlock()1329* Write a single empty block with an end-of-frame to finish a frame.1330* Job must be created from streaming variant.1331* This function is always successful if expected conditions are fulfilled.1332*/1333static void ZSTDMT_writeLastEmptyBlock(ZSTDMT_jobDescription* job)1334{1335assert(job->lastJob == 1);1336assert(job->src.size == 0); /* last job is empty -> will be simplified into a last empty block */1337assert(job->firstJob == 0); /* cannot be first job, as it also needs to create frame header */1338assert(job->dstBuff.start == NULL); /* invoked from streaming variant only (otherwise, dstBuff might be user's output) */1339job->dstBuff = ZSTDMT_getBuffer(job->bufPool);1340if (job->dstBuff.start == NULL) {1341job->cSize = ERROR(memory_allocation);1342return;1343}1344assert(job->dstBuff.capacity >= ZSTD_blockHeaderSize); /* no buffer should ever be that small */1345job->src = kNullRange;1346job->cSize = ZSTD_writeLastEmptyBlock(job->dstBuff.start, job->dstBuff.capacity);1347assert(!ZSTD_isError(job->cSize));1348assert(job->consumed == 0);1349}13501351static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZSTD_EndDirective endOp)1352{1353unsigned const jobID = mtctx->nextJobID & mtctx->jobIDMask;1354int const endFrame = (endOp == ZSTD_e_end);13551356if (mtctx->nextJobID > mtctx->doneJobID + mtctx->jobIDMask) {1357DEBUGLOG(5, "ZSTDMT_createCompressionJob: will not create new job : table is full");1358assert((mtctx->nextJobID & mtctx->jobIDMask) == (mtctx->doneJobID & mtctx->jobIDMask));1359return 0;1360}13611362if (!mtctx->jobReady) {1363BYTE const* src = (BYTE const*)mtctx->inBuff.buffer.start;1364DEBUGLOG(5, "ZSTDMT_createCompressionJob: preparing job %u to compress %u bytes with %u preload ",1365mtctx->nextJobID, (U32)srcSize, (U32)mtctx->inBuff.prefix.size);1366mtctx->jobs[jobID].src.start = src;1367mtctx->jobs[jobID].src.size = srcSize;1368assert(mtctx->inBuff.filled >= srcSize);1369mtctx->jobs[jobID].prefix = mtctx->inBuff.prefix;1370mtctx->jobs[jobID].consumed = 0;1371mtctx->jobs[jobID].cSize = 0;1372mtctx->jobs[jobID].params = mtctx->params;1373mtctx->jobs[jobID].cdict = mtctx->nextJobID==0 ? mtctx->cdict : NULL;1374mtctx->jobs[jobID].fullFrameSize = mtctx->frameContentSize;1375mtctx->jobs[jobID].dstBuff = g_nullBuffer;1376mtctx->jobs[jobID].cctxPool = mtctx->cctxPool;1377mtctx->jobs[jobID].bufPool = mtctx->bufPool;1378mtctx->jobs[jobID].seqPool = mtctx->seqPool;1379mtctx->jobs[jobID].serial = &mtctx->serial;1380mtctx->jobs[jobID].jobID = mtctx->nextJobID;1381mtctx->jobs[jobID].firstJob = (mtctx->nextJobID==0);1382mtctx->jobs[jobID].lastJob = endFrame;1383mtctx->jobs[jobID].frameChecksumNeeded = mtctx->params.fParams.checksumFlag && endFrame && (mtctx->nextJobID>0);1384mtctx->jobs[jobID].dstFlushed = 0;13851386/* Update the round buffer pos and clear the input buffer to be reset */1387mtctx->roundBuff.pos += srcSize;1388mtctx->inBuff.buffer = g_nullBuffer;1389mtctx->inBuff.filled = 0;1390/* Set the prefix */1391if (!endFrame) {1392size_t const newPrefixSize = MIN(srcSize, mtctx->targetPrefixSize);1393mtctx->inBuff.prefix.start = src + srcSize - newPrefixSize;1394mtctx->inBuff.prefix.size = newPrefixSize;1395} else { /* endFrame==1 => no need for another input buffer */1396mtctx->inBuff.prefix = kNullRange;1397mtctx->frameEnded = endFrame;1398if (mtctx->nextJobID == 0) {1399/* single job exception : checksum is already calculated directly within worker thread */1400mtctx->params.fParams.checksumFlag = 0;1401} }14021403if ( (srcSize == 0)1404&& (mtctx->nextJobID>0)/*single job must also write frame header*/ ) {1405DEBUGLOG(5, "ZSTDMT_createCompressionJob: creating a last empty block to end frame");1406assert(endOp == ZSTD_e_end); /* only possible case : need to end the frame with an empty last block */1407ZSTDMT_writeLastEmptyBlock(mtctx->jobs + jobID);1408mtctx->nextJobID++;1409return 0;1410}1411}14121413DEBUGLOG(5, "ZSTDMT_createCompressionJob: posting job %u : %u bytes (end:%u, jobNb == %u (mod:%u))",1414mtctx->nextJobID,1415(U32)mtctx->jobs[jobID].src.size,1416mtctx->jobs[jobID].lastJob,1417mtctx->nextJobID,1418jobID);1419if (POOL_tryAdd(mtctx->factory, ZSTDMT_compressionJob, &mtctx->jobs[jobID])) {1420mtctx->nextJobID++;1421mtctx->jobReady = 0;1422} else {1423DEBUGLOG(5, "ZSTDMT_createCompressionJob: no worker available for job %u", mtctx->nextJobID);1424mtctx->jobReady = 1;1425}1426return 0;1427}142814291430/*! ZSTDMT_flushProduced() :1431* flush whatever data has been produced but not yet flushed in current job.1432* move to next job if current one is fully flushed.1433* `output` : `pos` will be updated with amount of data flushed .1434* `blockToFlush` : if >0, the function will block and wait if there is no data available to flush .1435* @return : amount of data remaining within internal buffer, 0 if no more, 1 if unknown but > 0, or an error code */1436static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, unsigned blockToFlush, ZSTD_EndDirective end)1437{1438unsigned const wJobID = mtctx->doneJobID & mtctx->jobIDMask;1439DEBUGLOG(5, "ZSTDMT_flushProduced (blocking:%u , job %u <= %u)",1440blockToFlush, mtctx->doneJobID, mtctx->nextJobID);1441assert(output->size >= output->pos);14421443ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[wJobID].job_mutex);1444if ( blockToFlush1445&& (mtctx->doneJobID < mtctx->nextJobID) ) {1446assert(mtctx->jobs[wJobID].dstFlushed <= mtctx->jobs[wJobID].cSize);1447while (mtctx->jobs[wJobID].dstFlushed == mtctx->jobs[wJobID].cSize) { /* nothing to flush */1448if (mtctx->jobs[wJobID].consumed == mtctx->jobs[wJobID].src.size) {1449DEBUGLOG(5, "job %u is completely consumed (%u == %u) => don't wait for cond, there will be none",1450mtctx->doneJobID, (U32)mtctx->jobs[wJobID].consumed, (U32)mtctx->jobs[wJobID].src.size);1451break;1452}1453DEBUGLOG(5, "waiting for something to flush from job %u (currently flushed: %u bytes)",1454mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed);1455ZSTD_pthread_cond_wait(&mtctx->jobs[wJobID].job_cond, &mtctx->jobs[wJobID].job_mutex); /* block when nothing to flush but some to come */1456} }14571458/* try to flush something */1459{ size_t cSize = mtctx->jobs[wJobID].cSize; /* shared */1460size_t const srcConsumed = mtctx->jobs[wJobID].consumed; /* shared */1461size_t const srcSize = mtctx->jobs[wJobID].src.size; /* read-only, could be done after mutex lock, but no-declaration-after-statement */1462ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex);1463if (ZSTD_isError(cSize)) {1464DEBUGLOG(5, "ZSTDMT_flushProduced: job %u : compression error detected : %s",1465mtctx->doneJobID, ZSTD_getErrorName(cSize));1466ZSTDMT_waitForAllJobsCompleted(mtctx);1467ZSTDMT_releaseAllJobResources(mtctx);1468return cSize;1469}1470/* add frame checksum if necessary (can only happen once) */1471assert(srcConsumed <= srcSize);1472if ( (srcConsumed == srcSize) /* job completed -> worker no longer active */1473&& mtctx->jobs[wJobID].frameChecksumNeeded ) {1474U32 const checksum = (U32)XXH64_digest(&mtctx->serial.xxhState);1475DEBUGLOG(4, "ZSTDMT_flushProduced: writing checksum : %08X \n", checksum);1476MEM_writeLE32((char*)mtctx->jobs[wJobID].dstBuff.start + mtctx->jobs[wJobID].cSize, checksum);1477cSize += 4;1478mtctx->jobs[wJobID].cSize += 4; /* can write this shared value, as worker is no longer active */1479mtctx->jobs[wJobID].frameChecksumNeeded = 0;1480}14811482if (cSize > 0) { /* compression is ongoing or completed */1483size_t const toFlush = MIN(cSize - mtctx->jobs[wJobID].dstFlushed, output->size - output->pos);1484DEBUGLOG(5, "ZSTDMT_flushProduced: Flushing %u bytes from job %u (completion:%u/%u, generated:%u)",1485(U32)toFlush, mtctx->doneJobID, (U32)srcConsumed, (U32)srcSize, (U32)cSize);1486assert(mtctx->doneJobID < mtctx->nextJobID);1487assert(cSize >= mtctx->jobs[wJobID].dstFlushed);1488assert(mtctx->jobs[wJobID].dstBuff.start != NULL);1489if (toFlush > 0) {1490ZSTD_memcpy((char*)output->dst + output->pos,1491(const char*)mtctx->jobs[wJobID].dstBuff.start + mtctx->jobs[wJobID].dstFlushed,1492toFlush);1493}1494output->pos += toFlush;1495mtctx->jobs[wJobID].dstFlushed += toFlush; /* can write : this value is only used by mtctx */14961497if ( (srcConsumed == srcSize) /* job is completed */1498&& (mtctx->jobs[wJobID].dstFlushed == cSize) ) { /* output buffer fully flushed => free this job position */1499DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one",1500mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed);1501ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[wJobID].dstBuff);1502DEBUGLOG(5, "dstBuffer released");1503mtctx->jobs[wJobID].dstBuff = g_nullBuffer;1504mtctx->jobs[wJobID].cSize = 0; /* ensure this job slot is considered "not started" in future check */1505mtctx->consumed += srcSize;1506mtctx->produced += cSize;1507mtctx->doneJobID++;1508} }15091510/* return value : how many bytes left in buffer ; fake it to 1 when unknown but >0 */1511if (cSize > mtctx->jobs[wJobID].dstFlushed) return (cSize - mtctx->jobs[wJobID].dstFlushed);1512if (srcSize > srcConsumed) return 1; /* current job not completely compressed */1513}1514if (mtctx->doneJobID < mtctx->nextJobID) return 1; /* some more jobs ongoing */1515if (mtctx->jobReady) return 1; /* one job is ready to push, just not yet in the list */1516if (mtctx->inBuff.filled > 0) return 1; /* input is not empty, and still needs to be converted into a job */1517mtctx->allJobsCompleted = mtctx->frameEnded; /* all jobs are entirely flushed => if this one is last one, frame is completed */1518if (end == ZSTD_e_end) return !mtctx->frameEnded; /* for ZSTD_e_end, question becomes : is frame completed ? instead of : are internal buffers fully flushed ? */1519return 0; /* internal buffers fully flushed */1520}15211522/**1523* Returns the range of data used by the earliest job that is not yet complete.1524* If the data of the first job is broken up into two segments, we cover both1525* sections.1526*/1527static range_t ZSTDMT_getInputDataInUse(ZSTDMT_CCtx* mtctx)1528{1529unsigned const firstJobID = mtctx->doneJobID;1530unsigned const lastJobID = mtctx->nextJobID;1531unsigned jobID;15321533for (jobID = firstJobID; jobID < lastJobID; ++jobID) {1534unsigned const wJobID = jobID & mtctx->jobIDMask;1535size_t consumed;15361537ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[wJobID].job_mutex);1538consumed = mtctx->jobs[wJobID].consumed;1539ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex);15401541if (consumed < mtctx->jobs[wJobID].src.size) {1542range_t range = mtctx->jobs[wJobID].prefix;1543if (range.size == 0) {1544/* Empty prefix */1545range = mtctx->jobs[wJobID].src;1546}1547/* Job source in multiple segments not supported yet */1548assert(range.start <= mtctx->jobs[wJobID].src.start);1549return range;1550}1551}1552return kNullRange;1553}15541555/**1556* Returns non-zero iff buffer and range overlap.1557*/1558static int ZSTDMT_isOverlapped(buffer_t buffer, range_t range)1559{1560BYTE const* const bufferStart = (BYTE const*)buffer.start;1561BYTE const* const rangeStart = (BYTE const*)range.start;15621563if (rangeStart == NULL || bufferStart == NULL)1564return 0;15651566{1567BYTE const* const bufferEnd = bufferStart + buffer.capacity;1568BYTE const* const rangeEnd = rangeStart + range.size;15691570/* Empty ranges cannot overlap */1571if (bufferStart == bufferEnd || rangeStart == rangeEnd)1572return 0;15731574return bufferStart < rangeEnd && rangeStart < bufferEnd;1575}1576}15771578static int ZSTDMT_doesOverlapWindow(buffer_t buffer, ZSTD_window_t window)1579{1580range_t extDict;1581range_t prefix;15821583DEBUGLOG(5, "ZSTDMT_doesOverlapWindow");1584extDict.start = window.dictBase + window.lowLimit;1585extDict.size = window.dictLimit - window.lowLimit;15861587prefix.start = window.base + window.dictLimit;1588prefix.size = window.nextSrc - (window.base + window.dictLimit);1589DEBUGLOG(5, "extDict [0x%zx, 0x%zx)",1590(size_t)extDict.start,1591(size_t)extDict.start + extDict.size);1592DEBUGLOG(5, "prefix [0x%zx, 0x%zx)",1593(size_t)prefix.start,1594(size_t)prefix.start + prefix.size);15951596return ZSTDMT_isOverlapped(buffer, extDict)1597|| ZSTDMT_isOverlapped(buffer, prefix);1598}15991600static void ZSTDMT_waitForLdmComplete(ZSTDMT_CCtx* mtctx, buffer_t buffer)1601{1602if (mtctx->params.ldmParams.enableLdm == ZSTD_ps_enable) {1603ZSTD_pthread_mutex_t* mutex = &mtctx->serial.ldmWindowMutex;1604DEBUGLOG(5, "ZSTDMT_waitForLdmComplete");1605DEBUGLOG(5, "source [0x%zx, 0x%zx)",1606(size_t)buffer.start,1607(size_t)buffer.start + buffer.capacity);1608ZSTD_PTHREAD_MUTEX_LOCK(mutex);1609while (ZSTDMT_doesOverlapWindow(buffer, mtctx->serial.ldmWindow)) {1610DEBUGLOG(5, "Waiting for LDM to finish...");1611ZSTD_pthread_cond_wait(&mtctx->serial.ldmWindowCond, mutex);1612}1613DEBUGLOG(6, "Done waiting for LDM to finish");1614ZSTD_pthread_mutex_unlock(mutex);1615}1616}16171618/**1619* Attempts to set the inBuff to the next section to fill.1620* If any part of the new section is still in use we give up.1621* Returns non-zero if the buffer is filled.1622*/1623static int ZSTDMT_tryGetInputRange(ZSTDMT_CCtx* mtctx)1624{1625range_t const inUse = ZSTDMT_getInputDataInUse(mtctx);1626size_t const spaceLeft = mtctx->roundBuff.capacity - mtctx->roundBuff.pos;1627size_t const target = mtctx->targetSectionSize;1628buffer_t buffer;16291630DEBUGLOG(5, "ZSTDMT_tryGetInputRange");1631assert(mtctx->inBuff.buffer.start == NULL);1632assert(mtctx->roundBuff.capacity >= target);16331634if (spaceLeft < target) {1635/* ZSTD_invalidateRepCodes() doesn't work for extDict variants.1636* Simply copy the prefix to the beginning in that case.1637*/1638BYTE* const start = (BYTE*)mtctx->roundBuff.buffer;1639size_t const prefixSize = mtctx->inBuff.prefix.size;16401641buffer.start = start;1642buffer.capacity = prefixSize;1643if (ZSTDMT_isOverlapped(buffer, inUse)) {1644DEBUGLOG(5, "Waiting for buffer...");1645return 0;1646}1647ZSTDMT_waitForLdmComplete(mtctx, buffer);1648ZSTD_memmove(start, mtctx->inBuff.prefix.start, prefixSize);1649mtctx->inBuff.prefix.start = start;1650mtctx->roundBuff.pos = prefixSize;1651}1652buffer.start = mtctx->roundBuff.buffer + mtctx->roundBuff.pos;1653buffer.capacity = target;16541655if (ZSTDMT_isOverlapped(buffer, inUse)) {1656DEBUGLOG(5, "Waiting for buffer...");1657return 0;1658}1659assert(!ZSTDMT_isOverlapped(buffer, mtctx->inBuff.prefix));16601661ZSTDMT_waitForLdmComplete(mtctx, buffer);16621663DEBUGLOG(5, "Using prefix range [%zx, %zx)",1664(size_t)mtctx->inBuff.prefix.start,1665(size_t)mtctx->inBuff.prefix.start + mtctx->inBuff.prefix.size);1666DEBUGLOG(5, "Using source range [%zx, %zx)",1667(size_t)buffer.start,1668(size_t)buffer.start + buffer.capacity);166916701671mtctx->inBuff.buffer = buffer;1672mtctx->inBuff.filled = 0;1673assert(mtctx->roundBuff.pos + buffer.capacity <= mtctx->roundBuff.capacity);1674return 1;1675}16761677typedef struct {1678size_t toLoad; /* The number of bytes to load from the input. */1679int flush; /* Boolean declaring if we must flush because we found a synchronization point. */1680} syncPoint_t;16811682/**1683* Searches through the input for a synchronization point. If one is found, we1684* will instruct the caller to flush, and return the number of bytes to load.1685* Otherwise, we will load as many bytes as possible and instruct the caller1686* to continue as normal.1687*/1688static syncPoint_t1689findSynchronizationPoint(ZSTDMT_CCtx const* mtctx, ZSTD_inBuffer const input)1690{1691BYTE const* const istart = (BYTE const*)input.src + input.pos;1692U64 const primePower = mtctx->rsync.primePower;1693U64 const hitMask = mtctx->rsync.hitMask;16941695syncPoint_t syncPoint;1696U64 hash;1697BYTE const* prev;1698size_t pos;16991700syncPoint.toLoad = MIN(input.size - input.pos, mtctx->targetSectionSize - mtctx->inBuff.filled);1701syncPoint.flush = 0;1702if (!mtctx->params.rsyncable)1703/* Rsync is disabled. */1704return syncPoint;1705if (mtctx->inBuff.filled + input.size - input.pos < RSYNC_MIN_BLOCK_SIZE)1706/* We don't emit synchronization points if it would produce too small blocks.1707* We don't have enough input to find a synchronization point, so don't look.1708*/1709return syncPoint;1710if (mtctx->inBuff.filled + syncPoint.toLoad < RSYNC_LENGTH)1711/* Not enough to compute the hash.1712* We will miss any synchronization points in this RSYNC_LENGTH byte1713* window. However, since it depends only in the internal buffers, if the1714* state is already synchronized, we will remain synchronized.1715* Additionally, the probability that we miss a synchronization point is1716* low: RSYNC_LENGTH / targetSectionSize.1717*/1718return syncPoint;1719/* Initialize the loop variables. */1720if (mtctx->inBuff.filled < RSYNC_MIN_BLOCK_SIZE) {1721/* We don't need to scan the first RSYNC_MIN_BLOCK_SIZE positions1722* because they can't possibly be a sync point. So we can start1723* part way through the input buffer.1724*/1725pos = RSYNC_MIN_BLOCK_SIZE - mtctx->inBuff.filled;1726if (pos >= RSYNC_LENGTH) {1727prev = istart + pos - RSYNC_LENGTH;1728hash = ZSTD_rollingHash_compute(prev, RSYNC_LENGTH);1729} else {1730assert(mtctx->inBuff.filled >= RSYNC_LENGTH);1731prev = (BYTE const*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled - RSYNC_LENGTH;1732hash = ZSTD_rollingHash_compute(prev + pos, (RSYNC_LENGTH - pos));1733hash = ZSTD_rollingHash_append(hash, istart, pos);1734}1735} else {1736/* We have enough bytes buffered to initialize the hash,1737* and have processed enough bytes to find a sync point.1738* Start scanning at the beginning of the input.1739*/1740assert(mtctx->inBuff.filled >= RSYNC_MIN_BLOCK_SIZE);1741assert(RSYNC_MIN_BLOCK_SIZE >= RSYNC_LENGTH);1742pos = 0;1743prev = (BYTE const*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled - RSYNC_LENGTH;1744hash = ZSTD_rollingHash_compute(prev, RSYNC_LENGTH);1745if ((hash & hitMask) == hitMask) {1746/* We're already at a sync point so don't load any more until1747* we're able to flush this sync point.1748* This likely happened because the job table was full so we1749* couldn't add our job.1750*/1751syncPoint.toLoad = 0;1752syncPoint.flush = 1;1753return syncPoint;1754}1755}1756/* Starting with the hash of the previous RSYNC_LENGTH bytes, roll1757* through the input. If we hit a synchronization point, then cut the1758* job off, and tell the compressor to flush the job. Otherwise, load1759* all the bytes and continue as normal.1760* If we go too long without a synchronization point (targetSectionSize)1761* then a block will be emitted anyways, but this is okay, since if we1762* are already synchronized we will remain synchronized.1763*/1764assert(pos < RSYNC_LENGTH || ZSTD_rollingHash_compute(istart + pos - RSYNC_LENGTH, RSYNC_LENGTH) == hash);1765for (; pos < syncPoint.toLoad; ++pos) {1766BYTE const toRemove = pos < RSYNC_LENGTH ? prev[pos] : istart[pos - RSYNC_LENGTH];1767/* This assert is very expensive, and Debian compiles with asserts enabled.1768* So disable it for now. We can get similar coverage by checking it at the1769* beginning & end of the loop.1770* assert(pos < RSYNC_LENGTH || ZSTD_rollingHash_compute(istart + pos - RSYNC_LENGTH, RSYNC_LENGTH) == hash);1771*/1772hash = ZSTD_rollingHash_rotate(hash, toRemove, istart[pos], primePower);1773assert(mtctx->inBuff.filled + pos >= RSYNC_MIN_BLOCK_SIZE);1774if ((hash & hitMask) == hitMask) {1775syncPoint.toLoad = pos + 1;1776syncPoint.flush = 1;1777++pos; /* for assert */1778break;1779}1780}1781assert(pos < RSYNC_LENGTH || ZSTD_rollingHash_compute(istart + pos - RSYNC_LENGTH, RSYNC_LENGTH) == hash);1782return syncPoint;1783}17841785size_t ZSTDMT_nextInputSizeHint(const ZSTDMT_CCtx* mtctx)1786{1787size_t hintInSize = mtctx->targetSectionSize - mtctx->inBuff.filled;1788if (hintInSize==0) hintInSize = mtctx->targetSectionSize;1789return hintInSize;1790}17911792/** ZSTDMT_compressStream_generic() :1793* internal use only - exposed to be invoked from zstd_compress.c1794* assumption : output and input are valid (pos <= size)1795* @return : minimum amount of data remaining to flush, 0 if none */1796size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,1797ZSTD_outBuffer* output,1798ZSTD_inBuffer* input,1799ZSTD_EndDirective endOp)1800{1801unsigned forwardInputProgress = 0;1802DEBUGLOG(5, "ZSTDMT_compressStream_generic (endOp=%u, srcSize=%u)",1803(U32)endOp, (U32)(input->size - input->pos));1804assert(output->pos <= output->size);1805assert(input->pos <= input->size);18061807if ((mtctx->frameEnded) && (endOp==ZSTD_e_continue)) {1808/* current frame being ended. Only flush/end are allowed */1809return ERROR(stage_wrong);1810}18111812/* fill input buffer */1813if ( (!mtctx->jobReady)1814&& (input->size > input->pos) ) { /* support NULL input */1815if (mtctx->inBuff.buffer.start == NULL) {1816assert(mtctx->inBuff.filled == 0); /* Can't fill an empty buffer */1817if (!ZSTDMT_tryGetInputRange(mtctx)) {1818/* It is only possible for this operation to fail if there are1819* still compression jobs ongoing.1820*/1821DEBUGLOG(5, "ZSTDMT_tryGetInputRange failed");1822assert(mtctx->doneJobID != mtctx->nextJobID);1823} else1824DEBUGLOG(5, "ZSTDMT_tryGetInputRange completed successfully : mtctx->inBuff.buffer.start = %p", mtctx->inBuff.buffer.start);1825}1826if (mtctx->inBuff.buffer.start != NULL) {1827syncPoint_t const syncPoint = findSynchronizationPoint(mtctx, *input);1828if (syncPoint.flush && endOp == ZSTD_e_continue) {1829endOp = ZSTD_e_flush;1830}1831assert(mtctx->inBuff.buffer.capacity >= mtctx->targetSectionSize);1832DEBUGLOG(5, "ZSTDMT_compressStream_generic: adding %u bytes on top of %u to buffer of size %u",1833(U32)syncPoint.toLoad, (U32)mtctx->inBuff.filled, (U32)mtctx->targetSectionSize);1834ZSTD_memcpy((char*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled, (const char*)input->src + input->pos, syncPoint.toLoad);1835input->pos += syncPoint.toLoad;1836mtctx->inBuff.filled += syncPoint.toLoad;1837forwardInputProgress = syncPoint.toLoad>0;1838}1839}1840if ((input->pos < input->size) && (endOp == ZSTD_e_end)) {1841/* Can't end yet because the input is not fully consumed.1842* We are in one of these cases:1843* - mtctx->inBuff is NULL & empty: we couldn't get an input buffer so don't create a new job.1844* - We filled the input buffer: flush this job but don't end the frame.1845* - We hit a synchronization point: flush this job but don't end the frame.1846*/1847assert(mtctx->inBuff.filled == 0 || mtctx->inBuff.filled == mtctx->targetSectionSize || mtctx->params.rsyncable);1848endOp = ZSTD_e_flush;1849}18501851if ( (mtctx->jobReady)1852|| (mtctx->inBuff.filled >= mtctx->targetSectionSize) /* filled enough : let's compress */1853|| ((endOp != ZSTD_e_continue) && (mtctx->inBuff.filled > 0)) /* something to flush : let's go */1854|| ((endOp == ZSTD_e_end) && (!mtctx->frameEnded)) ) { /* must finish the frame with a zero-size block */1855size_t const jobSize = mtctx->inBuff.filled;1856assert(mtctx->inBuff.filled <= mtctx->targetSectionSize);1857FORWARD_IF_ERROR( ZSTDMT_createCompressionJob(mtctx, jobSize, endOp) , "");1858}18591860/* check for potential compressed data ready to be flushed */1861{ size_t const remainingToFlush = ZSTDMT_flushProduced(mtctx, output, !forwardInputProgress, endOp); /* block if there was no forward input progress */1862if (input->pos < input->size) return MAX(remainingToFlush, 1); /* input not consumed : do not end flush yet */1863DEBUGLOG(5, "end of ZSTDMT_compressStream_generic: remainingToFlush = %u", (U32)remainingToFlush);1864return remainingToFlush;1865}1866}186718681869