Skip to content

Commit

Permalink
apiserver: use a buffered database log writer
Browse files Browse the repository at this point in the history
Instead of maintaining a state.DbLogger for
each agent, maintain one for each model, and
buffer writes. This should limit the number
of sockets required and reduce the write
pressure on mongo.
  • Loading branch information
axw committed Jun 15, 2017
1 parent 2e4f340 commit 58e0cd9
Show file tree
Hide file tree
Showing 7 changed files with 225 additions and 54 deletions.
7 changes: 5 additions & 2 deletions apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ type Server struct {
tlsConfig *tls.Config
allowModelAccess bool
logSinkWriter io.WriteCloser
dbloggers dbloggers

// mu guards the fields below it.
mu sync.Mutex
Expand Down Expand Up @@ -303,6 +304,7 @@ func newServer(s *state.State, lis net.Listener, cfg ServerConfig) (_ *Server, e
allowModelAccess: cfg.AllowModelAccess,
publicDNSName_: cfg.AutocertDNSName,
registerIntrospectionHandlers: cfg.RegisterIntrospectionHandlers,
dbloggers: dbloggers{clock: cfg.Clock},
}

srv.tlsConfig = srv.newTLSConfig(cfg)
Expand Down Expand Up @@ -460,6 +462,7 @@ func (srv *Server) run() {

srv.wg.Wait() // wait for any outstanding requests to complete.
srv.tomb.Done()
srv.dbloggers.dispose()
srv.statePool.Close()
srv.state.Close()
srv.logSinkWriter.Close()
Expand Down Expand Up @@ -556,14 +559,14 @@ func (srv *Server) endpoints() []apihttp.Endpoint {
add("/model/:modeluuid/log", debugLogHandler)

logSinkHandler := logsink.NewHTTPHandler(
newAgentLogWriteCloserFunc(httpCtxt, srv.logSinkWriter),
newAgentLogWriteCloserFunc(httpCtxt, srv.logSinkWriter, &srv.dbloggers),
httpCtxt.stop(),
)
add("/model/:modeluuid/logsink", srv.trackRequests(logSinkHandler))

// We don't need to save the migrated logs to a logfile as well as to the DB.
logTransferHandler := logsink.NewHTTPHandler(
newMigrationLogWriteCloserFunc(httpCtxt),
newMigrationLogWriteCloserFunc(httpCtxt, &srv.dbloggers),
httpCtxt.stop(),
)
add("/migrate/logtransfer", srv.trackRequests(logTransferHandler))
Expand Down
123 changes: 105 additions & 18 deletions apiserver/logsink.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,105 @@ import (
"io"
"net/http"
"strings"
"sync"
"time"

"github.com/juju/errors"
"github.com/juju/loggo"
"github.com/juju/utils/clock"
"github.com/juju/version"
"gopkg.in/juju/names.v2"

"github.com/juju/juju/apiserver/logsink"
"github.com/juju/juju/apiserver/params"
"github.com/juju/juju/state"
"github.com/juju/juju/state/logdb"
)

const (
// dbLoggerBufferSize is the capacity of the log buffer.
// When the buffer fills up, it will be flushed to the
// database.
dbLoggerBufferSize = 1024

// dbLoggerFlushInterval is the amount of time to allow
// a log record to sit in the buffer before being flushed
// to the database.
dbLoggerFlushInterval = 2 * time.Second
)

type agentLoggingStrategy struct {
dbloggers *dbloggers
fileLogger io.Writer

st *state.State
releaser state.StatePoolReleaser
dblogger recordLogger
releaser func()
version version.Number
entity names.Tag
filePrefix string
dbLogger *state.DbLogger
}

type recordLogger interface {
Log([]state.LogRecord) error
}

// dbloggers contains a map of buffered DB loggers. When one of the
// logging strategies requires a DB logger, it uses this to get it.
// When the State corresponding to the DB logger is removed from the
// state pool, the strategies must call the dbloggers.remove method.
type dbloggers struct {
clock clock.Clock
mu sync.Mutex
loggers map[*state.State]*bufferedDbLogger
}

func (d *dbloggers) get(st *state.State) recordLogger {
d.mu.Lock()
defer d.mu.Unlock()
if l, ok := d.loggers[st]; ok {
return l
}
if d.loggers == nil {
d.loggers = make(map[*state.State]*bufferedDbLogger)
}
dbl := state.NewDbLogger(st)
l := &bufferedDbLogger{dbl, logdb.NewBufferedLogger(
dbl,
dbLoggerBufferSize,
dbLoggerFlushInterval,
d.clock,
)}
d.loggers[st] = l
return l
}

func (d *dbloggers) remove(st *state.State) {
d.mu.Lock()
defer d.mu.Unlock()
if l, ok := d.loggers[st]; ok {
l.Close()
delete(d.loggers, st)
}
}

// dispose closes all dbloggers in the map, and clears the memory. This
// must not be called concurrently with any other dbloggers methods.
func (d *dbloggers) dispose() {
for _, l := range d.loggers {
l.Close()
}
d.loggers = nil
}

type bufferedDbLogger struct {
dbl *state.DbLogger
*logdb.BufferedLogger
}

func (b *bufferedDbLogger) Close() error {
err := errors.Trace(b.Flush())
b.dbl.Close()
return err
}

// newAgentLogWriteCloserFunc returns a function that will create a
Expand All @@ -36,9 +114,13 @@ type agentLoggingStrategy struct {
func newAgentLogWriteCloserFunc(
ctxt httpContext,
fileLogger io.Writer,
dbloggers *dbloggers,
) logsink.NewLogWriteCloserFunc {
return func(req *http.Request) (logsink.LogWriteCloser, error) {
strategy := &agentLoggingStrategy{fileLogger: fileLogger}
strategy := &agentLoggingStrategy{
dbloggers: dbloggers,
fileLogger: fileLogger,
}
if err := strategy.init(ctxt, req); err != nil {
return nil, errors.Annotate(err, "initialising agent logsink session")
}
Expand All @@ -47,7 +129,7 @@ func newAgentLogWriteCloserFunc(
}

func (s *agentLoggingStrategy) init(ctxt httpContext, req *http.Request) error {
st, releaser, entity, err := ctxt.stateForRequestAuthenticatedAgent(req)
st, releaseState, entity, err := ctxt.stateForRequestAuthenticatedAgent(req)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -61,21 +143,35 @@ func (s *agentLoggingStrategy) init(ctxt httpContext, req *http.Request) error {
// address this caveat appropriately.
ver, err := logsink.JujuClientVersionFromRequest(req)
if err != nil {
releaser()
releaseState()
return errors.Trace(err)
}
s.releaser = releaser
s.version = ver
s.entity = entity.Tag()
s.filePrefix = st.ModelUUID() + ":"
s.dbLogger = state.NewDbLogger(st)
s.dblogger = s.dbloggers.get(st)
s.releaser = func() {
if removed := releaseState(); removed {
s.dbloggers.remove(st)
}
}
return nil
}

// Close is part of the logsink.LogWriteCloser interface.
//
// Close releases the StatePool entry, closing the DB logger
// if the State is closed/removed. The file logger is owned
// by the apiserver, so it is not closed.
func (s *agentLoggingStrategy) Close() error {
s.releaser()
return nil
}

// WriteLog is part of the logsink.LogWriteCloser interface.
func (s *agentLoggingStrategy) WriteLog(m params.LogRecord) error {
level, _ := loggo.ParseLevel(m.Level)
dbErr := errors.Annotate(s.dbLogger.Log([]state.LogRecord{{
dbErr := errors.Annotate(s.dblogger.Log([]state.LogRecord{{
Time: m.Time,
Entity: s.entity,
Version: s.version,
Expand Down Expand Up @@ -112,12 +208,3 @@ func logToFile(writer io.Writer, prefix string, m params.LogRecord) error {
}, " ") + "\n"))
return err
}

// Close is part of the logsink.LogWriteCloser interface. Close closes
// the DB logger and releases the state. It doesn't close the file logger
// because that lives longer than one request.
func (s *agentLoggingStrategy) Close() error {
s.dbLogger.Close()
s.releaser()
return nil
}
55 changes: 33 additions & 22 deletions apiserver/logtransfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@ import (
)

type migrationLoggingStrategy struct {
st *state.State
releaser state.StatePoolReleaser
filePrefix string
dbLogger *state.DbLogger
tracker *logTracker
dbloggers *dbloggers

dblogger recordLogger
releaser func()
tracker *logTracker
}

// newMigrationLogWriteCloserFunc returns a function that will create a
// logsink.LoggingStrategy given an *http.Request, that writes log
// messages to the state database and tracks their migration.
func newMigrationLogWriteCloserFunc(ctxt httpContext) logsink.NewLogWriteCloserFunc {
func newMigrationLogWriteCloserFunc(ctxt httpContext, dbloggers *dbloggers) logsink.NewLogWriteCloserFunc {
return func(req *http.Request) (logsink.LogWriteCloser, error) {
strategy := &migrationLoggingStrategy{}
strategy := &migrationLoggingStrategy{dbloggers: dbloggers}
if err := strategy.init(ctxt, req); err != nil {
return nil, errors.Annotate(err, "initialising migration logsink session")
}
Expand All @@ -40,7 +40,7 @@ func newMigrationLogWriteCloserFunc(ctxt httpContext) logsink.NewLogWriteCloserF
func (s *migrationLoggingStrategy) init(ctxt httpContext, req *http.Request) error {
// Require MigrationModeNone because logtransfer happens after the
// model proper is completely imported.
st, releaser, err := ctxt.stateForMigration(req, state.MigrationModeNone)
st, releaseState, err := ctxt.stateForMigration(req, state.MigrationModeNone)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -53,17 +53,30 @@ func (s *migrationLoggingStrategy) init(ctxt httpContext, req *http.Request) err
// conversion of log messages from an old client.
_, err = logsink.JujuClientVersionFromRequest(req)
if err != nil {
releaser()
releaseState()
return errors.Trace(err)
}

s.releaser = releaser
s.filePrefix = st.ModelUUID() + ":"
s.dbLogger = state.NewDbLogger(st)
s.dblogger = s.dbloggers.get(st)
s.tracker = newLogTracker(st)
s.releaser = func() {
if removed := releaseState(); removed {
s.dbloggers.remove(st)
}
}
return nil
}

// Close is part of the logsink.LogWriteCloser interface.
func (s *migrationLoggingStrategy) Close() error {
err := errors.Annotate(
s.tracker.Close(),
"closing last-sent tracker",
)
s.releaser()
return err
}

// WriteLog is part of the logsink.LogWriteCloser interface.
func (s *migrationLoggingStrategy) WriteLog(m params.LogRecord) error {
level, _ := loggo.ParseLevel(m.Level)
Expand All @@ -75,7 +88,7 @@ func (s *migrationLoggingStrategy) WriteLog(m params.LogRecord) error {
return errors.Annotate(err, "parsing entity from log record")
}
}
err := s.dbLogger.Log([]state.LogRecord{{
err := s.dblogger.Log([]state.LogRecord{{
Time: m.Time,
Entity: entity,
Module: m.Module,
Expand All @@ -89,18 +102,16 @@ func (s *migrationLoggingStrategy) WriteLog(m params.LogRecord) error {
return errors.Annotate(err, "logging to DB failed")
}

// Close is part of the logsink.LogWriteCloser interface.
func (s *migrationLoggingStrategy) Close() error {
s.dbLogger.Close()
s.tracker.Close()
s.releaser()
return nil
}

// trackingPeriod is used to limit the number of database writes
// made in order to record the ID of the log record last persisted.
const trackingPeriod = 2 * time.Minute

func newLogTracker(st *state.State) *logTracker {
return &logTracker{tracker: state.NewLastSentLogTracker(st, st.ModelUUID(), "migration-logtransfer")}
return &logTracker{
tracker: state.NewLastSentLogTracker(
st, st.ModelUUID(), "migration-logtransfer",
),
}
}

// logTracker assumes that log messages are sent in time order (which
Expand Down
4 changes: 3 additions & 1 deletion state/logdb/buf.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (b *BufferedLogger) Log(in []state.LogRecord) error {
}
b.buf = append(b.buf, in[:n]...)
in = in[n:]
if len(b.buf) == cap(b.buf) {
if len(b.buf) >= cap(b.buf) {
if err := b.flush(); err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -92,6 +92,8 @@ func (b *BufferedLogger) flushOnTimer() {
b.flush()
}

// flush flushes any buffered log records to the underlying Logger, and stops
// the flush timer if there is one. The caller must be holding b.mu.
func (b *BufferedLogger) flush() error {
if b.flushTimer != nil {
b.flushTimer.Stop()
Expand Down
Loading

0 comments on commit 58e0cd9

Please sign in to comment.