22template<
typename T,
typename TableType = shm_table>
23 requires std::is_trivially_copyable_v<T>
27 std::atomic<uint64_t> write_pos{0};
28 std::atomic<uint64_t> read_pos{0};
34 RingHeader* header_{
nullptr};
36 const typename TableType::entry* table_entry_{
nullptr};
39 [[nodiscard]] uint32_t to_index(uint64_t pos)
const noexcept {
40 return pos % header_->capacity;
47 template<
typename ShmType>
49 static_assert(std::is_same_v<typename ShmType::table_type, TableType>,
50 "SharedMemory table type must match ring buffer table type");
52 auto* table =
static_cast<TableType*
>(shm.get_base_addr());
54 char name_buf[TableType::MAX_NAME_SIZE]{};
55 size_t copy_len = std::min(
name.size(),
sizeof(name_buf) - 1);
56 std::copy_n(
name.begin(), copy_len, name_buf);
58 auto* entry = table->find(name_buf);
62 header_ =
reinterpret_cast<RingHeader*
>(
63 static_cast<char*
>(shm.get_base_addr()) + entry->offset
65 data_ =
reinterpret_cast<T*
>(header_ + 1);
69 size_t total_size =
sizeof(RingHeader) +
sizeof(T) *
capacity;
70 size_t table_size =
sizeof(TableType);
71 size_t current_used = table->get_total_allocated_size();
72 size_t offset = table_size + current_used;
74 header_ =
reinterpret_cast<RingHeader*
>(
75 static_cast<char*
>(shm.get_base_addr()) + offset
79 new (header_) RingHeader();
80 header_->capacity =
static_cast<uint32_t
>(
capacity);
82 data_ =
reinterpret_cast<T*
>(header_ + 1);
85 if (!table->add(name_buf, offset, total_size,
sizeof(T),
capacity)) {
86 throw std::runtime_error(
"Failed to add ring buffer to table");
88 table_entry_ = table->find(name_buf);
90 throw std::runtime_error(
"Ring buffer not found and capacity not specified");
98 [[nodiscard]]
bool push(
const T& value)
noexcept {
99 uint64_t write = header_->write_pos.load(std::memory_order_acquire);
100 uint64_t read = header_->read_pos.load(std::memory_order_acquire);
102 if (write - read >= header_->capacity) {
106 data_[to_index(write)] = value;
107 header_->write_pos.store(write + 1, std::memory_order_release);
116 uint64_t write = header_->write_pos.load(std::memory_order_acquire);
117 uint64_t read = header_->read_pos.load(std::memory_order_acquire);
119 size_t available = header_->capacity - (write - read);
120 size_t to_write = std::min(values.size(), available);
122 for (
size_t i = 0; i < to_write; ++i) {
123 data_[to_index(write + i)] = values[i];
126 header_->write_pos.store(write + to_write, std::memory_order_release);
134 [[nodiscard]] std::optional<T>
pop() noexcept {
135 uint64_t read = header_->read_pos.load(std::memory_order_acquire);
136 uint64_t write = header_->write_pos.load(std::memory_order_acquire);
142 T value = data_[to_index(read)];
143 header_->read_pos.store(read + 1, std::memory_order_release);
153 uint64_t read = header_->read_pos.load(std::memory_order_acquire);
154 uint64_t write = header_->write_pos.load(std::memory_order_acquire);
156 size_t available = write - read;
157 size_t to_read = std::min(values.size(), available);
159 for (
size_t i = 0; i < to_read; ++i) {
160 values[i] = data_[to_index(read + i)];
163 header_->read_pos.store(read + to_read, std::memory_order_release);
173 size_t peek_bulk(
size_t offset, std::span<T> values)
const noexcept {
174 uint64_t read = header_->read_pos.load(std::memory_order_acquire);
175 uint64_t write = header_->write_pos.load(std::memory_order_acquire);
177 if (read + offset >= write) {
181 size_t available = write - (read + offset);
182 size_t to_peek = std::min(values.size(), available);
184 for (
size_t i = 0; i < to_peek; ++i) {
185 values[i] = data_[to_index(read + offset + i)];
195 size_t get_last_n(
size_t n, std::span<T> values)
const noexcept {
196 uint64_t write = header_->write_pos.load(std::memory_order_acquire);
197 uint64_t read = header_->read_pos.load(std::memory_order_acquire);
199 size_t available = write - read;
200 size_t to_get = std::min({n, values.size(), available});
202 uint64_t start = write - to_get;
203 for (
size_t i = 0; i < to_get; ++i) {
204 values[i] = data_[to_index(start + i)];
213 void skip(
size_t count)
noexcept {
214 uint64_t read = header_->read_pos.load(std::memory_order_acquire);
215 uint64_t write = header_->write_pos.load(std::memory_order_acquire);
217 size_t available = write - read;
218 size_t to_skip = std::min(count, available);
220 header_->read_pos.store(read + to_skip, std::memory_order_release);
227 uint64_t write = header_->write_pos.load(std::memory_order_acquire);
228 header_->read_pos.store(write, std::memory_order_release);
233 return header_->capacity;
236 [[nodiscard]]
size_t size() const noexcept {
237 uint64_t write = header_->write_pos.load(std::memory_order_acquire);
238 uint64_t read = header_->read_pos.load(std::memory_order_acquire);
242 [[nodiscard]]
bool empty() const noexcept {
246 [[nodiscard]]
bool full() const noexcept {
258 return header_->write_pos.load(std::memory_order_acquire);
262 return header_->read_pos.load(std::memory_order_acquire);
265 [[nodiscard]] std::string_view
name() const noexcept {
266 return table_entry_ ? std::string_view(table_entry_->name.data()) : std::string_view{};
274 uint64_t write = header_->write_pos.load(std::memory_order_acquire);
275 uint64_t read = header_->read_pos.load(std::memory_order_acquire);
277 data_[to_index(write)] = value;
280 if (write - read >= header_->capacity) {
281 header_->read_pos.store(read + 1, std::memory_order_release);
284 header_->write_pos.store(write + 1, std::memory_order_release);
Lock-free ring buffer for high-throughput streaming data.
size_t push_bulk(std::span< const T > values) noexcept
Push multiple elements efficiently.
void clear() noexcept
Clear the buffer (reset read/write positions)
size_t available_space() const noexcept
std::string_view name() const noexcept
size_t capacity() const noexcept
size_t pop_bulk(std::span< T > values) noexcept
Pop multiple elements efficiently.
size_t peek_bulk(size_t offset, std::span< T > values) const noexcept
Peek at elements without consuming them.
void push_overwrite(const T &value) noexcept
Force overwrite when full (converts to circular overwrite mode) Useful for continuous sensor streams ...
bool push(const T &value) noexcept
Push a single element.
void skip(size_t count) noexcept
Skip elements without reading them.
std::optional< T > pop() noexcept
Pop a single element.
shm_ring_buffer(ShmType &shm, std::string_view name, size_t capacity=0)
size_t get_last_n(size_t n, std::span< T > values) const noexcept
Get the last N elements (most recent) Useful for getting trailing sensor data.
bool full() const noexcept
bool empty() const noexcept
uint64_t total_read() const noexcept
size_t size() const noexcept
uint64_t total_written() const noexcept
Get total elements written (useful for monitoring data rate)
Core POSIX shared memory management with automatic reference counting.