-
Notifications
You must be signed in to change notification settings - Fork 16
feat: capture delayed events #4104
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…re.capture-delayed-events
e2448ee to
9513fd6
Compare
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## master #4104 +/- ##
==========================================
+ Coverage 72.44% 72.49% +0.04%
==========================================
Files 382 383 +1
Lines 55420 55476 +56
==========================================
+ Hits 40149 40216 +67
+ Misses 12940 12931 -9
+ Partials 2331 2329 -2 ☔ View full report in Codecov by Sentry. |
Co-authored-by: Akash Chetty <[email protected]>
…rudder-server into chore.capture-delayed-events
…re.capture-delayed-events
…rudder-server into chore.capture-delayed-events
…re.capture-delayed-events
| source, err := proc.getSourceBySourceID(string(sourceID)) | ||
| if err != nil { | ||
| continue | ||
| } | ||
|
|
||
| for _, obs := range proc.sourceObservers { | ||
| obs.ObserveSourceEvents(source, events) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trivial: alternative with less lines
| source, err := proc.getSourceBySourceID(string(sourceID)) | |
| if err != nil { | |
| continue | |
| } | |
| for _, obs := range proc.sourceObservers { | |
| obs.ObserveSourceEvents(source, events) | |
| } | |
| if source, err := proc.getSourceBySourceID(string(sourceID)); err == nil { | |
| for _, obs := range proc.sourceObservers { | |
| obs.ObserveSourceEvents(source, events) | |
| } | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find the usage of if ...; err == nil less readable. I am in favor of aligning the happy path to the left. https://medium.com/@matryer/line-of-sight-in-code-186dd7cdea88
| sdkContext, err := misc.NestedMapLookup(event.Message, "context", "library") | ||
| if err == nil { | ||
| m, ok := sdkContext.(map[string]interface{}) | ||
| if ok { | ||
| sdkLibVersion, _ := m["version"].(string) | ||
| sdkLibName, _ := m["name"].(string) | ||
|
|
||
| if sdkLibName != "" || sdkLibVersion != "" { | ||
| sdkVersion = fmt.Sprintf("%s/%s", sdkLibName, sdkLibVersion) | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| sdkContext, err := misc.NestedMapLookup(event.Message, "context", "library") | |
| if err == nil { | |
| m, ok := sdkContext.(map[string]interface{}) | |
| if ok { | |
| sdkLibVersion, _ := m["version"].(string) | |
| sdkLibName, _ := m["name"].(string) | |
| if sdkLibName != "" || sdkLibVersion != "" { | |
| sdkVersion = fmt.Sprintf("%s/%s", sdkLibName, sdkLibVersion) | |
| } | |
| } | |
| } | |
| if sdkContext, err := misc.NestedMapLookup(event.Message, "context", "library"); err == nil { | |
| if m, ok := sdkContext.(map[string]interface{}); ok { | |
| sdkVersion = strings.Join([]string{m["version"].(string), m["name"].(string)}, "/") | |
| } | |
| } |
…re.capture-delayed-events
…rudder-server into chore.capture-delayed-events
…re.capture-delayed-events
Description
WARNING: Requires rudderlabs/rudder-go-kit#210 to be merged first
Motivation
Gather statistics about delayed events. Events that are being delivered - usually due to retries - many days after their initial creation.
The intention is to gather stats, that can be used for decision making and further help investigation of deduplicate records.
Implementation
I created a separate
delayedpackage for encapsulation and testability.sourceObserverinterface provides a generic way to inspect events on the source level.Linear Ticket
Completes PIPE-516
Security