Skip to content

Commit a850002

Browse files
authored
JAVA-2105: Transient Replication Support (apache#1414)
* JAVA-2105: Transient Replication Support
1 parent bf27238 commit a850002

6 files changed

Lines changed: 144 additions & 16 deletions

File tree

changelog/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
### 4.6.0 (in progress)
66

7+
- [new feature] JAVA-2105: Add support for transient replication
78
- [new feature] JAVA-2670: Provide base class for mapped custom codecs
89
- [new feature] JAVA-2633: Add execution profile argument to DAO mapper factory methods
910
- [improvement] JAVA-2667: Add ability to fail the build when integration tests fail

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/token/NetworkTopologyReplicationStrategy.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,15 @@ class NetworkTopologyReplicationStrategy implements ReplicationStrategy {
4040
LoggerFactory.getLogger(NetworkTopologyReplicationStrategy.class);
4141

4242
private final Map<String, String> replicationConfig;
43-
private final Map<String, Integer> replicationFactors;
43+
private final Map<String, ReplicationFactor> replicationFactors;
4444
private final String logPrefix;
4545

4646
NetworkTopologyReplicationStrategy(Map<String, String> replicationConfig, String logPrefix) {
4747
this.replicationConfig = replicationConfig;
48-
ImmutableMap.Builder<String, Integer> factorsBuilder = ImmutableMap.builder();
48+
ImmutableMap.Builder<String, ReplicationFactor> factorsBuilder = ImmutableMap.builder();
4949
for (Map.Entry<String, String> entry : replicationConfig.entrySet()) {
5050
if (!entry.getKey().equals("class")) {
51-
factorsBuilder.put(entry.getKey(), Integer.parseInt(entry.getValue()));
51+
factorsBuilder.put(entry.getKey(), ReplicationFactor.fromString(entry.getValue()));
5252
}
5353
}
5454
this.replicationFactors = factorsBuilder.build();
@@ -88,7 +88,7 @@ public SetMultimap<Token, Node> computeReplicasByToken(
8888
if (dc == null || !allDcReplicas.containsKey(dc)) {
8989
continue;
9090
}
91-
Integer rf = replicationFactors.get(dc);
91+
Integer rf = replicationFactors.get(dc).fullReplicas();
9292
Set<Node> dcReplicas = allDcReplicas.get(dc);
9393
if (rf == null || dcReplicas.size() >= rf) {
9494
continue;
@@ -123,7 +123,7 @@ public SetMultimap<Token, Node> computeReplicasByToken(
123123
// Warn the user because that leads to quadratic performance of this method (JAVA-702).
124124
for (Map.Entry<String, Set<Node>> entry : allDcReplicas.entrySet()) {
125125
String dcName = entry.getKey();
126-
int expectedFactor = replicationFactors.get(dcName);
126+
int expectedFactor = replicationFactors.get(dcName).fullReplicas();
127127
int achievedFactor = entry.getValue().size();
128128
if (achievedFactor < expectedFactor && !warnedDcs.contains(dcName)) {
129129
LOG.warn(
@@ -148,7 +148,7 @@ private boolean allDone(Map<String, Set<Node>> map, Map<String, Integer> dcNodeC
148148
for (Map.Entry<String, Set<Node>> entry : map.entrySet()) {
149149
String dc = entry.getKey();
150150
int dcCount = (dcNodeCount.get(dc) == null) ? 0 : dcNodeCount.get(dc);
151-
if (entry.getValue().size() < Math.min(replicationFactors.get(dc), dcCount)) {
151+
if (entry.getValue().size() < Math.min(replicationFactors.get(dc).fullReplicas(), dcCount)) {
152152
return false;
153153
}
154154
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package com.datastax.oss.driver.internal.core.metadata.token;
2+
/*
3+
* Copyright DataStax, Inc.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
import com.datastax.oss.driver.shaded.guava.common.base.Preconditions;
18+
import java.util.Objects;
19+
20+
// This class is a subset of server version at org.apache.cassandra.locator.ReplicationFactor
21+
public class ReplicationFactor {
22+
private final int allReplicas;
23+
private final int fullReplicas;
24+
private final int transientReplicas;
25+
26+
public ReplicationFactor(int allReplicas, int transientReplicas) {
27+
this.allReplicas = allReplicas;
28+
this.transientReplicas = transientReplicas;
29+
this.fullReplicas = allReplicas - transientReplicas;
30+
}
31+
32+
public ReplicationFactor(int allReplicas) {
33+
this(allReplicas, 0);
34+
}
35+
36+
public int fullReplicas() {
37+
return fullReplicas;
38+
}
39+
40+
public int transientReplicas() {
41+
return transientReplicas;
42+
}
43+
44+
public boolean hasTransientReplicas() {
45+
return allReplicas != fullReplicas;
46+
}
47+
48+
public static ReplicationFactor fromString(String s) {
49+
if (s.contains("/")) {
50+
51+
int slash = s.indexOf('/');
52+
String allPart = s.substring(0, slash);
53+
String transientPart = s.substring(slash + 1);
54+
Preconditions.checkArgument(
55+
allPart != null && transientPart != null,
56+
"Replication factor format is <replicas> or <replicas>/<transient>");
57+
return new ReplicationFactor(Integer.parseInt(allPart), Integer.parseInt(transientPart));
58+
} else {
59+
return new ReplicationFactor(Integer.parseInt(s), 0);
60+
}
61+
}
62+
63+
@Override
64+
public String toString() {
65+
return allReplicas + (hasTransientReplicas() ? "/" + transientReplicas() : "");
66+
}
67+
68+
@Override
69+
public boolean equals(Object o) {
70+
if (this == o) {
71+
return true;
72+
}
73+
if (!(o instanceof ReplicationFactor)) {
74+
return false;
75+
}
76+
ReplicationFactor that = (ReplicationFactor) o;
77+
return allReplicas == that.allReplicas && fullReplicas == that.fullReplicas;
78+
}
79+
80+
@Override
81+
public int hashCode() {
82+
return Objects.hash(allReplicas, fullReplicas);
83+
}
84+
}

core/src/main/java/com/datastax/oss/driver/internal/core/metadata/token/SimpleReplicationStrategy.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,22 +30,22 @@
3030
@ThreadSafe
3131
class SimpleReplicationStrategy implements ReplicationStrategy {
3232

33-
private final int replicationFactor;
33+
private final ReplicationFactor replicationFactor;
3434

3535
SimpleReplicationStrategy(Map<String, String> replicationConfig) {
3636
this(extractReplicationFactor(replicationConfig));
3737
}
3838

3939
@VisibleForTesting
40-
SimpleReplicationStrategy(int replicationFactor) {
40+
SimpleReplicationStrategy(ReplicationFactor replicationFactor) {
4141
this.replicationFactor = replicationFactor;
4242
}
4343

4444
@Override
4545
public SetMultimap<Token, Node> computeReplicasByToken(
4646
Map<Token, Node> tokenToPrimary, List<Token> ring) {
4747

48-
int rf = Math.min(replicationFactor, ring.size());
48+
int rf = Math.min(replicationFactor.fullReplicas(), ring.size());
4949

5050
ImmutableSetMultimap.Builder<Token, Node> result = ImmutableSetMultimap.builder();
5151
for (int i = 0; i < ring.size(); i++) {
@@ -63,9 +63,9 @@ private static Token getTokenWrapping(int i, List<Token> ring) {
6363
return ring.get(i % ring.size());
6464
}
6565

66-
private static int extractReplicationFactor(Map<String, String> replicationConfig) {
66+
private static ReplicationFactor extractReplicationFactor(Map<String, String> replicationConfig) {
6767
String factorString = replicationConfig.get("replication_factor");
6868
Preconditions.checkNotNull(factorString, "Missing replication factor in " + replicationConfig);
69-
return Integer.parseInt(factorString);
69+
return ReplicationFactor.fromString(factorString);
7070
}
7171
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright DataStax, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datastax.oss.driver.internal.core.metadata.token;
17+
18+
import static com.datastax.oss.driver.Assertions.assertThat;
19+
20+
import org.junit.Test;
21+
22+
public class ReplicationFactorTest {
23+
@Test
24+
public void should_parse_factor_from_string() {
25+
ReplicationFactor transFactor = ReplicationFactor.fromString("3/1");
26+
assertThat(transFactor.fullReplicas()).isEqualTo(2);
27+
assertThat(transFactor.hasTransientReplicas()).isTrue();
28+
assertThat(transFactor.transientReplicas()).isEqualTo(1);
29+
30+
ReplicationFactor factor = ReplicationFactor.fromString("3");
31+
assertThat(factor.fullReplicas()).isEqualTo(3);
32+
assertThat(factor.hasTransientReplicas()).isFalse();
33+
assertThat(factor.transientReplicas()).isEqualTo(0);
34+
}
35+
36+
@Test
37+
public void should_create_string_from_factor() {
38+
ReplicationFactor transFactor = new ReplicationFactor(3, 1);
39+
assertThat(transFactor.toString()).isEqualTo("3/1");
40+
ReplicationFactor factor = new ReplicationFactor(3);
41+
assertThat(factor.toString()).isEqualTo("3");
42+
}
43+
}

core/src/test/java/com/datastax/oss/driver/internal/core/metadata/token/SimpleReplicationStrategyTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public void should_compute_for_simple_layout() {
6161
List<Token> ring = ImmutableList.of(TOKEN01, TOKEN06, TOKEN14, TOKEN19);
6262
Map<Token, Node> tokenToPrimary =
6363
ImmutableMap.of(TOKEN01, node1, TOKEN06, node2, TOKEN14, node1, TOKEN19, node2);
64-
SimpleReplicationStrategy strategy = new SimpleReplicationStrategy(2);
64+
SimpleReplicationStrategy strategy = new SimpleReplicationStrategy(new ReplicationFactor(2));
6565

6666
// When
6767
SetMultimap<Token, Node> replicasByToken =
@@ -83,7 +83,7 @@ public void should_compute_when_nodes_own_consecutive_tokens() {
8383
List<Token> ring = ImmutableList.of(TOKEN01, TOKEN06, TOKEN14, TOKEN19);
8484
Map<Token, Node> tokenToPrimary =
8585
ImmutableMap.of(TOKEN01, node1, TOKEN06, node1, TOKEN14, node2, TOKEN19, node2);
86-
SimpleReplicationStrategy strategy = new SimpleReplicationStrategy(2);
86+
SimpleReplicationStrategy strategy = new SimpleReplicationStrategy(new ReplicationFactor(2));
8787

8888
// When
8989
SetMultimap<Token, Node> replicasByToken =
@@ -104,7 +104,7 @@ public void should_compute_when_ring_unbalanced() {
104104
List<Token> ring = ImmutableList.of(TOKEN01, TOKEN06, TOKEN14, TOKEN19);
105105
Map<Token, Node> tokenToPrimary =
106106
ImmutableMap.of(TOKEN01, node1, TOKEN06, node1, TOKEN14, node2, TOKEN19, node1);
107-
SimpleReplicationStrategy strategy = new SimpleReplicationStrategy(2);
107+
SimpleReplicationStrategy strategy = new SimpleReplicationStrategy(new ReplicationFactor(2));
108108

109109
// When
110110
SetMultimap<Token, Node> replicasByToken =
@@ -125,7 +125,7 @@ public void should_compute_when_replication_factor_is_larger_than_cluster_size()
125125
List<Token> ring = ImmutableList.of(TOKEN01, TOKEN06, TOKEN14, TOKEN19);
126126
Map<Token, Node> tokenToPrimary =
127127
ImmutableMap.of(TOKEN01, node1, TOKEN06, node2, TOKEN14, node1, TOKEN19, node2);
128-
SimpleReplicationStrategy strategy = new SimpleReplicationStrategy(6);
128+
SimpleReplicationStrategy strategy = new SimpleReplicationStrategy(new ReplicationFactor(6));
129129

130130
// When
131131
SetMultimap<Token, Node> replicasByToken =
@@ -185,7 +185,7 @@ public void should_compute_for_complex_layout() {
185185
.put(TOKEN18, node6)
186186
.build();
187187

188-
SimpleReplicationStrategy strategy = new SimpleReplicationStrategy(3);
188+
SimpleReplicationStrategy strategy = new SimpleReplicationStrategy(new ReplicationFactor(3));
189189

190190
// When
191191
SetMultimap<Token, Node> replicasByToken =

0 commit comments

Comments
 (0)