Skip to content

Commit b6c2a98

Browse files
authored
JAVA-2459: Improve extensibility of existing load balancing policies (apache#1346)
1 parent e7be2e9 commit b6c2a98

21 files changed

Lines changed: 1946 additions & 590 deletions

changelog/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
### 4.3.0 (in progress)
66

7+
- [improvement] JAVA-2447: Mention programmatic local DC method in Default LBP error message
8+
- [improvement] JAVA-2459: Improve extensibility of existing load balancing policies
79
- [documentation] JAVA-2428: Add developer docs
810
- [documentation] JAVA-2503: Migrate Cloud "getting started" page to driver manual
911
- [improvement] JAVA-2484: Add errors for cloud misconfiguration
Lines changed: 308 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,308 @@
1+
/*
2+
* Copyright DataStax, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datastax.oss.driver.internal.core.loadbalancing;
17+
18+
import com.datastax.oss.driver.api.core.CqlIdentifier;
19+
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
20+
import com.datastax.oss.driver.api.core.context.DriverContext;
21+
import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy;
22+
import com.datastax.oss.driver.api.core.loadbalancing.NodeDistance;
23+
import com.datastax.oss.driver.api.core.metadata.Node;
24+
import com.datastax.oss.driver.api.core.metadata.NodeState;
25+
import com.datastax.oss.driver.api.core.metadata.TokenMap;
26+
import com.datastax.oss.driver.api.core.metadata.token.Token;
27+
import com.datastax.oss.driver.api.core.session.Request;
28+
import com.datastax.oss.driver.api.core.session.Session;
29+
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
30+
import com.datastax.oss.driver.internal.core.loadbalancing.helper.DefaultNodeFilterHelper;
31+
import com.datastax.oss.driver.internal.core.loadbalancing.helper.OptionalLocalDcHelper;
32+
import com.datastax.oss.driver.internal.core.util.ArrayUtils;
33+
import com.datastax.oss.driver.internal.core.util.collection.QueryPlan;
34+
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet;
35+
import edu.umd.cs.findbugs.annotations.NonNull;
36+
import edu.umd.cs.findbugs.annotations.Nullable;
37+
import java.nio.ByteBuffer;
38+
import java.util.Collections;
39+
import java.util.Map;
40+
import java.util.Optional;
41+
import java.util.Queue;
42+
import java.util.Set;
43+
import java.util.UUID;
44+
import java.util.concurrent.CopyOnWriteArraySet;
45+
import java.util.concurrent.atomic.AtomicInteger;
46+
import java.util.function.IntUnaryOperator;
47+
import java.util.function.Predicate;
48+
import net.jcip.annotations.ThreadSafe;
49+
import org.slf4j.Logger;
50+
import org.slf4j.LoggerFactory;
51+
52+
/**
53+
* A basic implementation of {@link LoadBalancingPolicy} that can serve as a building block for more
54+
* advanced use cases.
55+
*
56+
* <p>To activate this policy, modify the {@code basic.load-balancing-policy} section in the driver
57+
* configuration, for example:
58+
*
59+
* <pre>
60+
* datastax-java-driver {
61+
* basic.load-balancing-policy {
62+
* class = BasicLoadBalancingPolicy
63+
* local-datacenter = datacenter1 # optional
64+
* }
65+
* }
66+
* </pre>
67+
*
68+
* See {@code reference.conf} (in the manual or core driver JAR) for more details.
69+
*
70+
* <p><b>Local datacenter</b>: This implementation will only define a local datacenter if it is
71+
* explicitly set either through configuration or programmatically; if the local datacenter is
72+
* unspecified, this implementation will effectively act as a datacenter-agnostic load balancing
73+
* policy and will consider all nodes in the cluster when creating query plans, regardless of their
74+
* datacenter.
75+
*
76+
* <p><b>Query plan</b>: This implementation prioritizes replica nodes over non-replica ones; if
77+
* more than one replica is available, the replicas will be shuffled. Non-replica nodes will be
78+
* included in a round-robin fashion. If the local datacenter is defined (see above), query plans
79+
* will only include local nodes, never remote ones; if it is unspecified however, query plans may
80+
* contain nodes from different datacenters.
81+
*
82+
* <p><b>This class is not recommended for normal users who should always prefer {@link
83+
* DefaultLoadBalancingPolicy}</b>.
84+
*/
85+
@ThreadSafe
86+
public class BasicLoadBalancingPolicy implements LoadBalancingPolicy {
87+
88+
private static final Logger LOG = LoggerFactory.getLogger(BasicLoadBalancingPolicy.class);
89+
90+
protected static final IntUnaryOperator INCREMENT = i -> (i == Integer.MAX_VALUE) ? 0 : i + 1;
91+
92+
@NonNull protected final InternalDriverContext context;
93+
@NonNull protected final DriverExecutionProfile profile;
94+
@NonNull protected final String logPrefix;
95+
96+
protected final AtomicInteger roundRobinAmount = new AtomicInteger();
97+
protected final CopyOnWriteArraySet<Node> liveNodes = new CopyOnWriteArraySet<>();
98+
99+
// private because they should be set in init() and never be modified after
100+
private volatile DistanceReporter distanceReporter;
101+
private volatile Predicate<Node> filter;
102+
private volatile String localDc;
103+
104+
public BasicLoadBalancingPolicy(@NonNull DriverContext context, @NonNull String profileName) {
105+
this.context = (InternalDriverContext) context;
106+
profile = context.getConfig().getProfile(profileName);
107+
logPrefix = context.getSessionName() + "|" + profileName;
108+
}
109+
110+
/** @return The local datacenter, if known; empty otherwise. */
111+
public Optional<String> getLocalDatacenter() {
112+
return Optional.ofNullable(localDc);
113+
}
114+
115+
/**
116+
* @return An immutable copy of the nodes currently considered as live; if the local datacenter is
117+
* known, this set will contain only nodes belonging to that datacenter.
118+
*/
119+
public Set<Node> getLiveNodes() {
120+
return ImmutableSet.copyOf(liveNodes);
121+
}
122+
123+
@Override
124+
public void init(@NonNull Map<UUID, Node> nodes, @NonNull DistanceReporter distanceReporter) {
125+
this.distanceReporter = distanceReporter;
126+
localDc = discoverLocalDc(nodes).orElse(null);
127+
filter = createNodeFilter(localDc, nodes);
128+
for (Node node : nodes.values()) {
129+
if (filter.test(node)) {
130+
distanceReporter.setDistance(node, NodeDistance.LOCAL);
131+
if (node.getState() != NodeState.DOWN) {
132+
// This includes state == UNKNOWN. If the node turns out to be unreachable, this will be
133+
// detected when we try to open a pool to it, it will get marked down and this will be
134+
// signaled back to this policy
135+
liveNodes.add(node);
136+
}
137+
} else {
138+
distanceReporter.setDistance(node, NodeDistance.IGNORED);
139+
}
140+
}
141+
}
142+
143+
/**
144+
* Returns the local datacenter, if it can be discovered, or returns {@link Optional#empty empty}
145+
* otherwise.
146+
*
147+
* <p>This method is called only once, during {@linkplain LoadBalancingPolicy#init(Map,
148+
* LoadBalancingPolicy.DistanceReporter) initialization}.
149+
*
150+
* <p>Implementors may choose to throw {@link IllegalStateException} instead of returning {@link
151+
* Optional#empty empty}, if they require a local datacenter to be defined in order to operate
152+
* properly.
153+
*
154+
* @param nodes All the nodes that were known to exist in the cluster (regardless of their state)
155+
* when the load balancing policy was initialized. This argument is provided in case
156+
* implementors need to inspect the cluster topology to discover the local datacenter.
157+
* @return The local datacenter, or {@link Optional#empty empty} if none found.
158+
* @throws IllegalStateException if the local datacenter could not be discovered, and this policy
159+
* cannot operate without it.
160+
*/
161+
@NonNull
162+
protected Optional<String> discoverLocalDc(@NonNull Map<UUID, Node> nodes) {
163+
return new OptionalLocalDcHelper(context, profile, logPrefix).discoverLocalDc(nodes);
164+
}
165+
166+
/**
167+
* Creates a new node filter to use with this policy.
168+
*
169+
* <p>This method is called only once, during {@linkplain LoadBalancingPolicy#init(Map,
170+
* LoadBalancingPolicy.DistanceReporter) initialization}, and only after local datacenter
171+
* discovery has been attempted.
172+
*
173+
* @param localDc The local datacenter that was just discovered, or null if none found.
174+
* @param nodes All the nodes that were known to exist in the cluster (regardless of their state)
175+
* when the load balancing policy was initialized. This argument is provided in case
176+
* implementors need to inspect the cluster topology to create the node filter.
177+
* @return the node filter to use.
178+
*/
179+
@NonNull
180+
protected Predicate<Node> createNodeFilter(
181+
@Nullable String localDc, @NonNull Map<UUID, Node> nodes) {
182+
return new DefaultNodeFilterHelper(context, profile, logPrefix)
183+
.createNodeFilter(localDc, nodes);
184+
}
185+
186+
@NonNull
187+
@Override
188+
public Queue<Node> newQueryPlan(@Nullable Request request, @Nullable Session session) {
189+
// Take a snapshot since the set is concurrent:
190+
Object[] currentNodes = liveNodes.toArray();
191+
192+
Set<Node> allReplicas = getReplicas(request, session);
193+
int replicaCount = 0; // in currentNodes
194+
195+
if (!allReplicas.isEmpty()) {
196+
// Move replicas to the beginning
197+
for (int i = 0; i < currentNodes.length; i++) {
198+
Node node = (Node) currentNodes[i];
199+
if (allReplicas.contains(node)) {
200+
ArrayUtils.bubbleUp(currentNodes, i, replicaCount);
201+
replicaCount += 1;
202+
}
203+
}
204+
205+
if (replicaCount > 1) {
206+
shuffleHead(currentNodes, replicaCount);
207+
}
208+
}
209+
210+
LOG.trace("[{}] Prioritizing {} local replicas", logPrefix, replicaCount);
211+
212+
// Round-robin the remaining nodes
213+
ArrayUtils.rotate(
214+
currentNodes,
215+
replicaCount,
216+
currentNodes.length - replicaCount,
217+
roundRobinAmount.getAndUpdate(INCREMENT));
218+
219+
return new QueryPlan(currentNodes);
220+
}
221+
222+
@NonNull
223+
protected Set<Node> getReplicas(@Nullable Request request, @Nullable Session session) {
224+
if (request == null || session == null) {
225+
return Collections.emptySet();
226+
}
227+
228+
// Note: we're on the hot path and the getXxx methods are potentially more than simple getters,
229+
// so we only call each method when strictly necessary (which is why the code below looks a bit
230+
// weird).
231+
CqlIdentifier keyspace = request.getKeyspace();
232+
if (keyspace == null) {
233+
keyspace = request.getRoutingKeyspace();
234+
}
235+
if (keyspace == null && session.getKeyspace().isPresent()) {
236+
keyspace = session.getKeyspace().get();
237+
}
238+
if (keyspace == null) {
239+
return Collections.emptySet();
240+
}
241+
242+
Token token = request.getRoutingToken();
243+
ByteBuffer key = (token == null) ? request.getRoutingKey() : null;
244+
if (token == null && key == null) {
245+
return Collections.emptySet();
246+
}
247+
248+
Optional<TokenMap> maybeTokenMap = context.getMetadataManager().getMetadata().getTokenMap();
249+
if (maybeTokenMap.isPresent()) {
250+
TokenMap tokenMap = maybeTokenMap.get();
251+
return (token != null)
252+
? tokenMap.getReplicas(keyspace, token)
253+
: tokenMap.getReplicas(keyspace, key);
254+
} else {
255+
return Collections.emptySet();
256+
}
257+
}
258+
259+
/** Exposed as a protected method so that it can be accessed by tests */
260+
protected void shuffleHead(Object[] currentNodes, int replicaCount) {
261+
ArrayUtils.shuffleHead(currentNodes, replicaCount);
262+
}
263+
264+
@Override
265+
public void onAdd(@NonNull Node node) {
266+
if (filter.test(node)) {
267+
LOG.debug("[{}] {} was added, setting distance to LOCAL", logPrefix, node);
268+
// Setting to a non-ignored distance triggers the session to open a pool, which will in turn
269+
// set the node UP when the first channel gets opened.
270+
distanceReporter.setDistance(node, NodeDistance.LOCAL);
271+
} else {
272+
distanceReporter.setDistance(node, NodeDistance.IGNORED);
273+
}
274+
}
275+
276+
@Override
277+
public void onUp(@NonNull Node node) {
278+
if (filter.test(node)) {
279+
// Normally this is already the case, but the filter could be dynamic and have ignored the
280+
// node previously.
281+
distanceReporter.setDistance(node, NodeDistance.LOCAL);
282+
if (liveNodes.add(node)) {
283+
LOG.debug("[{}] {} came back UP, added to live set", logPrefix, node);
284+
}
285+
} else {
286+
distanceReporter.setDistance(node, NodeDistance.IGNORED);
287+
}
288+
}
289+
290+
@Override
291+
public void onDown(@NonNull Node node) {
292+
if (liveNodes.remove(node)) {
293+
LOG.debug("[{}] {} went DOWN, removed from live set", logPrefix, node);
294+
}
295+
}
296+
297+
@Override
298+
public void onRemove(@NonNull Node node) {
299+
if (liveNodes.remove(node)) {
300+
LOG.debug("[{}] {} was removed, removed from live set", logPrefix, node);
301+
}
302+
}
303+
304+
@Override
305+
public void close() {
306+
// nothing to do
307+
}
308+
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright DataStax, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datastax.oss.driver.internal.core.loadbalancing;
17+
18+
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
19+
import com.datastax.oss.driver.api.core.context.DriverContext;
20+
import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy;
21+
import com.datastax.oss.driver.api.core.metadata.Node;
22+
import com.datastax.oss.driver.internal.core.loadbalancing.helper.InferringLocalDcHelper;
23+
import edu.umd.cs.findbugs.annotations.NonNull;
24+
import java.util.Map;
25+
import java.util.Optional;
26+
import java.util.UUID;
27+
import net.jcip.annotations.ThreadSafe;
28+
29+
/**
30+
* An implementation of {@link LoadBalancingPolicy} that infers the local datacenter from the
31+
* contact points, if no datacenter was provided neither through configuration nor programmatically.
32+
*
33+
* <p>To activate this policy, modify the {@code basic.load-balancing-policy} section in the driver
34+
* configuration, for example:
35+
*
36+
* <pre>
37+
* datastax-java-driver {
38+
* basic.load-balancing-policy {
39+
* class = DcInferringLoadBalancingPolicy
40+
* local-datacenter = datacenter1 # optional
41+
* }
42+
* }
43+
* </pre>
44+
*
45+
* See {@code reference.conf} (in the manual or core driver JAR) for more details.
46+
*
47+
* <p><b>Local datacenter</b>: This implementation requires a local datacenter to be defined,
48+
* otherwise it will throw an {@link IllegalStateException}. A local datacenter can be supplied
49+
* either:
50+
*
51+
* <ol>
52+
* <li>Programmatically with {@link
53+
* com.datastax.oss.driver.api.core.session.SessionBuilder#withLocalDatacenter(String)
54+
* SessionBuilder#withLocalDatacenter(String)};
55+
* <li>Through configuration, by defining the option {@link
56+
* DefaultDriverOption#LOAD_BALANCING_LOCAL_DATACENTER
57+
* basic.load-balancing-policy.local-datacenter};
58+
* <li>Or implicitly: in this case this implementation will infer the local datacenter from the
59+
* provided contact points, if and only if they are all located in the same datacenter.
60+
* </ol>
61+
*
62+
* <p><b>Query plan</b>: see {@link BasicLoadBalancingPolicy} for details on the computation of
63+
* query plans.
64+
*
65+
* <p><b>This class is not recommended for normal users who should always prefer {@link
66+
* DefaultLoadBalancingPolicy}</b>.
67+
*/
68+
@ThreadSafe
69+
public class DcInferringLoadBalancingPolicy extends BasicLoadBalancingPolicy {
70+
71+
public DcInferringLoadBalancingPolicy(
72+
@NonNull DriverContext context, @NonNull String profileName) {
73+
super(context, profileName);
74+
}
75+
76+
@NonNull
77+
@Override
78+
protected Optional<String> discoverLocalDc(@NonNull Map<UUID, Node> nodes) {
79+
return new InferringLocalDcHelper(context, profile, logPrefix).discoverLocalDc(nodes);
80+
}
81+
}

0 commit comments

Comments
 (0)