Skip to content

Commit

Permalink
fix grid multi node tests, fix for upsyream change
Browse files Browse the repository at this point in the history
  • Loading branch information
araddon committed Aug 29, 2016
1 parent 56c825e commit d3d976e
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 18 deletions.
2 changes: 1 addition & 1 deletion backends/mongo/mgo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func loadTestData() {
func jobMaker(ctx *plan.Context) (*planner.ExecutorGrid, error) {
ctx.Schema = testmysql.Schema
//u.Warnf("jobMaker, going to do a full plan?")
return planner.BuildExecutorUnPlanned(ctx, testmysql.ServerCtx.Grid)
return planner.BuildExecutorUnPlanned(ctx, testmysql.ServerCtx.PlanGrid)
}

func RunTestServer(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion frontends/mysqlfe/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func BuildMySqlJob(svr *models.ServerCtx, ctx *plan.Context) (*MySqlJob, error)
job := &planner.ExecutorGrid{JobExecutor: baseJob}
//baseJob.Executor = job // ??
job.Executor = job
job.GridServer = svr.Grid
job.GridServer = svr.PlanGrid
//u.Infof("executor: %T sub.executor: %T", job, job.Executor)
job.Ctx = ctx
task, err := exec.BuildSqlJobPlanned(job.Planner, job.Executor, ctx)
Expand Down
2 changes: 1 addition & 1 deletion frontends/mysqlfe/testmysql/runtestserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func NewTestServerForDb(t *testing.T, db string) {
//u.Infof("after init")
quit := make(chan bool)
go func() {
ServerCtx.Grid.RunMaster(quit)
ServerCtx.PlanGrid.Run(quit)
}()

Schema, _ = ServerCtx.Schema(db)
Expand Down
8 changes: 4 additions & 4 deletions models/serverctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ type ServerCtx struct {

// The underlying qlbridge schema holds info about the available datasource's
Reg *datasource.Registry
// Grid is our real-time multi-node coordination and messaging system
Grid *planner.Server
// PlanGrid is our real-time multi-node coordination and messaging system
PlanGrid *planner.PlannerGrid

schemas map[string]*schema.Schema
}
Expand Down Expand Up @@ -52,7 +52,7 @@ func (m *ServerCtx) Init() error {
m.Config.WorkerCt = 2
}

m.Grid = planner.NewServerGrid(m.Config.WorkerCt, m.Reg)
m.PlanGrid = planner.NewServerPlanner(m.Config.WorkerCt, m.Reg)

return nil
}
Expand Down Expand Up @@ -87,7 +87,7 @@ func (m *ServerCtx) InfoSchema() (*schema.Schema, error) {

func (m *ServerCtx) JobMaker(ctx *plan.Context) (*planner.ExecutorGrid, error) {
//u.Debugf("jobMaker, going to do a partial plan?")
return planner.BuildExecutorUnPlanned(ctx, m.Grid)
return planner.BuildExecutorUnPlanned(ctx, m.PlanGrid)
}

// Table Get by schema, name
Expand Down
18 changes: 9 additions & 9 deletions planner/plan_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,6 @@ func NextId() (uint64, error) {
return sf.NextID()
}

func NewServerPlanner(nodeCt int, r *datasource.Registry) *PlannerGrid {
nextId, _ := NextId()

conf := GridConf.Clone()
conf.NodeCt = nodeCt
conf.Hostname = NodeName(nextId)
return &PlannerGrid{Conf: conf, reg: r}
}

func NodeName(id uint64) string {
hostname, err := os.Hostname()
if err != nil {
Expand All @@ -97,6 +88,15 @@ type PlannerGrid struct {
lastTaskId uint64
}

func NewServerPlanner(nodeCt int, r *datasource.Registry) *PlannerGrid {
nextId, _ := NextId()

conf := GridConf.Clone()
conf.NodeCt = nodeCt
conf.Hostname = NodeName(nextId)
return &PlannerGrid{Conf: conf, reg: r}
}

// Submits a Sql Select statement task for planning across multiple nodes
func (m *PlannerGrid) RunSqlMaster(completionTask exec.TaskRunner, ns *SourceNats, flow Flow, p *plan.Select) error {

Expand Down
4 changes: 2 additions & 2 deletions proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ func RunDaemon(listener bool, workerCt int) {

if listener {
go func() {
// Master is the Grid master that coordinates
// PlanGrid is the master that coordinates
// with etcd, nats, etc, submit tasks to worker nodes
// Only needed on listener nodes
svrCtx.Grid.RunMaster(quit)
svrCtx.PlanGrid.Run(quit)
}()
// Listeners are the tcp-inbound connections
svr.RunListeners()
Expand Down

0 comments on commit d3d976e

Please sign in to comment.