forked from juju/juju
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlifewatcher.go
103 lines (88 loc) · 3.27 KB
/
lifewatcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
// Copyright 2023 Canonical Ltd.
// Licensed under the AGPLv3, see LICENCE file for details.
package domain
import (
"context"
"github.com/juju/collections/set"
"github.com/juju/errors"
"github.com/juju/juju/core/changestream"
coredatabase "github.com/juju/juju/core/database"
"github.com/juju/juju/core/logger"
"github.com/juju/juju/core/watcher/eventsource"
"github.com/juju/juju/domain/life"
)
// LifeGetter is a function which looks up life values of the entities with the specified IDs.
type LifeGetter func(ctx context.Context, db coredatabase.TxnRunner, ids []string) (map[string]life.Life, error)
// LifeStringsWatcherMapperFunc returns a namespace watcher mapper function which emits
// events when the life of an entity changes. The supplied lifeGetter func is used to
// retrieve a map of life values, keyed on IDs which must match those supplied by the
// source event stream.
// The source event stream may supply ids the caller is not interested in. These are
// filtered out after loading the current values from state.
func LifeStringsWatcherMapperFunc(logger logger.Logger, lifeGetter LifeGetter) eventsource.Mapper {
knownLife := make(map[string]life.Life)
return func(ctx context.Context, db coredatabase.TxnRunner, changes []changestream.ChangeEvent) (_ []changestream.ChangeEvent, err error) {
defer func() {
if err != nil {
logger.Errorf("running life watcher mapper func: %v", err)
}
}()
events := make(map[string]changestream.ChangeEvent, len(changes))
// Extract the ids of the changed entities.
ids := set.NewStrings()
for _, change := range changes {
events[change.Changed()] = change
ids.Add(change.Changed())
}
logger.Debugf("got changes for ids: %v", ids.Values())
// First record any deleted entities and remove from the
// set of ids we are interested in looking up the life for.
latest := make(map[string]life.Life)
for _, change := range events {
if change.Type() == changestream.Delete {
latest[change.Changed()] = life.Dead
ids.Remove(change.Changed())
}
}
// Separate ids into those thought to exist and those known to be removed.
// Gather the latest life values of the ids.
currentValues, err := lifeGetter(ctx, db, ids.Values())
if err != nil {
return nil, errors.Trace(err)
}
// We queried the ids that were not removed. The result contains
// only those we're interested in, so any extra needs to be
// removed from subsequent processing.
unknownIDs := set.NewStrings(ids.Values()...)
for id, l := range currentValues {
unknownIDs.Remove(id)
latest[id] = l
}
logger.Debugf("ignoring unknown ids %v", unknownIDs.Values())
for _, id := range unknownIDs.Values() {
delete(latest, id)
delete(events, id)
}
logger.Debugf("processing latest life values for %v", latest)
// Add to ids any whose life state is known to have changed.
for id, newLife := range latest {
gone := newLife == life.Dead
oldLife, known := knownLife[id]
switch {
case known && gone:
delete(knownLife, id)
case !known && !gone:
knownLife[id] = newLife
case known && newLife != oldLife:
knownLife[id] = newLife
default:
delete(events, id)
}
}
var result []changestream.ChangeEvent
for _, e := range events {
result = append(result, e)
}
return result, nil
}
}