@@ -254,9 +254,11 @@ void resizeReplicationBacklog(long long newsize) {
254254 zfree (g_pserver->repl_backlog );
255255 g_pserver->repl_backlog = backlog;
256256 g_pserver->repl_backlog_idx = g_pserver->repl_backlog_histlen ;
257- g_pserver->repl_batch_idxStart -= earliest_idx;
258- if (g_pserver->repl_batch_idxStart < 0 )
259- g_pserver->repl_batch_idxStart += g_pserver->repl_backlog_size ;
257+ if (g_pserver->repl_batch_idxStart >= 0 ) {
258+ g_pserver->repl_batch_idxStart -= earliest_idx;
259+ if (g_pserver->repl_batch_idxStart < 0 )
260+ g_pserver->repl_batch_idxStart += g_pserver->repl_backlog_size ;
261+ }
260262 g_pserver->repl_backlog_start = earliest_off;
261263 } else {
262264 zfree (g_pserver->repl_backlog );
@@ -301,19 +303,56 @@ void feedReplicationBacklog(const void *ptr, size_t len) {
301303 if (lower_bound == -1 )
302304 lower_bound = g_pserver->repl_batch_offStart ;
303305 long long minimumsize = g_pserver->master_repl_offset + len - lower_bound + 1 ;
306+
304307 if (minimumsize > g_pserver->repl_backlog_size ) {
305- flushReplBacklogToClients ();
306- lower_bound = g_pserver->repl_lowest_off .load (std::memory_order_seq_cst);
307- if (lower_bound == -1 )
308- lower_bound = g_pserver->repl_batch_offStart ;
308+ listIter li;
309+ listNode *ln;
310+ listRewind (g_pserver->slaves , &li);
311+ long long maxClientBuffer = (long long )cserver.client_obuf_limits [CLIENT_TYPE_SLAVE].hard_limit_bytes ;
312+ if (maxClientBuffer <= 0 )
313+ maxClientBuffer = LLONG_MAX; // infinite essentially
314+ long long min_offset = LLONG_MAX;
315+ int listening_replicas = 0 ;
316+ while ((ln = listNext (&li))) {
317+ client *replica = (client*)listNodeValue (ln);
318+ if (!canFeedReplicaReplBuffer (replica)) continue ;
319+ if (replica->flags & CLIENT_CLOSE_ASAP) continue ;
309320
310- minimumsize = g_pserver-> master_repl_offset + len - lower_bound + 1 ;
321+ std::unique_lock<fastlock> ul (replica-> lock ) ;
311322
312- if (minimumsize > g_pserver->repl_backlog_size && minimumsize < (long long )cserver.client_obuf_limits [CLIENT_TYPE_SLAVE].hard_limit_bytes ) {
323+ // Would this client overflow? If so close it
324+ long long neededBuffer = g_pserver->master_repl_offset + len - replica->repl_curr_off + 1 ;
325+ if (neededBuffer > maxClientBuffer) {
326+
327+ sds clientInfo = catClientInfoString (sdsempty (),replica);
328+ freeClientAsync (replica);
329+ serverLog (LL_WARNING," Client %s scheduled to be closed ASAP due to exceeding output buffer hard limit." , clientInfo);
330+ sdsfree (clientInfo);
331+ continue ;
332+ }
333+ min_offset = std::min (min_offset, replica->repl_curr_off );
334+ ++listening_replicas;
335+ }
336+
337+ if (min_offset == LLONG_MAX) {
338+ min_offset = g_pserver->repl_batch_offStart ;
339+ g_pserver->repl_lowest_off = -1 ;
340+ } else {
341+ g_pserver->repl_lowest_off = min_offset;
342+ }
343+
344+ minimumsize = g_pserver->master_repl_offset + len - min_offset + 1 ;
345+ serverAssert (listening_replicas == 0 || minimumsize <= maxClientBuffer);
346+
347+ if (minimumsize > g_pserver->repl_backlog_size && listening_replicas) {
313348 // This is an emergency overflow, we better resize to fit
314349 long long newsize = std::max (g_pserver->repl_backlog_size *2 , minimumsize);
315- serverLog (LL_WARNING, " Replication backlog is too small, resizing to: %lld" , newsize);
350+ serverLog (LL_WARNING, " Replication backlog is too small, resizing to: %lld bytes " , newsize);
316351 resizeReplicationBacklog (newsize);
352+ } else if (!listening_replicas) {
353+ // We need to update a few variables or later asserts will notice we dropped data
354+ g_pserver->repl_batch_offStart = g_pserver->master_repl_offset + len;
355+ g_pserver->repl_lowest_off = -1 ;
317356 }
318357 }
319358 }
@@ -4318,6 +4357,8 @@ void replicationCron(void) {
43184357
43194358 replicationStartPendingFork ();
43204359
4360+ trimReplicationBacklog ();
4361+
43214362 /* Remove the RDB file used for replication if Redis is not running
43224363 * with any persistence. */
43234364 removeRDBUsedToSyncReplicas ();
@@ -5076,3 +5117,17 @@ void updateFailoverStatus(void) {
50765117 g_pserver->target_replica_port );
50775118 }
50785119}
5120+
5121+ // If we automatically grew the backlog we need to trim it back to
5122+ // the config setting when possible
5123+ void trimReplicationBacklog () {
5124+ serverAssert (GlobalLocksAcquired ());
5125+ serverAssert (g_pserver->repl_batch_offStart < 0 ); // we shouldn't be in a batch
5126+ if (g_pserver->repl_backlog_size <= g_pserver->repl_backlog_config_size )
5127+ return ; // We're already a good size
5128+ if (g_pserver->repl_lowest_off > 0 && (g_pserver->master_repl_offset - g_pserver->repl_lowest_off + 1 ) > g_pserver->repl_backlog_config_size )
5129+ return ; // There is untransmitted data we can't truncate
5130+
5131+ serverLog (LL_NOTICE, " Reclaiming %lld replication backlog bytes" , g_pserver->repl_backlog_size - g_pserver->repl_backlog_config_size );
5132+ resizeReplicationBacklog (g_pserver->repl_backlog_config_size );
5133+ }
0 commit comments