Skip to content

Commit 2675079

Browse files
committed
Merge branch 'keydbpro' into keydbpro_collab
Former-commit-id: e4e5c6696c6d831924f314a198b266b10d831e14
2 parents ca920fe + 795da38 commit 2675079

9 files changed

Lines changed: 239 additions & 20 deletions

File tree

.gitlab-ci.yml

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
build:
2+
rules:
3+
- if: '$COVERAGE'
4+
when: never
5+
- if: '$ENDURANCE'
6+
when: never
7+
- when: always
8+
tags:
9+
- docker
10+
stage: build
11+
script:
12+
- git submodule init && git submodule update
13+
- make distclean
14+
- make -j
15+
16+
make-test:
17+
rules:
18+
- if: '$COVERAGE'
19+
when: never
20+
- if: '$ENDURANCE'
21+
when: never
22+
- when: always
23+
tags:
24+
- docker
25+
stage: test
26+
script:
27+
- git submodule init && git submodule update
28+
- make distclean
29+
- make -j
30+
- make test -j
31+
32+
node-redis-test:
33+
rules:
34+
- if: '$COVERAGE'
35+
when: never
36+
- if: '$ENDURANCE'
37+
when: never
38+
- when: always
39+
tags:
40+
- docker
41+
- ipv6
42+
stage: test
43+
script:
44+
- git submodule init && git submodule update
45+
- make distclean
46+
- make -j
47+
- make install
48+
- git clone https://gitlab-ci-token:${CI_JOB_TOKEN}@gitlab.eqalpha.com/keydb-dev/node-redis.git
49+
- cd node-redis
50+
- npm install
51+
- npm run test
52+
53+
jedis-test:
54+
rules:
55+
- if: '$COVERAGE'
56+
when: never
57+
- if: '$ENDURANCE'
58+
when: never
59+
- when: always
60+
tags:
61+
- docker
62+
- ipv4
63+
stage: test
64+
script:
65+
- git submodule init && git submodule update
66+
- make distclean
67+
- make -j
68+
- make install
69+
- git clone https://gitlab-ci-token:${CI_JOB_TOKEN}@gitlab.eqalpha.com/keydb-dev/jedis.git
70+
- cd jedis
71+
- make test
72+
73+
redis-rs-test:
74+
rules:
75+
- if: '$COVERAGE'
76+
when: never
77+
- if: '$ENDURANCE'
78+
when: never
79+
- when: always
80+
tags:
81+
- docker
82+
stage: test
83+
script:
84+
- git submodule init && git submodule update
85+
- make distclean
86+
- make -j
87+
- make install
88+
- git clone https://gitlab-ci-token:${CI_JOB_TOKEN}@gitlab.eqalpha.com/keydb-dev/redis-rs.git
89+
- cd redis-rs
90+
- make test
91+
92+
endurance-test:
93+
rules:
94+
- if: '$ENDURANCE'
95+
tags:
96+
- docker
97+
stage: test
98+
script:
99+
- git submodule init && git submodule update
100+
- make distclean
101+
- make -j
102+
- ./runtest --loop --stop
103+
104+
coverage-test:
105+
rules:
106+
- if: '$COVERAGE'
107+
tags:
108+
- docker
109+
stage: test
110+
script:
111+
- git submodule init && git submodule update
112+
- make distclean
113+
- make gcov -j
114+
- make install
115+
- ./runtest || true
116+
- pkill keydb-server || true
117+
- pkill stunnel || true
118+
- ./runtest-cluster || true
119+
- pkill keydb-server || true
120+
- pkill stunnel || true
121+
- ./runtest-sentinel || true
122+
- pkill keydb-server || true
123+
- pkill stunnel || true
124+
- ./runtest-moduleapi || true
125+
- pkill keydb-server || true
126+
- pkill stunnel || true
127+
- git clone https://gitlab-ci-token:${CI_JOB_TOKEN}@gitlab.eqalpha.com/keydb-dev/redis-rs.git
128+
- cd redis-rs
129+
- make test || true
130+
- pkill keydb-server || true
131+
- pkill stunnel || true
132+
- cd ..
133+
- git clone https://gitlab-ci-token:${CI_JOB_TOKEN}@gitlab.eqalpha.com/keydb-dev/jedis.git
134+
- cd jedis
135+
- make test || true
136+
- pkill keydb-server || true
137+
- pkill stunnel || true
138+
- cd ..
139+
- git clone https://gitlab-ci-token:${CI_JOB_TOKEN}@gitlab.eqalpha.com/keydb-dev/node-redis.git
140+
- cd node-redis
141+
- npm install
142+
- npm run test || true
143+
- pkill keydb-server || true
144+
- pkill stunnel || true
145+
- cd ..
146+
- geninfo -o KeyDB.info --no-external .
147+
- genhtml --legend -o lcov-html KeyDB.info

