Implement an ElasticSearchActivationStore#4724
Conversation
|
FTR, with this change, we can take advantage of ElasticSearch. |
| dir: | ||
| become: "{{ elastic_dir_become | default(false) }}" | ||
| base_volume: "{{ elastic_base_volume | default('esdata') }}" | ||
| cluster_name: "{{ elastic_cluster_name | default('lambda') }}" |
| @@ -0,0 +1,20 @@ | |||
| # Licensed to the Apache Software Foundation (ASF) under one or more contributor | |||
There was a problem hiding this comment.
We don't use the short version of the Apache license header anymore.
| with ActivationStoreBehavior { | ||
|
|
||
| override def checkDeleteActivation(activation: WhiskActivation)(implicit transid: TransactionId): Unit = { | ||
| retry(super.checkDeleteActivation(activation), 10) |
There was a problem hiding this comment.
sometimes delete will get a not found error even we can get a 200 response with a get, didn't know the reason
| export ELASTIC_INDEX_PATTERN="unittest-%s" | ||
| export ELASTIC_USERNAME="admin" | ||
| export ELASTIC_PASSWORD="admin" | ||
|
|
There was a problem hiding this comment.
I'm not sure whether it is ok to add codes here, but without these lines, tests for ElasticSearchActivationStore will be skipped and it can introduce some risk to the system
There was a problem hiding this comment.
For test we can also look into using TestContainers ElasticSearch support. Then you would not need to deploy an ES container via ansible just for test runs
There was a problem hiding this comment.
thanks, this looks great! I will look into it
There was a problem hiding this comment.
while using TestContainers, the gradle process will be time out cause of GC overhead limit exceed issue
There was a problem hiding this comment.
@jiangpengcheng could you please give more details? Even better if you can push a branch.
Testcontainers is not very memory greedy, so I believe this is something else
There was a problem hiding this comment.
@bsideup sorry to misleading you, actually the error is related to my environment, after do ./gradlew clean, it worked
| new Batcher(500, maxOpenDbRequests)(doStore(_)(TransactionId.dbBatcher)) | ||
|
|
||
| private val minStart = 0L | ||
| private val maxStart = Instant.now.toEpochMilli + 100L * 365 * 24 * 60 * 60 * 1000 //100 years from now |
There was a problem hiding this comment.
used for range query when since or upto is a None
| } | ||
|
|
||
| private def generateIndex(namespace: String): String = { | ||
| elasticSearchConfig.indexPattern.dropWhile(_ == '/') format namespace.toLowerCase |
There was a problem hiding this comment.
we can provide a custom pattern to store activations in special indices
for example, with pattern "openwhisk-%s", activations in different namespaces will be saved in to different indices
or use "openwhisk" with out a "%s", then all activations will be saved into the "openwhisk" index
we can even use index alias like:
-
openwhisk-whisk.system(this is an alias and below are indices it connects to)
- openwhisk-whisk.system_201911(write index)(activations will be write to this index only while list/get activations search all indices)
- openwhisk-whisk.system_201910
- openwhisk-whisk.system_201909
-
openwhisk-ns1
- openwhisk-ns1_201911(write index)
- openwhisk-ns1_201910
- openwhisk-ns1_201909
-
openwhisk-ns2(we can use shared indices for namespaces which are not very active)
- openwhisk@shared_201911(write index)
- openwhisk@shared_201910
- openwhisk@shared_201909
-
openwhisk-ns3
- openwhisk@shared_201911(write index)
- openwhisk@shared_201910
- openwhisk@shared_201909
chetanmeh
left a comment
There was a problem hiding this comment.
Did a first pass on the code part (excluding the ansible setup). Looks good! Wanted to confirm if caching support is actually needed for activation store?
| ErrorLevel)) | ||
| } | ||
|
|
||
| override def delete(activationId: ActivationId, context: UserContext)( |
There was a problem hiding this comment.
Later we should review the usage for delete as activation records are immutable and removed via expiry instead of explicit removal. So technically we can drop the delete support
|
Would be useful to have the actual record structure in ElasticSearch documented as part of PR description |
|
@jiangpengcheng Could you address @chetanmeh's comment? |
|
@chetanmeh Any other opinion on this? |
|
This is fantastic! |
Missed reviewing it. Added few more comments |
|
@chetanmeh Thank you for the comments. |
|
@jiangpengcheng Could you handle the review points? |
|
already updated @style95 |
chetanmeh
left a comment
There was a problem hiding this comment.
Mostly looks good. Added few more comments.
Looking at coverage report one thing looks odd
2 lines which do not have any coverage somehow indicates that flow hits the Try part first but due to some exception fallbacks to else part. Is that some issue on coverage side or test only exercise one part of the flow?
| start, | ||
| s"[PUT] 'activations' failed to put document: '${activation.docid}'; http status: '${res.status}'", | ||
| ErrorLevel) | ||
| throw new Exception("Unexpected http response code: " + res.status) |
There was a problem hiding this comment.
Better to use a specific exception here. In general when store impls throw exception they throw exception extending ArtifactStoreException such that in StoreUtils.reportFailure those exception are not rehandled. In current impl transid.failed would be invoked twice and thus would result in wrong metrics being recorded
There was a problem hiding this comment.
oh, great point, sorry I missed it
about the odd coverage report, currently it only apply unittests from org.apache.openwhisk.core.database.test.behavior.ActivationStoreBehavior, in which mock activations' annotations and response.result fields are all null, so Try will failed and fallbacks to else part, I will add some fake result and annotations to mock activations
|
@jiangpengcheng Thanks for taking care of the review feedback. Overall (barring ansible part which I was not able to have a closer look) it looks good to me. Feel free to merge once you are done with this |
rabbah
left a comment
There was a problem hiding this comment.
do you mind adding a short set of instructions for how to try this with a local deployment?
| {{ docker_machine_ip }} ansible_host={{ docker_machine_ip }} | ||
|
|
||
| [elasticsearch:children] | ||
| db |
There was a problem hiding this comment.
That means elasticsearch nodes will be deployed on the same hosts specified in the db section.
sorry for the late response, shall I write the instruction to a document or in this PR's description? |
|
@jiangpengcheng I think we anyway need a document to describe how to use it. |
|
Thanks for adding the instructions. I have restarted Travis. |
rabbah
left a comment
There was a problem hiding this comment.
LGTM. Remove vagrant change since we removed all vagrant mentions in the project. Also did you consider adding a new target for wskdev to start es?
b8e392e to
541fce1
Compare
|
It seems it is ready to merge. |

