POSIX Shared Memory Data Structures 1.0
High-performance lock-free data structures for inter-process communication
Loading...
Searching...
No Matches
shm_ring_buffer.h
Go to the documentation of this file.
1#pragma once
2#include "posix_shm.h"
3#include "shm_table.h"
4#include <atomic>
5#include <span>
6#include <algorithm>
7
22template<typename T, typename TableType = shm_table>
23 requires std::is_trivially_copyable_v<T>
25private:
26 struct RingHeader {
27 std::atomic<uint64_t> write_pos{0}; // Total elements written (never wraps)
28 std::atomic<uint64_t> read_pos{0}; // Total elements read (never wraps)
29 uint32_t capacity;
30 uint32_t _padding; // Align to 8 bytes
31 // Data follows immediately after header
32 };
33
34 RingHeader* header_{nullptr};
35 T* data_{nullptr};
36 const typename TableType::entry* table_entry_{nullptr};
37
38 // Convert absolute position to ring buffer index
39 [[nodiscard]] uint32_t to_index(uint64_t pos) const noexcept {
40 return pos % header_->capacity;
41 }
42
43public:
44 using value_type = T;
45 using size_type = size_t;
46
47 template<typename ShmType>
48 shm_ring_buffer(ShmType& shm, std::string_view name, size_t capacity = 0) {
49 static_assert(std::is_same_v<typename ShmType::table_type, TableType>,
50 "SharedMemory table type must match ring buffer table type");
51
52 auto* table = static_cast<TableType*>(shm.get_base_addr());
53
54 char name_buf[TableType::MAX_NAME_SIZE]{};
55 size_t copy_len = std::min(name.size(), sizeof(name_buf) - 1);
56 std::copy_n(name.begin(), copy_len, name_buf);
57
58 auto* entry = table->find(name_buf);
59
60 if (entry) {
61 // Open existing ring buffer
62 header_ = reinterpret_cast<RingHeader*>(
63 static_cast<char*>(shm.get_base_addr()) + entry->offset
64 );
65 data_ = reinterpret_cast<T*>(header_ + 1);
66 table_entry_ = entry;
67 } else if (capacity > 0) {
68 // Create new ring buffer
69 size_t total_size = sizeof(RingHeader) + sizeof(T) * capacity;
70 size_t table_size = sizeof(TableType);
71 size_t current_used = table->get_total_allocated_size();
72 size_t offset = table_size + current_used;
73
74 header_ = reinterpret_cast<RingHeader*>(
75 static_cast<char*>(shm.get_base_addr()) + offset
76 );
77
78 // Initialize header
79 new (header_) RingHeader();
80 header_->capacity = static_cast<uint32_t>(capacity);
81
82 data_ = reinterpret_cast<T*>(header_ + 1);
83
84 // Register in table
85 if (!table->add(name_buf, offset, total_size, sizeof(T), capacity)) {
86 throw std::runtime_error("Failed to add ring buffer to table");
87 }
88 table_entry_ = table->find(name_buf);
89 } else {
90 throw std::runtime_error("Ring buffer not found and capacity not specified");
91 }
92 }
93
98 [[nodiscard]] bool push(const T& value) noexcept {
99 uint64_t write = header_->write_pos.load(std::memory_order_acquire);
100 uint64_t read = header_->read_pos.load(std::memory_order_acquire);
101
102 if (write - read >= header_->capacity) {
103 return false; // Buffer full
104 }
105
106 data_[to_index(write)] = value;
107 header_->write_pos.store(write + 1, std::memory_order_release);
108 return true;
109 }
110
115 size_t push_bulk(std::span<const T> values) noexcept {
116 uint64_t write = header_->write_pos.load(std::memory_order_acquire);
117 uint64_t read = header_->read_pos.load(std::memory_order_acquire);
118
119 size_t available = header_->capacity - (write - read);
120 size_t to_write = std::min(values.size(), available);
121
122 for (size_t i = 0; i < to_write; ++i) {
123 data_[to_index(write + i)] = values[i];
124 }
125
126 header_->write_pos.store(write + to_write, std::memory_order_release);
127 return to_write;
128 }
129
134 [[nodiscard]] std::optional<T> pop() noexcept {
135 uint64_t read = header_->read_pos.load(std::memory_order_acquire);
136 uint64_t write = header_->write_pos.load(std::memory_order_acquire);
137
138 if (read >= write) {
139 return std::nullopt; // Empty
140 }
141
142 T value = data_[to_index(read)];
143 header_->read_pos.store(read + 1, std::memory_order_release);
144 return value;
145 }
146
152 size_t pop_bulk(std::span<T> values) noexcept {
153 uint64_t read = header_->read_pos.load(std::memory_order_acquire);
154 uint64_t write = header_->write_pos.load(std::memory_order_acquire);
155
156 size_t available = write - read;
157 size_t to_read = std::min(values.size(), available);
158
159 for (size_t i = 0; i < to_read; ++i) {
160 values[i] = data_[to_index(read + i)];
161 }
162
163 header_->read_pos.store(read + to_read, std::memory_order_release);
164 return to_read;
165 }
166
173 size_t peek_bulk(size_t offset, std::span<T> values) const noexcept {
174 uint64_t read = header_->read_pos.load(std::memory_order_acquire);
175 uint64_t write = header_->write_pos.load(std::memory_order_acquire);
176
177 if (read + offset >= write) {
178 return 0; // Offset beyond available data
179 }
180
181 size_t available = write - (read + offset);
182 size_t to_peek = std::min(values.size(), available);
183
184 for (size_t i = 0; i < to_peek; ++i) {
185 values[i] = data_[to_index(read + offset + i)];
186 }
187
188 return to_peek;
189 }
190
195 size_t get_last_n(size_t n, std::span<T> values) const noexcept {
196 uint64_t write = header_->write_pos.load(std::memory_order_acquire);
197 uint64_t read = header_->read_pos.load(std::memory_order_acquire);
198
199 size_t available = write - read;
200 size_t to_get = std::min({n, values.size(), available});
201
202 uint64_t start = write - to_get;
203 for (size_t i = 0; i < to_get; ++i) {
204 values[i] = data_[to_index(start + i)];
205 }
206
207 return to_get;
208 }
209
213 void skip(size_t count) noexcept {
214 uint64_t read = header_->read_pos.load(std::memory_order_acquire);
215 uint64_t write = header_->write_pos.load(std::memory_order_acquire);
216
217 size_t available = write - read;
218 size_t to_skip = std::min(count, available);
219
220 header_->read_pos.store(read + to_skip, std::memory_order_release);
221 }
222
226 void clear() noexcept {
227 uint64_t write = header_->write_pos.load(std::memory_order_acquire);
228 header_->read_pos.store(write, std::memory_order_release);
229 }
230
231 // Capacity and size queries
232 [[nodiscard]] size_t capacity() const noexcept {
233 return header_->capacity;
234 }
235
236 [[nodiscard]] size_t size() const noexcept {
237 uint64_t write = header_->write_pos.load(std::memory_order_acquire);
238 uint64_t read = header_->read_pos.load(std::memory_order_acquire);
239 return write - read;
240 }
241
242 [[nodiscard]] bool empty() const noexcept {
243 return size() == 0;
244 }
245
246 [[nodiscard]] bool full() const noexcept {
247 return size() == capacity();
248 }
249
250 [[nodiscard]] size_t available_space() const noexcept {
251 return capacity() - size();
252 }
253
257 [[nodiscard]] uint64_t total_written() const noexcept {
258 return header_->write_pos.load(std::memory_order_acquire);
259 }
260
261 [[nodiscard]] uint64_t total_read() const noexcept {
262 return header_->read_pos.load(std::memory_order_acquire);
263 }
264
265 [[nodiscard]] std::string_view name() const noexcept {
266 return table_entry_ ? std::string_view(table_entry_->name.data()) : std::string_view{};
267 }
268
273 void push_overwrite(const T& value) noexcept {
274 uint64_t write = header_->write_pos.load(std::memory_order_acquire);
275 uint64_t read = header_->read_pos.load(std::memory_order_acquire);
276
277 data_[to_index(write)] = value;
278
279 // If full, advance read pointer (lose oldest data)
280 if (write - read >= header_->capacity) {
281 header_->read_pos.store(read + 1, std::memory_order_release);
282 }
283
284 header_->write_pos.store(write + 1, std::memory_order_release);
285 }
286};
Lock-free ring buffer for high-throughput streaming data.
size_t push_bulk(std::span< const T > values) noexcept
Push multiple elements efficiently.
void clear() noexcept
Clear the buffer (reset read/write positions)
size_t available_space() const noexcept
std::string_view name() const noexcept
size_t capacity() const noexcept
size_t pop_bulk(std::span< T > values) noexcept
Pop multiple elements efficiently.
size_t peek_bulk(size_t offset, std::span< T > values) const noexcept
Peek at elements without consuming them.
void push_overwrite(const T &value) noexcept
Force overwrite when full (converts to circular overwrite mode) Useful for continuous sensor streams ...
bool push(const T &value) noexcept
Push a single element.
void skip(size_t count) noexcept
Skip elements without reading them.
std::optional< T > pop() noexcept
Pop a single element.
shm_ring_buffer(ShmType &shm, std::string_view name, size_t capacity=0)
size_t get_last_n(size_t n, std::span< T > values) const noexcept
Get the last N elements (most recent) Useful for getting trailing sensor data.
bool full() const noexcept
bool empty() const noexcept
uint64_t total_read() const noexcept
size_t size() const noexcept
uint64_t total_written() const noexcept
Get total elements written (useful for monitoring data rate)
Core POSIX shared memory management with automatic reference counting.