Skip to content

Commit

Permalink
Pulled observer and audit recorder into one type
Browse files Browse the repository at this point in the history
The observer and recorder are combined behind a Recorder - this is
preparation for passing a recorder factory into NewConn instead of an
observer factory.
  • Loading branch information
babbageclunk committed Dec 11, 2017
1 parent 576a95b commit c388ea9
Showing 1 changed file with 45 additions and 29 deletions.
74 changes: 45 additions & 29 deletions rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,19 +392,28 @@ func (conn *Conn) readBody(resp interface{}, isRequest bool) error {
return conn.codec.ReadBody(resp, isRequest)
}

func (conn *Conn) getRecorder() Recorder {
conn.mutex.Lock()
defer conn.mutex.Unlock()
return &combinedRecorder{
observer: conn.observerFactory.RPCObserver(),
recorder: conn.recorder,
}
}

func (conn *Conn) handleRequest(hdr *Header) error {
observer := conn.observerFactory.RPCObserver()
recorder := conn.getRecorder()
req, err := conn.bindRequest(hdr)
if err != nil {
if err := logRequest(req, observer, nil); err != nil {
if err := recorder.AddRequest(hdr, nil); err != nil {
return errors.Trace(err)
}
if err := conn.readBody(nil, true); err != nil {
return err
}
// We don't transform the error here. bindRequest will have
// already transformed it and returned a zero req.
return conn.writeErrorResponse(req, err, observer)
return conn.writeErrorResponse(hdr, err, recorder)
}
var argp interface{}
var arg reflect.Value
Expand All @@ -414,7 +423,7 @@ func (conn *Conn) handleRequest(hdr *Header) error {
argp = v.Interface()
}
if err := conn.readBody(argp, true); err != nil {
if err := logRequest(req, observer, nil); err != nil {
if err := recorder.AddRequest(hdr, nil); err != nil {
return errors.Trace(err)
}

Expand All @@ -431,43 +440,43 @@ func (conn *Conn) handleRequest(hdr *Header) error {
// the error is actually a framing or syntax
// problem, then the next ReadHeader should pick
// up the problem and abort.
return conn.writeErrorResponse(req, req.transformErrors(err), observer)
return conn.writeErrorResponse(hdr, req.transformErrors(err), recorder)
}
var body interface{} = struct{}{}
if req.ParamsType() != nil {
body = arg.Interface()
}
if err := logRequest(req, observer, body); err != nil {
if err := recorder.AddRequest(hdr, body); err != nil {
return errors.Trace(err)
}
conn.mutex.Lock()
closing := conn.closing
if !closing {
conn.srvPending.Add(1)
go conn.runRequest(req, arg, hdr.Version, observer)
go conn.runRequest(req, arg, hdr.Version, recorder)
}
conn.mutex.Unlock()
if closing {
// We're closing down - no new requests may be initiated.
return conn.writeErrorResponse(req, req.transformErrors(ErrShutdown), observer)
return conn.writeErrorResponse(hdr, req.transformErrors(ErrShutdown), recorder)
}
return nil
}

func (conn *Conn) writeErrorResponse(req boundRequest, err error, observer Observer) error {
func (conn *Conn) writeErrorResponse(reqHdr *Header, err error, recorder Recorder) error {
conn.sending.Lock()
defer conn.sending.Unlock()
hdr := &Header{
RequestId: req.hdr.RequestId,
Version: req.hdr.Version,
RequestId: reqHdr.RequestId,
Version: reqHdr.Version,
}
if err, ok := err.(ErrorCoder); ok {
hdr.ErrorCode = err.ErrorCode()
} else {
hdr.ErrorCode = ""
}
hdr.Error = err.Error()
if err := logReply(req, observer, hdr, struct{}{}); err != nil {
if err := recorder.AddReply(reqHdr.Request, hdr, struct{}{}); err != nil {
return errors.Trace(err)
}

Expand All @@ -479,7 +488,6 @@ func (conn *Conn) writeErrorResponse(req boundRequest, err error, observer Obser
type boundRequest struct {
rpcreflect.MethodCaller
transformErrors func(error) error
recorder *auditlog.Recorder
hdr Header
}

Expand All @@ -490,7 +498,6 @@ func (conn *Conn) bindRequest(hdr *Header) (boundRequest, error) {
conn.mutex.Lock()
root := conn.root
transformErrors := conn.transformErrors
recorder := conn.recorder
conn.mutex.Unlock()

if root == nil {
Expand All @@ -511,26 +518,25 @@ func (conn *Conn) bindRequest(hdr *Header) (boundRequest, error) {
return boundRequest{
MethodCaller: caller,
transformErrors: transformErrors,
recorder: recorder,
hdr: *hdr,
}, nil
}

// runRequest runs the given request and sends the reply.
func (conn *Conn) runRequest(req boundRequest, arg reflect.Value, version int, observer Observer) {
func (conn *Conn) runRequest(req boundRequest, arg reflect.Value, version int, recorder Recorder) {
// If the request causes a panic, ensure we log that before closing the connection.
defer func() {
if panicResult := recover(); panicResult != nil {
logger.Criticalf(
"panic running request %+v with arg %+v: %v\n%v", req, arg, panicResult, string(debug.Stack()))
conn.writeErrorResponse(req, errors.Errorf("%v", panicResult), observer)
conn.writeErrorResponse(&req.hdr, errors.Errorf("%v", panicResult), recorder)
}
}()
defer conn.srvPending.Done()

rv, err := req.Call(req.hdr.Request.Id, arg)
if err != nil {
err = conn.writeErrorResponse(req, req.transformErrors(err), observer)
err = conn.writeErrorResponse(&req.hdr, req.transformErrors(err), recorder)
} else {
hdr := &Header{
RequestId: req.hdr.RequestId,
Expand All @@ -542,7 +548,7 @@ func (conn *Conn) runRequest(req boundRequest, arg reflect.Value, version int, o
} else {
rvi = struct{}{}
}
if err := logReply(req, observer, hdr, rvi); err != nil {
if err := recorder.AddReply(req.hdr.Request, hdr, rvi); err != nil {
logger.Errorf("error logging response: %T %+v", err, err)
}
conn.sending.Lock()
Expand Down Expand Up @@ -583,24 +589,34 @@ func (nopObserver) ServerRequest(hdr *Header, body interface{}) {}

func (nopObserver) ServerReply(req Request, hdr *Header, body interface{}) {}

func logRequest(req boundRequest, observer Observer, body interface{}) error {
observer.ServerRequest(&req.hdr, body)
type Recorder interface {
AddRequest(hdr *Header, body interface{}) error
AddReply(req Request, replyHdr *Header, body interface{}) error
}

type combinedRecorder struct {
observer Observer
recorder *auditlog.Recorder
}

func (cr *combinedRecorder) AddRequest(hdr *Header, body interface{}) error {
cr.observer.ServerRequest(hdr, body)
// TODO(babbageclunk): make this configurable.
jsonArgs, err := json.Marshal(body)
if err != nil {
return errors.Trace(err)
}
return errors.Trace(req.recorder.AddRequest(auditlog.RequestArgs{
RequestID: req.hdr.RequestId,
Facade: req.hdr.Request.Type,
Method: req.hdr.Request.Action,
Version: req.hdr.Request.Version,
return errors.Trace(cr.recorder.AddRequest(auditlog.RequestArgs{
RequestID: hdr.RequestId,
Facade: hdr.Request.Type,
Method: hdr.Request.Action,
Version: hdr.Request.Version,
Args: string(jsonArgs),
}))
}

func logReply(req boundRequest, observer Observer, replyHdr *Header, body interface{}) error {
observer.ServerReply(req.hdr.Request, replyHdr, body)
func (cr *combinedRecorder) AddReply(req Request, replyHdr *Header, body interface{}) error {
cr.observer.ServerReply(req, replyHdr, body)
var responseErrors []*auditlog.Error
if replyHdr.Error == "" {
var err error
Expand All @@ -614,7 +630,7 @@ func logReply(req boundRequest, observer Observer, replyHdr *Header, body interf
Code: replyHdr.ErrorCode,
}}
}
return errors.Trace(req.recorder.AddResponse(auditlog.ResponseErrorsArgs{
return errors.Trace(cr.recorder.AddResponse(auditlog.ResponseErrorsArgs{
RequestID: replyHdr.RequestId,
Errors: responseErrors,
}))
Expand Down

0 comments on commit c388ea9

Please sign in to comment.