Skip to content

Commit

Permalink
Cleanup main to flag workers, pprof
Browse files Browse the repository at this point in the history
  • Loading branch information
Aaron Raddon committed Aug 14, 2016
1 parent a56495a commit f154518
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 51 deletions.
37 changes: 22 additions & 15 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package main

import (
"flag"
"net"
"net/http"
_ "net/http/pprof"
"os"
"runtime"
"runtime/pprof"

// Backend Side-Effect imports
_ "github.com/dataux/dataux/backends/cassandra"
Expand All @@ -20,17 +22,17 @@ import (
)

var (
configFile string
cpuProfileFile string
memProfileFile string
logLevel = "info"
configFile string
pprofPort string
workerCt int
logLevel = "info"
)

func init() {
flag.StringVar(&configFile, "config", "dataux.conf", "dataux proxy config file")
flag.StringVar(&logLevel, "loglevel", "debug", "logging [ debug,info,warn,error ]")
flag.StringVar(&cpuProfileFile, "cpuprofile", "", "cpuprofile")
flag.StringVar(&memProfileFile, "memprofile", "", "memProfileFile")
flag.StringVar(&pprofPort, "pprof", ":18008", "pprof and metrics port")
flag.IntVar(&workerCt, "workerct", 3, "Number of worker nodes")
flag.Parse()
}
func main() {
Expand All @@ -44,16 +46,21 @@ func main() {
u.SetupLogging(logLevel)
u.SetColorIfTerminal()

if cpuProfileFile != "" {
f, err := os.Create(cpuProfileFile)
proxy.LoadConfig(configFile)

if pprofPort != "" {
conn, err := net.Listen("tcp", pprofPort)
if err != nil {
u.Errorf("could not care cpu file: %v", err)
u.Warnf("Error listening on %s: %v", pprofPort, err)
os.Exit(1)
}
pprof.StartCPUProfile(f)
defer pprof.StopCPUProfile()
go func() {
if err := http.Serve(conn, http.DefaultServeMux); err != nil {
u.Errorf("Error from profile HTTP server: %v", err)
}
conn.Close()
}()
}

proxy.LoadConfig(configFile)

proxy.RunDaemon(true, 2)
proxy.RunDaemon(true, workerCt)
}
6 changes: 4 additions & 2 deletions proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ func banner() string {
//
// @listener true/false do we run the listeneners (mysql)?
// if not then this is a worker only node
// @workerct 2 ? How many worker services do we start? Say, one per proc/core?
//
// @workerct over-ride # of workers
func RunDaemon(listener bool, workerCt int) {

svrCtx := models.NewServerCtx(Conf)
Expand Down Expand Up @@ -81,6 +80,9 @@ func RunDaemon(listener bool, workerCt int) {

fmt.Println(banner())

if workerCt == 0 && Conf.WorkerCt > 0 {
workerCt = Conf.WorkerCt
}
if workerCt > 0 {
go planner.RunWorkerNodes(quit, workerCt, svrCtx.Reg)
}
Expand Down
50 changes: 20 additions & 30 deletions vendored/mixer/proxy/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,9 @@ func newConn(m *mysqlListener, co net.Conn) *Conn {
func (c *Conn) Run() {

if !c.noRecover {
u.Debugf("running recovery? %v", !c.noRecover)
defer func() {
if r := recover(); r != nil {
u.LogTracef(u.ERROR, "conn.Run() recover:%v", r)
u.Errorf("conn.Run() recover:%v", r)
}
c.Close()
}()
Expand All @@ -122,28 +121,13 @@ func (c *Conn) Run() {
return
}

//u.Debugf("Run() -> handler.Handle(): %v", string(data))
if err := c.handler.Handle(c, &models.Request{Raw: data}); err != nil {

if se, ok := err.(*mysql.SqlError); ok {
if se.Code == mysql.ER_WARN_DEPRECATED_SYNTAX {
//u.Debugf("deprecated %v", err)
} else {
//u.Warnf("Handler() error %v", err)
}
} else {
//u.Warnf("Handler() error %v", err)
}
es := err.Error()
switch {
case strings.Contains(es, "COM_FIELD_LIST"):
// ignore
default:
// c.handler is the front-end handler
err = c.handler.Handle(c, &models.Request{Raw: data})
if err != nil {
if !ignoreableErr(err) {
u.Warnf("got error on handle %v", err)
}

if err != mysql.ErrBadConn {
u.Warnf("writing error %v", err)
c.WriteError(err)
}
}
Expand All @@ -156,23 +140,35 @@ func (c *Conn) Run() {
}
}

func ignoreableErr(err error) bool {
sqlErr, isMysqlError := err.(*mysql.SqlError)
es := strings.ToLower(err.Error())
switch {
case isMysqlError && sqlErr.Code == mysql.ER_WARN_DEPRECATED_SYNTAX:
return true
case strings.Contains(es, "deprecated"):
return true
}
return false
}

func (c *Conn) Handshake() error {

if err := c.writeInitialHandshake(); err != nil {
u.Errorf("send initial handshake error %s", err.Error())
u.Errorf("send initial handshake error %v", err)
return err
}

if err := c.readHandshakeResponse(); err != nil {
u.Errorf("recv handshake response error %s", err.Error())
u.Errorf("recv handshake response error %v", err)

c.WriteError(err)

return err
}

if err := c.WriteOK(nil); err != nil {
u.Errorf("write ok fail %s", err.Error())
u.Errorf("write ok fail %v", err)
return err
}

Expand All @@ -193,15 +189,9 @@ func (c *Conn) Close() error {
if c.closed {
return nil
}

//u.LogTracef(u.WARN, "mysql conn listener closing")

c.c.Close()

c.rollback()

c.closed = true

return nil
}

Expand Down
4 changes: 0 additions & 4 deletions vendored/mixer/proxy/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,6 @@ type mysqlListener struct {
}

func (m *mysqlListener) Init(conf *models.ListenerConfig, svr *models.ServerCtx) error {
// m.svr = svr
// m.conf = conf
u.Warnf("has listener")
return nil
}

Expand Down Expand Up @@ -117,7 +114,6 @@ func (m *mysqlListener) OnConn(c net.Conn) {
conn.Close()
}()

//u.Infof("client connected")
if err := conn.Handshake(); err != nil {
u.Errorf("handshake error %s", err.Error())
c.Close()
Expand Down

0 comments on commit f154518

Please sign in to comment.