[New Scheduler] Add DataManagementService#5063
Conversation
| case object GrantLease | ||
|
|
||
| // TBD | ||
| class LeaseKeepAliveService { |
There was a problem hiding this comment.
I realized this pr is also dependent on these modules.
It would be better to merge this PR after this kind of module is introduced.
| // normally these messages will be sent when queues are created. | ||
| case request: ElectLeader => | ||
| if (inProgressKeys.contains(request.key)) { | ||
| logging.info(this, s"save request $request into a buffer") |
There was a problem hiding this comment.
do these really need to be info level?
There was a problem hiding this comment.
In this case, it stores the request into a buffer because there is already precedent request processing. If any issue happens it would let us know if the request has processed or not.
There was a problem hiding this comment.
So I'm still learning what this is doing, but what does each request involve? Is it every activation or some sort of metadata setup? If it's every activation it would seem spammy to me otherwise I think it's fine
| .map { res => | ||
| parent ! FinishWork(request.key) | ||
| if (res.getSucceeded) { | ||
| logging.debug(this, s"data is stored.") |
|
I think we need to merge a PR for CI tests for scheduler components before merging this PR. |
| * In the event any issue occurs while storing data, the actor keeps trying until the data is stored guaranteeing delivery to ETCD. | ||
| * So it guarantees the data is eventually stored. | ||
| */ | ||
| class DataManagementService(watcherService: ActorRef, workerFactory: ActorRefFactory => ActorRef)( |
There was a problem hiding this comment.
I will include test cases into this PR after setting up the CI pipeline for scheduler components.
| } | ||
|
|
||
| // normally these messages will be sent when queues are created. | ||
| case request: ElectLeader => |
There was a problem hiding this comment.
Leader election happens when a queue is created.
This is to guarantee only one scheduler creates a certain queue.
So it happens relatively fewer times.
|
|
||
| // normally these messages will be sent when queues are created. | ||
| case request: ElectLeader => | ||
| if (inProgressKeys.contains(request.key)) { |
There was a problem hiding this comment.
With the retry nature of this component, if there is a precedent request(being retried), it would store the new request to a buffer.
| logging.info(this, s"save a request $request into a buffer") | ||
| operations.getOrElseUpdate(request.key, Queue.empty[Any]).enqueue(request) | ||
| } else { | ||
| worker ! request |
There was a problem hiding this comment.
Actual works would be delegated to ETCDWorker.
| inProgressKeys = inProgressKeys + request.key | ||
| } | ||
|
|
||
| case request: RegisterInitialData => |
There was a problem hiding this comment.
Actions under the same namespace share some data such as namespace throttling data.
So it is required to store the data if there is no data yet but not overwrite an existing one.
This case is for the case.
|
|
||
| case request: RegisterInitialData => | ||
| // send WatchEndpoint first as the put operation will be retried until success if failed | ||
| if (request.failoverEnabled) |
There was a problem hiding this comment.
If the failover is enabled, it would watch the key and if the key is deleted for some reason, it would try to restore it.
| inProgressKeys = inProgressKeys + request.key | ||
| } | ||
|
|
||
| case request: RegisterData => |
There was a problem hiding this comment.
This will overwrite the existing data in ETCD.
Generally, this is used for data that is not shared among actions.
| case WatchEndpointRemoved(_, key, value, true) => | ||
| logging.error(this, s"unexpected data received: ${WatchEndpoint(key, value, isPrefix = true, watcherName)}") | ||
|
|
||
| case msg: UpdateDataOnChange => |
There was a problem hiding this comment.
To reduce the loads against ETCD, it does not store data if there is no change in the value.
| actorSystem: ActorSystem) | ||
| extends Actor { | ||
| private implicit val ec = context.dispatcher | ||
|
|
There was a problem hiding this comment.
This class is used by both schedulers and invokers to store data to ETCD.
The following kinds of data are stored to ETCD.
- Throttling data(Action / Namespace)
- Queue endpoint(where a queue is running)
- Scheduler endpoint.
- Container data(running container, warmed container, data to describe how many containers are being created)
Dependent modules are Queue, ContainerProxy, CreationJobManager, etc.
|
Just comes up in my mind is it would be great to write down some documents for each component in Wiki. |
bab5ce6 to
fd4d4e3
Compare
a20e5d2 to
448753b
Compare
|
Waiting for this PR(#5067) to be merged. |
|
I wrote a document about this module: https://cwiki.apache.org/confluence/display/OPENWHISK/DataManagementService |
…DataManagementService.scala Apply comment Co-authored-by: Brendan Doyle <[email protected]>
Update comments Co-authored-by: Brendan Doyle <[email protected]>
448753b to
56a4f2c
Compare
Codecov Report
@@ Coverage Diff @@
## master #5063 +/- ##
==========================================
- Coverage 81.63% 75.03% -6.60%
==========================================
Files 205 214 +9
Lines 10013 10448 +435
Branches 442 470 +28
==========================================
- Hits 8174 7840 -334
- Misses 1839 2608 +769
Continue to review full report at Codecov.
|
|
It's ready to merge. |
|
@bdoyle0182 Do you have any other comments on this PR? |
|
Just one comment. LGTM |
Description
This component is in charge of storing data to ETCD.
It is based on eventual consistency.
If it fails to store data for some reason, it keeps retrying until data is stored.
Related issue and scope
My changes affect the following components
Types of changes
Checklist: