Skip to content

Commit

Permalink
Cleanup close/handshaking to ensure distributed starts, finishes with…
Browse files Browse the repository at this point in the history
… flush
  • Loading branch information
Aaron Raddon committed Mar 26, 2016
1 parent f8384e9 commit 22580fb
Show file tree
Hide file tree
Showing 12 changed files with 213 additions and 133 deletions.
7 changes: 5 additions & 2 deletions backends/files/filesource.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,12 @@ func (m *FileSource) loadSchema() {

for _, obj := range objs {
fi := m.fh.File(m.path, obj)
if fi != nil && fi.Name != "" {
if fi == nil {
continue
}
if fi.Name != "" {
if _, tableExists := m.files[fi.Table]; !tableExists {
//u.Debugf("Nice, found new table: %q", fi.Table)
u.Debugf("%p found new table: %q", m, fi.Table)
m.files[fi.Table] = make([]*FileInfo, 0)
m.tablenames = append(m.tablenames, fi.Table)
}
Expand Down
2 changes: 1 addition & 1 deletion backends/mongo/sql_to_mgo.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (m *SqlToMgo) WalkExecSource(p *plan.Source) (exec.Task, error) {
return nil, fmt.Errorf("Plan did not include Sql Select Statement?")
}
if m.p == nil {
u.Debugf("custom? %v", p.Custom)
//u.Debugf("custom? %v", p.Custom)
// If we are operating in distributed mode it hasn't
// been planned? WE probably should allow raw data to be
// passed via plan?
Expand Down
71 changes: 58 additions & 13 deletions frontends/mysqlfe/results_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mysqlfe

import (
"database/sql/driver"
"time"

u "github.com/araddon/gou"

Expand All @@ -23,10 +24,12 @@ var (

type MySqlResultWriter struct {
writer models.ResultWriter
msghandler exec.MessageHandler
schema *schema.Schema
proj *rel.Projection
Rs *mysql.Resultset
ctx *plan.Context
complete chan bool
isComplete bool
wroteHeaders bool
*exec.TaskBase
}
Expand All @@ -35,13 +38,12 @@ type MySqlExecResultWriter struct {
writer models.ResultWriter
schema *schema.Schema
Rs *mysql.Result
ctx *plan.Context
*exec.TaskBase
}

func NewMySqlResultWriter(writer models.ResultWriter, ctx *plan.Context) *MySqlResultWriter {

m := &MySqlResultWriter{writer: writer, ctx: ctx, schema: ctx.Schema}
m := &MySqlResultWriter{writer: writer, schema: ctx.Schema, complete: make(chan bool)}
if ctx.Projection != nil {
m.proj = ctx.Projection.Proj
} else {
Expand All @@ -50,31 +52,47 @@ func NewMySqlResultWriter(writer models.ResultWriter, ctx *plan.Context) *MySqlR

m.TaskBase = exec.NewTaskBase(ctx)
m.Rs = mysql.NewResultSet()
m.msghandler = resultWrite(m)

m.Handler = resultWrite(m)
return m
}

func NewMySqlSchemaWriter(writer models.ResultWriter, ctx *plan.Context) *MySqlResultWriter {

m := &MySqlResultWriter{writer: writer, ctx: ctx, schema: ctx.Schema}
m := &MySqlResultWriter{writer: writer, schema: ctx.Schema, complete: make(chan bool)}
m.proj = ctx.Projection.Proj
// for _, col := range m.proj.Columns {
// u.Infof("col in mysql riter %+v", col)
// }
m.TaskBase = exec.NewTaskBase(ctx)
m.Rs = mysql.NewResultSet()

m.Handler = schemaWrite(m)
m.msghandler = schemaWrite(m)
return m
}

func (m *MySqlResultWriter) Close() error {

ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()

//u.Infof("%p mysql Close() waiting for complete", m)
select {
case <-ticker.C:
u.Warnf("timeout???? ")
case <-m.complete:
//u.Warnf("%p got mysql result complete", m)
}

if m.Rs == nil || len(m.Rs.Fields) == 0 {
m.Rs = NewEmptyResultset(m.ctx.Projection)
//u.Infof("nil resultwriter Close() has RS?%v rowct:%v", m.Rs == nil, len(m.Rs.RowDatas))
} else {
//u.Infof("in mysql resultwriter Close() has RS?%v rowct:%v", m.Rs == nil, len(m.Rs.RowDatas))
m.Rs = NewEmptyResultset(m.Ctx.Projection)
}
m.writer.WriteResult(m.Rs)

if err := m.TaskBase.Close(); err != nil {
return err
}

return nil
}
func schemaWrite(m *MySqlResultWriter) exec.MessageHandler {
Expand Down Expand Up @@ -127,6 +145,33 @@ func schemaWrite(m *MySqlResultWriter) exec.MessageHandler {
}
}

func (m *MySqlResultWriter) Run() error {
defer m.Ctx.Recover()
inCh := m.MessageIn()

for {

select {
case <-m.SigChan():
u.Debugf("got signal quit")
return nil
case msg, ok := <-inCh:
if !ok {
//u.Debugf("%p MYSQL INPUT CLOSED, got msg shutdown", m)
if !m.isComplete {
m.isComplete = true
close(m.complete)
}
return nil
}

if ok := m.msghandler(nil, msg); !ok {
u.Warnf("wat, not ok? %v", msg)
}
}
}
return nil
}
func resultWrite(m *MySqlResultWriter) exec.MessageHandler {

return func(_ *plan.Context, msg schema.Message) bool {
Expand All @@ -148,7 +193,7 @@ func resultWrite(m *MySqlResultWriter) exec.MessageHandler {

switch mt := msg.Body().(type) {
case *datasource.SqlDriverMessageMap:
//u.Infof("write: %#v", mt.Values())
// u.Infof("write: %#v", mt.Values())
// for _, v := range mt.Values() {
// u.Debugf("v = %T = %v", v, v)
// }
Expand All @@ -161,7 +206,7 @@ func resultWrite(m *MySqlResultWriter) exec.MessageHandler {
if val, ok := mt[col.As]; !ok {
u.Warnf("could not find result val: %v name=%s", col.As, col.Name)
} else {
u.Debugf("found col: %#v val=%#v", col, val)
//u.Debugf("found col: %#v val=%#v", col, val)
vals[col.ColPos] = val
}
}
Expand Down Expand Up @@ -273,7 +318,7 @@ func NewEmptyResultset(pp *plan.Projection) *mysql.Resultset {

func NewMySqlExecResultWriter(writer models.ResultWriter, ctx *plan.Context) *MySqlExecResultWriter {

m := &MySqlExecResultWriter{writer: writer, ctx: ctx, schema: ctx.Schema}
m := &MySqlExecResultWriter{writer: writer, schema: ctx.Schema}
m.TaskBase = exec.NewTaskBase(m.Ctx)
m.Rs = mysql.NewResult()
m.Handler = nilWriter(m)
Expand Down
3 changes: 3 additions & 0 deletions frontends/mysqlfe/testmysql/runtestserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ func NewTestServerForDb(t *testing.T, db string) {

ServerCtx = models.NewServerCtx(Conf)
ServerCtx.Init()
go func() {
ServerCtx.Grid.RunMaster()
}()

Schema, _ = ServerCtx.Schema(db)
//u.Infof("starting %q schema in test", db)
Expand Down
5 changes: 4 additions & 1 deletion models/serverctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,12 @@ func (m *ServerCtx) Init() error {
if err := m.loadConfig(); err != nil {
return err
}

planner.GridConf.NatsServers = m.Config.Nats
planner.GridConf.EtcdServers = m.Config.Etcd

// how many worker nodes?
m.Grid = planner.NewServerGrid(2, m.Reg)
go m.Grid.RunMaster()

return nil
}
Expand Down
8 changes: 4 additions & 4 deletions planner/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (m *ExecutorGrid) WalkSource(p *plan.Source) (exec.Task, error) {
return exec.NewSource(m.Ctx, p)
}
func (m *ExecutorGrid) WalkGroupBy(p *plan.GroupBy) (exec.Task, error) {
u.Warnf("partial groupby")
//u.Debugf("partial groupby")
p.Partial = true
return exec.NewGroupBy(m.Ctx, p), nil
}
Expand Down Expand Up @@ -115,9 +115,9 @@ func (m *ExecutorGrid) WalkSelect(p *plan.Select) (exec.Task, error) {
natsSource := NewSourceNats(m.Ctx, rx)
localTask.Add(natsSource)

u.Infof("isAgg? %v", p.Stmt.IsAggQuery())
//u.Infof("isAgg? %v", p.Stmt.IsAggQuery())
if p.Stmt.IsAggQuery() {
u.Debugf("Adding aggregate/group by?")
//u.Debugf("Adding aggregate/group by?")
gbplan := plan.NewGroupBy(p.Stmt)
gb := exec.NewGroupByFinal(m.Ctx, gbplan)
localTask.Add(gb)
Expand All @@ -131,7 +131,7 @@ func (m *ExecutorGrid) WalkSelect(p *plan.Select) (exec.Task, error) {
}
// need to send signal to quit
ch := natsSource.MessageOut()
u.Warnf("closing Source due to a task (first?) completing")
//u.Debugf("closing Source due to a task (first?) completing")
close(ch)
localTask.Close()
}()
Expand Down
13 changes: 5 additions & 8 deletions planner/gridrunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,24 +42,21 @@ func RunWorkerNodes(nodeCt int, r *datasource.Registry) {

for i := 0; i < nodeCt; i++ {
go func(nodeId int) {
s := serverStart(nodeCt, NodeName(uint64(nodeId)), r)
s := NewServerGrid(nodeCt, r)
s.Conf.Hostname = NodeName(uint64(nodeId))
s.RunWorker() // blocking
}(i)
}
time.Sleep(time.Millisecond * 80)
}

func NewServerGrid(nodeCt int, r *datasource.Registry) *Server {
serverId, _ := NextId()
return serverStart(nodeCt, NodeName(serverId), r)
}
nextId, _ := NextId()

func serverStart(nodeCt int, nodeName string, r *datasource.Registry) *Server {
conf := GridConf.Clone()
conf.NodeCt = nodeCt
conf.Hostname = nodeName
s := &Server{Conf: conf, reg: r}
return s
conf.Hostname = NodeName(nextId)
return &Server{Conf: conf, reg: r}
}

func NodeName(id uint64) string {
Expand Down
22 changes: 17 additions & 5 deletions planner/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (m *Server) startSqlActor(actorCt, actorId int, partition string, pb string
def.Settings["pb64"] = pb
def.Settings["partition"] = partition
def.Settings["actor_ct"] = strconv.Itoa(actorCt)
u.Debugf("%p submitting start actor %s nodeI=%d", m, def.ID(), actorId)
//u.Debugf("%p submitting start actor %s nodeI=%d", m, def.ID(), actorId)
err := m.Grid.StartActor(def)
//u.Debugf("%p after submit start actor", m)
if err != nil {
Expand Down Expand Up @@ -111,13 +111,22 @@ func (m *Server) SubmitTask(localTask exec.TaskRunner, flow Flow, p *plan.Select
u.Warnf("TODO: NOT Distributed, don't start tasks!")
}

w := condition.NewCountWatch(m.Grid.Etcd(), m.Grid.Name(), flow.Name(), "sqlcomplete")
_, err = m.Grid.Etcd().CreateDir(fmt.Sprintf("/%v/%v/%v", m.Grid.Name(), flow.Name(), "sqlcomplete"), 100000)
if err != nil {
u.Errorf("Could not initilize dir %v", err)
}
_, err = m.Grid.Etcd().CreateDir(fmt.Sprintf("/%v/%v/%v", m.Grid.Name(), flow.Name(), "finished"), 100000)
if err != nil {
u.Errorf("Could not initilize dir %v", err)
}

w := condition.NewCountWatch(m.Grid.Etcd(), m.Grid.Name(), flow.Name(), "finished")
defer w.Stop()

finished := w.WatchUntil(actorCt)

rp := ring.New(flow.NewContextualName("sqlactor"), actorCt)
u.Infof("%p master?? submitting distributed sql query with %d actors", m, actorCt)
u.Debugf("%p master?? submitting distributed sql query with %d actors", m, actorCt)
//u.WarnT(18)
for i, def := range rp.ActorDefs() {
go func(ad *grid.ActorDef, actorId int) {
Expand All @@ -127,10 +136,10 @@ func (m *Server) SubmitTask(localTask exec.TaskRunner, flow Flow, p *plan.Select
}(def, i)
}

//u.Infof("submitted actors, waiting for completion signal")
//u.Debugf("submitted actors, waiting for completion signal")
select {
case <-finished:
u.Warnf("%s got all! finished signal?", flow.Name())
//u.Debugf("%s got all! finished signal?", flow.Name())
return nil
case <-localTask.SigChan():
u.Warnf("%s YAAAAAY finished", flow.String())
Expand All @@ -141,6 +150,7 @@ func (m *Server) SubmitTask(localTask exec.TaskRunner, flow Flow, p *plan.Select
u.Warnf("what is going on?")
return nil
}

func (m *Server) RunWorker() error {
//u.Debugf("%p starting grid worker", m)
actor, err := newActorMaker(m.Conf)
Expand All @@ -150,10 +160,12 @@ func (m *Server) RunWorker() error {
}
return m.runMaker(actor)
}

func (m *Server) RunMaster() error {
//u.Debugf("%p start grid master", m)
return m.runMaker(&nilMaker{})
}

func (s *Server) runMaker(actorMaker grid.ActorMaker) error {

// We are going to start a "Grid" with specified maker
Expand Down
5 changes: 3 additions & 2 deletions planner/sink_nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ func NewSinkNats(ctx *plan.Context, destination string, tx grid.Sender) *SinkNat
}
}
func (m *SinkNats) Close() error {
u.Infof("SinkNats Close")
//inCh := m.MessageIn()
//u.Debugf("SinkNats close")
return nil
}
func (m *SinkNats) CloseFinal() error {
Expand Down Expand Up @@ -71,7 +72,7 @@ func (m *SinkNats) Run() error {
return nil
case msg, ok := <-inCh:
if !ok {
u.Debugf("NICE, got msg shutdown")
//u.Debugf("NICE, got msg shutdown")
// eofMsg := datasource.NewSqlDriverMessageMapEmpty()
// if err := m.tx.Send(m.destination, eofMsg); err != nil {
// u.Errorf("Could not send eof message? %v", err)
Expand Down
4 changes: 2 additions & 2 deletions planner/source_nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ func NewSourceNats(ctx *plan.Context, rx grid.Receiver) *SourceNats {
}

func (m *SourceNats) Close() error {
u.Infof("SourceNats Close")
//time.Sleep(time.Millisecond * 200)
//u.Debugf("SourceNats Close")
return nil
}
func (m *SourceNats) CloseFinal() error {
Expand All @@ -60,6 +59,7 @@ func (m *SourceNats) Run() error {

defer func() {
m.Ctx.Recover()
u.Infof("closing SourceNats Out")
close(outCh)
m.rx.Close()
}()
Expand Down
Loading

0 comments on commit 22580fb

Please sign in to comment.