Skip to content

Commit

Permalink
Feat: add watcher config compatibility in file source (loggie-io#589)
Browse files Browse the repository at this point in the history
  • Loading branch information
ethfoo authored Jul 10, 2023
1 parent fd73e56 commit 56aa85c
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 15 deletions.
24 changes: 20 additions & 4 deletions pkg/source/file/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,13 @@ type WatchConfig struct {
EnableOsWatch bool `yaml:"enableOsWatch,omitempty" default:"true"`
ScanTimeInterval time.Duration `yaml:"scanTimeInterval,omitempty" default:"10s"`
MaintenanceInterval time.Duration `yaml:"maintenanceInterval,omitempty" default:"5m"`
CleanFiles *CleanFiles `yaml:"cleanFiles,omitempty"` // deprecated
FdHoldTimeoutWhenInactive time.Duration `yaml:"fdHoldTimeoutWhenInactive,omitempty" default:"5m"` // deprecated
FdHoldTimeoutWhenRemove time.Duration `yaml:"fdHoldTimeoutWhenRemove,omitempty" default:"5m"` // deprecated
CleanFiles *CleanFiles `yaml:"cleanFiles,omitempty"` // deprecated, moved to CollectConfig
FdHoldTimeoutWhenInactive time.Duration `yaml:"fdHoldTimeoutWhenInactive,omitempty" default:"5m"` // deprecated, moved to CollectConfig
FdHoldTimeoutWhenRemove time.Duration `yaml:"fdHoldTimeoutWhenRemove,omitempty" default:"5m"` // deprecated, moved to CollectConfig
MaxOpenFds int `yaml:"maxOpenFds,omitempty" default:"4096"`
MaxEofCount int `yaml:"maxEofCount,omitempty" default:"3"`
CleanWhenRemoved bool `yaml:"cleanWhenRemoved,omitempty" default:"true"`
ReadFromTail bool `yaml:"readFromTail,omitempty" default:"false"` // deprecated
ReadFromTail bool `yaml:"readFromTail,omitempty" default:"false"` // deprecated, moved to CollectConfig
TaskStopTimeout time.Duration `yaml:"taskStopTimeout,omitempty" default:"30s"`
CleanDataTimeout time.Duration `yaml:"cleanDataTimeout,omitempty" default:"5s"`
}
Expand Down Expand Up @@ -129,6 +129,22 @@ type MultiConfig struct {
Timeout time.Duration `yaml:"timeout,omitempty" default:"5s"` // default 2 * read.timeout
}

func (c *Config) SetDefaults() {
// for config compatibility
if c.WatchConfig.CleanFiles != nil {
c.CollectConfig.CleanFiles = c.WatchConfig.CleanFiles
}
if c.WatchConfig.FdHoldTimeoutWhenInactive > 0 {
c.CollectConfig.FdHoldTimeoutWhenInactive = c.WatchConfig.FdHoldTimeoutWhenInactive
}
if c.WatchConfig.FdHoldTimeoutWhenRemove > 0 {
c.CollectConfig.FdHoldTimeoutWhenRemove = c.WatchConfig.FdHoldTimeoutWhenRemove
}
if c.WatchConfig.ReadFromTail {
c.CollectConfig.ReadFromTail = c.WatchConfig.ReadFromTail
}
}

func (c *Config) Validate() error {
if c.ReaderConfig.MultiConfig.Active {
_, err := util.Compile(c.ReaderConfig.MultiConfig.Pattern)
Expand Down
16 changes: 5 additions & 11 deletions pkg/source/file/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,9 +297,6 @@ func (w *Watcher) eventBus(e jobEvent) {
if existAckOffset == 0 {
if e.job.task.config.ReadFromTail {
existAckOffset = fileSize
} else if w.config.ReadFromTail {
// readFromTail is deprecated in watcher, keep it only for compatibility with older versions
existAckOffset = fileSize
}
w.preAllocationOffset(existAckOffset, job)
}
Expand Down Expand Up @@ -542,15 +539,15 @@ func (w *Watcher) scanActiveJob() {
continue
}
// check FdHoldTimeoutWhenRemove
if job.IsDeleteTimeout(job.task.config.FdHoldTimeoutWhenRemove) || job.IsDeleteTimeout(w.config.FdHoldTimeoutWhenRemove) {
if job.IsDeleteTimeout(job.task.config.FdHoldTimeoutWhenRemove) {
job.Stop()
log.Info("[pipeline(%s)-source(%s)]: job stop because file(%s) fdHoldTimeoutWhenRemove(%d second) reached", job.task.pipelineName, job.task.sourceName, job.filename, job.task.config.FdHoldTimeoutWhenRemove/time.Second)
continue
}
// check FdHoldTimeoutWhenInactive
if time.Since(job.LastActiveTime()) > job.task.config.FdHoldTimeoutWhenInactive || time.Since(job.LastActiveTime()) > w.config.FdHoldTimeoutWhenInactive {
if time.Since(job.LastActiveTime()) > job.task.config.FdHoldTimeoutWhenInactive {
job.Stop()
log.Info("[pipeline(%s)-source(%s)]: job stop because file(%s) fdHoldTimeoutWhenInactive(%d second) reached", job.task.pipelineName, job.task.sourceName, job.filename, w.config.FdHoldTimeoutWhenInactive/time.Second)
log.Info("[pipeline(%s)-source(%s)]: job stop because file(%s) fdHoldTimeoutWhenInactive(%d second) reached", job.task.pipelineName, job.task.sourceName, job.filename, job.task.config.FdHoldTimeoutWhenInactive/time.Second)
// more aggressive releasing of fd to prevent excessive memory usage
if job.Release() {
w.currentOpenFds--
Expand Down Expand Up @@ -632,7 +629,7 @@ func (w *Watcher) scanZombieJob() {
}
} else {
// release fd
if time.Since(job.LastActiveTime()) > job.task.config.FdHoldTimeoutWhenInactive || time.Since(job.LastActiveTime()) > w.config.FdHoldTimeoutWhenInactive {
if time.Since(job.LastActiveTime()) > job.task.config.FdHoldTimeoutWhenInactive {
if job.Release() {
w.currentOpenFds--
}
Expand Down Expand Up @@ -993,14 +990,11 @@ func (w *Watcher) cleanFiles(watchTask *WatchTask, infos []eventbus.FileInfo) []
if watchTask == nil {
return nil
}
if w.config.CleanFiles == nil && watchTask.config.CleanFiles == nil {
if watchTask.config.CleanFiles == nil {
return nil
}

var maxHistoryDays int
if w.config.CleanFiles != nil {
maxHistoryDays = w.config.CleanFiles.MaxHistoryDays
}
if watchTask.config.CleanFiles != nil {
maxHistoryDays = watchTask.config.CleanFiles.MaxHistoryDays
}
Expand Down

0 comments on commit 56aa85c

Please sign in to comment.