Skip to content

Commit

Permalink
Review changes
Browse files Browse the repository at this point in the history
  • Loading branch information
babbageclunk committed Nov 29, 2016
1 parent 1840e37 commit aa28bef
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 23 deletions.
4 changes: 2 additions & 2 deletions api/base/testing/apicaller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ func (APICallerFunc) HTTPClient() (*httprequest.Client, error) {
}

func (APICallerFunc) ConnectStream(path string, attrs url.Values) (base.Stream, error) {
return nil, errors.New("stream connection unimplemented")
return nil, errors.NotImplementedf("stream connection")
}

func (APICallerFunc) ConnectControllerStream(path string, attrs url.Values, headers http.Header) (base.Stream, error) {
return nil, errors.New("stream connection unimplemented")
return nil, errors.NotImplementedf("controller stream connection")
}

// CheckArgs holds the possible arguments to CheckingAPICaller(). Any
Expand Down
34 changes: 21 additions & 13 deletions worker/migrationmaster/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ const (
// phase.
maxMinionWait = 15 * time.Minute

// minionWaitLogInterval is the time between progress update
// messages, while the migrationmaster is waiting for reports from
// minions.
minionWaitLogInterval = 30 * time.Second
// progressUpdateInterval is the time between progress update
// messages. It's used while the migrationmaster is waiting for
// reports from minions and while it's transferring log messages
// to the newly-migrated model.
progressUpdateInterval = 30 * time.Second
)

