Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 131 additions & 54 deletions src/memory_prefetch.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ typedef enum {
└───────────────────────┘
**********************************************************************************************************************/

typedef void *(*GetValueDataFunc)(const void *val);

typedef struct KeyPrefetchInfo {
PrefetchState state; /* Current state of the prefetch operation */
HashTableIndex ht_idx; /* Index of the current hash table (0 or 1 for rehashing) */
Expand All @@ -68,6 +66,18 @@ typedef struct KeyPrefetchInfo {
kvobj *current_kv; /* Pointer to the kv object being prefetched */
} KeyPrefetchInfo;

/* DictPrefetchCtx holds the minimal state needed to run the dict prefetch
* state machine. It is used by both the cross-command batch path and the
* new intra-command dictPrefetchKeys() API. */
typedef struct DictPrefetchCtx {
size_t cur_idx; /* Round-robin index into key arrays */
size_t key_count; /* Number of keys being prefetched */
void **keys; /* Array of key pointers (sds) */
dict **dicts; /* Per-key dictionary pointers */
KeyPrefetchInfo *info; /* Per-key prefetch state */
PrefetchGetValueDataFunc get_val_data; /* Optional value-data callback */
} DictPrefetchCtx;

/* PrefetchCommandsBatch structure holds the state of the current batch of client commands being processed. */
typedef struct PrefetchCommandsBatch {
size_t cur_idx; /* Index of the current key being processed */
Expand All @@ -79,9 +89,7 @@ typedef struct PrefetchCommandsBatch {
client **clients; /* Array of clients in the current batch */
pendingCommand **pending_cmds; /* Array of pending commands in the current batch */
dict **keys_dicts; /* Main dict for each key */
dict **current_dicts; /* Points to dict to prefetch from */
KeyPrefetchInfo *prefetch_info; /* Prefetch info for each key */
GetValueDataFunc get_value_data_func; /* Function to get the value data */
} PrefetchCommandsBatch;

static PrefetchCommandsBatch *batch = NULL;
Expand Down Expand Up @@ -131,86 +139,83 @@ void onMaxBatchSizeChange(void) {
prefetchCommandsBatchInit();
}

/* Prefetch the given pointer and move to the next key in the batch. */
static inline void prefetchAndMoveToNextKey(void *addr) {
/* ---- State machine helpers operating on DictPrefetchCtx ---- */

/* Prefetch the given pointer and advance to the next key (round-robin). */
static inline void ctxPrefetchAndAdvance(DictPrefetchCtx *ctx, void *addr) {
redis_prefetch_read(addr);
/* While the prefetch is in progress, we can continue to the next key */
batch->cur_idx = (batch->cur_idx + 1) % batch->key_count;
ctx->cur_idx = (ctx->cur_idx + 1) % ctx->key_count;
}

static inline void markKeyAsdone(KeyPrefetchInfo *info) {
static inline void ctxMarkDone(KeyPrefetchInfo *info) {
info->state = PREFETCH_DONE;
server.stat_total_prefetch_entries++;
}

/* Returns the next KeyPrefetchInfo structure that needs to be processed. */
static KeyPrefetchInfo *getNextPrefetchInfo(void) {
size_t start_idx = batch->cur_idx;
/* Return the next KeyPrefetchInfo that still needs work, or NULL if all done. */
static KeyPrefetchInfo *ctxNextInfo(DictPrefetchCtx *ctx) {
size_t start = ctx->cur_idx;
do {
KeyPrefetchInfo *info = &batch->prefetch_info[batch->cur_idx];
KeyPrefetchInfo *info = &ctx->info[ctx->cur_idx];
if (info->state != PREFETCH_DONE) return info;
batch->cur_idx = (batch->cur_idx + 1) % batch->key_count;
} while (batch->cur_idx != start_idx);
ctx->cur_idx = (ctx->cur_idx + 1) % ctx->key_count;
} while (ctx->cur_idx != start);
return NULL;
}

static void initBatchInfo(dict **dicts, GetValueDataFunc func) {
batch->current_dicts = dicts;
batch->get_value_data_func = func;

/* Initialize the prefetch info */
for (size_t i = 0; i < batch->key_count; i++) {
KeyPrefetchInfo *info = &batch->prefetch_info[i];
if (!batch->current_dicts[i] || dictSize(batch->current_dicts[i]) == 0) {
static void ctxInit(DictPrefetchCtx *ctx) {
for (size_t i = 0; i < ctx->key_count; i++) {
KeyPrefetchInfo *info = &ctx->info[i];
if (!ctx->dicts[i] || dictSize(ctx->dicts[i]) == 0) {
info->state = PREFETCH_DONE;
continue;
}
info->ht_idx = HT_IDX_INVALID;
info->current_entry = NULL;
info->current_kv = NULL;
info->state = PREFETCH_BUCKET;
info->key_hash = dictGetHash(batch->current_dicts[i], batch->keys[i]);
info->key_hash = dictGetHash(ctx->dicts[i], ctx->keys[i]);
}
}

/* Prefetch the bucket of the next hash table index.
* If no tables are left, move to the PREFETCH_DONE state. */
static void prefetchBucket(KeyPrefetchInfo *info) {
size_t i = batch->cur_idx;
static void ctxPrefetchBucket(DictPrefetchCtx *ctx, KeyPrefetchInfo *info) {
size_t i = ctx->cur_idx;

/* Determine which hash table to use */
if (info->ht_idx == HT_IDX_INVALID) {
info->ht_idx = HT_IDX_FIRST;
} else if (info->ht_idx == HT_IDX_FIRST && dictIsRehashing(batch->current_dicts[i])) {
} else if (info->ht_idx == HT_IDX_FIRST && dictIsRehashing(ctx->dicts[i])) {
info->ht_idx = HT_IDX_SECOND;
} else {
/* No more tables left - mark as done. */
markKeyAsdone(info);
ctxMarkDone(info);
return;
}

/* Prefetch the bucket */
info->bucket_idx = info->key_hash & DICTHT_SIZE_MASK(batch->current_dicts[i]->ht_size_exp[info->ht_idx]);
prefetchAndMoveToNextKey(&batch->current_dicts[i]->ht_table[info->ht_idx][info->bucket_idx]);
info->bucket_idx = info->key_hash & DICTHT_SIZE_MASK(ctx->dicts[i]->ht_size_exp[info->ht_idx]);
ctxPrefetchAndAdvance(ctx, &ctx->dicts[i]->ht_table[info->ht_idx][info->bucket_idx]);
info->current_entry = NULL;
info->state = PREFETCH_ENTRY;
}

/* Prefetch the entry in the bucket and move to the PREFETCH_KVOBJ state.
* If no more entries in the bucket, move to the PREFETCH_BUCKET state to look at the next table. */
static void prefetchEntry(KeyPrefetchInfo *info) {
size_t i = batch->cur_idx;
static void ctxPrefetchEntry(DictPrefetchCtx *ctx, KeyPrefetchInfo *info) {
size_t i = ctx->cur_idx;

if (info->current_entry) {
/* We already found an entry in the bucket - move to the next entry */
info->current_entry = dictGetNext(info->current_entry);
} else {
/* Go to the first entry in the bucket */
info->current_entry = batch->current_dicts[i]->ht_table[info->ht_idx][info->bucket_idx];
info->current_entry = ctx->dicts[i]->ht_table[info->ht_idx][info->bucket_idx];
}

if (info->current_entry) {
prefetchAndMoveToNextKey(info->current_entry);
ctxPrefetchAndAdvance(ctx, info->current_entry);
info->current_kv = NULL;
info->state = PREFETCH_KVOBJ;
} else {
Expand All @@ -219,33 +224,33 @@ static void prefetchEntry(KeyPrefetchInfo *info) {
}
}

/* Prefetch the kv object in the dict entry, and to the PREFETCH_VALDATA state. */
static inline void prefetchKVOject(KeyPrefetchInfo *info) {
/* Prefetch the kv object in the dict entry, and move to the PREFETCH_VALDATA state. */
static inline void ctxPrefetchKvobj(DictPrefetchCtx *ctx, KeyPrefetchInfo *info) {
kvobj *kv = dictGetKey(info->current_entry);
int is_kv = dictEntryIsKey(info->current_entry);

info->current_kv = kv;
info->state = PREFETCH_VALDATA;
/* If the entry is a pointer of kv object, we don't need to prefetch it */
if (!is_kv) prefetchAndMoveToNextKey(kv);
if (!is_kv) ctxPrefetchAndAdvance(ctx, kv);
}

/* Prefetch the value data of the kv object found in dict entry. */
static void prefetchValueData(KeyPrefetchInfo *info) {
size_t i = batch->cur_idx;
static void ctxPrefetchValdata(DictPrefetchCtx *ctx, KeyPrefetchInfo *info) {
size_t i = ctx->cur_idx;
kvobj *kv = info->current_kv;
sds key = kvobjGetKey(kv);

/* 1. If this is the last element, we assume a hit and don't compare the keys
* 2. This kv object is the target of the lookup. */
if ((!dictGetNext(info->current_entry) && !dictIsRehashing(batch->current_dicts[i])) ||
dictCompareKeys(batch->current_dicts[i], batch->keys[i], key))
if ((!dictGetNext(info->current_entry) && !dictIsRehashing(ctx->dicts[i])) ||
dictCompareKeys(ctx->dicts[i], ctx->keys[i], key))
{
if (batch->get_value_data_func) {
void *value_data = batch->get_value_data_func(kv);
if (value_data) prefetchAndMoveToNextKey(value_data);
if (ctx->get_val_data) {
void *value_data = ctx->get_val_data(kv);
if (value_data) ctxPrefetchAndAdvance(ctx, value_data);
}
markKeyAsdone(info);
ctxMarkDone(info);
} else {
/* Not found in the current entry, move to the next entry */
info->state = PREFETCH_ENTRY;
Expand Down Expand Up @@ -273,22 +278,39 @@ static void prefetchValueData(KeyPrefetchInfo *info) {
* get_val_data_func - A callback function that dictPrefetch can invoke
* to bring the key's value data closer to the L1 cache as well.
*/
static void dictPrefetch(dict **dicts, GetValueDataFunc get_val_data_func) {
initBatchInfo(dicts, get_val_data_func);
/* Run the prefetch state machine over all keys in the context until every
* key reaches PREFETCH_DONE. */
static void dictPrefetchRun(DictPrefetchCtx *ctx) {
KeyPrefetchInfo *info;
while ((info = getNextPrefetchInfo())) {
while ((info = ctxNextInfo(ctx))) {
switch (info->state) {
case PREFETCH_BUCKET: prefetchBucket(info); break;
case PREFETCH_ENTRY: prefetchEntry(info); break;
case PREFETCH_KVOBJ: prefetchKVOject(info); break;
case PREFETCH_VALDATA: prefetchValueData(info); break;
case PREFETCH_BUCKET: ctxPrefetchBucket(ctx, info); break;
case PREFETCH_ENTRY: ctxPrefetchEntry(ctx, info); break;
case PREFETCH_KVOBJ: ctxPrefetchKvobj(ctx, info); break;
case PREFETCH_VALDATA: ctxPrefetchValdata(ctx, info); break;
default: serverPanic("Unknown prefetch state %d", info->state);
}
}
}

/* Wrapper used by the cross-command batch path (prefetchCommands). */
static void dictPrefetch(dict **dicts, PrefetchGetValueDataFunc get_val_data_func) {
DictPrefetchCtx ctx = {
.cur_idx = batch->cur_idx,
.key_count = batch->key_count,
.keys = batch->keys,
.dicts = dicts,
.info = batch->prefetch_info,
.get_val_data = get_val_data_func,
};
ctxInit(&ctx);
dictPrefetchRun(&ctx);
/* Write back cur_idx so the global batch stays in sync. */
batch->cur_idx = ctx.cur_idx;
}

/* Helper function to get the value pointer of a kv object. */
static void *getObjectValuePtr(const void *value) {
void *prefetchGetObjectValuePtr(const void *value) {
kvobj *kv = (kvobj *)value;
return (kv->type == OBJ_STRING && kv->encoding == OBJ_ENCODING_RAW) ? kv->ptr : NULL;
}
Expand Down Expand Up @@ -375,10 +397,57 @@ void prefetchCommands(void) {
if (batch->key_count > 1) {
server.stat_total_prefetch_batches++;
/* Prefetch keys from the main dict */
dictPrefetch(batch->keys_dicts, getObjectValuePtr);
dictPrefetch(batch->keys_dicts, prefetchGetObjectValuePtr);
}
}

/* --------------------------------------------------------------------------
* Intra-command prefetch API
* --------------------------------------------------------------------------
* dictPrefetchKeys() allows a single multi-key command (e.g. MGET) to
* prefetch dict data for a batch of its own keys, reusing the same state
* machine that the cross-command path uses.
*
* Typical usage from a command implementation:
*
* #define BATCH 16
* void myMultiKeyCommand(client *c) {
* dict *d = kvstoreGetDict(c->db->keys, slot);
* for (int j = 0; j < numkeys; j += BATCH) {
* int n = MIN(BATCH, numkeys - j);
* void *keys[BATCH]; dict *dicts[BATCH];
* for (int k = 0; k < n; k++) {
* keys[k] = c->argv[j+k+1]->ptr;
* dicts[k] = d;
* }
* dictPrefetchKeys(dicts, keys, n, prefetchGetObjectValuePtr);
* // Now process these n keys — data is in cache.
* }
* }
* ----------------------------------------------------------------------- */
void dictPrefetchKeys(dict **dicts, void **keys, size_t nkeys,
PrefetchGetValueDataFunc get_val_data)
{
if (nkeys <= 1) return; /* No benefit from prefetching a single key. */

/* Stack-allocate per-key state — callers should keep nkeys bounded
* (typically ≤ 16–32) to avoid excessive stack usage. */
KeyPrefetchInfo pf_info[nkeys];
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uninitialized VLA when dictPrefetchKeys receives nkeys of 2

Low Severity

The KeyPrefetchInfo pf_info[nkeys] VLA is stack-allocated but never zero-initialized before ctxInit runs. While ctxInit does set state for every element, it only sets ht_idx, current_entry, and current_kv for keys whose dict is non-null and non-empty. For keys marked PREFETCH_DONE in ctxInit (empty/null dict), the remaining fields like key_hash contain indeterminate values. This is benign today since PREFETCH_DONE keys are skipped, but it's fragile — any future state machine change that reads those fields could silently use garbage data.

Fix in Cursor Fix in Web


DictPrefetchCtx ctx = {
.cur_idx = 0,
.key_count = nkeys,
.keys = keys,
.dicts = dicts,
.info = pf_info,
.get_val_data = get_val_data,
};

server.stat_total_prefetch_batches++;
ctxInit(&ctx);
dictPrefetchRun(&ctx);
}

/* Adds the client's command to the current batch.
*
* Returns C_OK if the command was added successfully, C_ERR otherwise. */
Expand Down Expand Up @@ -419,12 +488,20 @@ int addCommandToBatch(client *c) {
batch->pending_cmds[batch->pcmd_count++] = pcmd;

serverAssert(pcmd->flags & PENDING_CMD_KEYS_RESULT_VALID);
size_t keys_before = batch->key_count;
for (int i = 0; i < pcmd->keys_result.numkeys && batch->key_count < batch->max_prefetch_size; i++) {
batch->keys[batch->key_count] = pcmd->argv[pcmd->keys_result.keys[i].pos];
batch->keys_dicts[batch->key_count] =
kvstoreGetDict(c->db->keys, pcmd->slot > 0 ? pcmd->slot : 0);
batch->key_count++;
}
/* Mark the command as prefetched only if ALL of its keys were
* added to the batch. If the batch ran out of space mid-command,
* the remaining keys were not prefetched and the intra-command
* path (e.g. dictPrefetchKeys in mgetCommand) must handle them. */
if (batch->key_count - keys_before == (size_t)pcmd->keys_result.numkeys) {
pcmd->flags |= PENDING_CMD_KEYS_PREFETCHED;
}
pcmd = pcmd->next;
}

Expand Down
17 changes: 17 additions & 0 deletions src/memory_prefetch.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,29 @@
#ifndef MEMORY_PREFETCH_H
#define MEMORY_PREFETCH_H

#include <stddef.h>

struct client;
struct dict;

/* Callback returning a pointer to prefetch for a kv object's value data,
* or NULL if nothing needs prefetching. */
typedef void *(*PrefetchGetValueDataFunc)(const void *val);

/* Cross-command batch prefetching (I/O-thread path) */
void prefetchCommandsBatchInit(void);
int determinePrefetchCount(int len);
int addCommandToBatch(struct client *c);
void resetCommandsBatch(void);
void prefetchCommands(void);

/* Intra-command prefetch: prefetch dict lookup data for an array of keys.
* Reuses the same state machine as the cross-command path.
* Callers should keep nkeys bounded (e.g. <= 16-32) per call. */
void dictPrefetchKeys(struct dict **dicts, void **keys, size_t nkeys,
PrefetchGetValueDataFunc get_val_data);

/* Default value-data callback for string kv objects (OBJ_STRING / OBJ_ENCODING_RAW). */
void *prefetchGetObjectValuePtr(const void *val);

#endif /* MEMORY_PREFETCH_H */
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2578,6 +2578,7 @@ enum {
PENDING_CMD_FLAG_INCOMPLETE = 1 << 0, /* Command parsing is incomplete, still waiting for more data */
PENDING_CMD_FLAG_PREPROCESSED = 1 << 1, /* This command has passed pre-processing */
PENDING_CMD_KEYS_RESULT_VALID = 1 << 2, /* Command's keys_result is valid and cached */
PENDING_CMD_KEYS_PREFETCHED = 1 << 3, /* Command's keys were prefetched by the cross-command batch */
};

/* Parser state and parse result of a command from a client's input buffer. */
Expand Down
Loading