Skip to content

Commit

Permalink
Add feature test, fix all the things
Browse files Browse the repository at this point in the history
  • Loading branch information
axw committed Jun 29, 2016
1 parent 7dc4652 commit 69607ce
Show file tree
Hide file tree
Showing 17 changed files with 466 additions and 74 deletions.
80 changes: 44 additions & 36 deletions apiserver/logstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
package apiserver

import (
"net/http"

"github.com/gorilla/schema"
"github.com/juju/errors"
"golang.org/x/net/websocket"
"net/http"

"github.com/juju/juju/apiserver/common"
"github.com/juju/juju/apiserver/params"
Expand Down Expand Up @@ -46,14 +47,22 @@ func newLogStreamEndpointHandler(ctxt httpContext) *logStreamEndpointHandler {
// sink -> string - the name of the the log forwarding target
func (eph *logStreamEndpointHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
logger.Infof("log stream request handler starting")
reqHandler, initial := eph.newLogStreamRequestHandler(req)
defer reqHandler.tailer.Stop()

server := websocket.Server{
Handler: func(conn *websocket.Conn) {
defer conn.Close()

reqHandler.serveWebsocket(conn, initial, eph.stopCh)
reqHandler, err := eph.newLogStreamRequestHandler(req)
if err == nil {
defer reqHandler.tailer.Stop()
}
stream, initErr := initStream(conn, err)
if initErr != nil {
logger.Debugf("failed to send initial error (%v): %v", err, initErr)
return
}
if err != nil {
return
}
reqHandler.serveWebsocket(conn, stream, eph.stopCh)
},
}
server.ServeHTTP(w, req)
Expand All @@ -69,13 +78,15 @@ func (eph *logStreamEndpointHandler) newLogStreamRequestHandler(req *http.Reques
}

var cfg params.LogStreamConfig
if err := schema.NewDecoder().Decode(&cfg, req.URL.Query()); err != nil {
return nil, errors.Trace(err)
query := req.URL.Query()
query.Del(":modeluuid")
if err := schema.NewDecoder().Decode(&cfg, query); err != nil {
return nil, errors.Annotate(err, "decoding schema")
}

tailer, err := eph.newTailer(source, cfg)
if err != nil {
return nil, errors.Trace(err)
return nil, errors.Annotate(err, "creating new tailer")
}

reqHandler := &logStreamRequestHandler{
Expand All @@ -89,15 +100,15 @@ func (eph *logStreamEndpointHandler) newLogStreamRequestHandler(req *http.Reques
func (eph logStreamEndpointHandler) newTailer(source logStreamSource, cfg params.LogStreamConfig) (state.LogTailer, error) {
start, err := source.getStart(cfg.Sink, cfg.AllModels)
if err != nil {
return nil, errors.Trace(err)
return nil, errors.Annotate(err, "getting log start position")
}
tailerArgs := &state.LogTailerParams{
StartID: start,
AllModels: cfg.AllModels,
}
tailer, err := source.newTailer(tailerArgs)
if err != nil {
return nil, errors.Trace(err)
return nil, errors.Annotate(err, "tailing logs")
}
return tailer, nil
}
Expand All @@ -119,9 +130,14 @@ func (st logStreamState) getStart(sink string, allModels bool) (int64, error) {

// Resume for the sink...
lastSent, err := tracker.Get()
if err != nil {
if errors.Cause(err) == state.ErrNeverForwarded {
// If we've never forwarded a message, we start from
// position zero.
lastSent = 0
} else if err != nil {
return 0, errors.Trace(err)
}

// Using the same timestamp will cause at least 1 duplicate
// entry, but that is better than dropping records.
// TODO(ericsnow) Add 1 to start once we track by sequential int ID
Expand All @@ -147,13 +163,9 @@ type logStreamRequestHandler struct {
stream *apiLogStream
}

func (rh *logStreamRequestHandler) serveWebsocket(conn *websocket.Conn, initial error, stop <-chan struct{}) {
func (rh *logStreamRequestHandler) serveWebsocket(conn *websocket.Conn, stream *apiLogStream, stop <-chan struct{}) {
logger.Infof("log stream request handler starting")

if ok := rh.initStream(conn, initial); !ok {
return
}

for {
select {
case <-stop:
Expand All @@ -163,8 +175,7 @@ func (rh *logStreamRequestHandler) serveWebsocket(conn *websocket.Conn, initial
logger.Errorf("tailer stopped: %v", rh.tailer.Err())
return
}

if err := rh.stream.sendRecord(rec); err != nil {
if err := stream.sendRecord(rec, rh.sendModelUUID); err != nil {
if isBrokenPipe(err) {
logger.Tracef("logstream handler stopped (client disconnected)")
} else {
Expand All @@ -175,26 +186,23 @@ func (rh *logStreamRequestHandler) serveWebsocket(conn *websocket.Conn, initial
}
}

func (rh *logStreamRequestHandler) initStream(conn *websocket.Conn, initial error) bool {
func initStream(conn *websocket.Conn, initial error) (*apiLogStream, error) {
stream := &apiLogStream{
conn: conn,
codec: websocket.JSON,
sendModelUUID: rh.sendModelUUID,
conn: conn,
codec: websocket.JSON,
}
stream.sendInitial(initial)

rh.stream = stream
return (initial == nil)
if err := stream.sendInitial(initial); err != nil {
return nil, errors.Trace(err)
}
return stream, nil
}

type apiLogStream struct {
conn *websocket.Conn
codec websocket.Codec

sendModelUUID bool
}

func (als *apiLogStream) sendInitial(initial error) {
func (als *apiLogStream) sendInitial(err error) error {
// The client is waiting for an indication that the stream
// is ready (or that it failed).
// See api/apiclient.go:readInitialStreamError().
Expand All @@ -208,13 +216,13 @@ func (als *apiLogStream) sendInitial(initial error) {
return append(data, '\n'), payloadType, nil
},
}
initialCodec.Send(als.conn, &params.ErrorResult{
Error: common.ServerError(initial),
return initialCodec.Send(als.conn, &params.ErrorResult{
Error: common.ServerError(err),
})
}

func (als *apiLogStream) sendRecord(rec *state.LogRecord) error {
apiRec := als.apiFromRec(rec)
func (als *apiLogStream) sendRecord(rec *state.LogRecord, sendModelUUID bool) error {
apiRec := als.apiFromRec(rec, sendModelUUID)
if err := als.send(apiRec); err != nil {
return errors.Trace(err)
}
Expand All @@ -225,7 +233,7 @@ func (als *apiLogStream) send(rec params.LogStreamRecord) error {
return als.codec.Send(als.conn, rec)
}

func (als *apiLogStream) apiFromRec(rec *state.LogRecord) params.LogStreamRecord {
func (als *apiLogStream) apiFromRec(rec *state.LogRecord, sendModelUUID bool) params.LogStreamRecord {
apiRec := params.LogStreamRecord{
ID: rec.ID,
Version: rec.Version.String(),
Expand All @@ -236,7 +244,7 @@ func (als *apiLogStream) apiFromRec(rec *state.LogRecord) params.LogStreamRecord
Level: rec.Level.String(),
Message: rec.Message,
}
if als.sendModelUUID {
if sendModelUUID {
apiRec.ModelUUID = rec.ModelUUID
}
return apiRec
Expand Down
7 changes: 5 additions & 2 deletions apiserver/logstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,11 @@ func (s *LogStreamIntSuite) TestFullRequest(c *gc.C) {
stop := make(chan struct{})
client := newWebsocketServer(c, func(conn *websocket.Conn) {
defer conn.Close()
initial := error(nil)
reqHandler.serveWebsocket(conn, initial, stop)
stream, err := initStream(conn, nil)
if err != nil {
panic(err)
}
reqHandler.serveWebsocket(conn, stream, stop)
})
defer client.Close()
defer close(stop)
Expand Down
98 changes: 98 additions & 0 deletions featuretests/syslog_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright 2016 Canonical Ltd.
// Licensed under the AGPLv3, see LICENCE file for details.

package featuretests

import (
"crypto/tls"
"crypto/x509"
"time"

jc "github.com/juju/testing/checkers"
gc "gopkg.in/check.v1"

"github.com/juju/juju/agent"
"github.com/juju/juju/cert"
agentcmd "github.com/juju/juju/cmd/jujud/agent"
"github.com/juju/juju/cmd/jujud/agent/agenttest"
"github.com/juju/juju/standards/rfc5424/rfc5424test"
"github.com/juju/juju/state"
coretesting "github.com/juju/juju/testing"
"github.com/juju/juju/testing/factory"
"github.com/juju/juju/worker/logsender"
)

type syslogSuite struct {
agenttest.AgentSuite
server *rfc5424test.Server
received chan rfc5424test.Message
}

var _ = gc.Suite(&syslogSuite{})

func (s *syslogSuite) SetUpTest(c *gc.C) {
s.AgentSuite.SetUpTest(c)

s.received = make(chan rfc5424test.Message, 1)
s.server = rfc5424test.NewServer(rfc5424test.HandlerFunc(func(msg rfc5424test.Message) {
select {
case s.received <- msg:
default:
}
}))
s.AddCleanup(func(*gc.C) { s.server.Close() })

serverCert, err := tls.X509KeyPair(
[]byte(coretesting.ServerCert),
[]byte(coretesting.ServerKey),
)
c.Assert(err, jc.ErrorIsNil)
caCert, err := cert.ParseCert(coretesting.CACert)
c.Assert(err, jc.ErrorIsNil)
clientCAs := x509.NewCertPool()
clientCAs.AddCert(caCert)
s.server.TLS = &tls.Config{
Certificates: []tls.Certificate{serverCert},
ClientCAs: clientCAs,
}
s.server.StartTLS()

err = s.State.UpdateModelConfig(map[string]interface{}{
"syslog-host": s.server.Listener.Addr().String(),
"syslog-server-cert": coretesting.ServerCert,
"syslog-ca-cert": coretesting.CACert,
"syslog-client-cert": coretesting.ServerCert,
"syslog-client-key": coretesting.ServerKey,
}, nil, nil)
c.Assert(err, jc.ErrorIsNil)
}

func (s *syslogSuite) TestAuditLogForwarded(c *gc.C) {
// Create a machine and an agent for it.
m, password := s.Factory.MakeMachineReturningPassword(c, &factory.MachineParams{
Nonce: agent.BootstrapNonce,
Jobs: []state.MachineJob{state.JobManageModel},
})

s.PrimeAgent(c, m.Tag(), password)
agentConf := agentcmd.NewAgentConf(s.DataDir())
agentConf.ReadConfig(m.Tag().String())

logsCh, err := logsender.InstallBufferedLogWriter(1000)
c.Assert(err, jc.ErrorIsNil)
machineAgentFactory := agentcmd.MachineAgentFactoryFn(agentConf, logsCh, c.MkDir())
a := machineAgentFactory(m.Id())

// Ensure there's no logs to begin with.
// Start the agent.
go func() { c.Check(a.Run(nil), jc.ErrorIsNil) }()
defer a.Stop()

select {
case msg, ok := <-s.received:
c.Assert(ok, jc.IsTrue)
c.Logf("message: %+v", msg)
case <-time.After(coretesting.LongWait):
c.Fatal("timed out waiting for message to be forwarded")
}
}
4 changes: 2 additions & 2 deletions logfwd/syslog/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,12 @@ func open(cfg RawConfig, opener SenderOpener) (Sender, error) {
var timeout time.Duration
dial, err := opener.DialFunc(tlsCfg, timeout)
if err != nil {
return nil, errors.Trace(err)
return nil, errors.Annotate(err, "obtaining dialer")
}

var clientCfg rfc5424.ClientConfig
client, err := opener.Open(cfg.Host, clientCfg, dial)
return client, errors.Trace(err)
return client, errors.Annotate(err, "opening client connection")
}

// Close closes the client's connection.
Expand Down
8 changes: 6 additions & 2 deletions standards/rfc5424/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,18 @@ func TLSDialFunc(cfg tls.Config, timeout time.Duration) (DialFunc, error) {
if network != "tcp" {
return nil, errors.Errorf("unsupported network %q", network)
}
if _, _, err := net.SplitHostPort(address); err == nil {
if _, _, err := net.SplitHostPort(address); err != nil {
address = net.JoinHostPort(address, defaultSyslogTLSPort)
}
return tls.DialTCP(tls.DialOpts{
conn, err := tls.DialTCP(tls.DialOpts{
Address: address,
TLSConfig: cfg,
ConnectTimeout: timeout,
})
if err != nil {
return nil, errors.Annotate(err, "dialing TLS")
}
return conn, nil
}
return dial, nil
}
Expand Down
5 changes: 5 additions & 0 deletions standards/rfc5424/rfc5424test/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// Copyright 2016 Canonical Ltd.
// Licensed under the AGPLv3, see LICENCE file for details.

// Package rc5424test provides utilities for testing RFC 5424.
package rfc5424test
14 changes: 14 additions & 0 deletions standards/rfc5424/rfc5424test/package_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright 2016 Canonical Ltd.
// Licensed under the AGPLv3, see LICENCE file for details.

package rfc5424test_test

import (
"testing"

gc "gopkg.in/check.v1"
)

func TestAll(t *testing.T) {
gc.TestingT(t)
}
Loading

0 comments on commit 69607ce

Please sign in to comment.