// Facade exposes controller functionality to a Worker.
Expand Down Expand Up @@ -448,10 +449,14 @@ func (w *Worker) doLOGTRANSFER(targetInfo coremigration.TargetInfo, modelUUID st

func (w *Worker) transferLogs(targetInfo coremigration.TargetInfo, modelUUID string) error {
sent := 0
reportProgress := func(sent int) {
w.setInfoStatus("successful, transferring logs to target controller (%d sent)", sent)
reportProgress := func(finished bool, sent int) {
verb := "transferring"
if finished {
verb = "transferred"
}
w.setInfoStatus("successful, %s logs to target controller (%d sent)", verb, sent)
}
reportProgress(sent)
reportProgress(false, sent)
logSource, err := w.config.Facade.StreamModelLog()
if err != nil {
return errors.Annotate(err, "opening source log stream")
Expand All @@ -469,14 +474,17 @@ func (w *Worker) transferLogs(targetInfo coremigration.TargetInfo, modelUUID str
}
defer logTarget.Close()

clk := w.config.Clock
logProgress := clk.After(progressUpdateInterval)

for {
select {
case <-w.catacomb.Dying():
return w.catacomb.ErrDying()
case msg, ok := <-logSource:
if !ok {
// The channel's been closed, we're finished!
reportProgress(sent)
reportProgress(true, sent)
return nil
}
err := logTarget.WriteJSON(params.LogRecord{
Expand All @@ -491,9 +499,9 @@ func (w *Worker) transferLogs(targetInfo coremigration.TargetInfo, modelUUID str
return errors.Trace(err)
}
sent++
if sent%1000 == 0 {
reportProgress(sent)
}
case <-logProgress:
reportProgress(false, sent)
logProgress = clk.After(progressUpdateInterval)
}
}
}
Expand Down Expand Up @@ -630,7 +638,7 @@ func (w *Worker) waitForMinions(
return false, errors.Trace(err)
}

logProgress := clk.After(minionWaitLogInterval)
logProgress := clk.After(progressUpdateInterval)

var reports coremigration.MinionReports
for {
Expand Down Expand Up @@ -674,7 +682,7 @@ func (w *Worker) waitForMinions(

case <-logProgress:
w.setInfoStatus("%s, %s", infoPrefix, formatMinionWaitUpdate(reports))
logProgress = clk.After(minionWaitLogInterval)
logProgress = clk.After(progressUpdateInterval)
}
}
}
Expand Down
64 changes: 56 additions & 8 deletions worker/migrationmaster/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/juju/errors"
"github.com/juju/loggo"
jujutesting "github.com/juju/testing"
jc "github.com/juju/testing/checkers"
"github.com/juju/utils"
Expand Down Expand Up @@ -757,8 +758,8 @@ func (s *Suite) TestLogTransferErrorOpeningLogDest(c *gc.C) {

func (s *Suite) TestLogTransferErrorWriting(c *gc.C) {
s.facade.queueStatus(s.makeStatus(coremigration.LOGTRANSFER))
s.facade.logMessages = []common.LogMessage{
{Message: "the go team"},
s.facade.logMessages = func(d chan<- common.LogMessage) {
safeSend(c, d, common.LogMessage{Message: "the go team"})
}
s.connection.logStream.writeErr = errors.New("bottle rocket")
s.checkWorkerReturns(c, s.connection.logStream.writeErr)
Expand All @@ -777,7 +778,7 @@ func (s *Suite) TestLogTransferSendsRecords(c *gc.C) {
t1, err := time.Parse("2006-01-02 15:04", "2016-11-28 16:11")
c.Assert(err, jc.ErrorIsNil)
s.facade.queueStatus(s.makeStatus(coremigration.LOGTRANSFER))
s.facade.logMessages = []common.LogMessage{
messages := []common.LogMessage{
{Message: "the go team"},
{Message: "joan as police woman"},
{
Expand All @@ -789,6 +790,11 @@ func (s *Suite) TestLogTransferSendsRecords(c *gc.C) {
Message: "ham shank",
},
}
s.facade.logMessages = func(d chan<- common.LogMessage) {
for _, message := range messages {
safeSend(c, d, message)
}
}

s.checkWorkerReturns(c, migrationmaster.ErrMigrated)
s.stub.CheckCalls(c, joinCalls(
Expand Down Expand Up @@ -817,6 +823,44 @@ func (s *Suite) TestLogTransferSendsRecords(c *gc.C) {
c.Assert(s.connection.logStream.closeCount, gc.Equals, 1)
}

func (s *Suite) TestLogTransferReportsProgress(c *gc.C) {
s.facade.queueStatus(s.makeStatus(coremigration.LOGTRANSFER))
messages := []common.LogMessage{
{Message: "captain beefheart"},
{Message: "super furry animals"},
{Message: "ezra furman"},
}
s.facade.logMessages = func(d chan<- common.LogMessage) {
for _, message := range messages {
safeSend(c, d, message)
s.clock.WaitAdvance(15*time.Second, coretesting.ShortWait, 1)
}
}

var logWriter loggo.TestWriter
c.Assert(loggo.RegisterWriter("migrationmaster-tests", &logWriter), jc.ErrorIsNil)
defer func() {
loggo.RemoveWriter("migrationmaster-tests")
logWriter.Clear()
}()

s.checkWorkerReturns(c, migrationmaster.ErrMigrated)

c.Assert(logWriter.Log()[:3], jc.LogMatches, []string{
"successful, transferring logs to target controller \\(0 sent\\)",
"successful, transferring logs to target controller \\(2 sent\\)",
"successful, transferred logs to target controller \\(3 sent\\)",
})
}

func safeSend(c *gc.C, d chan<- common.LogMessage, message common.LogMessage) {
select {
case d <- message:
case <-time.After(coretesting.ShortWait):
c.Fatalf("timed out sending log message")
}
}

func (s *Suite) checkWorkerReturns(c *gc.C, expected error) {
err := s.runWorker(c)
c.Check(errors.Cause(err), gc.Equals, expected)
Expand Down Expand Up @@ -915,7 +959,7 @@ type stubMasterFacade struct {
modelInfoErr error
exportErr error

logMessages []common.LogMessage
logMessages func(chan<- common.LogMessage)
streamErr error

minionReportsChanges chan struct{}
Expand Down Expand Up @@ -1044,11 +1088,15 @@ func (f *stubMasterFacade) StreamModelLog() (<-chan common.LogMessage, error) {
if f.streamErr != nil {
return nil, f.streamErr
}
result := make(chan common.LogMessage, len(f.logMessages))
for _, message := range f.logMessages {
result <- message
result := make(chan common.LogMessage)
messageFunc := f.logMessages
if messageFunc == nil {
messageFunc = func(chan<- common.LogMessage) {}
}
close(result)
go func() {
defer close(result)
messageFunc(result)
}()
return result, nil
}

Expand Down

0 comments on commit aa28bef

Please sign in to comment.