diff --git a/src/bgzf.h b/src/bgzf.h index 9b02852..849f39b 100644 --- a/src/bgzf.h +++ b/src/bgzf.h @@ -66,7 +66,18 @@ class BgzfMtReader { } ~BgzfMtReader() { - mStop = true; + // Publish mStop while holding every CV's mutex before notifying. mStop is a + // wait predicate for all three CVs; a thread that has evaluated its predicate + // as false but not yet parked still holds that mutex, so taking all three here + // serializes the stop+notify against that gap. Storing mStop outside the locks + // (as before) lets notify_all() slip into that window and be missed, leaving a + // worker parked forever and the join() below hung — an intermittent deadlock. + { + std::lock_guard l1(mProduceMtx); + std::lock_guard l2(mDecompMtx); + std::lock_guard l3(mConsumeMtx); + mStop = true; + } mDecompCv.notify_all(); mProduceCv.notify_all(); mConsumeCv.notify_all(); @@ -95,7 +106,12 @@ class BgzfMtReader { mConsumeOffset += tocopy; if (mConsumeOffset >= s.decompLen) { - s.state.store(FREE, std::memory_order_release); + // Publish FREE under mProduceMtx so readerLoop, which waits on + // mProduceCv for this slot to free up, cannot miss the wakeup. + { + std::lock_guard lk(mProduceMtx); + s.state.store(FREE, std::memory_order_release); + } mConsumeOffset = 0; mConsumeIdx++; mProduceCv.notify_one(); @@ -134,8 +150,13 @@ class BgzfMtReader { } s.compLen = bsize; - s.state.store(COMPRESSED, std::memory_order_release); - mProduceIdx++; + // Publish the COMPRESSED slot (state + mProduceIdx, which claimSlot scans) + // under mDecompMtx so a decompWorker parked on mDecompCv cannot miss it. + { + std::lock_guard lk(mDecompMtx); + s.state.store(COMPRESSED, std::memory_order_release); + mProduceIdx++; + } mDecompCv.notify_all(); } } @@ -162,7 +183,12 @@ class BgzfMtReader { int ret = isal_inflate_stateless(&ist); target->decompLen = (ret == ISAL_DECOMP_OK) ? (int)ist.total_out : 0; - target->state.store(READY, std::memory_order_release); + // Publish READY under mConsumeMtx so the consumer parked in read() on + // mConsumeCv cannot miss the wakeup for this slot. + { + std::lock_guard lk(mConsumeMtx); + target->state.store(READY, std::memory_order_release); + } mConsumeCv.notify_one(); } } @@ -183,7 +209,14 @@ class BgzfMtReader { } void markDone(Slot& s) { - s.state.store(DONE, std::memory_order_release); + // Publish DONE under mConsumeMtx before notifying: the consumer in read() + // waits on mConsumeCv for READY/DONE, and at EOF this is the terminal + // notification (no further producer follows), so a lost wakeup here would + // strand the reader thread in read() forever. + { + std::lock_guard lk(mConsumeMtx); + s.state.store(DONE, std::memory_order_release); + } mConsumeCv.notify_all(); mDecompCv.notify_all(); }