src/StorageCache.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,12 @@ StorageCache::StorageCache(IStorage *storage, bool fCache)
2525
m_pdict = dictCreate(&dbStorageCacheType, nullptr);
2626
}
2727

28+
StorageCache::~StorageCache()
29+
{
30+
if (m_pdict != nullptr)
31+
dictRelease(m_pdict);
32+
}
33+
2834
void StorageCache::clear()
2935
{
3036
std::unique_lock<fastlock> ul(m_lock);

src/StorageCache.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ class StorageCache
2929
}
3030

3131
public:
32+
~StorageCache();
33+
3234
static StorageCache *create(IStorageFactory *pfactory, int db, IStorageFactory::key_load_iterator fn, void *privdata) {
3335
StorageCache *cache = new StorageCache(nullptr, pfactory->FSlow() /*fCache*/);
3436
load_iter_data data = {cache, fn, privdata};

src/config.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2470,6 +2470,7 @@ static int updateReplBacklogSize(long long val, long long prev, const char **err
24702470
* being able to tell when the size changes, so restore prev before calling it. */
24712471
UNUSED(err);
24722472
g_pserver->repl_backlog_size = prev;
2473+
g_pserver->repl_backlog_config_size = val;
24732474
resizeReplicationBacklog(val);
24742475
return 1;
24752476
}

src/db.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2963,13 +2963,13 @@ dict_iter redisDbPersistentData::random()
29632963
return dict_iter(m_pdict, de);
29642964
}
29652965

2966-
size_t redisDbPersistentData::size() const
2966+
size_t redisDbPersistentData::size(bool fCachedOnly) const
29672967
{
2968-
if (m_spstorage != nullptr && !m_fAllChanged)
2968+
if (m_spstorage != nullptr && !m_fAllChanged && !fCachedOnly)
29692969
return m_spstorage->count() + m_cnewKeysPending;
29702970

29712971
return dictSize(m_pdict)
2972-
+ (m_pdbSnapshot ? (m_pdbSnapshot->size() - dictSize(m_pdictTombstone)) : 0);
2972+
+ (m_pdbSnapshot ? (m_pdbSnapshot->size(fCachedOnly) - dictSize(m_pdictTombstone)) : 0);
29732973
}
29742974

29752975
bool redisDbPersistentData::removeCachedValue(const char *key, dictEntry **ppde)
@@ -3024,13 +3024,13 @@ void redisDbPersistentData::removeAllCachedValues()
30243024
trackChanges(false);
30253025
}
30263026

3027-
if (m_pdict->pauserehash == 0) {
3027+
if (m_pdict->pauserehash == 0 && m_pdict->refcount == 1) {
30283028
dict *dT = m_pdict;
30293029
m_pdict = dictCreate(&dbDictType, this);
30303030
dictExpand(m_pdict, dictSize(dT)/2, false); // Make room for about half so we don't excessively rehash
30313031
g_pserver->asyncworkqueue->AddWorkFunction([dT]{
30323032
dictRelease(dT);
3033-
}, true);
3033+
}, false);
30343034
} else {
30353035
dictEmpty(m_pdict, nullptr);
30363036
}

