Sharing memory between processes: learnings from Postgres

This post summarizes some Claude-assisted learnings from how PostgreSQL shares memory between its workers that are essentially forked processes.

  • Two-layer design: Static shared memory for core structures + Dynamic shared memory for parallel operations
// Static allocation for core structures
void *buffer_descriptors = ShmemInitStruct("Buffer Descriptors", size, &found);

// Dynamic allocation for parallel workers
dsm_segment *seg = dsm_create(1024 * 1024, 0);
void *parallel_data = dsm_segment_address(seg);
  • Single segment inheritance: Postmaster creates segment at fixed address, backends inherit correct pointers via fork()
// In postmaster: create at consistent virtual address
ShmemBase = PGSharedMemoryCreate(total_size, &shim);

// In backend: inherit same virtual address automatically
// No remapping needed - pointers work across processes
// Register named structure in shared memory index
void *my_data = ShmemInitStruct("Buffer Strategy Status", sizeof(BufferStrategyControl), &found);

// Later processes find by name
result = hash_search(ShmemIndex, "Buffer Strategy Status", HASH_FIND, NULL);

Anti-Thundering Herd Mechanisms

LWLockAcquire(ShmemIndexLock, LW_EXCLUSIVE);

// Atomic: either find existing or create new entry (never both)
result = hash_search(ShmemIndex, "MyExpensiveData", HASH_ENTER_NULL, &found);

if (found) {
    // Someone else already computed it
    return result->location;  // Just return existing pointer
} else {
    // We won the race - allocate and initialize
    ptr = ShmemAllocRaw(size, &allocated_size);
    result->location = ptr;
    // Now do expensive initialization work
}
LWLockRelease(ShmemIndexLock);
// Waiter pattern
ConditionVariablePrepareToSleep(&slot->active_cv);
while (slot->active_pid != 0)  // While someone else owns the slot
    ConditionVariableSleep(&slot->active_cv, WAIT_EVENT_REPLICATION_SLOT_DROP);
ConditionVariableCancelSleep();

// Signaler pattern
slot->active_pid = 0;  // Release the resource
ConditionVariableBroadcast(&slot->active_cv);  // Wake all waiters
// Producer atomically updates write position
pg_atomic_write_u64(&mq->mq_bytes_written, new_write_pos);
pg_write_barrier();  // Ensure data visible before updating counter

// Consumer atomically reads without locks
uint64 bytes_written = pg_atomic_read_u64(&mq->mq_bytes_written);
uint64 bytes_available = bytes_written - mq_bytes_read;

TTL Implementation Patterns

  • Expiry check with immediate invalidation:
bool is_expired(CacheEntry *entry) {
    if (!entry->initialized) return false;
    return (time(NULL) - entry->created_at) > entry->ttl_seconds;
}
// In get operation
pthread_mutex_lock(&entry->mutex);
if (entry->initialized && is_expired(entry)) {
    printf("Cache expired (age: %lds > %lds TTL)\n",
            time(NULL) - entry->created_at, entry->ttl_seconds);
    entry->initialized = false;  // Treat as cache miss
}
  • Per-key TTL with timestamp tracking:
// When storing result
entry->value = computed_result;
entry->created_at = time(NULL);
entry->ttl_seconds = 300;  // 5 minute TTL
entry->initialized = true;

// When checking freshness
time_t age = time(NULL) - entry->created_at;
bool fresh = age <= entry->ttl_seconds;
  • Combined with thundering herd protection:
if (entry->initialized && !is_expired(entry)) {
    return entry->value;  // Fresh cache hit
}
if (entry->initializing) {
    wait_for_computation();  // Someone else recomputing expired data
}
// Expired + not computing = we recompute

Safety and Synchronization

typedef struct PGShmemHeader {
    int32 magic;  // #define PGShmemMagic 679834894
    pid_t creatorPID;
    Size totalsize;
    // ...
} PGShmemHeader;

// Validation on attach
if (header->magic != PGShmemMagic)
    elog(FATAL, "wrong shared memory segment");
  • Process validation with creator PID:
// Store creator info
header->creatorPID = getpid();
header->device = stat_buf.st_dev;     // Prevent cross-instance sharing
header->inode = stat_buf.st_ino;

// Validate on attach
if (header->device != our_device || header->inode != our_inode)
    elog(FATAL, "shared memory from different data directory");
// Register cleanup function
on_shmem_exit(cleanup_my_resources, PointerGetDatum(my_data));

// Automatic cleanup on process exit
static void cleanup_my_resources(int code, Datum arg) {
    MyStruct *data = (MyStruct*) DatumGetPointer(arg);
    data->active_pid = 0;  // Release ownership
    ConditionVariableBroadcast(&data->cv);  // Wake waiters
}
  • Spinlock protection for short critical sections:
