POSIX Shared Memory Data Structures 1.0
High-performance lock-free data structures for inter-process communication
Loading...
Searching...
No Matches
shm_queue.h
Go to the documentation of this file.
1#pragma once
2#include "posix_shm.h"
3#include "shm_span.h"
4#include "shm_table.h"
5#include <atomic>
6#include <concepts>
7#include <span>
8#include <optional>
9#include <string_view>
10
22template<typename T, typename TableType = shm_table>
23 requires std::is_trivially_copyable_v<T>
24class shm_queue : public shm_span<T, posix_shm_impl<TableType>> {
25private:
26 struct QueueHeader {
27 std::atomic<size_t> head{0};
28 std::atomic<size_t> tail{0};
29 size_t capacity;
30 };
31
32 QueueHeader* header() {
33 return reinterpret_cast<QueueHeader*>(
34 static_cast<char*>(this->shm.get_base_addr()) + this->offset
35 );
36 }
37
38 const QueueHeader* header() const {
39 return reinterpret_cast<const QueueHeader*>(
40 static_cast<const char*>(this->shm.get_base_addr()) + this->offset
41 );
42 }
43
44 T* data_start() {
45 return reinterpret_cast<T*>(
46 reinterpret_cast<char*>(header()) + sizeof(QueueHeader)
47 );
48 }
49
50 const typename TableType::entry* _table_entry{nullptr};
51
52public:
56 template<typename ShmType>
57 shm_queue(ShmType& shm, std::string_view name, size_t capacity = 0)
58 : shm_span<T, posix_shm_impl<TableType>>(shm, 0, 0) {
59 static_assert(std::is_same_v<typename ShmType::table_type, TableType>,
60 "SharedMemory table type must match queue table type");
61
62 auto* table = static_cast<TableType*>(shm.get_base_addr());
63
64 // Convert string_view to null-terminated string for find
65 char name_buf[TableType::MAX_NAME_SIZE]{};
66 size_t copy_len = std::min(name.size(), sizeof(name_buf) - 1);
67 std::copy_n(name.begin(), copy_len, name_buf);
68
69 auto* entry = table->find(name_buf);
70
71 if (entry) {
72 // Open existing queue
73 if (capacity != 0 && entry->num_elem != capacity) {
74 throw std::runtime_error("Queue capacity mismatch");
75 }
76 this->offset = entry->offset;
77 this->num_elem = entry->num_elem;
78 _table_entry = entry;
79
80 // Validate that the header has been initialized
81 auto* hdr = header();
82 if (hdr->capacity == 0) {
83 // Header wasn't properly initialized, fix it
84 hdr->capacity = this->num_elem + 1; // Internal capacity
85 hdr->head.store(0, std::memory_order_relaxed);
86 hdr->tail.store(0, std::memory_order_relaxed);
87 }
88 } else if (capacity > 0) {
89 // Create new queue
90 // Allocate capacity+1 to allow full use of capacity (circular queue needs one empty slot)
91 size_t actual_capacity = capacity + 1;
92 size_t required_size = sizeof(QueueHeader) + actual_capacity * sizeof(T);
93 size_t current_used = table->get_total_allocated_size();
94
95 // Data must come after the table
96 this->offset = sizeof(TableType) + current_used;
97 this->num_elem = capacity;
98
99 // Initialize header with actual capacity
100 new (header()) QueueHeader{.capacity = actual_capacity};
101
102 // Register in table
103 table->add(name_buf, this->offset, required_size, sizeof(T), capacity);
104 // Find the entry we just added
105 _table_entry = table->find(name_buf);
106 } else {
107 throw std::runtime_error("Queue not found and capacity not specified");
108 }
109 }
110
115 [[nodiscard]] bool enqueue(const T& value) noexcept {
116 auto* hdr = header();
117 size_t current_tail = hdr->tail.load(std::memory_order_relaxed);
118 size_t next_tail = (current_tail + 1) % hdr->capacity;
119
120 if (next_tail == hdr->head.load(std::memory_order_acquire)) {
121 return false; // Queue is full
122 }
123
124 data_start()[current_tail] = value;
125 hdr->tail.store(next_tail, std::memory_order_release);
126 return true;
127 }
128
133 [[nodiscard]] std::optional<T> dequeue() noexcept {
134 auto* hdr = header();
135 size_t current_head = hdr->head.load(std::memory_order_relaxed);
136
137 if (current_head == hdr->tail.load(std::memory_order_acquire)) {
138 return std::nullopt; // Queue is empty
139 }
140
141 T value = data_start()[current_head];
142 size_t next_head = (current_head + 1) % hdr->capacity;
143 hdr->head.store(next_head, std::memory_order_release);
144
145 return value;
146 }
147
151 [[nodiscard]] bool dequeue(T& out_value) noexcept {
152 if (auto val = dequeue()) {
153 out_value = *val;
154 return true;
155 }
156 return false;
157 }
158
159 [[nodiscard]] bool empty() const noexcept {
160 const auto* hdr = header();
161 return hdr->head.load(std::memory_order_acquire) ==
162 hdr->tail.load(std::memory_order_acquire);
163 }
164
165 [[nodiscard]] bool full() const noexcept {
166 const auto* hdr = header();
167 size_t current_tail = hdr->tail.load(std::memory_order_acquire);
168 size_t next_tail = (current_tail + 1) % hdr->capacity;
169 return next_tail == hdr->head.load(std::memory_order_acquire);
170 }
171
172 [[nodiscard]] size_t size() const noexcept {
173 const auto* hdr = header();
174 size_t h = hdr->head.load(std::memory_order_acquire);
175 size_t t = hdr->tail.load(std::memory_order_acquire);
176
177 if (t >= h) {
178 return t - h;
179 } else {
180 return hdr->capacity - h + t;
181 }
182 }
183
184 [[nodiscard]] size_t capacity() const noexcept {
185 // Return user-visible capacity
186 // For new queues: stored in num_elem
187 // For consistency, use the stored value
188 return this->num_elem;
189 }
190
191 [[nodiscard]] std::string_view name() const noexcept {
192 return _table_entry ? std::string_view(_table_entry->name.data()) : std::string_view{};
193 }
194};
POSIX shared memory wrapper with RAII and reference counting.
Definition posix_shm.h:62
Lock-free circular queue for shared memory IPC.
Definition shm_queue.h:24
bool dequeue(T &out_value) noexcept
Try dequeue with output parameter (for compatibility)
Definition shm_queue.h:151
std::string_view name() const noexcept
Definition shm_queue.h:191
bool empty() const noexcept
Definition shm_queue.h:159
shm_queue(ShmType &shm, std::string_view name, size_t capacity=0)
Create or open a shared memory queue.
Definition shm_queue.h:57
size_t size() const noexcept
Definition shm_queue.h:172
bool enqueue(const T &value) noexcept
Enqueue an element (lock-free)
Definition shm_queue.h:115
size_t capacity() const noexcept
Definition shm_queue.h:184
bool full() const noexcept
Definition shm_queue.h:165
std::optional< T > dequeue() noexcept
Dequeue an element (lock-free)
Definition shm_queue.h:133
Base class for shared memory data structures that span a region.
Definition shm_span.h:11
ShmType & shm
Definition shm_span.h:13
size_t offset
Definition shm_span.h:14
size_t num_elem
Definition shm_span.h:15
Core POSIX shared memory management with automatic reference counting.