src/rdb.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2884,6 +2884,8 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
28842884
do this every 16 keys to limit the perf impact */
28852885
if (g_pserver->m_pstorageFactory && (ckeysLoaded % 128) == 0)
28862886
{
2887+
g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch);
2888+
serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch();
28872889
bool fHighMemory = (getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK);
28882890
if (fHighMemory || (ckeysLoaded % (1024)) == 0)
28892891
{

src/replication.cpp

Lines changed: 65 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -254,9 +254,11 @@ void resizeReplicationBacklog(long long newsize) {
254254
zfree(g_pserver->repl_backlog);
255255
g_pserver->repl_backlog = backlog;
256256
g_pserver->repl_backlog_idx = g_pserver->repl_backlog_histlen;
257-
g_pserver->repl_batch_idxStart -= earliest_idx;
258-
if (g_pserver->repl_batch_idxStart < 0)
259-
g_pserver->repl_batch_idxStart += g_pserver->repl_backlog_size;
257+
if (g_pserver->repl_batch_idxStart >= 0) {
258+
g_pserver->repl_batch_idxStart -= earliest_idx;
259+
if (g_pserver->repl_batch_idxStart < 0)
260+
g_pserver->repl_batch_idxStart += g_pserver->repl_backlog_size;
261+
}
260262
g_pserver->repl_backlog_start = earliest_off;
261263
} else {
262264
zfree(g_pserver->repl_backlog);
@@ -301,19 +303,56 @@ void feedReplicationBacklog(const void *ptr, size_t len) {
301303
if (lower_bound == -1)
302304
lower_bound = g_pserver->repl_batch_offStart;
303305
long long minimumsize = g_pserver->master_repl_offset + len - lower_bound + 1;
306+
304307
if (minimumsize > g_pserver->repl_backlog_size) {
305-
flushReplBacklogToClients();
306-
lower_bound = g_pserver->repl_lowest_off.load(std::memory_order_seq_cst);
307-
if (lower_bound == -1)
308-
lower_bound = g_pserver->repl_batch_offStart;
308+
listIter li;
309+
listNode *ln;
310+
listRewind(g_pserver->slaves, &li);
311+
long long maxClientBuffer = (long long)cserver.client_obuf_limits[CLIENT_TYPE_SLAVE].hard_limit_bytes;
312+
if (maxClientBuffer <= 0)
313+
maxClientBuffer = LLONG_MAX; // infinite essentially
314+
long long min_offset = LLONG_MAX;
315+
int listening_replicas = 0;
316+
while ((ln = listNext(&li))) {
317+
client *replica = (client*)listNodeValue(ln);
318+
if (!canFeedReplicaReplBuffer(replica)) continue;
319+
if (replica->flags & CLIENT_CLOSE_ASAP) continue;
309320

310-
minimumsize = g_pserver->master_repl_offset + len - lower_bound + 1;
321+
std::unique_lock<fastlock> ul(replica->lock);
311322

312-
if (minimumsize > g_pserver->repl_backlog_size && minimumsize < (long long)cserver.client_obuf_limits[CLIENT_TYPE_SLAVE].hard_limit_bytes) {
323+
// Would this client overflow? If so close it
324+
long long neededBuffer = g_pserver->master_repl_offset + len - replica->repl_curr_off + 1;
325+
if (neededBuffer > maxClientBuffer) {
326+
327+
sds clientInfo = catClientInfoString(sdsempty(),replica);
328+
freeClientAsync(replica);
329+
serverLog(LL_WARNING,"Client %s scheduled to be closed ASAP due to exceeding output buffer hard limit.", clientInfo);
330+
sdsfree(clientInfo);
331+
continue;
332+
}
333+
min_offset = std::min(min_offset, replica->repl_curr_off);
334+
++listening_replicas;
335+
}
336+
337+
if (min_offset == LLONG_MAX) {
338+
min_offset = g_pserver->repl_batch_offStart;
339+
g_pserver->repl_lowest_off = -1;
340+
} else {
341+
g_pserver->repl_lowest_off = min_offset;
342+
}
343+
344+
minimumsize = g_pserver->master_repl_offset + len - min_offset + 1;
345+
serverAssert(listening_replicas == 0 || minimumsize <= maxClientBuffer);
346+
347+
if (minimumsize > g_pserver->repl_backlog_size && listening_replicas) {
313348
// This is an emergency overflow, we better resize to fit
314349
long long newsize = std::max(g_pserver->repl_backlog_size*2, minimumsize);
315-
serverLog(LL_WARNING, "Replication backlog is too small, resizing to: %lld", newsize);
350+
serverLog(LL_WARNING, "Replication backlog is too small, resizing to: %lld bytes", newsize);
316351
resizeReplicationBacklog(newsize);
352+
} else if (!listening_replicas) {
353+
// We need to update a few variables or later asserts will notice we dropped data
354+
g_pserver->repl_batch_offStart = g_pserver->master_repl_offset + len;
355+
g_pserver->repl_lowest_off = -1;
317356
}
318357
}
319358
}
@@ -4318,6 +4357,8 @@ void replicationCron(void) {
43184357

43194358
replicationStartPendingFork();
43204359

4360+
trimReplicationBacklog();
4361+
43214362
/* Remove the RDB file used for replication if Redis is not running
43224363
* with any persistence. */
43234364
removeRDBUsedToSyncReplicas();
@@ -5076,3 +5117,17 @@ void updateFailoverStatus(void) {
50765117
g_pserver->target_replica_port);
50775118
}
50785119
}
5120+
5121+
// If we automatically grew the backlog we need to trim it back to
5122+
// the config setting when possible
5123+
void trimReplicationBacklog() {
5124+
serverAssert(GlobalLocksAcquired());
5125+
serverAssert(g_pserver->repl_batch_offStart < 0); // we shouldn't be in a batch
5126+
if (g_pserver->repl_backlog_size <= g_pserver->repl_backlog_config_size)
5127+
return; // We're already a good size
5128+
if (g_pserver->repl_lowest_off > 0 && (g_pserver->master_repl_offset - g_pserver->repl_lowest_off + 1) > g_pserver->repl_backlog_config_size)
5129+
return; // There is untransmitted data we can't truncate
5130+
5131+
serverLog(LL_NOTICE, "Reclaiming %lld replication backlog bytes", g_pserver->repl_backlog_size - g_pserver->repl_backlog_config_size);
5132+
resizeReplicationBacklog(g_pserver->repl_backlog_config_size);
5133+
}

src/server.cpp

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2124,7 +2124,7 @@ void databasesCron(bool fMainThread) {
21242124
::dict *dict = g_pserver->db[rehash_db]->dictUnsafeKeyOnly();
21252125
/* Are we async rehashing? And if so is it time to re-calibrate? */
21262126
/* The recalibration limit is a prime number to ensure balancing across threads */
2127-
if (rehashes_per_ms > 0 && async_rehashes < 131 && !cserver.active_defrag_enabled && cserver.cthreads > 1) {
2127+
if (rehashes_per_ms > 0 && async_rehashes < 131 && !cserver.active_defrag_enabled && cserver.cthreads > 1 && dictSize(dict) > 2048 && dictIsRehashing(dict) && !g_pserver->loading) {
21282128
serverTL->rehashCtl = dictRehashAsyncStart(dict, rehashes_per_ms);
21292129
++async_rehashes;
21302130
}
@@ -6112,10 +6112,11 @@ sds genRedisInfoString(const char *section) {
61126112
if (sections++) info = sdscat(info,"\r\n");
61136113
info = sdscatprintf(info, "# Keyspace\r\n");
61146114
for (j = 0; j < cserver.dbnum; j++) {
6115-
long long keys, vkeys;
6115+
long long keys, vkeys, cachedKeys;
61166116

61176117
keys = g_pserver->db[j]->size();
61186118
vkeys = g_pserver->db[j]->expireSize();
6119+
cachedKeys = g_pserver->db[j]->size(true /* fCachedOnly */);
61196120

61206121
// Adjust TTL by the current time
61216122
mstime_t mstime;
@@ -6127,8 +6128,8 @@ sds genRedisInfoString(const char *section) {
61276128

61286129
if (keys || vkeys) {
61296130
info = sdscatprintf(info,
6130-
"db%d:keys=%lld,expires=%lld,avg_ttl=%lld\r\n",
6131-
j, keys, vkeys, static_cast<long long>(g_pserver->db[j]->avg_ttl));
6131+
"db%d:keys=%lld,expires=%lld,avg_ttl=%lld,cached_keys=%lld\r\n",
6132+
j, keys, vkeys, static_cast<long long>(g_pserver->db[j]->avg_ttl), cachedKeys);
61326133
}
61336134
}
61346135
}
@@ -7100,6 +7101,8 @@ static void validateConfiguration()
71007101
serverLog(LL_WARNING, "\tKeyDB will now exit. Please update your configuration file.");
71017102
exit(EXIT_FAILURE);
71027103
}
7104+
7105+
g_pserver->repl_backlog_config_size = g_pserver->repl_backlog_size; // this is normally set in the update logic, but not on initial config
71037106
}
71047107

71057108
int iAmMaster(void) {

src/server.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1095,7 +1095,7 @@ class redisDbPersistentData
10951095
redisDbPersistentData(redisDbPersistentData &&) = default;
10961096

10971097
size_t slots() const { return dictSlots(m_pdict); }
1098-
size_t size() const;
1098+
size_t size(bool fCachedOnly = false) const;
10991099
void expand(uint64_t slots) { dictExpand(m_pdict, slots); }
11001100

11011101
void trackkey(robj_roptr o, bool fUpdate)
@@ -2362,6 +2362,7 @@ struct redisServer {
23622362
int repl_ping_slave_period; /* Master pings the replica every N seconds */
23632363
char *repl_backlog; /* Replication backlog for partial syncs */
23642364
long long repl_backlog_size; /* Backlog circular buffer size */
2365+
long long repl_backlog_config_size; /* The repl backlog may grow but we want to know what the user set it to */
23652366
long long repl_backlog_histlen; /* Backlog actual data length */
23662367
long long repl_backlog_idx; /* Backlog circular buffer current offset,
23672368
that is the next byte will'll write to.*/
@@ -3028,6 +3029,8 @@ void clearFailoverState(void);
30283029
void updateFailoverStatus(void);
30293030
void abortFailover(redisMaster *mi, const char *err);
30303031
const char *getFailoverStateString();
3032+
int canFeedReplicaReplBuffer(client *replica);
3033+
void trimReplicationBacklog();
30313034

30323035
/* Generic persistence functions */
30333036
void startLoadingFile(FILE* fp, const char * filename, int rdbflags);

0 commit comments

Comments
 (0)