Lock-free Queue
In multithreaded programming, queues are frequently used, especially in producer-consumer models. However, using locks (e.g., std::mutex
) to manage shared queues can degrade performance. In high-concurrency scenarios, lock contention leads to frequent context switching, reducing efficiency. A lock-free queue provides a solution by using atomic operations to ensure thread safety without locks.
Here I am going to show how to implement a thread-safe and lock-free queue using std::atomic
and a ring buffer in C++ to improve performance (when crital section is relatively small).
Data Structure for Lock-free Queue
LockFreeQueue
uses std::atomic
to manage producer and consumer states, while using a ring buffer for data storage to avoid frequent memory allocations. To enable circular buffer, it uses modulo, ensuring pointers wrap around without exceeding boundaries, thereby eliminating the overhead of dynamic resizing.
private:
std::atomic<int> producerHead_;
std::atomic<int> producerTail_;
std::atomic<int> consumerHead_;
std::vector<T> data_{SIZE};
inline int getIndex(int t) const {
return t % SIZE;
}
Push
Operation
The main logic of the push
operation is to update producerHead_
through the fetch_add
atomic operation and ensure that the space that the consumer has not read can be written to the data.
std::optional<int> push(const T& elem) {
int p_head = producerHead_.fetch_add(1, std::memory_order_seq_cst);
int p_next = p_head + 1;
if (consumerHead_.load(std::memory_order_seq_cst) == p_next) {
// The queue is full.
return std::nullopt;
}
data_[getIndex(p_head)] = std::move(elem);
// Wait other threads to finish the push operation.
int wait_cycles = 0;
while (producerTail_.load(std::memory_order_seq_cst) != p_head) {
if (wait_cycles % 1000 == 0) {
std::this_thread::yield();
}
wait_cycles++;
}
producerTail_.store(p_next, std::memory_order_seq_cst);
return getIndex(p_head);
}
The queue is full by checking if consumerHead_
is equal to p_next
. If the consumer’s read pointer catches up with the producer’s write pointer, the queue is full and no more data can be written.
Pop
Operation
When dequeuing, the consumer reads the data from the location pointed to by consumerHead_
and advances the pointer.
bool pop(T& elem) {
int c_head = consumerHead_.load(std::memory_order_seq_cst);
int c_next = c_head + 1;
if (producerTail_.load(std::memory_order_seq_cst) == c_head) {
return false; // The queue is empty.
}
elem = std::move(data_[getIndex(c_head)]);
consumerHead_.store(c_next, std::memory_order_seq_cst);
return true;
}
Time Wheel Algorithm
The time wheel is a ring-like structure that divides time into multiple slots (MAX_TIMEOUT_VALUE
indicates the number of slots), each of which can hold a timer. The time wheel “turns” forward one space after each time period. Whenever the time wheel pointer points to a slot, it checks whether there is an expired timer in the slot. If there is an expired timer, the corresponding callback function is executed.
Data Structure for Time Wheel
private:
std::atomic<uint32_t> now_;
Callback callback_;
std::vector<LockFreeQueue<MAX_TIMEOUT_VALUE, uint32_t>> timers_;
now_
: Current time, which is accessed safely in a multi-threaded environment using the atomic typestd::atomic
. Each time tick is called,now_
is incremented to advance the time wheel.callback_
: This is the callback function that needs to be called when the timer times out.timers
: Each element is a queue used to store the timer ID.
Set
Function
The timer is set through the set
method. The timer is assigned to a slot according to its timeout period and stored in the lock-free queue timers_[]
.
void set(uint32_t id, uint64_t timeout) {
assert(timeout > 0);
assert(timeout < MAX_TIMEOUT_VALUE);
uint32_t targetTime = (now_ + timeout) % MAX_TIMEOUT_VALUE;
timers_[targetTime].push(id);
}
Tick
Function
The tick
method is used to advance the time wheel. When the time wheel advances to a certain slot, it checks the timer task in the slot and triggers the callback function.
void tick(uint32_t milliseconds = 1) {
for (uint32_t ms = 0; ms < milliseconds; ++ms) {
now_ = (now_ + 1) % MAX_TIMEOUT_VALUE;
uint32_t id;
while (timers_[now_].pop(id)) {
callback_(id);
}
}
}