SpinLockAcquire(&entry->mutex);
entry->reference_count++;
entry->last_accessed = GetCurrentTimestamp();
SpinLockRelease(&entry->mutex);

Coordination Patterns

  • Complete initialization race pattern:
pthread_mutex_lock(&entry->mutex);

if (entry->initialized && !is_expired(entry)) {
    result = entry->value;  // Cache hit
    pthread_mutex_unlock(&entry->mutex);
    return result;
}

if (entry->initializing) {
    // Another process computing
    while (!entry->initialized)
        pthread_cond_wait(&entry->init_cv, &entry->mutex);
    result = entry->value;
    pthread_mutex_unlock(&entry->mutex);
    return result;
}

// We won the race
entry->initializing = true;
pthread_mutex_unlock(&entry->mutex);

// Expensive work OUTSIDE the lock
expensive_result = compute_expensive_thing();

// Store result and notify
pthread_mutex_lock(&entry->mutex);
entry->value = expensive_result;
entry->created_at = time(NULL);
entry->initialized = true;
entry->initializing = false;
pthread_mutex_unlock(&entry->mutex);

pthread_cond_broadcast(&entry->init_cv);  // Wake all waiters
  • State transitions: uninitialized → initializing → initialized → expired → uninitialized

  • Retry with goto pattern (PostgreSQL style):

retry:
LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
if (origin->acquired_by != 0) {
    cv = &origin->origin_cv;
    LWLockRelease(ReplicationOriginLock);
    ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
    goto retry;  // Start over
}
// Proceed with acquisition

Performance Optimizations

  • NUMA-aware allocation: Track which memory nodes hold data for optimal CPU affinity
  • Huge pages: Use 2MB/1GB pages instead of 4KB to reduce TLB pressure
  • Computation outside locks: Always release locks before expensive operations:
// BAD: holding lock during expensive work
pthread_mutex_lock(&mutex);
expensive_result = heavy_computation();  // Blocks everyone
data->value = expensive_result;
pthread_mutex_unlock(&mutex);

// GOOD: compute outside lock
pthread_mutex_lock(&mutex);
data->computing = true;
pthread_mutex_unlock(&mutex);

expensive_result = heavy_computation();  // Others can proceed

pthread_mutex_lock(&mutex);
data->value = expensive_result;
data->computing = false;
pthread_mutex_unlock(&mutex);
  • Atomic operations for hot paths: Use lock-free techniques for frequently accessed counters
  • Cache-line padding: Separate frequently modified fields to different cache lines
typedef struct {
    volatile int reader_count;
    char pad1[64];  // Prevent false sharing
    volatile int writer_count;
    char pad2[64];
} CounterStruct;

The gist

For the sake of being able to refer to it back without going over PostgreSQL again, below is a barebone implementation of the tricks from above.

Some day this might become a Ruby gem.

/*
 * thundering_herd_demo.c
 * 
 * Generic key/value storage with anti-thundering herd pattern
 * using condition variables and shared memory.
 * 
 * Features:
 * - Key/value storage where values are msgpack-encoded bytes
 * - Only one process computes expensive value per key
 * - Other processes wait efficiently using condition variables
 * - No PostgreSQL dependencies
 */

#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <unistd.h>
#include <sys/mman.h>
#include <sys/wait.h>
#include <pthread.h>
#include <time.h>
#include <string.h>
#include <stdint.h>

#define MAX_ENTRIES 64
#define MAX_KEY_SIZE 48
#define MAX_VALUE_SIZE 1024

typedef struct {
    char key[MAX_KEY_SIZE];
    bool initialized;
    bool initializing;
    pthread_mutex_t mutex;
    pthread_cond_t init_cv;
    time_t created_at;  // When value was computed
    time_t ttl_seconds; // TTL in seconds
    size_t value_size;
    uint8_t value_data[MAX_VALUE_SIZE];  // msgpack-encoded bytes
} StorageEntry;

typedef struct {
    pthread_mutex_t global_mutex;
    int num_entries;
    StorageEntry entries[MAX_ENTRIES];
} SharedStorage;

static SharedStorage *storage;

/*
 * Simple msgpack encoding for demo (just length-prefixed data)
 * In real implementation, use proper msgpack library
 */
static size_t encode_int_to_msgpack(int value, uint8_t *buffer)
{
    // Simple encoding: 4-byte int in network byte order
    buffer[0] = (value >> 24) & 0xFF;
    buffer[1] = (value >> 16) & 0xFF;
    buffer[2] = (value >> 8) & 0xFF;
    buffer[3] = value & 0xFF;
    return 4;
}

