Skip to content

Commit

Permalink
feat: consume interests events from queues
Browse files Browse the repository at this point in the history
  • Loading branch information
akurilov committed Aug 1, 2024
1 parent 4a7809d commit 9cadb53
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 22 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
golang.org/x/net v0.27.0 // indirect
golang.org/x/sys v0.22.0 // indirect
golang.org/x/text v0.16.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240725223205-93522f1f2a9f // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf // indirect
gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 h1:
google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240725223205-93522f1f2a9f h1:RARaIm8pxYuxyNPbBQf5igT7XdOyCNtat1qAT2ZxjU4=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240725223205-93522f1f2a9f/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf h1:liao9UHurZLtiEwBgT9LMOnKYsHze6eA6w1KQCMVN2Q=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY=
google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc=
google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
Expand Down
34 changes: 13 additions & 21 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"context"
"errors"
"fmt"
"github.com/awakari/client-sdk-go/api"
apiGrpc "github.com/awakari/int-mastodon/api/grpc"
Expand Down Expand Up @@ -105,6 +104,7 @@ func main() {
cfg.Api.Queue.InterestsCreated.BatchSize,
evtTypeInterestsCreated,
cfg,
log,
)
if err != nil {
panic(err)
Expand All @@ -125,6 +125,7 @@ func main() {
cfg.Api.Queue.InterestsUpdated.BatchSize,
evtTypeInterestsUpdated,
cfg,
log,
)
if err != nil {
panic(err)
Expand All @@ -146,16 +147,14 @@ func consumeQueue(
batchSize uint32,
typ evtTypeInterests,
cfg config.Config,
log *slog.Logger,
) (err error) {
for {
err = svcQueue.ReceiveMessages(ctx, name, subj, batchSize, func(evts []*pb.CloudEvent) (err error) {
return consumeEvents(ctx, svc, evts, typ, cfg)
_ = svcQueue.ReceiveMessages(ctx, name, subj, batchSize, func(evts []*pb.CloudEvent) (err error) {
consumeEvents(ctx, svc, evts, typ, cfg, log)
return
})
if err != nil {
break
}
}
return
}

func consumeEvents(
Expand All @@ -164,7 +163,8 @@ func consumeEvents(
evts []*pb.CloudEvent,
typ evtTypeInterests,
cfg config.Config,
) (err error) {
log *slog.Logger,
) {
for _, evt := range evts {
interestId := evt.GetTextData()
var queries []string
Expand All @@ -176,26 +176,18 @@ func consumeEvents(
groupId = groupIdAttr.GetCeString()
}
if groupId == "" {
err = fmt.Errorf("interest %s creation: missing group id in the event", interestId)
log.Error(fmt.Sprintf("interest %s event type %d: empty group id, skipping", interestId, typ))
continue
}
if err == nil && len(queries) > 0 {
if len(queries) > 0 {
for _, q := range queries {
_, err = svc.SearchAndAdd(ctx, interestId, groupId, q, cfg.Api.Mastodon.Search.Limit, model.SearchTypeStatuses)
if err != nil {
break
}
_, _ = svc.SearchAndAdd(ctx, interestId, groupId, q, cfg.Api.Mastodon.Search.Limit, model.SearchTypeStatuses)
}
}
publicAttr, publicAttrPresent := evt.Attributes[ceKeyPublic]
if publicAttrPresent && publicAttr.GetCeBoolean() && typ == evtTypeInterestsCreated {
actor := interestId + "@" + cfg.Api.ActivityPub.Host
_, errFollow := svc.SearchAndAdd(ctx, interestId, groupId, actor, 1, model.SearchTypeAccounts)
if errFollow != nil {
err = errors.Join(err, errFollow)
}
}
if err != nil {
break
_, _ = svc.SearchAndAdd(ctx, interestId, groupId, actor, 1, model.SearchTypeAccounts)
}
}
return
Expand Down

0 comments on commit 9cadb53

Please sign in to comment.