Skip to content

Commit

Permalink
fix memory leack caused by time.timer
Browse files Browse the repository at this point in the history
  • Loading branch information
medcl committed Jan 8, 2021
1 parent e41616d commit f6c08fa
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 20 deletions.
15 changes: 13 additions & 2 deletions bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,19 @@ func (c *Migrator) NewBulkWorker(docCount *int, pb *pb.ProgressBar, wg *sync.Wai
docBuf := bytes.Buffer{}
docEnc := json.NewEncoder(&docBuf)

idleDuration := 5 * time.Second
idleTimeout := time.NewTimer(idleDuration)
defer idleTimeout.Stop()

taskTimeOutDuration := 5 * time.Minute
taskTimeout := time.NewTimer(taskTimeOutDuration)
defer taskTimeout.Stop()


READ_DOCS:
for {
idleTimeout.Reset(idleDuration)
taskTimeout.Reset(taskTimeOutDuration)
select {
case docI, open := <-c.DocChan:
var err error
Expand Down Expand Up @@ -138,10 +149,10 @@ READ_DOCS:
bulkItemSize++
docBuf.Reset()
(*docCount)++
case <-time.After(time.Second * 5):
case <-idleTimeout.C:
log.Debug("5s no message input")
goto CLEAN_BUFFER
case <-time.After(time.Minute * 5):
case <-taskTimeout.C:
log.Warn("5m no message input, close worker")
goto WORKER_DONE
}
Expand Down
3 changes: 3 additions & 0 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,9 +284,12 @@ func Request(method string,r string,auth *Auth,body *bytes.Buffer,proxy string)(
}

func DecodeJson(jsonStream string, o interface{})(error) {


decoder := json.NewDecoder(strings.NewReader(jsonStream))
// UseNumber causes the Decoder to unmarshal a number into an interface{} as a Number instead of as a float64.
decoder.UseNumber()
decoder.

if err := decoder.Decode(o); err != nil {
fmt.Println("error:", err)
Expand Down
39 changes: 21 additions & 18 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/mattn/go-isatty"
"io"
"io/ioutil"
"net/http"
_ "net/http/pprof"
"os"
"runtime"
Expand All @@ -23,21 +24,21 @@ func main() {

runtime.GOMAXPROCS(runtime.NumCPU())

//go func() {
// //log.Infof("pprof listen at: http://%s/debug/pprof/", app.httpprof)
// mux := http.NewServeMux()
//
// // register pprof handler
// mux.HandleFunc("/debug/pprof/", func(w http.ResponseWriter, r *http.Request) {
// http.DefaultServeMux.ServeHTTP(w, r)
// })
//
// // register metrics handler
// //mux.HandleFunc("/debug/vars", app.metricsHandler)
//
// endpoint := http.ListenAndServe("0.0.0.0:6060", mux)
// log.Debug("stop pprof server: %v", endpoint)
//}()
go func() {
//log.Infof("pprof listen at: http://%s/debug/pprof/", app.httpprof)
mux := http.NewServeMux()

// register pprof handler
mux.HandleFunc("/debug/pprof/", func(w http.ResponseWriter, r *http.Request) {
http.DefaultServeMux.ServeHTTP(w, r)
})

// register metrics handler
//mux.HandleFunc("/debug/vars", app.metricsHandler)

endpoint := http.ListenAndServe("0.0.0.0:6060", mux)
log.Debug("stop pprof server: %v", endpoint)
}()

var err error
c := &Config{}
Expand Down Expand Up @@ -298,9 +299,12 @@ func main() {
}

// wait for cluster state to be okay before moving
timer := time.NewTimer(time.Second * 3)

idleDuration := 3 * time.Second
timer := time.NewTimer(idleDuration)
defer timer.Stop()
for {
timer.Reset(idleDuration)

if len(c.SourceEs) > 0 {
if status, ready := migrator.ClusterReady(migrator.SourceESAPI); !ready {
log.Infof("%s at %s is %s, delaying migration ", status.Name, c.SourceEs, status.Status)
Expand All @@ -316,7 +320,6 @@ func main() {
continue
}
}
timer.Stop()
break
}

Expand Down

0 comments on commit f6c08fa

Please sign in to comment.