Table of Contents
- Basic Concepts
- Simple Producer-Consumer
- Particle Simulation System
- Sensor Data Pipeline
- Multi-Process Coordination
- Error Handling & Recovery
Basic Concepts
The Power of the Table System
The metadata table is what makes our shared memory system powerful. It allows multiple data structures to coexist in the same shared memory segment, each discoverable by name. This is crucial for complex systems.
posix_shm shm(
"my_system", 100 * 1024 * 1024);
Fixed-size array in shared memory with zero-overhead access.
Shared memory atomic value with auto-discovery.
High-performance object pool for shared memory.
Lock-free circular queue for shared memory IPC.
Lock-free ring buffer for high-throughput streaming data.
Creating Shared Memory
posix_shm shm(
"my_simulation", 10 * 1024 * 1024);
Core POSIX shared memory management with automatic reference counting.
Choosing Table Configuration
my_shm shm("custom", 50 * 1024 * 1024);
Metadata table for managing shared memory data structures.
Complete Multi-Structure Example
Game Server State (Multiple Structures in One Segment)
This example shows how a game server uses multiple data structures in a single shared memory segment for different purposes:
class GameServer {
private:
shm_atomic_uint32<shm_table_large> player_count;
public:
GameServer()
: shm("game_server", 500 * 1024 * 1024),
player_pool(shm, "players", 100),
monster_pool(shm, "monsters", 1000),
projectile_pool(shm, "projectiles", 500),
collision_grid(shm, "collision", 256 * 256),
damage_events(shm, "damage_queue", 1000),
spawn_requests(shm, "spawn_queue", 100),
network_msgs(shm, "network_queue", 5000),
frame_history(shm, "frame_stats", 3600),
tick_counter(shm, "tick", 0),
player_count(shm, "players_online", 0),
match_active(shm, "match_active", false),
leaderboard(shm, "leaderboard", 10) {
std::cout << "Game server initialized with structures:\n";
print_memory_layout();
}
void print_memory_layout() {
std::cout << "Shared Memory Layout:\n";
std::cout << "Total size: 500MB\n";
std::cout << "Structures allocated: " << table->get_entry_count() << "\n\n";
std::cout << "Named structures:\n";
std::cout <<
" - players (object pool): " << player_pool.
capacity() <<
" slots\n";
std::cout <<
" - monsters (object pool): " << monster_pool.
capacity() <<
" slots\n";
std::cout <<
" - projectiles (object pool): " << projectile_pool.
capacity() <<
" slots\n";
std::cout <<
" - collision (array): " << collision_grid.
size() <<
" cells\n";
std::cout <<
" - damage_queue: " << damage_events.
capacity() <<
" capacity\n";
std::cout <<
" - spawn_queue: " << spawn_requests.
capacity() <<
" capacity\n";
std::cout <<
" - network_queue: " << network_msgs.
capacity() <<
" capacity\n";
std::cout <<
" - frame_stats (ring): " << frame_history.
capacity() <<
" samples\n";
std::cout <<
" - tick (atomic): current=" << tick_counter.
load() <<
"\n";
std::cout << " - players_online (atomic): " << player_count.load() << "\n";
std::cout <<
" - match_active (atomic): " << match_active.
load() <<
"\n";
std::cout <<
" - leaderboard (array): " << leaderboard.
size() <<
" entries\n";
}
};
class AISystem {
private:
public:
AISystem()
: shm("game_server"),
players(shm, "players"),
monsters(shm, "monsters"),
collision_grid(shm, "collision"),
spawn_queue(shm, "spawn_queue"),
tick(shm, "tick") {
std::cout << "AI System connected to game server\n";
std::cout <<
"Found " << monsters.
num_allocated() <<
" active monsters\n";
}
void update_ai() {
uint64_t current_tick = tick.
load();
for (uint32_t handle = 0; handle < monsters.
capacity(); ++handle) {
if (
auto* monster = monsters.
get(handle)) {
int grid_x = monster->x / CELL_SIZE;
int grid_y = monster->y / CELL_SIZE;
uint32_t cell = collision_grid[grid_y * 256 + grid_x];
if (should_spawn_adds(monster)) {
SpawnRequest req{
.type = SPAWN_MINION,
.x = monster->x,
.y = monster->y
};
}
}
}
}
};
class Analytics {
private:
shm_atomic_uint32<shm_table_large> player_count;
public:
Analytics()
: shm("game_server"),
frame_history(shm, "frame_stats"),
player_count(shm, "players_online"),
leaderboard(shm, "leaderboard") {
}
void generate_report() {
FrameStats stats[3600];
size_t count = frame_history.
get_last_n(3600, stats);
double avg_fps = calculate_average_fps(stats, count);
uint32_t current_players = player_count.load();
std::cout << "=== Performance Report ===\n";
std::cout << "Average FPS: " << avg_fps << "\n";
std::cout << "Players online: " << current_players << "\n";
std::cout << "\nTop Players:\n";
for (
size_t i = 0; i < leaderboard.
size(); ++i) {
auto& entry = leaderboard[i];
if (entry.score > 0) {
std::cout << i+1 <<
". " << entry.
name
<< " - " << entry.score << "\n";
}
}
}
};
TableType * get_table()
Get mutable pointer to metadata table.
std::string_view name() const noexcept
Get array name from metadata table.
size_t size() const noexcept
Get number of elements.
T load(std::memory_order order=std::memory_order_seq_cst) const noexcept
T * get(handle_type handle) noexcept
size_t capacity() const noexcept
Get pool statistics.
size_t num_allocated() const noexcept
bool enqueue(const T &value) noexcept
Enqueue an element (lock-free)
size_t capacity() const noexcept
size_t capacity() const noexcept
size_t get_last_n(size_t n, std::span< T > values) const noexcept
Get the last N elements (most recent) Useful for getting trailing sensor data.
Fixed-size shared memory array with STL compatibility.
shm_table_impl< 64, 256 > shm_table_large
Memory Layout Visualization
Here's what the shared memory segment looks like with all these structures:
┌────────────────────────────────────────────────┐
│ Shared Memory: "game_server" (500MB) │
├────────────────────────────────────────────────┤
│ Offset | Size | Structure │
├────────────────────────────────────────────────┤
│ 0x0000 | 4B | Reference Counter │
│ 0x0004 | 26KB | shm_table_large │
│ | | ├─ "players" │
│ | | ├─ "monsters" │
│ | | ├─ "projectiles" │
│ | | ├─ "collision" │
│ | | ├─ "damage_queue" │
│ | | ├─ "spawn_queue" │
│ | | ├─ "network_queue" │
│ | | ├─ "frame_stats" │
│ | | ├─ "tick" │
│ | | ├─ "players_online" │
│ | | ├─ "match_active" │
│ | | └─ "leaderboard" │
├────────────────────────────────────────────────┤
│ 0x6808 | ~4KB | Player Pool │
│ 0x7808 | ~40KB | Monster Pool │
│ 0x11408 | ~10KB | Projectile Pool │
│ 0x13C08 | 256KB | Collision Grid │
│ 0x53C08 | ~16KB | Damage Queue │
│ 0x57C08 | ~2KB | Spawn Queue │
│ 0x58408 | ~80KB | Network Queue │
│ 0x6C408 | ~56KB | Frame History │
│ 0x7A008 | 8B | Tick Counter │
│ 0x7A010 | 4B | Player Count │
│ 0x7A014 | 1B | Match Active │
│ 0x7A018 | 400B | Leaderboard │
│ ... | ... | (Unused space) │
└────────────────────────────────────────────────┘
Key Points About Multiple Structures
- Single Allocation: One
posix_shm
object manages the entire segment
- Automatic Layout: The table system tracks offsets automatically
- No Fragmentation: Structures are allocated sequentially
- Discovery by Name: Any process can find any structure
- Type Safety: Template parameters ensure consistency
- Independent Lifecycles: Each structure can be used independently
Simple Producer-Consumer
Producer Process
#include <iostream>
#include <thread>
#include <chrono>
struct Message {
uint64_t timestamp;
int producer_id;
double value;
};
posix_shm shm(
"producer_consumer", 1024 * 1024);
for (int i = 0; i < 1000; ++i) {
Message msg{
.timestamp = std::chrono::system_clock::now().time_since_epoch().count(),
.producer_id = 1,
.value = i * 3.14
};
while (!queue.enqueue(msg)) {
std::this_thread::sleep_for(std::chrono::microseconds(10));
}
std::cout << "Produced message " << i << "\n";
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
return 0;
}
Consumer Process
int count = 0;
while (count < 1000) {
if (auto msg = queue.dequeue()) {
std::cout << "Consumed message from producer "
<< msg->producer_id
<< " with value " << msg->value << "\n";
count++;
} else {
std::this_thread::sleep_for(std::chrono::microseconds(10));
}
}
return 0;
}
Particle Simulation System
Simulation Core
class ParticleSimulation {
private:
shm_atomic_uint32 active_count;
public:
float position[3];
float velocity[3];
float mass;
float lifetime;
uint32_t type;
uint32_t flags;
};
ParticleSimulation(const std::string& name, size_t max_particles)
: shm(name, calculate_memory_size(max_particles)),
particle_pool(shm, "particles", max_particles),
active_particles(shm, "active_list", max_particles),
frame_counter(shm, "frame", 0),
active_count(shm, "active", 0) {
}
static size_t calculate_memory_size(size_t max_particles) {
sizeof(uint32_t) * max_particles +
sizeof(std::atomic<uint64_t>) +
sizeof(std::atomic<uint32_t>) +
1024 * 1024;
}
uint32_t spawn_particle(
const Particle& p) {
auto handle = particle_pool.
acquire();
}
particle_pool[handle] = p;
uint32_t index = active_count.fetch_add(1);
active_particles[index] = handle;
return handle;
}
void update_physics(float dt) {
uint32_t count = active_count.load();
#pragma omp parallel for
for (uint32_t i = 0; i < count; ++i) {
uint32_t handle = active_particles[i];
auto& p = particle_pool[handle];
p.position[0] += p.velocity[0] * dt;
p.position[1] += p.velocity[1] * dt;
p.position[2] += p.velocity[2] * dt;
p.velocity[1] -= 9.8f * dt;
}
compact_active_list();
frame_counter++;
}
void compact_active_list() {
uint32_t read = 0, write = 0;
uint32_t count = active_count.
load();
while (read < count) {
uint32_t handle = active_particles[read];
auto& p = particle_pool[handle];
active_particles[write++] = handle;
} else {
}
read++;
}
active_count.store(write);
}
};
handle_type acquire() noexcept
Acquire an object from the pool.
void release(handle_type handle) noexcept
Release an object back to the pool.
static constexpr handle_type invalid_handle
shm_table_impl< 32, 64 > shm_table
Renderer Process
shm_atomic_uint32 active_count(shm, "active");
uint64_t last_frame = 0;
while (true) {
uint64_t current_frame = frame.load();
if (current_frame != last_frame) {
uint32_t count = active_count.load();
for (uint32_t i = 0; i < count; ++i) {
uint32_t handle = active_list[i];
const auto& p = particles[handle];
render_particle(p.position, p.
type);
}
present_frame();
last_frame = current_frame;
}
std::this_thread::sleep_for(std::chrono::milliseconds(16));
}
}
Sensor Data Pipeline
High-Frequency Data Collection
class SensorCollector {
private:
public:
uint64_t timestamp_ns;
float values[8];
uint16_t status;
uint16_t sensor_id;
};
SensorCollector()
: shm("sensor_pipeline", 100 * 1024 * 1024),
buffer(shm, "readings", 1000000),
total_samples(shm, "total", 0),
dropped_samples(shm, "dropped", 0) {
}
void collect_samples() {
while (true) {
size_t collected = read_sensors(batch, 1000);
size_t pushed = buffer.
push_bulk({batch, collected});
if (pushed < collected) {
for (size_t i = pushed; i < collected; ++i) {
dropped_samples++;
}
}
std::this_thread::sleep_for(std::chrono::microseconds(1000));
}
}
};
T fetch_add(T arg, std::memory_order order=std::memory_order_seq_cst) noexcept
size_t push_bulk(std::span< const T > values) noexcept
Push multiple elements efficiently.
void push_overwrite(const T &value) noexcept
Force overwrite when full (converts to circular overwrite mode) Useful for continuous sensor streams ...
Data Processing Pipeline
class DataProcessor {
private:
public:
void process_pipeline() {
ProcessedData processed_batch[100];
while (true) {
size_t count = input_buffer.
pop_bulk(raw_batch);
if (count > 0) {
for (size_t i = 0; i < count; ++i) {
processed_batch[i] = process_reading(raw_batch[i]);
}
output_buffer.
push_bulk({processed_batch, count});
} else {
std::this_thread::sleep_for(std::chrono::microseconds(100));
}
}
}
ProcessedData result;
return result;
}
};
size_t pop_bulk(std::span< T > values) noexcept
Pop multiple elements efficiently.
Multi-Process Coordination
Master-Worker Pattern
class TaskScheduler {
private:
shm_atomic_uint32 workers_ready;
public:
struct Task {
uint64_t task_id;
uint32_t type;
char parameters[256];
};
struct Result {
uint64_t task_id;
uint32_t worker_id;
bool success;
char output[1024];
};
void run_master() {
while (workers_ready.load() < 4) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
for (uint64_t i = 0; i < 1000; ++i) {
Task t{.task_id = i, .type = i % 4};
process_results();
}
}
for (int i = 0; i < 1000; ++i) {
process_results();
}
}
void run_worker(uint32_t worker_id) {
workers_ready++;
while (!shutdown.
load()) {
if (
auto task = task_queue.
dequeue()) {
Result r{
.task_id = task->task_id,
.worker_id = worker_id,
.success = true
};
execute_task(*task, r);
if (shutdown.
load())
break;
std::this_thread::yield();
}
} else {
std::this_thread::sleep_for(std::chrono::microseconds(10));
}
}
}
};
void store(T value, std::memory_order order=std::memory_order_seq_cst) noexcept
std::optional< T > dequeue() noexcept
Dequeue an element (lock-free)
Error Handling & Recovery
Robust Initialization
class RobustSystem {
public:
static std::unique_ptr<RobustSystem> create(const std::string& name) {
try {
return std::make_unique<RobustSystem>(name, true);
} catch (const std::exception& e) {
try {
return std::make_unique<RobustSystem>(name, false);
} catch (...) {
cleanup_shared_memory(name);
return std::make_unique<RobustSystem>(name, true);
}
}
}
private:
static void cleanup_shared_memory(const std::string& name) {
shm_unlink(name.c_str());
}
};
Crash Recovery
class CrashResilientQueue {
private:
public:
void process_with_recovery() {
uint64_t last_id = last_processed_id.
load();
while (true) {
if (msg->id <= last_id) {
continue;
}
try {
process_message(*msg);
last_processed_id.
store(msg->id);
} catch (const std::exception& e) {
log_error(e.what());
}
}
}
}
};
Monitoring & Diagnostics
class SystemMonitor {
private:
struct Stats {
std::atomic<uint64_t> messages_processed{0};
std::atomic<uint64_t> errors_count{0};
std::atomic<uint64_t> queue_full_count{0};
std::atomic<double> avg_latency_us{0};
std::atomic<uint64_t> last_heartbeat{0};
};
public:
void monitor_system() {
while (true) {
auto now = std::chrono::system_clock::now().time_since_epoch().count();
for (
size_t i = 0; i < process_stats.
size(); ++i) {
auto& stats = process_stats[i];
uint64_t last_heartbeat = stats.last_heartbeat.load();
if (now - last_heartbeat > 5000000000) {
std::cerr << "Process " << i << " appears hung!\n";
alert_operator(i);
}
std::cout << "Process " << i << ": "
<< stats.messages_processed.load() << " messages, "
<< stats.errors_count.load() << " errors, "
<< stats.avg_latency_us.load() << " µs latency\n";
}
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
};
Best Practices
1. <strong>Always Name Your Structures</strong>
void * get_base_addr() const
Get base address for user data (after header)
2. <strong>Handle Full Conditions Gracefully</strong>
if (++retries > max_retries) {
handle_overflow();
break;
}
std::this_thread::sleep_for(backoff);
backoff *= 2;
}
3. <strong>Use Bulk Operations for Efficiency</strong>
Reading batch[100];
process_batch(batch, count);
for (int i = 0; i < 100; ++i) {
if (
auto val = buffer.
pop()) {
process_single(*val);
}
}
std::optional< T > pop() noexcept
Pop a single element.
4. <strong>Monitor System Health</strong>
while (running) {
do_work();
heartbeat.store(now());
}
while (running) {
do_work();
}
5. <strong>Plan for Growth</strong>
constexpr size_t MAX_PARTICLES =
std::getenv("MAX_PARTICLES") ?
std::atoi(std::getenv("MAX_PARTICLES")) : 10000;
constexpr size_t MAX_PARTICLES = 1000;