-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reduce schema pull request volume #582
base: palantir-cassandra-2.2.18
Are you sure you want to change the base?
Reduce schema pull request volume #582
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tested on a staging stack with a large number of nodes... we see the following log lines on a node that needs to catch up with some schema changes:
{"type":"service.1","level":"DEBUG","time":"2024-11-20T01:00:21.939519Z","origin":"org.apache.cassandra.service.MigrationManager","safe":true,"thread":"MigrationStage:1","message":"Gossiping my schema version {}","params":{},"uid":null,"sid":null,"tokenId":null,"traceId":"6d9ec00a8aad1bfe","stacktrace":null,"unsafeParams":{"0":"4211f487-13c9-3da0-8f09-77ff6dce868b"},"tags":{}}
{"type":"service.1","level":"DEBUG","time":"2024-11-20T01:00:22.547598Z","origin":"org.apache.cassandra.service.MigrationManager","safe":true,"thread":"GossipStage:1","message":"Evaluating schema pull criteria: other schema empty {}, no scheduled requests {}","params":{},"uid":null,"sid":null,"tokenId":null,"traceId":null,"stacktrace":null,"unsafeParams":{"0":true,"1":true},"tags":{}}
// ... many lines of the following ...
{"type":"service.1","level":"DEBUG","time":"2024-11-20T01:00:22.551392Z","origin":"org.apache.cassandra.service.MigrationManager","safe":true,"thread":"GossipStage:1","message":"Not pulling schema because versions match or shouldPullSchemaFrom returned false","params":{},"uid":null,"sid":null,"tokenId":null,"traceId":null,"stacktrace":null,"unsafeParams":{},"tags":{}}
{"type":"service.1","level":"DEBUG","time":"2024-11-20T01:00:22.553306Z","origin":"org.apache.cassandra.service.MigrationManager","safe":true,"thread":"GossipStage:1","message":"Evaluating schema pull criteria: other schema empty {}, no scheduled requests {}","params":{},"uid":null,"sid":null,"tokenId":null,"traceId":null,"stacktrace":null,"unsafeParams":{"0":true,"1":false},"tags":{}}
{"type":"service.1","level":"DEBUG","time":"2024-11-20T01:00:22.553320Z","origin":"org.apache.cassandra.service.MigrationManager","safe":true,"thread":"GossipStage:1","message":"Not pulling schema because versions match or shouldPullSchemaFrom returned false","params":{},"uid":null,"sid":null,"tokenId":null,"traceId":null,"stacktrace":null,"unsafeParams":{},"tags":{}}
{"type":"service.1","level":"DEBUG","time":"2024-11-20T01:00:22.555249Z","origin":"org.apache.cassandra.service.MigrationManager","safe":true,"thread":"GossipStage:1","message":"Evaluating schema pull criteria: other schema empty {}, no scheduled requests {}","params":{},"uid":null,"sid":null,"tokenId":null,"traceId":null,"stacktrace":null,"unsafeParams":{"0":true,"1":false},"tags":{}}
// ... finally actually submitting the migration task ...
{"type":"service.1","level":"DEBUG","time":"2024-11-20T01:01:41.249743Z","origin":"org.apache.cassandra.service.MigrationManager","safe":true,"thread":"NonPeriodicTasks:1","message":"submitting migration task for endpoint {}, endpoint schema version {}, and our schema version {}","params":{},"uid":null,"sid":null,"tokenId":null,"traceId":null,"stacktrace":null,"unsafeParams":{"0":"/10.100.169.237","1":"f4e89c16-e5a0-3cac-9aa8-93ea575fb472","2":"60a8b5bb-1e41-32ef-a40a-3112b19b6238"},"tags":{}}
*/ | ||
boolean isOtherSchemaNonEmpty = !Schema.emptyVersion.equals(theirVersion); | ||
boolean noScheduledRequests = !scheduledSchemaPulls.contains(theirVersion); | ||
logger.debug("Evaluating schema pull criteria: other schema empty {}, no scheduled requests {}", isOtherSchemaNonEmpty, noScheduledRequests); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was added for testing purposes... reminder to remove before merging. Also, got the log param message wrong 🤦♂️
@@ -62,10 +62,13 @@ public class MigrationManager | |||
|
|||
private static final RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean(); | |||
|
|||
public static final Set<UUID> scheduledSchemaPulls = new ConcurrentSkipListSet<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't love the visibility of this, but we needed access from the migration task. Will probably update the task itself to hold a reference to this.
* Don't request schema from bootstrapping nodes (?) | ||
* Don't request schema if we have scheduled a pull request for that schema version | ||
*/ | ||
boolean isOtherSchemaNonEmpty = !Schema.emptyVersion.equals(theirVersion); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an additional check to prevent making schema pull requests to new nodes. It's probably not that important, since the work done in response to that is pretty small.
After adding some additional logging and moving where we remove from the outstanding schema pull request set:
Will test the timeout case and see if there are any easy ways to add unit tests for this |
// always attempt to clean up our outstanding schema pull request if created with a version | ||
version.ifPresent(v -> { | ||
logger.debug("Successfully processed response to schema pull, removing endpoint from scheduled schema pulls {}: {}", endpoint, v); | ||
MigrationManager.scheduledSchemaPulls.computeIfPresent(v, MigrationManager.removeEndpointFromSchemaPulls(endpoint)); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still don't know if "we don't have multiple parallel requests per node" will be foolproof at the cluster level vs also having some configured backoff duration.
if this node is slow to merge schema, you could still be spamming the other node w/ schema pulls so long as it was respodining quickly enough, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this node is slow to merge schema
Is this
node here the local node or the endpoint? The upside would be that we won't make more requests until either we've merged the schema, or our callback has timed out and we need to make a new request anyways.
The thing I'm more concerned about is that it makes the response caching change less effective. For example, with the previous behavior:
- Node A on schema version 1, Nodes B, C, and D on sv2
- Node A makes pulls to B/C/D -> request takes too long and callback is timed out, but response is cached on all nodes
- Node A makes pulls to B/C/D -> response from cache is fast and callback merges the schema
Now that we're limiting the number of nodes we hit, in the pathological case:
- Node A on schema version 1, Nodes B, C, and D on sv2
- Node A makes pulls to B -> request takes too long and callback is timed out, but response is cached B
- Node A makes pulls to C -> request takes too long and callback is timed out, but response is cached C
- Cache expires on B
- ... continue making requests until we happen to hit a node with the cached response ...
This could be made less problematic with the "seed node" approach, but could also be more invasive depending on how we want to handle the endpoint selection bits.
public static boolean shouldPullSchemaFrom(InetAddress endpoint, UUID theirVersion) | ||
{ | ||
/* | ||
* Don't request schema from nodes with a differnt or unknonw major version (may have incompatible schema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit typos in this + the original comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
ScheduledExecutors.nonPeriodicTasks.schedule(runnable, MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS); | ||
} | ||
} | ||
|
||
static BiFunction<UUID, Set<InetAddress>, Set<InetAddress>> removeEndpointFromSchemaPulls(InetAddress endpoint) { | ||
return (v, s) -> { | ||
logger.debug("Removing endpoint from scheduled schema pulls {}: {} ({})", endpoint, v, s); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we can use safeargs for all these now!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated!
endpoint, | ||
currentVersion, | ||
Schema.instance.getVersion()); | ||
submitMigrationTask(endpoint); | ||
submitMigrationTask(endpoint, currentVersion); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be theirVersion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should prevent us from having this drift, so I think currentVersion
should be fine.
Reduce schema pull request volume by only scheduling one request per schema version at a time. The thought is that any successful request for a schema version will return the same set of mutations, so there's no need to flood other nodes with unnecessary requests.