diff --git a/src/writerthread.cpp b/src/writerthread.cpp index 5d21091a..759d5cc9 100644 --- a/src/writerthread.cpp +++ b/src/writerthread.cpp @@ -14,7 +14,8 @@ WriterThread::WriterThread(Options* opt, string filename, bool isSTDOUT){ mInputCompleted = false; mFilename = filename; - mPwriteMode = !isSTDOUT && ends_with(filename, ".gz") && mOptions->thread > 1; + mGzipMode = ends_with(filename, ".gz"); + mPwriteMode = !isSTDOUT && mOptions->thread > 1; mFd = -1; mOffsetRing = NULL; mNextSeq = NULL; @@ -31,15 +32,17 @@ WriterThread::WriterThread(Options* opt, string filename, bool isSTDOUT){ mNextSeq = new size_t[mOptions->thread]; for (int t = 0; t < mOptions->thread; t++) mNextSeq[t] = t; - mCompressors = new libdeflate_compressor*[mOptions->thread]; - for (int t = 0; t < mOptions->thread; t++) - mCompressors[t] = libdeflate_alloc_compressor(mOptions->compression); - size_t initBufSize = PACK_SIZE * 500; - mCompBufs = new char*[mOptions->thread]; - mCompBufSizes = new size_t[mOptions->thread]; - for (int t = 0; t < mOptions->thread; t++) { - mCompBufs[t] = new char[initBufSize]; - mCompBufSizes[t] = initBufSize; + if (mGzipMode) { + mCompressors = new libdeflate_compressor*[mOptions->thread]; + for (int t = 0; t < mOptions->thread; t++) + mCompressors[t] = libdeflate_alloc_compressor(mOptions->compression); + size_t initBufSize = PACK_SIZE * 500; + mCompBufs = new char*[mOptions->thread]; + mCompBufSizes = new size_t[mOptions->thread]; + for (int t = 0; t < mOptions->thread; t++) { + mCompBufs[t] = new char[initBufSize]; + mCompBufSizes[t] = initBufSize; + } } mWorkingBufferList = 0; mBufferLength = 0; @@ -116,20 +119,22 @@ void WriterThread::input(int tid, string* data) { } void WriterThread::inputPwrite(int tid, string* data) { - size_t bound = libdeflate_gzip_compress_bound(mCompressors[tid], data->size()); - // Grow per-worker buffer if needed - if (bound > mCompBufSizes[tid]) { - delete[] mCompBufs[tid]; - mCompBufs[tid] = new char[bound]; - mCompBufSizes[tid] = bound; + const char* writeData = data->data(); + size_t wsize = data->size(); + if (mGzipMode) { + size_t bound = libdeflate_gzip_compress_bound(mCompressors[tid], data->size()); + if (bound > mCompBufSizes[tid]) { + delete[] mCompBufs[tid]; + mCompBufs[tid] = new char[bound]; + mCompBufSizes[tid] = bound; + } + size_t outsize = libdeflate_gzip_compress(mCompressors[tid], data->data(), data->size(), + mCompBufs[tid], bound); + if (outsize == 0) + error_exit("libdeflate gzip compression failed"); + writeData = mCompBufs[tid]; + wsize = outsize; } - size_t outsize = libdeflate_gzip_compress(mCompressors[tid], data->data(), data->size(), - mCompBufs[tid], bound); - if (outsize == 0) - error_exit("libdeflate gzip compression failed"); - delete data; - const char* writeData = mCompBufs[tid]; - size_t wsize = outsize; size_t seq = mNextSeq[tid]; @@ -163,6 +168,7 @@ void WriterThread::inputPwrite(int tid, string* data) { written += ret; } } + delete data; mNextSeq[tid] += mOptions->thread; } diff --git a/src/writerthread.h b/src/writerthread.h index 053d27f6..9da5e031 100644 --- a/src/writerthread.h +++ b/src/writerthread.h @@ -55,14 +55,14 @@ class WriterThread{ SingleProducerSingleConsumerList** mBufferLists; int mWorkingBufferList; - // pwrite mode: parallel libdeflate gz compression + direct file write bool mPwriteMode; + bool mGzipMode; int mFd; OffsetSlot* mOffsetRing; size_t* mNextSeq; libdeflate_compressor** mCompressors; - char** mCompBufs; // per-worker pre-allocated compress output buffers - size_t* mCompBufSizes; // per-worker buffer sizes + char** mCompBufs; + size_t* mCompBufSizes; }; #endif