static int decode_int_from_msgpack(const uint8_t *buffer, size_t size)
{
    if (size < 4) return -1;
    return (buffer[0] << 24) | (buffer[1] << 16) | (buffer[2] << 8) | buffer[3];
}

/*
 * Check if entry is expired based on TTL
 */
static bool is_entry_expired(StorageEntry *entry)
{
    if (!entry->initialized) return false;
    
    time_t now = time(NULL);
    return (now - entry->created_at) > entry->ttl_seconds;
}

/*
 * Find or create entry for given key
 */
static StorageEntry* find_or_create_entry(const char *key)
{
    pthread_mutex_lock(&storage->global_mutex);
    
    // Look for existing entry
    for (int i = 0; i < storage->num_entries; i++) {
        if (strcmp(storage->entries[i].key, key) == 0) {
            pthread_mutex_unlock(&storage->global_mutex);
            return &storage->entries[i];
        }
    }
    
    // Create new entry if space available
    if (storage->num_entries < MAX_ENTRIES) {
        StorageEntry *entry = &storage->entries[storage->num_entries];
        strncpy(entry->key, key, MAX_KEY_SIZE - 1);
        entry->key[MAX_KEY_SIZE - 1] = '\0';
        entry->initialized = false;
        entry->initializing = false;
        entry->created_at = 0;
        entry->ttl_seconds = 3;  // Default 3 second TTL
        entry->value_size = 0;
        
        // Initialize per-entry synchronization
        pthread_mutexattr_t mutex_attr;
        pthread_mutexattr_init(&mutex_attr);
        pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED);
        pthread_mutex_init(&entry->mutex, &mutex_attr);
        
        pthread_condattr_t cond_attr;
        pthread_condattr_init(&cond_attr);
        pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED);
        pthread_cond_init(&entry->init_cv, &cond_attr);
        
        storage->num_entries++;
        pthread_mutex_unlock(&storage->global_mutex);
        return entry;
    }
    
    pthread_mutex_unlock(&storage->global_mutex);
    return NULL;  // No space
}

/*
 * Simulate expensive calculation for a given key
 */
static int calculate_expensive_value(const char *key, int worker_id)
{
    printf("Worker %d: Computing expensive value for key '%s'...\n", worker_id, key);
    sleep(1);  // Simulate expensive work
    
    // Generate "expensive" result based on key hash
    int hash = 0;
    for (const char *p = key; *p; p++) {
        hash = hash * 31 + *p;
    }
    int result = abs(hash) % 1000 + worker_id;
    
    printf("Worker %d: Finished computing for key '%s', result = %d\n", 
           worker_id, key, result);
    return result;
}

/*
 * Get value for key, computing it if necessary (anti-thundering herd)
 * Supports TTL - expired entries are treated as cache misses
 */
static int get_value(const char *key, int worker_id)
{
    StorageEntry *entry = find_or_create_entry(key);
    if (!entry) {
        printf("Worker %d: No space for key '%s'\n", worker_id, key);
        return -1;
    }
    
    pthread_mutex_lock(&entry->mutex);
    
    if (entry->initialized && !is_entry_expired(entry)) {
        // Value already computed and still fresh
        int result = decode_int_from_msgpack(entry->value_data, entry->value_size);
        time_t age = time(NULL) - entry->created_at;
        pthread_mutex_unlock(&entry->mutex);
        printf("Worker %d: Key '%s' cache hit, result = %d (age: %lds)\n", 
               worker_id, key, result, age);
        return result;
    }
    
    if (entry->initialized && is_entry_expired(entry)) {
        // Value expired - treat as cache miss
        printf("Worker %d: Key '%s' expired (age: %lds > %lds TTL), will recompute\n", 
               worker_id, key, time(NULL) - entry->created_at, entry->ttl_seconds);
        entry->initialized = false;  // Mark as needing recomputation
    }
    
    if (entry->initializing) {
        // Someone else is computing - wait
        printf("Worker %d: Key '%s' being computed by another worker, waiting...\n", 
               worker_id, key);
        
        while (!entry->initialized) {
            pthread_cond_wait(&entry->init_cv, &entry->mutex);
        }
        
        int result = decode_int_from_msgpack(entry->value_data, entry->value_size);
        pthread_mutex_unlock(&entry->mutex);
        printf("Worker %d: Key '%s' computation complete, result = %d\n", 
               worker_id, key, result);
        return result;
    }
    
    // We won the race - compute the value
    printf("Worker %d: Won race for key '%s', will compute\n", worker_id, key);
    entry->initializing = true;
    pthread_mutex_unlock(&entry->mutex);
    
    // Do expensive computation outside the lock
    int computed_value = calculate_expensive_value(key, worker_id);
    
    // Encode result as msgpack and store
    uint8_t encoded_data[MAX_VALUE_SIZE];
    size_t encoded_size = encode_int_to_msgpack(computed_value, encoded_data);
    
    pthread_mutex_lock(&entry->mutex);
    memcpy(entry->value_data, encoded_data, encoded_size);
    entry->value_size = encoded_size;
    entry->created_at = time(NULL);
    entry->initialized = true;
    pthread_mutex_unlock(&entry->mutex);
    
    // Wake up all waiting processes
    pthread_cond_broadcast(&entry->init_cv);
    printf("Worker %d: Stored result for key '%s', notified waiters\n", 
           worker_id, key);
    
    return computed_value;
}

