|
| 1 | +/* |
| 2 | + * Copyright DataStax, Inc. |
| 3 | + * |
| 4 | + * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | + * you may not use this file except in compliance with the License. |
| 6 | + * You may obtain a copy of the License at |
| 7 | + * |
| 8 | + * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | + * |
| 10 | + * Unless required by applicable law or agreed to in writing, software |
| 11 | + * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | + * See the License for the specific language governing permissions and |
| 14 | + * limitations under the License. |
| 15 | + */ |
| 16 | +package com.datastax.oss.driver.internal.core.loadbalancing; |
| 17 | + |
| 18 | +import com.datastax.oss.driver.api.core.CqlIdentifier; |
| 19 | +import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; |
| 20 | +import com.datastax.oss.driver.api.core.context.DriverContext; |
| 21 | +import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy; |
| 22 | +import com.datastax.oss.driver.api.core.loadbalancing.NodeDistance; |
| 23 | +import com.datastax.oss.driver.api.core.metadata.Node; |
| 24 | +import com.datastax.oss.driver.api.core.metadata.NodeState; |
| 25 | +import com.datastax.oss.driver.api.core.metadata.TokenMap; |
| 26 | +import com.datastax.oss.driver.api.core.metadata.token.Token; |
| 27 | +import com.datastax.oss.driver.api.core.session.Request; |
| 28 | +import com.datastax.oss.driver.api.core.session.Session; |
| 29 | +import com.datastax.oss.driver.internal.core.context.InternalDriverContext; |
| 30 | +import com.datastax.oss.driver.internal.core.loadbalancing.helper.DefaultNodeFilterHelper; |
| 31 | +import com.datastax.oss.driver.internal.core.loadbalancing.helper.OptionalLocalDcHelper; |
| 32 | +import com.datastax.oss.driver.internal.core.util.ArrayUtils; |
| 33 | +import com.datastax.oss.driver.internal.core.util.collection.QueryPlan; |
| 34 | +import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet; |
| 35 | +import edu.umd.cs.findbugs.annotations.NonNull; |
| 36 | +import edu.umd.cs.findbugs.annotations.Nullable; |
| 37 | +import java.nio.ByteBuffer; |
| 38 | +import java.util.Collections; |
| 39 | +import java.util.Map; |
| 40 | +import java.util.Optional; |
| 41 | +import java.util.Queue; |
| 42 | +import java.util.Set; |
| 43 | +import java.util.UUID; |
| 44 | +import java.util.concurrent.CopyOnWriteArraySet; |
| 45 | +import java.util.concurrent.atomic.AtomicInteger; |
| 46 | +import java.util.function.IntUnaryOperator; |
| 47 | +import java.util.function.Predicate; |
| 48 | +import net.jcip.annotations.ThreadSafe; |
| 49 | +import org.slf4j.Logger; |
| 50 | +import org.slf4j.LoggerFactory; |
| 51 | + |
| 52 | +/** |
| 53 | + * A basic implementation of {@link LoadBalancingPolicy} that can serve as a building block for more |
| 54 | + * advanced use cases. |
| 55 | + * |
| 56 | + * <p>To activate this policy, modify the {@code basic.load-balancing-policy} section in the driver |
| 57 | + * configuration, for example: |
| 58 | + * |
| 59 | + * <pre> |
| 60 | + * datastax-java-driver { |
| 61 | + * basic.load-balancing-policy { |
| 62 | + * class = BasicLoadBalancingPolicy |
| 63 | + * local-datacenter = datacenter1 # optional |
| 64 | + * } |
| 65 | + * } |
| 66 | + * </pre> |
| 67 | + * |
| 68 | + * See {@code reference.conf} (in the manual or core driver JAR) for more details. |
| 69 | + * |
| 70 | + * <p><b>Local datacenter</b>: This implementation will only define a local datacenter if it is |
| 71 | + * explicitly set either through configuration or programmatically; if the local datacenter is |
| 72 | + * unspecified, this implementation will effectively act as a datacenter-agnostic load balancing |
| 73 | + * policy and will consider all nodes in the cluster when creating query plans, regardless of their |
| 74 | + * datacenter. |
| 75 | + * |
| 76 | + * <p><b>Query plan</b>: This implementation prioritizes replica nodes over non-replica ones; if |
| 77 | + * more than one replica is available, the replicas will be shuffled. Non-replica nodes will be |
| 78 | + * included in a round-robin fashion. If the local datacenter is defined (see above), query plans |
| 79 | + * will only include local nodes, never remote ones; if it is unspecified however, query plans may |
| 80 | + * contain nodes from different datacenters. |
| 81 | + * |
| 82 | + * <p><b>This class is not recommended for normal users who should always prefer {@link |
| 83 | + * DefaultLoadBalancingPolicy}</b>. |
| 84 | + */ |
| 85 | +@ThreadSafe |
| 86 | +public class BasicLoadBalancingPolicy implements LoadBalancingPolicy { |
| 87 | + |
| 88 | + private static final Logger LOG = LoggerFactory.getLogger(BasicLoadBalancingPolicy.class); |
| 89 | + |
| 90 | + protected static final IntUnaryOperator INCREMENT = i -> (i == Integer.MAX_VALUE) ? 0 : i + 1; |
| 91 | + |
| 92 | + @NonNull protected final InternalDriverContext context; |
| 93 | + @NonNull protected final DriverExecutionProfile profile; |
| 94 | + @NonNull protected final String logPrefix; |
| 95 | + |
| 96 | + protected final AtomicInteger roundRobinAmount = new AtomicInteger(); |
| 97 | + protected final CopyOnWriteArraySet<Node> liveNodes = new CopyOnWriteArraySet<>(); |
| 98 | + |
| 99 | + // private because they should be set in init() and never be modified after |
| 100 | + private volatile DistanceReporter distanceReporter; |
| 101 | + private volatile Predicate<Node> filter; |
| 102 | + private volatile String localDc; |
| 103 | + |
| 104 | + public BasicLoadBalancingPolicy(@NonNull DriverContext context, @NonNull String profileName) { |
| 105 | + this.context = (InternalDriverContext) context; |
| 106 | + profile = context.getConfig().getProfile(profileName); |
| 107 | + logPrefix = context.getSessionName() + "|" + profileName; |
| 108 | + } |
| 109 | + |
| 110 | + /** @return The local datacenter, if known; empty otherwise. */ |
| 111 | + public Optional<String> getLocalDatacenter() { |
| 112 | + return Optional.ofNullable(localDc); |
| 113 | + } |
| 114 | + |
| 115 | + /** |
| 116 | + * @return An immutable copy of the nodes currently considered as live; if the local datacenter is |
| 117 | + * known, this set will contain only nodes belonging to that datacenter. |
| 118 | + */ |
| 119 | + public Set<Node> getLiveNodes() { |
| 120 | + return ImmutableSet.copyOf(liveNodes); |
| 121 | + } |
| 122 | + |
| 123 | + @Override |
| 124 | + public void init(@NonNull Map<UUID, Node> nodes, @NonNull DistanceReporter distanceReporter) { |
| 125 | + this.distanceReporter = distanceReporter; |
| 126 | + localDc = discoverLocalDc(nodes).orElse(null); |
| 127 | + filter = createNodeFilter(localDc, nodes); |
| 128 | + for (Node node : nodes.values()) { |
| 129 | + if (filter.test(node)) { |
| 130 | + distanceReporter.setDistance(node, NodeDistance.LOCAL); |
| 131 | + if (node.getState() != NodeState.DOWN) { |
| 132 | + // This includes state == UNKNOWN. If the node turns out to be unreachable, this will be |
| 133 | + // detected when we try to open a pool to it, it will get marked down and this will be |
| 134 | + // signaled back to this policy |
| 135 | + liveNodes.add(node); |
| 136 | + } |
| 137 | + } else { |
| 138 | + distanceReporter.setDistance(node, NodeDistance.IGNORED); |
| 139 | + } |
| 140 | + } |
| 141 | + } |
| 142 | + |
| 143 | + /** |
| 144 | + * Returns the local datacenter, if it can be discovered, or returns {@link Optional#empty empty} |
| 145 | + * otherwise. |
| 146 | + * |
| 147 | + * <p>This method is called only once, during {@linkplain LoadBalancingPolicy#init(Map, |
| 148 | + * LoadBalancingPolicy.DistanceReporter) initialization}. |
| 149 | + * |
| 150 | + * <p>Implementors may choose to throw {@link IllegalStateException} instead of returning {@link |
| 151 | + * Optional#empty empty}, if they require a local datacenter to be defined in order to operate |
| 152 | + * properly. |
| 153 | + * |
| 154 | + * @param nodes All the nodes that were known to exist in the cluster (regardless of their state) |
| 155 | + * when the load balancing policy was initialized. This argument is provided in case |
| 156 | + * implementors need to inspect the cluster topology to discover the local datacenter. |
| 157 | + * @return The local datacenter, or {@link Optional#empty empty} if none found. |
| 158 | + * @throws IllegalStateException if the local datacenter could not be discovered, and this policy |
| 159 | + * cannot operate without it. |
| 160 | + */ |
| 161 | + @NonNull |
| 162 | + protected Optional<String> discoverLocalDc(@NonNull Map<UUID, Node> nodes) { |
| 163 | + return new OptionalLocalDcHelper(context, profile, logPrefix).discoverLocalDc(nodes); |
| 164 | + } |
| 165 | + |
| 166 | + /** |
| 167 | + * Creates a new node filter to use with this policy. |
| 168 | + * |
| 169 | + * <p>This method is called only once, during {@linkplain LoadBalancingPolicy#init(Map, |
| 170 | + * LoadBalancingPolicy.DistanceReporter) initialization}, and only after local datacenter |
| 171 | + * discovery has been attempted. |
| 172 | + * |
| 173 | + * @param localDc The local datacenter that was just discovered, or null if none found. |
| 174 | + * @param nodes All the nodes that were known to exist in the cluster (regardless of their state) |
| 175 | + * when the load balancing policy was initialized. This argument is provided in case |
| 176 | + * implementors need to inspect the cluster topology to create the node filter. |
| 177 | + * @return the node filter to use. |
| 178 | + */ |
| 179 | + @NonNull |
| 180 | + protected Predicate<Node> createNodeFilter( |
| 181 | + @Nullable String localDc, @NonNull Map<UUID, Node> nodes) { |
| 182 | + return new DefaultNodeFilterHelper(context, profile, logPrefix) |
| 183 | + .createNodeFilter(localDc, nodes); |
| 184 | + } |
| 185 | + |
| 186 | + @NonNull |
| 187 | + @Override |
| 188 | + public Queue<Node> newQueryPlan(@Nullable Request request, @Nullable Session session) { |
| 189 | + // Take a snapshot since the set is concurrent: |
| 190 | + Object[] currentNodes = liveNodes.toArray(); |
| 191 | + |
| 192 | + Set<Node> allReplicas = getReplicas(request, session); |
| 193 | + int replicaCount = 0; // in currentNodes |
| 194 | + |
| 195 | + if (!allReplicas.isEmpty()) { |
| 196 | + // Move replicas to the beginning |
| 197 | + for (int i = 0; i < currentNodes.length; i++) { |
| 198 | + Node node = (Node) currentNodes[i]; |
| 199 | + if (allReplicas.contains(node)) { |
| 200 | + ArrayUtils.bubbleUp(currentNodes, i, replicaCount); |
| 201 | + replicaCount += 1; |
| 202 | + } |
| 203 | + } |
| 204 | + |
| 205 | + if (replicaCount > 1) { |
| 206 | + shuffleHead(currentNodes, replicaCount); |
| 207 | + } |
| 208 | + } |
| 209 | + |
| 210 | + LOG.trace("[{}] Prioritizing {} local replicas", logPrefix, replicaCount); |
| 211 | + |
| 212 | + // Round-robin the remaining nodes |
| 213 | + ArrayUtils.rotate( |
| 214 | + currentNodes, |
| 215 | + replicaCount, |
| 216 | + currentNodes.length - replicaCount, |
| 217 | + roundRobinAmount.getAndUpdate(INCREMENT)); |
| 218 | + |
| 219 | + return new QueryPlan(currentNodes); |
| 220 | + } |
| 221 | + |
| 222 | + @NonNull |
| 223 | + protected Set<Node> getReplicas(@Nullable Request request, @Nullable Session session) { |
| 224 | + if (request == null || session == null) { |
| 225 | + return Collections.emptySet(); |
| 226 | + } |
| 227 | + |
| 228 | + // Note: we're on the hot path and the getXxx methods are potentially more than simple getters, |
| 229 | + // so we only call each method when strictly necessary (which is why the code below looks a bit |
| 230 | + // weird). |
| 231 | + CqlIdentifier keyspace = request.getKeyspace(); |
| 232 | + if (keyspace == null) { |
| 233 | + keyspace = request.getRoutingKeyspace(); |
| 234 | + } |
| 235 | + if (keyspace == null && session.getKeyspace().isPresent()) { |
| 236 | + keyspace = session.getKeyspace().get(); |
| 237 | + } |
| 238 | + if (keyspace == null) { |
| 239 | + return Collections.emptySet(); |
| 240 | + } |
| 241 | + |
| 242 | + Token token = request.getRoutingToken(); |
| 243 | + ByteBuffer key = (token == null) ? request.getRoutingKey() : null; |
| 244 | + if (token == null && key == null) { |
| 245 | + return Collections.emptySet(); |
| 246 | + } |
| 247 | + |
| 248 | + Optional<TokenMap> maybeTokenMap = context.getMetadataManager().getMetadata().getTokenMap(); |
| 249 | + if (maybeTokenMap.isPresent()) { |
| 250 | + TokenMap tokenMap = maybeTokenMap.get(); |
| 251 | + return (token != null) |
| 252 | + ? tokenMap.getReplicas(keyspace, token) |
| 253 | + : tokenMap.getReplicas(keyspace, key); |
| 254 | + } else { |
| 255 | + return Collections.emptySet(); |
| 256 | + } |
| 257 | + } |
| 258 | + |
| 259 | + /** Exposed as a protected method so that it can be accessed by tests */ |
| 260 | + protected void shuffleHead(Object[] currentNodes, int replicaCount) { |
| 261 | + ArrayUtils.shuffleHead(currentNodes, replicaCount); |
| 262 | + } |
| 263 | + |
| 264 | + @Override |
| 265 | + public void onAdd(@NonNull Node node) { |
| 266 | + if (filter.test(node)) { |
| 267 | + LOG.debug("[{}] {} was added, setting distance to LOCAL", logPrefix, node); |
| 268 | + // Setting to a non-ignored distance triggers the session to open a pool, which will in turn |
| 269 | + // set the node UP when the first channel gets opened. |
| 270 | + distanceReporter.setDistance(node, NodeDistance.LOCAL); |
| 271 | + } else { |
| 272 | + distanceReporter.setDistance(node, NodeDistance.IGNORED); |
| 273 | + } |
| 274 | + } |
| 275 | + |
| 276 | + @Override |
| 277 | + public void onUp(@NonNull Node node) { |
| 278 | + if (filter.test(node)) { |
| 279 | + // Normally this is already the case, but the filter could be dynamic and have ignored the |
| 280 | + // node previously. |
| 281 | + distanceReporter.setDistance(node, NodeDistance.LOCAL); |
| 282 | + if (liveNodes.add(node)) { |
| 283 | + LOG.debug("[{}] {} came back UP, added to live set", logPrefix, node); |
| 284 | + } |
| 285 | + } else { |
| 286 | + distanceReporter.setDistance(node, NodeDistance.IGNORED); |
| 287 | + } |
| 288 | + } |
| 289 | + |
| 290 | + @Override |
| 291 | + public void onDown(@NonNull Node node) { |
| 292 | + if (liveNodes.remove(node)) { |
| 293 | + LOG.debug("[{}] {} went DOWN, removed from live set", logPrefix, node); |
| 294 | + } |
| 295 | + } |
| 296 | + |
| 297 | + @Override |
| 298 | + public void onRemove(@NonNull Node node) { |
| 299 | + if (liveNodes.remove(node)) { |
| 300 | + LOG.debug("[{}] {} was removed, removed from live set", logPrefix, node); |
| 301 | + } |
| 302 | + } |
| 303 | + |
| 304 | + @Override |
| 305 | + public void close() { |
| 306 | + // nothing to do |
| 307 | + } |
| 308 | +} |
0 commit comments