This post summarizes some Claude-assisted learnings from how PostgreSQL shares memory between its workers that are essentially forked processes.
- Two-layer design: Static shared memory for core structures + Dynamic shared memory for parallel operations
// Static allocation for core structures void *buffer_descriptors = ShmemInitStruct("Buffer Descriptors", size, &found); // Dynamic allocation for parallel workers dsm_segment *seg = dsm_create(1024 * 1024, 0); void *parallel_data = dsm_segment_address(seg);
- Single segment inheritance: Postmaster creates segment at fixed address, backends inherit correct pointers via fork()
// In postmaster: create at consistent virtual address ShmemBase = PGSharedMemoryCreate(total_size, &shim); // In backend: inherit same virtual address automatically // No remapping needed - pointers work across processes
- Name-based lookup: Hash table maps structure names to memory locations (https://github.com/postgres/postgres/blob/master/src/backend/storage/ipc/shmem.c#L91)
// Register named structure in shared memory index void *my_data = ShmemInitStruct("Buffer Strategy Status", sizeof(BufferStrategyControl), &found); // Later processes find by name result = hash_search(ShmemIndex, "Buffer Strategy Status", HASH_FIND, NULL);
- No deallocation: Memory never freed, hash buckets reused when items deleted
- Cache-line alignment: All allocations aligned to 64-byte boundaries to prevent false sharing (https://github.com/postgres/postgres/blob/master/src/backend/storage/ipc/shmem.c#L203)
Anti-Thundering Herd Mechanisms
- Exclusive locking with atomic check-and-allocate (https://github.com/postgres/postgres/blob/master/src/backend/storage/ipc/shmem.c#L392-L429):
LWLockAcquire(ShmemIndexLock, LW_EXCLUSIVE); // Atomic: either find existing or create new entry (never both) result = hash_search(ShmemIndex, "MyExpensiveData", HASH_ENTER_NULL, &found); if (found) { // Someone else already computed it return result->location; // Just return existing pointer } else { // We won the race - allocate and initialize ptr = ShmemAllocRaw(size, &allocated_size); result->location = ptr; // Now do expensive initialization work } LWLockRelease(ShmemIndexLock);
- Condition variables for efficient waiting (https://github.com/postgres/postgres/blob/master/src/backend/storage/lmgr/condition_variable.c):
// Waiter pattern ConditionVariablePrepareToSleep(&slot->active_cv); while (slot->active_pid != 0) // While someone else owns the slot ConditionVariableSleep(&slot->active_cv, WAIT_EVENT_REPLICATION_SLOT_DROP); ConditionVariableCancelSleep(); // Signaler pattern slot->active_pid = 0; // Release the resource ConditionVariableBroadcast(&slot->active_cv); // Wake all waiters
- Lock-free message queues: Atomic counters + memory barriers (https://github.com/postgres/postgres/blob/master/src/backend/storage/ipc/shm_mq.c#L76-L77)
// Producer atomically updates write position pg_atomic_write_u64(&mq->mq_bytes_written, new_write_pos); pg_write_barrier(); // Ensure data visible before updating counter // Consumer atomically reads without locks uint64 bytes_written = pg_atomic_read_u64(&mq->mq_bytes_written); uint64 bytes_available = bytes_written - mq_bytes_read;
TTL Implementation Patterns
- Expiry check with immediate invalidation:
bool is_expired(CacheEntry *entry) {
if (!entry->initialized) return false;
return (time(NULL) - entry->created_at) > entry->ttl_seconds;
}
// In get operation pthread_mutex_lock(&entry->mutex); if (entry->initialized && is_expired(entry)) { printf("Cache expired (age: %lds > %lds TTL)\n", time(NULL) - entry->created_at, entry->ttl_seconds); entry->initialized = false; // Treat as cache miss }
- Per-key TTL with timestamp tracking:
// When storing result entry->value = computed_result; entry->created_at = time(NULL); entry->ttl_seconds = 300; // 5 minute TTL entry->initialized = true; // When checking freshness time_t age = time(NULL) - entry->created_at; bool fresh = age <= entry->ttl_seconds;
- Combined with thundering herd protection:
if (entry->initialized && !is_expired(entry)) { return entry->value; // Fresh cache hit } if (entry->initializing) { wait_for_computation(); // Someone else recomputing expired data } // Expired + not computing = we recompute
Safety and Synchronization
- Magic number validation (https://github.com/postgres/postgres/blob/master/src/include/storage/pg_shmem.h#L32):
typedef struct PGShmemHeader {
int32 magic; // #define PGShmemMagic 679834894
pid_t creatorPID;
Size totalsize;
// ...
} PGShmemHeader;
// Validation on attach
if (header->magic != PGShmemMagic)
elog(FATAL, "wrong shared memory segment");
- Process validation with creator PID:
// Store creator info header->creatorPID = getpid(); header->device = stat_buf.st_dev; // Prevent cross-instance sharing header->inode = stat_buf.st_ino; // Validate on attach if (header->device != our_device || header->inode != our_inode) elog(FATAL, "shared memory from different data directory");
- Resource cleanup with exit callbacks (https://github.com/postgres/postgres/blob/master/src/backend/storage/ipc/ipc.c#L79-L85):
// Register cleanup function
on_shmem_exit(cleanup_my_resources, PointerGetDatum(my_data));
// Automatic cleanup on process exit
static void cleanup_my_resources(int code, Datum arg) {
MyStruct *data = (MyStruct*) DatumGetPointer(arg);
data->active_pid = 0; // Release ownership
ConditionVariableBroadcast(&data->cv); // Wake waiters
}
- Spinlock protection for short critical sections:
SpinLockAcquire(&entry->mutex); entry->reference_count++; entry->last_accessed = GetCurrentTimestamp(); SpinLockRelease(&entry->mutex);
Coordination Patterns
- Complete initialization race pattern:
pthread_mutex_lock(&entry->mutex); if (entry->initialized && !is_expired(entry)) { result = entry->value; // Cache hit pthread_mutex_unlock(&entry->mutex); return result; } if (entry->initializing) { // Another process computing while (!entry->initialized) pthread_cond_wait(&entry->init_cv, &entry->mutex); result = entry->value; pthread_mutex_unlock(&entry->mutex); return result; } // We won the race entry->initializing = true; pthread_mutex_unlock(&entry->mutex); // Expensive work OUTSIDE the lock expensive_result = compute_expensive_thing(); // Store result and notify pthread_mutex_lock(&entry->mutex); entry->value = expensive_result; entry->created_at = time(NULL); entry->initialized = true; entry->initializing = false; pthread_mutex_unlock(&entry->mutex); pthread_cond_broadcast(&entry->init_cv); // Wake all waiters
-
State transitions: uninitialized → initializing → initialized → expired → uninitialized
-
Retry with goto pattern (PostgreSQL style):
retry: LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE); if (origin->acquired_by != 0) { cv = &origin->origin_cv; LWLockRelease(ReplicationOriginLock); ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP); goto retry; // Start over } // Proceed with acquisition
Performance Optimizations
- NUMA-aware allocation: Track which memory nodes hold data for optimal CPU affinity
- Huge pages: Use 2MB/1GB pages instead of 4KB to reduce TLB pressure
- Computation outside locks: Always release locks before expensive operations:
// BAD: holding lock during expensive work pthread_mutex_lock(&mutex); expensive_result = heavy_computation(); // Blocks everyone data->value = expensive_result; pthread_mutex_unlock(&mutex); // GOOD: compute outside lock pthread_mutex_lock(&mutex); data->computing = true; pthread_mutex_unlock(&mutex); expensive_result = heavy_computation(); // Others can proceed pthread_mutex_lock(&mutex); data->value = expensive_result; data->computing = false; pthread_mutex_unlock(&mutex);
- Atomic operations for hot paths: Use lock-free techniques for frequently accessed counters
- Cache-line padding: Separate frequently modified fields to different cache lines
typedef struct {
volatile int reader_count;
char pad1[64]; // Prevent false sharing
volatile int writer_count;
char pad2[64];
} CounterStruct;
The gist
For the sake of being able to refer to it back without going over PostgreSQL again, below is a barebone implementation of the tricks from above.
Some day this might become a Ruby gem.
/*
* thundering_herd_demo.c
*
* Generic key/value storage with anti-thundering herd pattern
* using condition variables and shared memory.
*
* Features:
* - Key/value storage where values are msgpack-encoded bytes
* - Only one process computes expensive value per key
* - Other processes wait efficiently using condition variables
* - No PostgreSQL dependencies
*/
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <unistd.h>
#include <sys/mman.h>
#include <sys/wait.h>
#include <pthread.h>
#include <time.h>
#include <string.h>
#include <stdint.h>
#define MAX_ENTRIES 64
#define MAX_KEY_SIZE 48
#define MAX_VALUE_SIZE 1024
typedef struct {
char key[MAX_KEY_SIZE];
bool initialized;
bool initializing;
pthread_mutex_t mutex;
pthread_cond_t init_cv;
time_t created_at; // When value was computed
time_t ttl_seconds; // TTL in seconds
size_t value_size;
uint8_t value_data[MAX_VALUE_SIZE]; // msgpack-encoded bytes
} StorageEntry;
typedef struct {
pthread_mutex_t global_mutex;
int num_entries;
StorageEntry entries[MAX_ENTRIES];
} SharedStorage;
static SharedStorage *storage;
/*
* Simple msgpack encoding for demo (just length-prefixed data)
* In real implementation, use proper msgpack library
*/
static size_t encode_int_to_msgpack(int value, uint8_t *buffer)
{
// Simple encoding: 4-byte int in network byte order
buffer[0] = (value >> 24) & 0xFF;
buffer[1] = (value >> 16) & 0xFF;
buffer[2] = (value >> 8) & 0xFF;
buffer[3] = value & 0xFF;
return 4;
}
static int decode_int_from_msgpack(const uint8_t *buffer, size_t size)
{
if (size < 4) return -1;
return (buffer[0] << 24) | (buffer[1] << 16) | (buffer[2] << 8) | buffer[3];
}
/*
* Check if entry is expired based on TTL
*/
static bool is_entry_expired(StorageEntry *entry)
{
if (!entry->initialized) return false;
time_t now = time(NULL);
return (now - entry->created_at) > entry->ttl_seconds;
}
/*
* Find or create entry for given key
*/
static StorageEntry* find_or_create_entry(const char *key)
{
pthread_mutex_lock(&storage->global_mutex);
// Look for existing entry
for (int i = 0; i < storage->num_entries; i++) {
if (strcmp(storage->entries[i].key, key) == 0) {
pthread_mutex_unlock(&storage->global_mutex);
return &storage->entries[i];
}
}
// Create new entry if space available
if (storage->num_entries < MAX_ENTRIES) {
StorageEntry *entry = &storage->entries[storage->num_entries];
strncpy(entry->key, key, MAX_KEY_SIZE - 1);
entry->key[MAX_KEY_SIZE - 1] = '\0';
entry->initialized = false;
entry->initializing = false;
entry->created_at = 0;
entry->ttl_seconds = 3; // Default 3 second TTL
entry->value_size = 0;
// Initialize per-entry synchronization
pthread_mutexattr_t mutex_attr;
pthread_mutexattr_init(&mutex_attr);
pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED);
pthread_mutex_init(&entry->mutex, &mutex_attr);
pthread_condattr_t cond_attr;
pthread_condattr_init(&cond_attr);
pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED);
pthread_cond_init(&entry->init_cv, &cond_attr);
storage->num_entries++;
pthread_mutex_unlock(&storage->global_mutex);
return entry;
}
pthread_mutex_unlock(&storage->global_mutex);
return NULL; // No space
}
/*
* Simulate expensive calculation for a given key
*/
static int calculate_expensive_value(const char *key, int worker_id)
{
printf("Worker %d: Computing expensive value for key '%s'...\n", worker_id, key);
sleep(1); // Simulate expensive work
// Generate "expensive" result based on key hash
int hash = 0;
for (const char *p = key; *p; p++) {
hash = hash * 31 + *p;
}
int result = abs(hash) % 1000 + worker_id;
printf("Worker %d: Finished computing for key '%s', result = %d\n",
worker_id, key, result);
return result;
}
/*
* Get value for key, computing it if necessary (anti-thundering herd)
* Supports TTL - expired entries are treated as cache misses
*/
static int get_value(const char *key, int worker_id)
{
StorageEntry *entry = find_or_create_entry(key);
if (!entry) {
printf("Worker %d: No space for key '%s'\n", worker_id, key);
return -1;
}
pthread_mutex_lock(&entry->mutex);
if (entry->initialized && !is_entry_expired(entry)) {
// Value already computed and still fresh
int result = decode_int_from_msgpack(entry->value_data, entry->value_size);
time_t age = time(NULL) - entry->created_at;
pthread_mutex_unlock(&entry->mutex);
printf("Worker %d: Key '%s' cache hit, result = %d (age: %lds)\n",
worker_id, key, result, age);
return result;
}
if (entry->initialized && is_entry_expired(entry)) {
// Value expired - treat as cache miss
printf("Worker %d: Key '%s' expired (age: %lds > %lds TTL), will recompute\n",
worker_id, key, time(NULL) - entry->created_at, entry->ttl_seconds);
entry->initialized = false; // Mark as needing recomputation
}
if (entry->initializing) {
// Someone else is computing - wait
printf("Worker %d: Key '%s' being computed by another worker, waiting...\n",
worker_id, key);
while (!entry->initialized) {
pthread_cond_wait(&entry->init_cv, &entry->mutex);
}
int result = decode_int_from_msgpack(entry->value_data, entry->value_size);
pthread_mutex_unlock(&entry->mutex);
printf("Worker %d: Key '%s' computation complete, result = %d\n",
worker_id, key, result);
return result;
}
// We won the race - compute the value
printf("Worker %d: Won race for key '%s', will compute\n", worker_id, key);
entry->initializing = true;
pthread_mutex_unlock(&entry->mutex);
// Do expensive computation outside the lock
int computed_value = calculate_expensive_value(key, worker_id);
// Encode result as msgpack and store
uint8_t encoded_data[MAX_VALUE_SIZE];
size_t encoded_size = encode_int_to_msgpack(computed_value, encoded_data);
pthread_mutex_lock(&entry->mutex);
memcpy(entry->value_data, encoded_data, encoded_size);
entry->value_size = encoded_size;
entry->created_at = time(NULL);
entry->initialized = true;
pthread_mutex_unlock(&entry->mutex);
// Wake up all waiting processes
pthread_cond_broadcast(&entry->init_cv);
printf("Worker %d: Stored result for key '%s', notified waiters\n",
worker_id, key);
return computed_value;
}
/*
* Worker process that requests multiple keys
*/
static void worker_process(int worker_id)
{
printf("Worker %d: Started (PID %d)\n", worker_id, getpid());
// Each worker requests different combinations of keys
const char *keys[] = {"config", "stats", "cache", "metadata"};
int num_keys = sizeof(keys) / sizeof(keys[0]);
// Add random delay to increase race conditions
usleep((rand() % 200) * 1000);
// Request 2-3 keys per worker
int start_key = worker_id % num_keys;
int num_requests = 2 + (worker_id % 2);
for (int i = 0; i < num_requests; i++) {
const char *key = keys[(start_key + i) % num_keys];
int result = get_value(key, worker_id);
printf("Worker %d: Got result %d for key '%s'\n", worker_id, result, key);
// Small delay between requests
usleep(100000);
}
// Wait a bit, then re-request same keys to test TTL
printf("Worker %d: Waiting 4 seconds to test TTL expiration...\n", worker_id);
sleep(4);
printf("Worker %d: Re-requesting keys after TTL expiration...\n", worker_id);
for (int i = 0; i < num_requests; i++) {
const char *key = keys[(start_key + i) % num_keys];
int result = get_value(key, worker_id);
printf("Worker %d: Re-got result %d for key '%s'\n", worker_id, result, key);
}
printf("Worker %d: Completed all requests\n", worker_id);
}
/*
* Print final storage state
*/
static void print_storage_state(void)
{
printf("\n=== Final Storage State ===\n");
time_t now = time(NULL);
for (int i = 0; i < storage->num_entries; i++) {
StorageEntry *entry = &storage->entries[i];
if (entry->initialized) {
int value = decode_int_from_msgpack(entry->value_data, entry->value_size);
time_t age = now - entry->created_at;
bool expired = is_entry_expired(entry);
printf("Key: '%s' -> Value: %d (age: %lds, TTL: %lds) %s\n",
entry->key, value, age, entry->ttl_seconds,
expired ? "[EXPIRED]" : "[FRESH]");
}
}
printf("===========================\n");
}
int main(int argc, char *argv[])
{
const int num_workers = 2;
pid_t pids[num_workers];
printf("Creating shared key/value storage with anti-thundering herd...\n");
// Create shared memory segment
storage = mmap(NULL, sizeof(SharedStorage),
PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0);
if (storage == MAP_FAILED) {
perror("mmap failed");
exit(1);
}
// Initialize shared storage
memset(storage, 0, sizeof(SharedStorage));
pthread_mutexattr_t mutex_attr;
pthread_mutexattr_init(&mutex_attr);
pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED);
pthread_mutex_init(&storage->global_mutex, &mutex_attr);
storage->num_entries = 0;
printf("Spawning %d worker processes...\n", num_workers);
// Fork multiple worker processes
for (int i = 0; i < num_workers; i++) {
pid_t pid = fork();
if (pid == 0) {
// Child process
srand(time(NULL) + getpid());
worker_process(i + 1);
exit(0);
} else if (pid > 0) {
pids[i] = pid;
} else {
perror("fork failed");
exit(1);
}
}
// Wait for all workers to complete
for (int i = 0; i < num_workers; i++) {
int status;
waitpid(pids[i], &status, 0);
}
print_storage_state();
// Cleanup
for (int i = 0; i < storage->num_entries; i++) {
pthread_mutex_destroy(&storage->entries[i].mutex);
pthread_cond_destroy(&storage->entries[i].init_cv);
}
pthread_mutex_destroy(&storage->global_mutex);
munmap(storage, sizeof(SharedStorage));
printf("Demo completed successfully!\n");
return 0;
}