/*
 * Worker process that requests multiple keys
 */
static void worker_process(int worker_id)
{
    printf("Worker %d: Started (PID %d)\n", worker_id, getpid());
    
    // Each worker requests different combinations of keys
    const char *keys[] = {"config", "stats", "cache", "metadata"};
    int num_keys = sizeof(keys) / sizeof(keys[0]);
    
    // Add random delay to increase race conditions
    usleep((rand() % 200) * 1000);
    
    // Request 2-3 keys per worker
    int start_key = worker_id % num_keys;
    int num_requests = 2 + (worker_id % 2);
    
    for (int i = 0; i < num_requests; i++) {
        const char *key = keys[(start_key + i) % num_keys];
        int result = get_value(key, worker_id);
        printf("Worker %d: Got result %d for key '%s'\n", worker_id, result, key);
        
        // Small delay between requests
        usleep(100000);
    }
    
    // Wait a bit, then re-request same keys to test TTL
    printf("Worker %d: Waiting 4 seconds to test TTL expiration...\n", worker_id);
    sleep(4);
    
    printf("Worker %d: Re-requesting keys after TTL expiration...\n", worker_id);
    for (int i = 0; i < num_requests; i++) {
        const char *key = keys[(start_key + i) % num_keys];
        int result = get_value(key, worker_id);
        printf("Worker %d: Re-got result %d for key '%s'\n", worker_id, result, key);
    }
    
    printf("Worker %d: Completed all requests\n", worker_id);
}

/*
 * Print final storage state
 */
static void print_storage_state(void)
{
    printf("\n=== Final Storage State ===\n");
    time_t now = time(NULL);
    for (int i = 0; i < storage->num_entries; i++) {
        StorageEntry *entry = &storage->entries[i];
        if (entry->initialized) {
            int value = decode_int_from_msgpack(entry->value_data, entry->value_size);
            time_t age = now - entry->created_at;
            bool expired = is_entry_expired(entry);
            printf("Key: '%s' -> Value: %d (age: %lds, TTL: %lds) %s\n", 
                   entry->key, value, age, entry->ttl_seconds, 
                   expired ? "[EXPIRED]" : "[FRESH]");
        }
    }
    printf("===========================\n");
}

int main(int argc, char *argv[])
{
    const int num_workers = 2;
    pid_t pids[num_workers];
    
    printf("Creating shared key/value storage with anti-thundering herd...\n");
    
    // Create shared memory segment
    storage = mmap(NULL, sizeof(SharedStorage), 
                   PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0);
    if (storage == MAP_FAILED) {
        perror("mmap failed");
        exit(1);
    }
    
    // Initialize shared storage
    memset(storage, 0, sizeof(SharedStorage));
    
    pthread_mutexattr_t mutex_attr;
    pthread_mutexattr_init(&mutex_attr);
    pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED);
    pthread_mutex_init(&storage->global_mutex, &mutex_attr);
    
    storage->num_entries = 0;
    
    printf("Spawning %d worker processes...\n", num_workers);
    
    // Fork multiple worker processes
    for (int i = 0; i < num_workers; i++) {
        pid_t pid = fork();
        if (pid == 0) {
            // Child process
            srand(time(NULL) + getpid());
            worker_process(i + 1);
            exit(0);
        } else if (pid > 0) {
            pids[i] = pid;
        } else {
            perror("fork failed");
            exit(1);
        }
    }
    
    // Wait for all workers to complete
    for (int i = 0; i < num_workers; i++) {
        int status;
        waitpid(pids[i], &status, 0);
    }
    
    print_storage_state();
    
    // Cleanup
    for (int i = 0; i < storage->num_entries; i++) {
        pthread_mutex_destroy(&storage->entries[i].mutex);
        pthread_cond_destroy(&storage->entries[i].init_cv);
    }
    pthread_mutex_destroy(&storage->global_mutex);
    munmap(storage, sizeof(SharedStorage));
    
    printf("Demo completed successfully!\n");
    return 0;
}
Written in August 2025.
Kir Shatrov

Kir Shatrov helps businesses to grow by scaling the infrastructure. He writes about software, scalability and the ecosystem. Follow him on Twitter to get the latest updates.