Skip to content
This repository has been archived by the owner on Oct 19, 2023. It is now read-only.

Commit

Permalink
rework
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrei Kurilov committed May 23, 2023
1 parent e9cd692 commit 992f29d
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 10 deletions.
2 changes: 1 addition & 1 deletion converter.go → converter/converter.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package converter

import (
"github.com/SlyMarbo/rss"
Expand Down
2 changes: 1 addition & 1 deletion converter_logging.go → converter/converter_logging.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package converter

import (
"fmt"
Expand Down
11 changes: 7 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
"net/http"
"os"
"producer-rss/config"
"producer-rss/converter"
"producer-rss/feeds"
"producer-rss/producer"
"time"
)

Expand Down Expand Up @@ -77,11 +79,12 @@ func main() {
if err != nil {
log.Error(fmt.Sprintf("failed to fetch the feed @ %s:", cfg.Feed.Url), err)
}
log.Info(fmt.Sprintf("feed contains %d items to process", len(feed.Items)))
//
conv := NewConverter(cfg.Message)
conv = NewConverterLogging(conv, log)
prod := NewProducer(feed, feedUpdTime, conv, ws, cfg.Api.Writer.Backoff)
prod = NewProducerLogging(prod, log)
conv := converter.NewConverter(cfg.Message)
conv = converter.NewConverterLogging(conv, log)
prod := producer.NewProducer(feed, feedUpdTime, conv, ws, cfg.Api.Writer.Backoff)
prod = producer.NewProducerLogging(prod, log)
//
var newFeedUpdTime time.Time
if err == nil {
Expand Down
10 changes: 7 additions & 3 deletions producer.go → producer/producer.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package main
package producer

import (
"context"
"errors"
"fmt"
"github.com/SlyMarbo/rss"
"github.com/awakari/client-sdk-go/model"
"github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb"
"producer-rss/converter"
"time"
)

Expand All @@ -16,12 +18,12 @@ type Producer interface {
type producer struct {
feed *rss.Feed
timeMin time.Time
conv Converter
conv converter.Converter
output model.WriteStream[*pb.CloudEvent]
outputBackoff time.Duration
}

func NewProducer(feed *rss.Feed, timeMin time.Time, conv Converter, output model.WriteStream[*pb.CloudEvent], outputBackoff time.Duration) Producer {
func NewProducer(feed *rss.Feed, timeMin time.Time, conv converter.Converter, output model.WriteStream[*pb.CloudEvent], outputBackoff time.Duration) Producer {
return producer{
feed: feed,
timeMin: timeMin,
Expand All @@ -48,6 +50,8 @@ func (p producer) getNewMessages() (msgs []*pb.CloudEvent, nextTime time.Time, e
msg, itemErr = p.conv.Convert(p.feed, item)
msgs = append(msgs, msg)
err = errors.Join(err, itemErr)
} else {
fmt.Printf("item date %s is before the min time %s\n", item.Date.Format(time.RFC3339), p.timeMin.Format(time.RFC3339))
}
if nextTime.Before(item.Date) {
nextTime = item.Date
Expand Down
2 changes: 1 addition & 1 deletion producer_logging.go → producer/producer_logging.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package producer

import (
"context"
Expand Down

0 comments on commit 992f29d

Please sign in to comment.