Implement an ElasticSearchActivationStore
Description
Although currently we can use
ArtifactWithFileStorageActivationStoreto store activations into ES, but this need configure logstash manually, and it will still save activations in CouchDbThis PR will save activations into ElasticSearch directly, an then CouchDb will only serve for subjects&actions, someone may need this
It also provides ansible scripts to deploy a simple ES cluster for test/dev purpose
The activation stored in ES is like below(the
_sourcefield):{ "_index": "openwhisk-test_@shared_1911", "_type": "_doc", "_id": "jiangpengcheng/6138d67781a34b9fb8d67781a35b9f80", "_version": 1, "_score": 11.635652, "_source": { "@timestamp": "2019-11-20T06:57:04.282Z", "activationId": "6138d67781a34b9fb8d67781a35b9f80", "annotations": { "path": "jiangpengcheng/hello", "waitTime": 78, "kind": "nodejs:6", "timeout": false, "limits": { "concurrency": 1, "logs": 1, "memory": 256, "timeout": 60000 }, "initTime": 42 }, "duration": 57, "end": 1574233024339, "entityType": "activation", "logs": [ "2019-11-20T06:57:04.337733554Z stdout: undefined" ], "name": "hello", "namespace": "jiangpengcheng", "path": "jiangpengcheng/hello", "publish": false, "response": { "result": "{}", "statusCode": 0 }, "start": 1574233024282, "subject": "jiangpengcheng", "updated": 1574233024340, "version": "0.0.1" }, "fields": { "@timestamp": [ "2019-11-20T06:57:04.282Z" ] }, "highlight": { "namespace": [ "@kibana-highlighted-field@jiangpengcheng@/kibana-highlighted-field@" ] } }Related issue and scope
My changes affect the following components
Types of changes
Checklist: