Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce schema pull request volume #582

Open
wants to merge 8 commits into
base: palantir-cassandra-2.2.18
Choose a base branch
from

Conversation

andybradshaw
Copy link
Contributor

Reduce schema pull request volume by only scheduling one request per schema version at a time. The thought is that any successful request for a schema version will return the same set of mutations, so there's no need to flood other nodes with unnecessary requests.

Copy link
Contributor Author

@andybradshaw andybradshaw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tested on a staging stack with a large number of nodes... we see the following log lines on a node that needs to catch up with some schema changes:

{"type":"service.1","level":"DEBUG","time":"2024-11-20T01:00:21.939519Z","origin":"org.apache.cassandra.service.MigrationManager","safe":true,"thread":"MigrationStage:1","message":"Gossiping my schema version {}","params":{},"uid":null,"sid":null,"tokenId":null,"traceId":"6d9ec00a8aad1bfe","stacktrace":null,"unsafeParams":{"0":"4211f487-13c9-3da0-8f09-77ff6dce868b"},"tags":{}}
{"type":"service.1","level":"DEBUG","time":"2024-11-20T01:00:22.547598Z","origin":"org.apache.cassandra.service.MigrationManager","safe":true,"thread":"GossipStage:1","message":"Evaluating schema pull criteria: other schema empty {}, no scheduled requests {}","params":{},"uid":null,"sid":null,"tokenId":null,"traceId":null,"stacktrace":null,"unsafeParams":{"0":true,"1":true},"tags":{}}
// ... many lines of the following ...
{"type":"service.1","level":"DEBUG","time":"2024-11-20T01:00:22.551392Z","origin":"org.apache.cassandra.service.MigrationManager","safe":true,"thread":"GossipStage:1","message":"Not pulling schema because versions match or shouldPullSchemaFrom returned false","params":{},"uid":null,"sid":null,"tokenId":null,"traceId":null,"stacktrace":null,"unsafeParams":{},"tags":{}}
{"type":"service.1","level":"DEBUG","time":"2024-11-20T01:00:22.553306Z","origin":"org.apache.cassandra.service.MigrationManager","safe":true,"thread":"GossipStage:1","message":"Evaluating schema pull criteria: other schema empty {}, no scheduled requests {}","params":{},"uid":null,"sid":null,"tokenId":null,"traceId":null,"stacktrace":null,"unsafeParams":{"0":true,"1":false},"tags":{}}
{"type":"service.1","level":"DEBUG","time":"2024-11-20T01:00:22.553320Z","origin":"org.apache.cassandra.service.MigrationManager","safe":true,"thread":"GossipStage:1","message":"Not pulling schema because versions match or shouldPullSchemaFrom returned false","params":{},"uid":null,"sid":null,"tokenId":null,"traceId":null,"stacktrace":null,"unsafeParams":{},"tags":{}}
{"type":"service.1","level":"DEBUG","time":"2024-11-20T01:00:22.555249Z","origin":"org.apache.cassandra.service.MigrationManager","safe":true,"thread":"GossipStage:1","message":"Evaluating schema pull criteria: other schema empty {}, no scheduled requests {}","params":{},"uid":null,"sid":null,"tokenId":null,"traceId":null,"stacktrace":null,"unsafeParams":{"0":true,"1":false},"tags":{}}
// ... finally actually submitting the migration task ...
{"type":"service.1","level":"DEBUG","time":"2024-11-20T01:01:41.249743Z","origin":"org.apache.cassandra.service.MigrationManager","safe":true,"thread":"NonPeriodicTasks:1","message":"submitting migration task for endpoint {}, endpoint schema version {}, and our schema version {}","params":{},"uid":null,"sid":null,"tokenId":null,"traceId":null,"stacktrace":null,"unsafeParams":{"0":"/10.100.169.237","1":"f4e89c16-e5a0-3cac-9aa8-93ea575fb472","2":"60a8b5bb-1e41-32ef-a40a-3112b19b6238"},"tags":{}} 

*/
boolean isOtherSchemaNonEmpty = !Schema.emptyVersion.equals(theirVersion);
boolean noScheduledRequests = !scheduledSchemaPulls.contains(theirVersion);
logger.debug("Evaluating schema pull criteria: other schema empty {}, no scheduled requests {}", isOtherSchemaNonEmpty, noScheduledRequests);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was added for testing purposes... reminder to remove before merging. Also, got the log param message wrong 🤦‍♂️

@@ -62,10 +62,13 @@ public class MigrationManager

private static final RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean();

public static final Set<UUID> scheduledSchemaPulls = new ConcurrentSkipListSet<>();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't love the visibility of this, but we needed access from the migration task. Will probably update the task itself to hold a reference to this.

* Don't request schema from bootstrapping nodes (?)
* Don't request schema if we have scheduled a pull request for that schema version
*/
boolean isOtherSchemaNonEmpty = !Schema.emptyVersion.equals(theirVersion);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an additional check to prevent making schema pull requests to new nodes. It's probably not that important, since the work done in response to that is pretty small.

@andybradshaw
Copy link
Contributor Author

After adding some additional logging and moving where we remove from the outstanding schema pull request set:

{"type":"service.1","level":"DEBUG","time":"2024-11-25T18:33:05.569981Z","origin":"org.apache.cassandra.service.MigrationManager","safe":true,"thread":"GossipStage:1","message":"Evaluating schema pull criteria: currently scheduled requests for version {}: {}","params":{},"uid":null,"sid":null,"tokenId":null,"traceId":null,"stacktrace":null,"unsafeParams":{"0":"1f90579d-5612-3a6d-885c-f13f31e2027a","1":"[10.100.133.103/10.100.133.103, 10.100.137.98/10.100.137.98, 10.100.195.98/10.100.195.98, 10.100.98.98/10.100.98.98]"},"tags":{}}
{"type":"service.1","level":"DEBUG","time":"2024-11-25T18:33:05.570005Z","origin":"org.apache.cassandra.service.MigrationManager","safe":true,"thread":"GossipStage:1","message":"Not pulling schema because versions match or shouldPullSchemaFrom returned false","params":{},"uid":null,"sid":null,"tokenId":null,"traceId":null,"stacktrace":null,"unsafeParams":{},"tags":{}}
{"type":"service.1","level":"DEBUG","time":"2024-11-25T18:33:05.666239Z","origin":"org.apache.cassandra.service.MigrationManager","safe":true,"thread":"InternalResponseStage:33","message":"Gossiping my schema version {}","params":{},"uid":null,"sid":null,"tokenId":null,"traceId":"3e697fcfb98def74","stacktrace":null,"unsafeParams":{"0":"1f90579d-5612-3a6d-885c-f13f31e2027a"},"tags":{}}
{"type":"service.1","level":"DEBUG","time":"2024-11-25T18:33:05.686966Z","origin":"org.apache.cassandra.service.MigrationTask","safe":true,"thread":"InternalResponseStage:33","message":"Successfully processed response to schema pull, removing endpoint from scheduled schema pulls {}: {} ({})","params":{},"uid":null,"sid":null,"tokenId":null,"traceId":null,"stacktrace":null,"unsafeParams":{"0":"10.100.133.103/10.100.133.103","1":"1f90579d-5612-3a6d-885c-f13f31e2027a","2":"[10.100.133.103/10.100.133.103, 10.100.137.98/10.100.137.98, 10.100.195.98/10.100.195.98, 10.100.98.98/10.100.98.98]"},"tags":{}}
{"type":"service.1","level":"DEBUG","time":"2024-11-25T18:33:05.712994Z","origin":"org.apache.cassandra.service.MigrationManager","safe":true,"thread":"GossipStage:1","message":"Not pulling schema because versions match or shouldPullSchemaFrom returned false","params":{},"uid":null,"sid":null,"tokenId":null,"traceId":null,"stacktrace":null,"unsafeParams":{},"tags":{}}
{"type":"service.1","level":"DEBUG","time":"2024-11-25T18:33:05.919488Z","origin":"org.apache.cassandra.service.MigrationManager","safe":true,"thread":"GossipStage:1","message":"Not pulling schema because versions match or shouldPullSchemaFrom returned false","params":{},"uid":null,"sid":null,"tokenId":null,"traceId":null,"stacktrace":null,"unsafeParams":{},"tags":{}}
{"type":"service.1","level":"DEBUG","time":"2024-11-25T18:33:09.138266Z","origin":"org.apache.cassandra.service.MigrationManager","safe":true,"thread":"InternalResponseStage:34","message":"Gossiping my schema version {}","params":{},"uid":null,"sid":null,"tokenId":null,"traceId":"59dccc2121923d02","stacktrace":null,"unsafeParams":{"0":"1f90579d-5612-3a6d-885c-f13f31e2027a"},"tags":{}}
{"type":"service.1","level":"DEBUG","time":"2024-11-25T18:33:09.138320Z","origin":"org.apache.cassandra.service.MigrationTask","safe":true,"thread":"InternalResponseStage:34","message":"Successfully processed response to schema pull, removing endpoint from scheduled schema pulls {}: {} ({})","params":{},"uid":null,"sid":null,"tokenId":null,"traceId":null,"stacktrace":null,"unsafeParams":{"0":"10.100.137.98/10.100.137.98","1":"1f90579d-5612-3a6d-885c-f13f31e2027a","2":"[10.100.195.98/10.100.195.98, 10.100.98.98/10.100.98.98]"},"tags":{}}

Will test the timeout case and see if there are any easy ways to add unit tests for this

Comment on lines 101 to 105
// always attempt to clean up our outstanding schema pull request if created with a version
version.ifPresent(v -> {
logger.debug("Successfully processed response to schema pull, removing endpoint from scheduled schema pulls {}: {}", endpoint, v);
MigrationManager.scheduledSchemaPulls.computeIfPresent(v, MigrationManager.removeEndpointFromSchemaPulls(endpoint));
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't know if "we don't have multiple parallel requests per node" will be foolproof at the cluster level vs also having some configured backoff duration.

if this node is slow to merge schema, you could still be spamming the other node w/ schema pulls so long as it was respodining quickly enough, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this node is slow to merge schema

Is this node here the local node or the endpoint? The upside would be that we won't make more requests until either we've merged the schema, or our callback has timed out and we need to make a new request anyways.

The thing I'm more concerned about is that it makes the response caching change less effective. For example, with the previous behavior:

  • Node A on schema version 1, Nodes B, C, and D on sv2
  • Node A makes pulls to B/C/D -> request takes too long and callback is timed out, but response is cached on all nodes
  • Node A makes pulls to B/C/D -> response from cache is fast and callback merges the schema

Now that we're limiting the number of nodes we hit, in the pathological case:

  • Node A on schema version 1, Nodes B, C, and D on sv2
  • Node A makes pulls to B -> request takes too long and callback is timed out, but response is cached B
  • Node A makes pulls to C -> request takes too long and callback is timed out, but response is cached C
  • Cache expires on B
  • ... continue making requests until we happen to hit a node with the cached response ...

This could be made less problematic with the "seed node" approach, but could also be more invasive depending on how we want to handle the endpoint selection bits.

public static boolean shouldPullSchemaFrom(InetAddress endpoint, UUID theirVersion)
{
/*
* Don't request schema from nodes with a differnt or unknonw major version (may have incompatible schema)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit typos in this + the original comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

ScheduledExecutors.nonPeriodicTasks.schedule(runnable, MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS);
}
}

static BiFunction<UUID, Set<InetAddress>, Set<InetAddress>> removeEndpointFromSchemaPulls(InetAddress endpoint) {
return (v, s) -> {
logger.debug("Removing endpoint from scheduled schema pulls {}: {} ({})", endpoint, v, s);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can use safeargs for all these now!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated!

endpoint,
currentVersion,
Schema.instance.getVersion());
submitMigrationTask(endpoint);
submitMigrationTask(endpoint, currentVersion);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be theirVersion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/palantir/cassandra/pull/582/files#diff-8191bf68e083d923ddd54125087c550e0fa3136ea0c62d722e79a5d1a1fbe030R149

This should prevent us from having this drift, so I think currentVersion should be fine.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants