Skip to content

Commit

Permalink
feat: consume interests events from queues
Browse files Browse the repository at this point in the history
  • Loading branch information
akurilov committed Aug 2, 2024
1 parent 9cadb53 commit 6851a89
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 56 deletions.
5 changes: 3 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type MastodonConfig struct {
UserAgent string `envconfig:"API_MASTODON_USER_AGENT" default:"awakari" required:"true""`
}
Endpoint struct {
Accounts string `envconfig:"API_MASTODON_ENDPOINT_SEARCH" default:"https://mastodon.social/api/v1/accounts" required:"true"`
Accounts string `envconfig:"API_MASTODON_ENDPOINT_ACCOUNTS" default:"https://mastodon.social/api/v1/accounts" required:"true"`
Search string `envconfig:"API_MASTODON_ENDPOINT_SEARCH" default:"https://mastodon.social/api/v2/search" required:"true"`
Stream string `envconfig:"API_MASTODON_ENDPOINT_STREAM" default:"https://streaming.mastodon.social/api/v1/streaming/public?remote=false&only_media=false" required:"true"`
}
Expand All @@ -51,7 +51,8 @@ type MastodonConfig struct {
}

type QueueConfig struct {
Uri string `envconfig:"API_QUEUE_URI" default:"queue:50051" required:"true"`
BackoffError time.Duration `envconfig:"API_QUEUE_BACKOFF_ERROR" default:"1s" required:"true"`
Uri string `envconfig:"API_QUEUE_URI" default:"queue:50051" required:"true"`
InterestsCreated struct {
BatchSize uint32 `envconfig:"API_QUEUE_INTERESTS_CREATED_BATCH_SIZE" default:"1" required:"true"`
Name string `envconfig:"API_QUEUE_INTERESTS_CREATED_NAME" default:"source-search" required:"true"`
Expand Down
2 changes: 2 additions & 0 deletions helm/int-mastodon/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ spec:
value: "{{ .Values.api.event.type }}"
- name: LOG_LEVEL
value: "{{ .Values.log.level }}"
- name: API_MASTODON_ENDPOINT_ACCOUNTS
value: "{{ .Values.mastodon.endpoint.accounts }}"
- name: API_MASTODON_ENDPOINT_SEARCH
value: "{{ .Values.mastodon.endpoint.search }}"
- name: API_MASTODON_ENDPOINT_STREAM
Expand Down
11 changes: 10 additions & 1 deletion helm/int-mastodon/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,16 @@ nodeSelector: {}

tolerations: []

affinity: {}
affinity:
nodeAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 1
preference:
matchExpressions:
- key: spot
operator: In
values:
- "true"

api:
activitypub:
Expand Down
35 changes: 24 additions & 11 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,13 @@ func consumeQueue(
log *slog.Logger,
) (err error) {
for {
_ = svcQueue.ReceiveMessages(ctx, name, subj, batchSize, func(evts []*pb.CloudEvent) (err error) {
err = svcQueue.ReceiveMessages(ctx, name, subj, batchSize, func(evts []*pb.CloudEvent) (err error) {
consumeEvents(ctx, svc, evts, typ, cfg, log)
return
})
if err != nil {
panic(err)
}
}
}

Expand All @@ -165,12 +168,10 @@ func consumeEvents(
cfg config.Config,
log *slog.Logger,
) {
log.Debug(fmt.Sprintf("consumeEvents(%d, typ=%d))\n", len(evts), typ))
for _, evt := range evts {

interestId := evt.GetTextData()
var queries []string
if queriesComplAttr, queriesComplPresent := evt.Attributes[ceKeyQueriesCompl]; queriesComplPresent {
queries = strings.Split(queriesComplAttr.GetCeString(), "\n")
}
var groupId string
if groupIdAttr, groupIdIdPresent := evt.Attributes[ceKeyGroupId]; groupIdIdPresent {
groupId = groupIdAttr.GetCeString()
Expand All @@ -179,15 +180,27 @@ func consumeEvents(
log.Error(fmt.Sprintf("interest %s event type %d: empty group id, skipping", interestId, typ))
continue
}
if len(queries) > 0 {
for _, q := range queries {
_, _ = svc.SearchAndAdd(ctx, interestId, groupId, q, cfg.Api.Mastodon.Search.Limit, model.SearchTypeStatuses)
}
}

publicAttr, publicAttrPresent := evt.Attributes[ceKeyPublic]
if publicAttrPresent && publicAttr.GetCeBoolean() && typ == evtTypeInterestsCreated {
switch publicAttrPresent && publicAttr.GetCeBoolean() && typ == evtTypeInterestsCreated {
case true:
actor := interestId + "@" + cfg.Api.ActivityPub.Host
_, _ = svc.SearchAndAdd(ctx, interestId, groupId, actor, 1, model.SearchTypeAccounts)
default:
log.Debug(fmt.Sprintf("interest %s event type %d: public: %t/%t", interestId, typ, publicAttrPresent, publicAttr.GetCeBoolean()))
}

var queries []string
if queriesComplAttr, queriesComplPresent := evt.Attributes[ceKeyQueriesCompl]; queriesComplPresent {
queries = strings.Split(queriesComplAttr.GetCeString(), "\n")
}
switch len(queries) {
case 0:
log.Debug(fmt.Sprintf("interest %s event type %d: no queries, skipping the sources discovery", interestId, typ))
default:
for _, q := range queries {
_, _ = svc.SearchAndAdd(ctx, interestId, groupId, q, cfg.Api.Mastodon.Search.Limit, model.SearchTypeStatuses)
}
}
}
return
Expand Down
80 changes: 38 additions & 42 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ func NewService(
}
}

func (m mastodon) SearchAndAdd(ctx context.Context, interestId, groupId, q string, limit uint32, typ model.SearchType) (n uint32, err error) {
var offset int
func (m mastodon) SearchAndAdd(ctx context.Context, interestId, groupId, q string, limit uint32, typ model.SearchType) (n uint32, errs error) {
for n < limit {
reqQuery := "?q=" + url.QueryEscape(q) + "&type=" + typ.String() + "&resolve=true&offset=" + strconv.Itoa(offset) + "&limit=" + strconv.Itoa(int(limit))
reqQuery := "?q=" + url.QueryEscape(q) + "&type=" + typ.String() + "&resolve=true&offset=" + strconv.Itoa(int(n)) + "&limit=" + strconv.Itoa(int(limit-n))
var req *http.Request
var err error
req, err = http.NewRequestWithContext(ctx, http.MethodGet, m.cfg.Endpoint.Search+reqQuery, nil)
var resp *http.Response
if err == nil {
Expand All @@ -72,51 +72,40 @@ func (m mastodon) SearchAndAdd(ctx context.Context, interestId, groupId, q strin
resp, err = m.clientHttp.Do(req)
}
var data []byte
if err == nil {
if err == nil && resp != nil {
data, err = io.ReadAll(io.LimitReader(resp.Body, limitRespBodyLen))
_ = resp.Body.Close()
}
var results model.Results
if err == nil {
err = json.Unmarshal(data, &results)
}
if err == nil {
switch typ {
case model.SearchTypeStatuses:
countResults := len(results.Statuses)
if countResults == 0 {
break
}
offset += countResults
for _, st := range results.Statuses {
errSt := m.processFoundStatus(ctx, st, interestId, groupId, q)
switch errSt {
case nil:
n++
default:
err = errors.Join(err, errSt)
}
if n > limit {
break
}
}
case model.SearchTypeAccounts:
countResults := len(results.Accounts)
if countResults == 0 {
break
if err != nil {
errs = errors.Join(errs, err)
break
}
if typ == model.SearchTypeStatuses {
countResults := len(results.Statuses)
if countResults == 0 {
break
}
n += uint32(countResults)
for _, st := range results.Statuses {
err = m.processFoundStatus(ctx, st, interestId, groupId, q)
if err != nil {
err = errors.Join(errs, err)
}
offset += countResults
for _, acc := range results.Accounts {
errSt := m.processFoundAccount(ctx, acc, interestId, groupId, q, false)
switch errSt {
case nil:
n++
default:
err = errors.Join(err, errSt)
}
if n > limit {
break
}
}
} else if typ == model.SearchTypeAccounts {
countResults := len(results.Accounts)
if countResults == 0 {
break
}
n += uint32(countResults)
for _, acc := range results.Accounts {
err = m.processFoundAccount(ctx, acc, interestId, groupId, q, false)
if err != nil {
errs = errors.Join(errs, err)
}
}
}
Expand Down Expand Up @@ -170,7 +159,7 @@ func (m mastodon) processFoundAccount(ctx context.Context, acc model.Account, in

func (m mastodon) follow(ctx context.Context, acc model.Account) (err error) {
var req *http.Request
req, err = http.NewRequestWithContext(ctx, http.MethodPost, m.cfg.Endpoint.Search+"/"+acc.Id+"/follow", strings.NewReader(`{"reblogs":true}`))
req, err = http.NewRequestWithContext(ctx, http.MethodPost, m.cfg.Endpoint.Accounts+"/"+acc.Id+"/follow", nil)
var resp *http.Response
if err == nil {
req.Header.Add("Accept", "application/json")
Expand All @@ -182,7 +171,14 @@ func (m mastodon) follow(ctx context.Context, acc model.Account) (err error) {
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
data, _ := io.ReadAll(io.LimitReader(resp.Body, limitRespBodyLenErr))
err = fmt.Errorf("failed to follow the account %s: %s", acc.Acct, string(data))
err = fmt.Errorf(
"failed to follow the account %s: request_url=%s, request_headers=%+v, response=%d/%s",
acc.Acct,
req.URL,
req.Header,
resp.StatusCode,
string(data),
)
}
}
return
Expand Down

0 comments on commit 6851a89

Please sign in to comment.