Skip to content

Commit

Permalink
Merge pull request juju#5787 from wallyworld/logfwd-cpu
Browse files Browse the repository at this point in the history
Do not open new state for each SetLastSend api call

Fixes: https://bugs.launchpad.net/bugs/1599779

There's a couple of fixes here:
1. the SetLastSent api call no longer opens a new state object
2. The log stream Next() api allows for batching of log records. The
underlying implementation (log tailer) still emits one record at a time, but
a followup can fix that.

(Review request: http://reviews.vapour.ws/r/5231/)
  • Loading branch information
jujubot authored Jul 13, 2016
2 parents e053372 + d891c35 commit 06005df
Show file tree
Hide file tree
Showing 17 changed files with 169 additions and 139 deletions.
4 changes: 2 additions & 2 deletions api/logfwd/lastsent.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ func (c LastSentClient) GetList(ids []LastSentID) ([]LastSentResult, error) {
return results, nil
}

// SetList makes a "SetLastSent" call on the facade and returns the
// SetLastSent makes a "SetLastSent" call on the facade and returns the
// results in the same order.
func (c LastSentClient) SetList(reqs []LastSentInfo) ([]LastSentResult, error) {
func (c LastSentClient) SetLastSent(reqs []LastSentInfo) ([]LastSentResult, error) {
var args params.LogForwardingSetLastSentParams
args.Params = make([]params.LogForwardingSetLastSentParam, len(reqs))
for i, req := range reqs {
Expand Down
2 changes: 1 addition & 1 deletion api/logfwd/lastsent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (s *LastSentSuite) TestSetLastSent(c *gc.C) {
model := "deadbeef-2f18-4fd2-967d-db9663db7bea"
modelTag := names.NewModelTag(model)

results, err := client.SetList([]logfwd.LastSentInfo{{
results, err := client.SetLastSent([]logfwd.LastSentInfo{{
LastSentID: logfwd.LastSentID{
Model: modelTag,
Sink: "spam",
Expand Down
41 changes: 26 additions & 15 deletions api/logstream/logstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ func Open(conn base.StreamConnector, cfg params.LogStreamConfig, controllerUUID
return ls, nil
}

// Next returns the next log record from the server. The records are
// coverted from the wire format into logfwd.Record. The first returned
// Next returns the next batch of log records from the server. The records are
// converted from the wire format into logfwd.Record. The first returned
// record will be the one after the last successfully sent record. If no
// records have been sent yet then it will be the oldest log record.
//
Expand All @@ -66,13 +66,12 @@ func Open(conn base.StreamConnector, cfg params.LogStreamConfig, controllerUUID
// before being stored in the DB or if the DB on-disk storage for the
// record becomes corrupted. Both scenarios are highly unlikely and
// the respective systems are managed such that neither should happen.
func (ls *LogStream) Next() (logfwd.Record, error) {
var record logfwd.Record
apiRecord, err := ls.next()
func (ls *LogStream) Next() ([]logfwd.Record, error) {
apiRecords, err := ls.next()
if err != nil {
return record, errors.Trace(err)
return nil, errors.Trace(err)
}
record, err = recordFromAPI(apiRecord, ls.controllerUUID)
records, err := recordsFromAPI(apiRecords, ls.controllerUUID)
if err != nil {
// This should only happen if the data got corrupted over the
// network. Any other cause should be addressed by fixing the
Expand All @@ -82,25 +81,25 @@ func (ls *LogStream) Next() (logfwd.Record, error) {
// block on a consistently invalid record or to throw away
// a record. The log stream needs to maintain a high level
// of reliable delivery.
return record, errors.Trace(err)
return nil, errors.Trace(err)
}
return record, nil
return records, nil
}

func (ls *LogStream) next() (params.LogStreamRecord, error) {
func (ls *LogStream) next() (params.LogStreamRecords, error) {
ls.mu.Lock()
defer ls.mu.Unlock()

var apiRec params.LogStreamRecord
var result params.LogStreamRecords
if ls.stream == nil {
return apiRec, errors.Errorf("cannot read from closed stream")
return result, errors.Errorf("cannot read from closed stream")
}

err := ls.stream.ReadJSON(&apiRec)
err := ls.stream.ReadJSON(&result)
if err != nil {
return apiRec, errors.Trace(err)
return result, errors.Trace(err)
}
return apiRec, nil
return result, nil
}

// Close closes the stream.
Expand All @@ -119,6 +118,18 @@ func (ls *LogStream) Close() error {
}

// See the counterpart in apiserver/logstream.go.
func recordsFromAPI(apiRecords params.LogStreamRecords, controllerUUID string) ([]logfwd.Record, error) {
result := make([]logfwd.Record, len(apiRecords.Records))
for i, apiRec := range apiRecords.Records {
rec, err := recordFromAPI(apiRec, controllerUUID)
if err != nil {
return nil, errors.Trace(err)
}
result[i] = rec
}
return result, nil
}

func recordFromAPI(apiRec params.LogStreamRecord, controllerUUID string) (logfwd.Record, error) {
rec := logfwd.Record{
ID: apiRec.ID,
Expand Down
22 changes: 13 additions & 9 deletions api/logstream/logstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,15 @@ func (s *LogReaderSuite) TestNextOneRecord(c *gc.C) {
Level: loggo.INFO.String(),
Message: "test message",
}
apiRecords := params.LogStreamRecords{
Records: []params.LogStreamRecord{apiRec},
}
cUUID := "feebdaed-2f18-4fd2-967d-db9663db7bea"
stub := &testing.Stub{}
conn := &mockConnector{stub: stub}
jsonReader := mockStream{stub: stub}
logsCh := make(chan params.LogStreamRecord, 1)
logsCh <- apiRec
logsCh := make(chan params.LogStreamRecords, 1)
logsCh <- apiRecords
jsonReader.ReturnReadJSON = logsCh
conn.ReturnConnectStream = jsonReader
var cfg params.LogStreamConfig
Expand All @@ -89,10 +92,10 @@ func (s *LogReaderSuite) TestNextOneRecord(c *gc.C) {
stub.ResetCalls()

// Check the record we injected into the stream.
var rec logfwd.Record
var records []logfwd.Record
done := make(chan struct{})
go func() {
rec, err = stream.Next()
records, err = stream.Next()
c.Assert(err, jc.ErrorIsNil)
close(done)
}()
Expand All @@ -101,7 +104,8 @@ func (s *LogReaderSuite) TestNextOneRecord(c *gc.C) {
case <-time.After(coretesting.LongWait):
c.Errorf("timed out waiting for record")
}
c.Check(rec, jc.DeepEquals, logfwd.Record{
c.Assert(records, gc.HasLen, 1)
c.Check(records[0], jc.DeepEquals, logfwd.Record{
Origin: logfwd.Origin{
ControllerUUID: cUUID,
ModelUUID: "deadbeef-2f18-4fd2-967d-db9663db7bea",
Expand All @@ -128,13 +132,13 @@ func (s *LogReaderSuite) TestNextOneRecord(c *gc.C) {
// Make sure we don't get extras.
done = make(chan struct{})
go func() {
rec, err = stream.Next()
records, err = stream.Next()
c.Assert(err, jc.ErrorIsNil)
close(done)
}()
select {
case <-done:
c.Errorf("got extra record: %#v", rec)
c.Errorf("got extra record: %#v", records)
case <-time.After(coretesting.ShortWait):
}
}
Expand Down Expand Up @@ -206,7 +210,7 @@ type mockStream struct {
base.Stream
stub *testing.Stub

ReturnReadJSON chan params.LogStreamRecord
ReturnReadJSON chan params.LogStreamRecords
}

func (s mockStream) ReadJSON(v interface{}) error {
Expand All @@ -216,7 +220,7 @@ func (s mockStream) ReadJSON(v interface{}) error {
}

switch vt := v.(type) {
case *params.LogStreamRecord:
case *params.LogStreamRecords:
*vt = <-s.ReturnReadJSON
return nil
default:
Expand Down
24 changes: 4 additions & 20 deletions apiserver/logfwd/lastsent.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type LastSentTracker interface {
type LogForwardingState interface {
// NewLastSentTracker creates a new tracker for the given model
// and log sink.
NewLastSentTracker(tag names.ModelTag, sink string) (LastSentTracker, error)
NewLastSentTracker(tag names.ModelTag, sink string) LastSentTracker
}

// LogForwardingAPI is the concrete implementation of the api end point.
Expand Down Expand Up @@ -116,10 +116,7 @@ func (api *LogForwardingAPI) newLastSentTracker(id params.LogForwardingID) (Last
if err != nil {
return nil, err
}
tracker, err := api.state.NewLastSentTracker(tag, id.Sink)
if err != nil {
return nil, err
}
tracker := api.state.NewLastSentTracker(tag, id.Sink)
return tracker, nil
}

Expand All @@ -128,19 +125,6 @@ type stateAdapter struct {
}

// NewLastSentTracker implements LogForwardingState.
func (st stateAdapter) NewLastSentTracker(tag names.ModelTag, sink string) (LastSentTracker, error) {
if _, err := st.GetModel(tag); err != nil {
return nil, err
}
loggingState, err := st.ForModel(tag)
if err != nil {
return nil, err
}
lastSent := state.NewLastSentLogTracker(loggingState, sink)
return &lastSentCloser{lastSent, loggingState}, nil
}

type lastSentCloser struct {
*state.LastSentLogTracker
io.Closer
func (st stateAdapter) NewLastSentTracker(tag names.ModelTag, sink string) LastSentTracker {
return state.NewLastSentLogTracker(st, tag.Id(), sink)
}
12 changes: 4 additions & 8 deletions apiserver/logfwd/lastsent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (s *LastSentSuite) TestGetLastSentBulk(c *gc.C) {
trackerEggs := s.state.addTracker()
trackerEggs.ReturnGet = 20
s.state.addTracker() // ham
s.stub.SetErrors(nil, nil, nil, nil, nil, nil, nil, state.ErrNeverForwarded)
s.stub.SetErrors(nil, nil, nil, nil, state.ErrNeverForwarded)
api, err := logfwd.NewLogForwardingAPI(s.state, s.authorizer)
c.Assert(err, jc.ErrorIsNil)
model := "deadbeef-2f18-4fd2-967d-db9663db7bea"
Expand Down Expand Up @@ -152,7 +152,7 @@ func (s *LastSentSuite) TestSetLastSentBulk(c *gc.C) {
s.state.addTracker() // eggs
s.state.addTracker() // ham
failure := errors.New("<failed>")
s.stub.SetErrors(nil, nil, nil, nil, failure)
s.stub.SetErrors(nil, nil, failure)
api, err := logfwd.NewLogForwardingAPI(s.state, s.authorizer)
c.Assert(err, jc.ErrorIsNil)
model := "deadbeef-2f18-4fd2-967d-db9663db7bea"
Expand Down Expand Up @@ -214,18 +214,14 @@ func (s *stubState) addTracker() *stubTracker {
return tracker
}

func (s *stubState) NewLastSentTracker(tag names.ModelTag, sink string) (logfwd.LastSentTracker, error) {
func (s *stubState) NewLastSentTracker(tag names.ModelTag, sink string) logfwd.LastSentTracker {
s.stub.AddCall("NewLastSentTracker", tag, sink)
if err := s.stub.NextErr(); err != nil {
return nil, err
}

if len(s.ReturnNewLastSentTracker) == 0 {
panic("ran out of trackers")
}
tracker := s.ReturnNewLastSentTracker[0]
s.ReturnNewLastSentTracker = s.ReturnNewLastSentTracker[1:]
return tracker, nil
return tracker
}

type stubTracker struct {
Expand Down
52 changes: 31 additions & 21 deletions apiserver/logstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,17 @@ type logStreamState struct {
}

func (st logStreamState) getStart(sink string, allModels bool) (int64, error) {
tracker := state.NewLastSentLogTracker(st, sink)
var tracker *state.LastSentLogTracker
if allModels {
allTracker, err := state.NewAllLastSentLogTracker(st, sink)
var err error
tracker, err = state.NewAllLastSentLogTracker(st, sink)
if err != nil {
return 0, errors.Trace(err)
}
tracker = allTracker
} else {
tracker = state.NewLastSentLogTracker(st, st.ModelUUID(), sink)
}
defer tracker.Close()

// Resume for the sink...
lastSent, err := tracker.Get()
Expand Down Expand Up @@ -166,6 +169,8 @@ type logStreamRequestHandler struct {
func (rh *logStreamRequestHandler) serveWebsocket(conn *websocket.Conn, stream *apiLogStream, stop <-chan struct{}) {
logger.Infof("log stream request handler starting")

// TODO(wallyworld) - we currently only send one record at a time, but the API allows for
// sending batches of records, so we need to batch up the output from tailer.Logs().
for {
select {
case <-stop:
Expand All @@ -175,7 +180,7 @@ func (rh *logStreamRequestHandler) serveWebsocket(conn *websocket.Conn, stream *
logger.Errorf("tailer stopped: %v", rh.tailer.Err())
return
}
if err := stream.sendRecord(rec, rh.sendModelUUID); err != nil {
if err := stream.sendRecords([]*state.LogRecord{rec}, rh.sendModelUUID); err != nil {
if isBrokenPipe(err) {
logger.Tracef("logstream handler stopped (client disconnected)")
} else {
Expand Down Expand Up @@ -221,31 +226,36 @@ func (als *apiLogStream) sendInitial(err error) error {
})
}

func (als *apiLogStream) sendRecord(rec *state.LogRecord, sendModelUUID bool) error {
apiRec := als.apiFromRec(rec, sendModelUUID)
func (als *apiLogStream) sendRecords(rec []*state.LogRecord, sendModelUUID bool) error {
apiRec := als.apiFromRecords(rec, sendModelUUID)
if err := als.send(apiRec); err != nil {
return errors.Trace(err)
}
return nil
}

func (als *apiLogStream) send(rec params.LogStreamRecord) error {
func (als *apiLogStream) send(rec params.LogStreamRecords) error {
return als.codec.Send(als.conn, rec)
}

func (als *apiLogStream) apiFromRec(rec *state.LogRecord, sendModelUUID bool) params.LogStreamRecord {
apiRec := params.LogStreamRecord{
ID: rec.ID,
Version: rec.Version.String(),
Entity: rec.Entity.String(),
Timestamp: rec.Time,
Module: rec.Module,
Location: rec.Location,
Level: rec.Level.String(),
Message: rec.Message,
}
if sendModelUUID {
apiRec.ModelUUID = rec.ModelUUID
func (als *apiLogStream) apiFromRecords(records []*state.LogRecord, sendModelUUID bool) params.LogStreamRecords {
var result params.LogStreamRecords
result.Records = make([]params.LogStreamRecord, len(records))
for i, rec := range records {
apiRec := params.LogStreamRecord{
ID: rec.ID,
Version: rec.Version.String(),
Entity: rec.Entity.String(),
Timestamp: rec.Time,
Module: rec.Module,
Location: rec.Location,
Level: rec.Level.String(),
Message: rec.Message,
}
if sendModelUUID {
apiRec.ModelUUID = rec.ModelUUID
}
result.Records[i] = apiRec
}
return apiRec
return result
}
Loading

0 comments on commit 06005df

Please sign in to comment.