This repository has been archived by the owner on Oct 19, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
110 lines (108 loc) · 3.1 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package main
import (
"context"
"crypto/tls"
"fmt"
"github.com/SlyMarbo/rss"
"github.com/awakari/client-sdk-go/api"
"golang.org/x/exp/slog"
"google.golang.org/grpc/metadata"
"net/http"
"os"
"producer-rss/config"
"producer-rss/converter"
"producer-rss/feeds"
"producer-rss/producer"
"time"
)
func main() {
//
cfg, err := config.NewConfigFromEnv()
if err != nil {
panic(fmt.Sprintf("failed to load the config from env: %s", err))
}
ctx := context.TODO()
//
opts := slog.HandlerOptions{
Level: slog.Level(cfg.Log.Level),
}
log := slog.New(opts.NewTextHandler(os.Stdout))
log.Info(fmt.Sprintf("starting the update for the feed @ %s", cfg.Feed.Url))
//
httpClient := http.Client{
Timeout: cfg.Feed.UpdateTimeout,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: cfg.Feed.TlsSkipVerify,
},
},
}
feedsClient := feeds.NewClient(httpClient, cfg.Feed.UserAgent)
feedsClient = feeds.NewLoggingMiddleware(feedsClient, log)
log.Info("initialized the RSS client")
//
var stor feeds.Storage
stor, err = feeds.NewStorage(ctx, cfg.Db)
if err != nil {
panic(fmt.Sprintf("failed to initialize the storage: %s", err))
}
defer stor.Close()
//
var feedUpdTime time.Time
feedUpdTime, err = stor.GetUpdateTime(ctx, cfg.Feed.Url)
if err != nil {
panic(fmt.Sprintf("failed to read the feed update time: %s", err))
}
log.Info(fmt.Sprintf("feed %s: update time is %s", cfg.Feed.Url, feedUpdTime.Format(time.RFC3339)))
//
var awakariClient api.Client
awakariClient, err = api.
NewClientBuilder().
WriterUri(cfg.Api.Writer.Uri).
Build()
if err != nil {
panic(fmt.Sprintf("failed to initialize the Awakari API client: %s", err))
}
defer awakariClient.Close()
log.Info("initialized the Awakari API client")
//
groupIdCtx := metadata.AppendToOutgoingContext(
ctx,
"x-awakari-group-id", "producer-rss",
"x-awakari-user-id", "producer-rss",
)
ws, err := awakariClient.OpenMessagesWriter(groupIdCtx, "producer-rss")
if err != nil {
panic(fmt.Sprintf("failed to open the messages writer: %s", err))
}
defer ws.Close()
log.Info("opened the messages writer")
//
var feed *rss.Feed
feed, err = rss.FetchByFunc(feedsClient.Get, cfg.Feed.Url)
if err != nil {
log.Error(fmt.Sprintf("failed to fetch the feed: %s:", err))
}
log.Info(fmt.Sprintf("feed contains %d items to process", len(feed.Items)))
//
conv := converter.NewConverter(cfg.Message)
conv = converter.NewConverterLogging(conv, log)
prod := producer.NewProducer(feed, feedUpdTime, conv, ws, cfg.Api.Writer.Backoff, cfg.Api.Writer.BatchSize)
prod = producer.NewProducerLogging(prod, log)
//
var newFeedUpdTime time.Time
if err == nil {
newFeedUpdTime, err = prod.Produce(ctx)
}
if err != nil {
log.Error(fmt.Sprintf("failed to process the feed: %s", err))
}
if !newFeedUpdTime.After(feedUpdTime) {
newFeedUpdTime = time.Now().UTC()
}
log.Info(fmt.Sprintf("setting the new update time to %s", newFeedUpdTime.Format(time.RFC3339)))
err = stor.SetUpdateTime(ctx, cfg.Feed.Url, newFeedUpdTime)
if err != nil {
log.Error(fmt.Sprintf("failed to set the new update time for the feed: %s:", err))
}
}