Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 39 additions & 6 deletions src/bgzf.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,18 @@ class BgzfMtReader {
}

~BgzfMtReader() {
mStop = true;
// Publish mStop while holding every CV's mutex before notifying. mStop is a
// wait predicate for all three CVs; a thread that has evaluated its predicate
// as false but not yet parked still holds that mutex, so taking all three here
// serializes the stop+notify against that gap. Storing mStop outside the locks
// (as before) lets notify_all() slip into that window and be missed, leaving a
// worker parked forever and the join() below hung — an intermittent deadlock.
{
std::lock_guard<std::mutex> l1(mProduceMtx);
std::lock_guard<std::mutex> l2(mDecompMtx);
std::lock_guard<std::mutex> l3(mConsumeMtx);
mStop = true;
}
mDecompCv.notify_all();
mProduceCv.notify_all();
mConsumeCv.notify_all();
Expand Down Expand Up @@ -95,7 +106,12 @@ class BgzfMtReader {
mConsumeOffset += tocopy;

if (mConsumeOffset >= s.decompLen) {
s.state.store(FREE, std::memory_order_release);
// Publish FREE under mProduceMtx so readerLoop, which waits on
// mProduceCv for this slot to free up, cannot miss the wakeup.
{
std::lock_guard<std::mutex> lk(mProduceMtx);
s.state.store(FREE, std::memory_order_release);
}
mConsumeOffset = 0;
mConsumeIdx++;
mProduceCv.notify_one();
Expand Down Expand Up @@ -134,8 +150,13 @@ class BgzfMtReader {
}

s.compLen = bsize;
s.state.store(COMPRESSED, std::memory_order_release);
mProduceIdx++;
// Publish the COMPRESSED slot (state + mProduceIdx, which claimSlot scans)
// under mDecompMtx so a decompWorker parked on mDecompCv cannot miss it.
{
std::lock_guard<std::mutex> lk(mDecompMtx);
s.state.store(COMPRESSED, std::memory_order_release);
mProduceIdx++;
}
mDecompCv.notify_all();
}
}
Expand All @@ -162,7 +183,12 @@ class BgzfMtReader {
int ret = isal_inflate_stateless(&ist);
target->decompLen = (ret == ISAL_DECOMP_OK) ? (int)ist.total_out : 0;

target->state.store(READY, std::memory_order_release);
// Publish READY under mConsumeMtx so the consumer parked in read() on
// mConsumeCv cannot miss the wakeup for this slot.
{
std::lock_guard<std::mutex> lk(mConsumeMtx);
target->state.store(READY, std::memory_order_release);
}
mConsumeCv.notify_one();
}
}
Expand All @@ -183,7 +209,14 @@ class BgzfMtReader {
}

void markDone(Slot& s) {
s.state.store(DONE, std::memory_order_release);
// Publish DONE under mConsumeMtx before notifying: the consumer in read()
// waits on mConsumeCv for READY/DONE, and at EOF this is the terminal
// notification (no further producer follows), so a lost wakeup here would
// strand the reader thread in read() forever.
{
std::lock_guard<std::mutex> lk(mConsumeMtx);
s.state.store(DONE, std::memory_order_release);
}
mConsumeCv.notify_all();
mDecompCv.notify_all();
}
Expand Down
Loading