22template<
typename T,
typename TableType = shm_table>
23 requires std::is_trivially_copyable_v<T>
27 std::atomic<size_t> head{0};
28 std::atomic<size_t> tail{0};
32 QueueHeader* header() {
33 return reinterpret_cast<QueueHeader*
>(
34 static_cast<char*
>(this->
shm.get_base_addr()) + this->
offset
38 const QueueHeader* header()
const {
39 return reinterpret_cast<const QueueHeader*
>(
40 static_cast<const char*
>(this->
shm.get_base_addr()) + this->
offset
45 return reinterpret_cast<T*
>(
46 reinterpret_cast<char*
>(header()) +
sizeof(QueueHeader)
50 const typename TableType::entry* _table_entry{
nullptr};
56 template<
typename ShmType>
59 static_assert(std::is_same_v<typename ShmType::table_type, TableType>,
60 "SharedMemory table type must match queue table type");
62 auto* table =
static_cast<TableType*
>(
shm.get_base_addr());
65 char name_buf[TableType::MAX_NAME_SIZE]{};
66 size_t copy_len = std::min(
name.size(),
sizeof(name_buf) - 1);
67 std::copy_n(
name.begin(), copy_len, name_buf);
69 auto* entry = table->find(name_buf);
74 throw std::runtime_error(
"Queue capacity mismatch");
76 this->
offset = entry->offset;
82 if (hdr->capacity == 0) {
85 hdr->head.store(0, std::memory_order_relaxed);
86 hdr->tail.store(0, std::memory_order_relaxed);
91 size_t actual_capacity =
capacity + 1;
92 size_t required_size =
sizeof(QueueHeader) + actual_capacity *
sizeof(T);
93 size_t current_used = table->get_total_allocated_size();
96 this->
offset =
sizeof(TableType) + current_used;
100 new (header()) QueueHeader{.capacity = actual_capacity};
103 table->add(name_buf, this->
offset, required_size,
sizeof(T),
capacity);
105 _table_entry = table->find(name_buf);
107 throw std::runtime_error(
"Queue not found and capacity not specified");
115 [[nodiscard]]
bool enqueue(
const T& value)
noexcept {
116 auto* hdr = header();
117 size_t current_tail = hdr->tail.load(std::memory_order_relaxed);
118 size_t next_tail = (current_tail + 1) % hdr->capacity;
120 if (next_tail == hdr->head.load(std::memory_order_acquire)) {
124 data_start()[current_tail] = value;
125 hdr->tail.store(next_tail, std::memory_order_release);
133 [[nodiscard]] std::optional<T>
dequeue() noexcept {
134 auto* hdr = header();
135 size_t current_head = hdr->head.load(std::memory_order_relaxed);
137 if (current_head == hdr->tail.load(std::memory_order_acquire)) {
141 T value = data_start()[current_head];
142 size_t next_head = (current_head + 1) % hdr->capacity;
143 hdr->head.store(next_head, std::memory_order_release);
151 [[nodiscard]]
bool dequeue(T& out_value)
noexcept {
159 [[nodiscard]]
bool empty() const noexcept {
160 const auto* hdr = header();
161 return hdr->head.load(std::memory_order_acquire) ==
162 hdr->tail.load(std::memory_order_acquire);
165 [[nodiscard]]
bool full() const noexcept {
166 const auto* hdr = header();
167 size_t current_tail = hdr->tail.load(std::memory_order_acquire);
168 size_t next_tail = (current_tail + 1) % hdr->capacity;
169 return next_tail == hdr->head.load(std::memory_order_acquire);
172 [[nodiscard]]
size_t size() const noexcept {
173 const auto* hdr = header();
174 size_t h = hdr->head.load(std::memory_order_acquire);
175 size_t t = hdr->tail.load(std::memory_order_acquire);
180 return hdr->capacity - h + t;
191 [[nodiscard]] std::string_view
name() const noexcept {
192 return _table_entry ? std::string_view(_table_entry->name.data()) : std::string_view{};
POSIX shared memory wrapper with RAII and reference counting.
Lock-free circular queue for shared memory IPC.
bool dequeue(T &out_value) noexcept
Try dequeue with output parameter (for compatibility)
std::string_view name() const noexcept
bool empty() const noexcept
shm_queue(ShmType &shm, std::string_view name, size_t capacity=0)
Create or open a shared memory queue.
size_t size() const noexcept
bool enqueue(const T &value) noexcept
Enqueue an element (lock-free)
size_t capacity() const noexcept
bool full() const noexcept
std::optional< T > dequeue() noexcept
Dequeue an element (lock-free)
Base class for shared memory data structures that span a region.
Core POSIX shared memory management with automatic reference counting.