Skip to content

Commit

Permalink
Merge pull request #26 from dataux/new_balancer2
Browse files Browse the repository at this point in the history
New balancer2
  • Loading branch information
araddon authored Sep 9, 2016
2 parents e71780b + 3d8868c commit 512fc19
Show file tree
Hide file tree
Showing 23 changed files with 303 additions and 233 deletions.
36 changes: 18 additions & 18 deletions GLOCKFILE
Original file line number Diff line number Diff line change
@@ -1,45 +1,45 @@
cloud.google.com/go f9495dc7d5d062d1b04f594f06347c9a38be96bb
cloud.google.com/go 4fa13efd3e95632b326090d097cec09cedda7507
github.com/araddon/dateparse 86f3852b68545a6c7db45021ba05484cd15a372b
github.com/araddon/gou 50a94aa4a3fb69e8fbde05df290fcb49fa685e07
github.com/araddon/qlbridge a256367c0a9113485a78a814a2fb66d455db9f15
github.com/araddon/gou 4060436435c0b91885cb45241edb764c632ed8ae
github.com/araddon/qlbridge 447cae3d3e74da9b8ee755319256a6fc49e619a9
github.com/bitly/go-hostpool d0e59c22a56e8dadfed24f74f452cea5a52722d2
github.com/bmizerany/assert b7ed37b82869576c289d7d97fb2bbd8b64a0cb28
github.com/coreos/etcd 207c92b62785d220edc74ed791fd8a22a18e378e
github.com/coreos/go-etcd de3514f25635bbfb024fdaf2a8d5f67378492675
github.com/dchest/siphash 6d8617816bb5d8268011ffbfb8720f17ce9af63c
github.com/go-sql-driver/mysql 0b58b37b664c21f3010e836f1b931e1d0b0b0685
github.com/gocql/gocql b2caded3d0f457e42515c06d4092c02055cebaa0
github.com/gogo/protobuf a4cceea7a401a73fefafd1a21fedbd4694124a82
github.com/golang/protobuf c3cefd437628a0b7d31b34fe44b3a7a540e98527
github.com/gocql/gocql 3ac1aabebaf2705c6f695d4ef2c25ab6239e88b3
github.com/gogo/protobuf 966a6f4b3274f2692aa2f30df2ea5c7172c832ca
github.com/golang/protobuf 888eb0692c857ec880338addf316bd662d5e630e
github.com/golang/snappy d9eb7a3d35ec988b8585d4a0068e462c27d28380
github.com/google/btree 7d79101e329e5a3adf994758c578dab82b90c017
github.com/hailocab/go-hostpool e80d13ce29ede4452c43dea11e79b9bc8a15b478
github.com/hashicorp/go-immutable-radix afc5a0dbb18abdf82c277a7bc01533e81fa1d6b8
github.com/hashicorp/go-memdb 98f52f52d7a476958fa9da671354d270c50661a7
github.com/hashicorp/golang-lru a0d98a5f288019575c6d1f4bb1573fef2d1fcdc4
github.com/hashicorp/golang-lru 0a025b7e63adc15a622f29b0b2c4c3848243bbf6
github.com/jmoiron/sqlx 7396209bbeada6a4fcc28aa9408f89b2e71cac39
github.com/kr/pretty 737b74a46c4bf788349f72cb256fed10aea4d0ac
github.com/kr/pretty cfb55aafdaf3ec08f0db22699ab822c50091b1c4
github.com/kr/text 7cafcd837844e784b526369c9bce262804aebc60
github.com/leekchan/timeutil 28917288c48df3d2c1cfe468c273e0b2adda0aa5
github.com/lytics/cloudstorage 2edc5b3d8e00f8080edfb7dccd7b7c1c837e30be
github.com/lytics/confl f12b6bace2ff295fc6730f6dbaf009289e2f6e80
github.com/lytics/datemath 988020f3ad34814005ab10b6c7863e31672b5f63
github.com/lytics/dfa 63e35f788f7fa5203fcd2dcd7e318da5a1b981e5
github.com/lytics/grid c3dbc9fec6398e4c6ce2f451709705774574e159
github.com/lytics/metafora eb26f00432a98228e44de8e074b9627fc4c1e457
github.com/lytics/grid fe6326a222d957ebf8c2d882fbb4711a1e16c39b
github.com/lytics/metafora 57cc27a66f10e0d6fcbaf89d98303e0b5ad889ad
github.com/lytics/sereno f6128b640de0742892a439b3672ac88637df977d
github.com/mattbaird/elastigo 34c4c4d8425cbdcbc8e257943a2044d5e9f7dab5
github.com/mb0/glob 1eb79d2de6c448664e7272f8b9fe1938239e3aaa
github.com/nats-io/gnatsd 5e68e38c8bc3cd3c1972682c91b798b6262584d4
github.com/nats-io/nats aea64801fab70c44603b524e0a0210c06727c972
github.com/nats-io/gnatsd e41d360e77711885108008ccb6735a5b7637dd8e
github.com/nats-io/nats ea8b4fd12ebb823073c0004b9f09ac8748f4f165
github.com/nats-io/nuid a5152d67cf63cbfb5d992a395458722a45194715
github.com/pborman/uuid c55201b036063326c5b1b89ccfe45a184973d073
github.com/pborman/uuid b984ec7fa9ff9e428bd0cf0abf429384dfbe3e37
github.com/sony/sonyflake fa881fb1052b152e977c41023052c2f2a1c475e9
github.com/ugorji/go 45ce7596ace4534e47b69051a92aef7b64ec7b3f
golang.org/x/net 075e191f18186a8ff2becaf64478e30f4545cdad
golang.org/x/oauth2 04e1573abc896e70388bd387a69753c378d46466
google.golang.org/api 3f131f305a2ae45080e71fdb780128cc92e8745e
google.golang.org/genproto 9f48fed629f137e474fb7422a911dfad49e2eaae
google.golang.org/grpc 35896af9ad39c1fb1b1cd925fe3621be361e3d81
golang.org/x/net 6250b412798208e6c90b03b7c4f226de5aa299e2
golang.org/x/oauth2 4d549c893be7d4011bab739cc585d091a7188d27
google.golang.org/api 93f710e7c7e7f084a7043ec18aab7359e7626811
google.golang.org/genproto 44808ed2d86e258615bb701d395cbbfe6686a3e6
google.golang.org/grpc 79b7c349179cdd6efd8bac4a1ce7f01b98c16e9b
gopkg.in/inf.v0 3887ee99ecf07df5b447e9b00d9c0b2adaa9f3e4
gopkg.in/mgo.v2 22287bab4379e1fbf6002fb4eb769888f3fb224c
2 changes: 1 addition & 1 deletion backends/cassandra/cass_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ CREATE TABLE IF NOT EXISTS event (

func jobMaker(ctx *plan.Context) (*planner.ExecutorGrid, error) {
ctx.Schema = testmysql.Schema
return planner.BuildSqlJob(ctx, testmysql.ServerCtx.Grid)
return planner.BuildSqlJob(ctx, testmysql.ServerCtx.PlanGrid)
}

func RunTestServer(t *testing.T) func() {
Expand Down
2 changes: 1 addition & 1 deletion backends/datastore/datastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func init() {
func jobMaker(ctx *plan.Context) (*planner.ExecutorGrid, error) {
// func BuildSqlJob(ctx *plan.Context, gs *Server) (*ExecutorGrid, error) {
ctx.Schema = testmysql.Schema
return planner.BuildSqlJob(ctx, testmysql.ServerCtx.Grid)
return planner.BuildSqlJob(ctx, testmysql.ServerCtx.PlanGrid)
}

func RunTestServer(t *testing.T) func() {
Expand Down
2 changes: 1 addition & 1 deletion backends/elasticsearch/es_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func init() {
func jobMaker(ctx *plan.Context) (*planner.ExecutorGrid, error) {
// func BuildSqlJob(ctx *plan.Context, gs *Server) (*ExecutorGrid, error) {
ctx.Schema = testmysql.Schema
return planner.BuildSqlJob(ctx, testmysql.ServerCtx.Grid)
return planner.BuildSqlJob(ctx, testmysql.ServerCtx.PlanGrid)
}

func RunTestServer(t *testing.T) func() {
Expand Down
2 changes: 1 addition & 1 deletion backends/files/filesource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func init() {

func jobMaker(ctx *plan.Context) (*planner.ExecutorGrid, error) {
ctx.Schema = testmysql.Schema
return planner.BuildSqlJob(ctx, testmysql.ServerCtx.Grid)
return planner.BuildSqlJob(ctx, testmysql.ServerCtx.PlanGrid)
}

func RunTestServer(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion backends/mongo/mgo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func loadTestData() {
func jobMaker(ctx *plan.Context) (*planner.ExecutorGrid, error) {
ctx.Schema = testmysql.Schema
//u.Warnf("jobMaker, going to do a full plan?")
return planner.BuildExecutorUnPlanned(ctx, testmysql.ServerCtx.Grid)
return planner.BuildExecutorUnPlanned(ctx, testmysql.ServerCtx.PlanGrid)
}

func RunTestServer(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion frontends/mysqlfe/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func BuildMySqlJob(svr *models.ServerCtx, ctx *plan.Context) (*MySqlJob, error)
job := &planner.ExecutorGrid{JobExecutor: baseJob}
//baseJob.Executor = job // ??
job.Executor = job
job.GridServer = svr.Grid
job.GridServer = svr.PlanGrid
//u.Infof("executor: %T sub.executor: %T", job, job.Executor)
job.Ctx = ctx
task, err := exec.BuildSqlJobPlanned(job.Planner, job.Executor, ctx)
Expand Down
4 changes: 1 addition & 3 deletions frontends/mysqlfe/sql_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,7 @@ func TableCreate(tbl *schema.Table) (string, error) {
fmt.Fprint(w, "\n ")
writeField(w, fld)
}
fmt.Fprint(w, "\n) ENGINE=InnoDB DEFAULT CHARSET=utf8")
//tblStr := fmt.Sprintf("CREATE TABLE `%s` (\n\n);", tbl.Name, strings.Join(cols, ","))
//return tblStr, nil
fmt.Fprint(w, "\n) ENGINE=InnoDB DEFAULT CHARSET=utf8;")
return w.String(), nil
}
func writeField(w *bytes.Buffer, fld *schema.Field) {
Expand Down
16 changes: 9 additions & 7 deletions frontends/mysqlfe/sql_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ ADD "column_name" "Data Type";
func TestSqlCreate(t *testing.T) {
t.Parallel()

ss := schema.NewSourceSchema("test", "test")
tbl := schema.NewTable("equipment", ss)
tbl.AddField(schema.NewField("id", value.IntType, 64, "Id is auto-generated random uuid"))
tbl.AddField(schema.NewField("name", value.StringType, 20, ""))
tbl.AddField(schema.NewField("installed", value.TimeType, 0, "When was this installed?"))
tbl.AddField(schema.NewField("jsondata", value.ByteSliceType, 0, "Json Data"))
//ss := schema.NewSourceSchema("test", "test")
tbl := schema.NewTable("equipment")
tbl.AddField(schema.NewFieldBase("id", value.IntType, 64, "Id is auto-generated random uuid"))
tbl.AddField(schema.NewFieldBase("name", value.StringType, 20, ""))
tbl.AddField(schema.NewFieldBase("installed", value.TimeType, 0, "When was this installed?"))
tbl.AddField(schema.NewFieldBase("jsondata", value.ByteSliceType, 0, "Json Data"))

expected := "CREATE TABLE `equipment` (\n" +
" `id` bigint DEFAULT NULL COMMENT \"Id is auto-generated random uuid\",\n" +
Expand All @@ -96,7 +96,9 @@ func TestSqlCreate(t *testing.T) {
") ENGINE=InnoDB DEFAULT CHARSET=utf8;"

createStmt, err := TableCreate(tbl)
assert.T(t, err == nil)
assert.Equal(t, err, nil)
u.Debugf("\n%s\n%s", expected, createStmt)
assert.Equal(t, expected, createStmt)
for i, _ := range expected {
if expected[i] != createStmt[i] {
end := i + 20
Expand Down
2 changes: 1 addition & 1 deletion frontends/mysqlfe/testmysql/runtestserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func NewTestServerForDb(t *testing.T, db string) {
//u.Infof("after init")
quit := make(chan bool)
go func() {
ServerCtx.Grid.RunMaster(quit)
ServerCtx.PlanGrid.Run(quit)
}()

Schema, _ = ServerCtx.Schema(db)
Expand Down
8 changes: 4 additions & 4 deletions models/serverctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ type ServerCtx struct {

// The underlying qlbridge schema holds info about the available datasource's
Reg *datasource.Registry
// Grid is our real-time multi-node coordination and messaging system
Grid *planner.Server
// PlanGrid is our real-time multi-node coordination and messaging system
PlanGrid *planner.PlannerGrid

schemas map[string]*schema.Schema
}
Expand Down Expand Up @@ -52,7 +52,7 @@ func (m *ServerCtx) Init() error {
m.Config.WorkerCt = 2
}

m.Grid = planner.NewServerGrid(m.Config.WorkerCt, m.Reg)
m.PlanGrid = planner.NewServerPlanner(m.Config.WorkerCt, m.Reg)

return nil
}
Expand Down Expand Up @@ -87,7 +87,7 @@ func (m *ServerCtx) InfoSchema() (*schema.Schema, error) {

func (m *ServerCtx) JobMaker(ctx *plan.Context) (*planner.ExecutorGrid, error) {
//u.Debugf("jobMaker, going to do a partial plan?")
return planner.BuildExecutorUnPlanned(ctx, m.Grid)
return planner.BuildExecutorUnPlanned(ctx, m.PlanGrid)
}

// Table Get by schema, name
Expand Down
8 changes: 0 additions & 8 deletions planner/actormaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

var (
_ grid.ActorMaker = (*maker)(nil)
_ grid.ActorMaker = (*nilMaker)(nil)
_ = u.EMPTY
)

Expand All @@ -31,10 +30,3 @@ func (m *maker) MakeActor(def *grid.ActorDef) (grid.Actor, error) {
return nil, fmt.Errorf("type does not map to any type of actor: %v", def.Type)
}
}

type nilMaker struct {
}

func (m *nilMaker) MakeActor(def *grid.ActorDef) (grid.Actor, error) {
return nil, fmt.Errorf("NilMaker does not run actors it is lazy: %v", def.Type)
}
20 changes: 10 additions & 10 deletions planner/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,20 @@ var (
)

// Build a Sql Job which may be a Grid/Distributed job
func BuildSqlJob(ctx *plan.Context, gs *Server) (*ExecutorGrid, error) {
func BuildSqlJob(ctx *plan.Context, pg *PlannerGrid) (*ExecutorGrid, error) {
sqlPlanner := plan.NewPlanner(ctx)
baseJob := exec.NewExecutor(ctx, sqlPlanner)

job := &ExecutorGrid{JobExecutor: baseJob}
baseJob.Executor = job
job.GridServer = gs
job.GridServer = pg
job.Ctx = ctx
u.Debugf("buildsqljob: %T p:%p %T p:%p", job, job, job.Executor, job.JobExecutor)
if gs == nil {
u.Warnf("Grid Server Doesn't exist %v", gs)
} else if gs.Grid == nil {
if pg == nil {
u.Warnf("Grid Server Doesn't exist %v", pg)
} else if pg.Grid == nil {
u.Warnf("Grid doens't exist? ")
} else if gs.Grid.Nats() == nil {
} else if pg.Grid.Nats() == nil {
u.Warnf("Grid.Nats() doesnt exist")
}
//u.Debugf("buildsqljob2: %T %T", baseJob, baseJob.Executor)
Expand All @@ -49,14 +49,14 @@ func BuildSqlJob(ctx *plan.Context, gs *Server) (*ExecutorGrid, error) {
}

// Build a Sql Job which has already been planned so this is just execution runner
func BuildExecutorUnPlanned(ctx *plan.Context, gs *Server) (*ExecutorGrid, error) {
func BuildExecutorUnPlanned(ctx *plan.Context, pg *PlannerGrid) (*ExecutorGrid, error) {

baseJob := exec.NewExecutor(ctx, nil)

job := &ExecutorGrid{JobExecutor: baseJob}
baseJob.Executor = job
job.GridServer = gs
if gs == nil {
job.GridServer = pg
if pg == nil {
u.Warnf("nope, need a grid server ")
//return nil, fmt.Errorf("no grid server")
}
Expand All @@ -72,7 +72,7 @@ type ExecutorGrid struct {
*exec.JobExecutor
distributed bool
sp *plan.Select
GridServer *Server
GridServer *PlannerGrid
}

// Finalize is after the Dag of Relational-algebra tasks have been assembled
Expand Down
79 changes: 0 additions & 79 deletions planner/gridrunner.go

This file was deleted.

Loading

0 comments on commit 512fc19

Please sign in to comment.