Skip to content

Commit

Permalink
Plumbining in initial Grid/Distributed tests, grid-nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
Aaron Raddon committed Jan 25, 2016
1 parent 808dbe3 commit 64ca6bf
Show file tree
Hide file tree
Showing 19 changed files with 338 additions and 224 deletions.
32 changes: 31 additions & 1 deletion backends/mongo/mgo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"gopkg.in/mgo.v2"

"github.com/dataux/dataux/frontends/mysqlfe/testmysql"
"github.com/dataux/dataux/planner"
"github.com/dataux/dataux/testutil"
)

Expand Down Expand Up @@ -40,6 +41,14 @@ func loadTestData() {
}
}

func RunDistributedNodes(t *testing.T) func() {
testmysql.RunTestServer(t)
planner.RunWorkerNodes(2)
return func() {

}
}

type QuerySpec struct {
Sql string
Cols []string
Expand Down Expand Up @@ -122,7 +131,7 @@ func TestInvalidQuery(t *testing.T) {
assert.T(t, err == nil)
// It is parsing the SQL on server side (proxy)
// not in client, so hence that is what this is testing, making sure
// proxy responds gracefully
// proxy responds gracefully with an error
rows, err := db.Query("select `stuff`, NOTAKEYWORD github_fork NOTWHERE `description` LIKE \"database\";")
assert.Tf(t, err != nil, "%v", err)
assert.Tf(t, rows == nil, "must not get rows")
Expand Down Expand Up @@ -305,6 +314,27 @@ func TestSelectCountStar(t *testing.T) {
RowData: &data,
})
}

func TestSelectDistributed(t *testing.T) {
data := struct {
Avg float64 `db:"title_avg"`
}{}

cleanup := RunDistributedNodes(t)
defer cleanup()

// We are going to use the WITH distributed=true to force distribution
validateQuerySpec(t, QuerySpec{
Sql: "select AVG(CHAR_LENGTH(CAST(`title` AS CHAR))) as title_avg from article WITH distributed=true, node_ct=2",
ExpectRowCt: 1,
ValidateRowData: func() {
u.Infof("%#v", data.Avg)
assert.Tf(t, data.Avg == 8.25, "Not avg right?? %v", data)
},
RowData: &data,
})
}

func TestSelectAggAvg(t *testing.T) {
data := struct {
Avg float64 `db:"title_avg"`
Expand Down
2 changes: 1 addition & 1 deletion backends/mongo/sql_to_mgo.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (m *SqlToMgo) VisitSourceSelect(sp *plan.SourcePlan) (rel.Task, rel.VisitSt
m.TaskBase = exec.NewTaskBase(sp.Ctx, "SqlToMgo")
var err error
m.sp = sp
req := sp.Source
req := sp.From.Source
//u.Infof("mongo.VisitSubSelect %v final:%v", req.String(), sp.Final)

m.sel = req
Expand Down
36 changes: 21 additions & 15 deletions frontends/mysqlfe/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/araddon/qlbridge/plan"
"github.com/araddon/qlbridge/rel"

"github.com/dataux/dataux/models"
"github.com/dataux/dataux/planner"
)

Expand All @@ -21,33 +22,38 @@ var (

// Mysql job that wraps the generic qlbridge job builder
type MySqlJob struct {
*exec.JobBuilder
runner exec.TaskRunners
*planner.SqlJob
}

// Create a MySql job: QlBridge jobs can be wrapper
// allowing per-method (VisitShow etc) to be replaced by a dialect
// specific handler.
// - mysql `SHOW CREATE TABLE name` is dialect specific so needs to be replaced
func BuildMySqlob(ctx *plan.Context) (*MySqlJob, error) {
// Create a MySql job that wraps underlying qlbridge generic implementation
// allowing per-method (VisitShow etc) to be replaced by a dialect specific handler.
// - mysql `SHOW CREATE TABLE name` for example is dialect specific so needs to be replaced
// - also allows a distributed planner from dataux
func BuildMySqlob(svr *models.ServerCtx, ctx *plan.Context) (*MySqlJob, error) {

b := exec.JobBuilder{}
b.Ctx = ctx
// We are going to swap out a new distributed planner
b.TaskMaker = planner.TaskRunnersMaker
job := &MySqlJob{JobBuilder: &b}
b.Visitor = job
// We are going to replace qlbridge planner with dataux distributed one
b.TaskMaker = planner.TaskRunnersMaker(ctx, svr.Grid)
job := &planner.SqlJob{JobBuilder: &b, Grid: svr.Grid}
mj := &MySqlJob{SqlJob: job}
job.Visitor = mj
b.Visitor = mj

task, err := exec.BuildSqlJobVisitor(job, ctx)
u.Debugf("SqlJob:%p exec.Job:%p about to build: %#v", job, &b, mj)
task, err := exec.BuildSqlJobVisitor(mj, ctx)
if err != nil {
return nil, err
}
taskRunner, ok := task.(exec.TaskRunner)
if !ok {
return nil, fmt.Errorf("Expected TaskRunner but was %T", task)
return nil, fmt.Errorf("Expected TaskRunner type root task but was %T", task)
}

job.RootTask = taskRunner

if job.Ctx.Projection != nil {
return job, nil
return mj, nil
}

if sqlSelect, ok := job.Ctx.Stmt.(*rel.SqlSelect); ok {
Expand All @@ -58,5 +64,5 @@ func BuildMySqlob(ctx *plan.Context) (*MySqlJob, error) {
}
}

return job, err
return mj, nil
}
12 changes: 7 additions & 5 deletions frontends/mysqlfe/build_show.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,13 @@ func (m *MySqlJob) VisitShow(stmt *rel.SqlShow) (rel.Task, rel.VisitStatus, erro
// return exec.NewSequential(m.Ctx, "show-tables", tasks), rel.VisitContinue, nil

u.Debugf("%p has sqlJob? %+v", m, m)
planner := m.TaskMaker(m.Ctx)
sourceTask := exec.NewSource(m.Ctx, nil, source)
planner.Add(sourceTask)
return planner.Sequential("show-create-table"), rel.VisitContinue, nil
tasks := m.TaskMaker.Sequential("show-create-table")

sourcePlan := plan.NewSourceStaticPlan(m.Ctx)
sourceTask := exec.NewSource(sourcePlan, source)
tasks.Add(sourceTask)
return tasks, rel.VisitContinue, nil

}
return m.JobBuilder.VisitShow(stmt)
return m.SqlJob.VisitShow(stmt)
}
39 changes: 18 additions & 21 deletions frontends/mysqlfe/mysql_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,33 +33,31 @@ var (
_ models.Handler = (*MySqlHandler)(nil)
)

// Handle request splitting, a single connection session
// not threadsafe, not shared
// MySqlHandler shared across connections, used to create
// connection specific connections
type MySqlHandlerShared struct {
svr *models.ServerCtx
conf *models.Config
nodes map[string]*schema.SourceConfig // List of servers
schema *schema.Schema
svr *models.ServerCtx
}

// Handle request splitting, a single connection session
// not threadsafe, not shared
// MySql connection handler, a single connection session
// not threadsafe, not shared
type MySqlHandler struct {
*MySqlHandlerShared
sess expr.ContextReader
conn *proxy.Conn
sess expr.ContextReader // session info
conn *proxy.Conn // Connection to client, inbound mysql conn
schema *schema.Schema
}

func NewMySqlHandler(svr *models.ServerCtx) (models.ConnectionHandle, error) {
sharedHandler := &MySqlHandlerShared{svr: svr, conf: svr.Config}
sharedHandler := &MySqlHandlerShared{svr: svr}
err := sharedHandler.Init()
connHandler := &MySqlHandler{MySqlHandlerShared: sharedHandler}
return connHandler, err
}

func (m *MySqlHandlerShared) Init() error { return nil }

// Clone this handler as each handler is a per-client/conn copy of handler
// Open/Clone this handler as each handler is a per-client/conn copy of handler
// - this occurs once when a new tcp-conn is established
// - it re-uses the HandlerShard with has schema, etc on it
func (m *MySqlHandler) Open(connI interface{}) models.Handler {
Expand All @@ -72,7 +70,7 @@ func (m *MySqlHandler) Open(connI interface{}) models.Handler {
handler.conn = conn
return &handler
}
panic("not cloneable")
panic(fmt.Sprintf("not proxy.Conn? %T", connI))
}

func (m *MySqlHandler) Close() error {
Expand Down Expand Up @@ -140,7 +138,7 @@ func (m *MySqlHandler) chooseCommand(writer models.ResultWriter, req *models.Req
func (m *MySqlHandler) handleQuery(writer models.ResultWriter, sql string) (err error) {

//u.Debugf("handleQuery: %v", sql)
if !m.conf.SupressRecover {
if !m.svr.Config.SupressRecover {
//u.Debugf("running recovery? ")
defer func() {
if e := recover(); e != nil {
Expand All @@ -161,17 +159,15 @@ func (m *MySqlHandler) handleQuery(writer models.ResultWriter, sql string) (err
ctx := plan.NewContext(sql)
ctx.Session = m.sess
ctx.Schema = m.schema
job, err := BuildMySqlob(ctx)
job, err := BuildMySqlob(m.svr, ctx)

if err != nil {
//u.Debugf("error? %v", err)
sql = strings.ToLower(sql)
switch {
case strings.Contains(sql, "set autocommit"):
return m.conn.WriteOK(nil)
case strings.Contains(sql, "set session transaction isolation"):
// SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
return m.conn.WriteOK(nil)
case strings.HasPrefix(sql, "set "):
// set autocommit
// SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ
return m.conn.WriteOK(nil)
}
u.Debugf("error on parse sql statement: %v", err)
Expand All @@ -181,8 +177,9 @@ func (m *MySqlHandler) handleQuery(writer models.ResultWriter, sql string) (err
// we are done, already wrote results
return nil
}

//u.Infof("job.Ctx %p Session %p", job.Ctx, job.Ctx.Session)
job.Ctx.Session = m.sess
//job.Ctx.Session = m.sess

switch stmt := job.Ctx.Stmt.(type) {
case *rel.SqlSelect:
Expand Down
18 changes: 15 additions & 3 deletions frontends/mysqlfe/testmysql/runtestserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import (

u "github.com/araddon/gou"
"github.com/bmizerany/assert"
"github.com/lytics/sereno/embeddedetcd"

"github.com/dataux/dataux/frontends/mysqlfe"
"github.com/dataux/dataux/models"
"github.com/dataux/dataux/planner"
"github.com/dataux/dataux/vendored/mixer/client"
mysqlproxy "github.com/dataux/dataux/vendored/mixer/proxy"
)
Expand All @@ -21,6 +23,8 @@ var (
testDBOnce sync.Once
testDB *client.DB
Conf *models.Config
EtcdCluster *embeddedetcd.EtcdCluster
ServerCtx *models.ServerCtx
)

func init() {
Expand Down Expand Up @@ -109,10 +113,18 @@ func NewTestServer(t *testing.T) *TestListenerWraper {

assert.Tf(t, Conf != nil, "must load config without err: %v", Conf)

svr := models.NewServerCtx(Conf)
svr.Init()
EtcdCluster = embeddedetcd.TestClusterOf1()
EtcdCluster.Launch()
etcdServers := EtcdCluster.HTTPMembers()[0].ClientURLs
u.Infof("etcdServers: %#v", etcdServers)
Conf.Etcd = etcdServers

handler, err := mysqlfe.NewMySqlHandler(svr)
planner.GridConf.EtcdServers = etcdServers

ServerCtx = models.NewServerCtx(Conf)
ServerCtx.Init()

handler, err := mysqlfe.NewMySqlHandler(ServerCtx)
assert.Tf(t, err == nil, "must create es handler without err: %v", err)

// Load our Frontend Listener's
Expand Down
3 changes: 3 additions & 0 deletions models/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,12 @@ func LoadConfig(conf string) (*Config, error) {
// 2) Sources (types of backends such as elasticsearch, mysql, mongo, ...)
// 3) Virtual Schemas
// 4) list of server/nodes for Sources
// 5) nats,etcd coordinators
type Config struct {
SupressRecover bool `json:"supress_recover"` // do we recover?
LogLevel string `json:"log_level"` // [debug,info,error,]
Etcd []string `json:"etcd"` // list of etcd servers http://127.0.0.1:2379,http://127.0.0.1:2380
Nats []string `json:"nats"` // list of nats servers http://127.0.0.1:4222,http://127.0.0.1:4223
Frontends []*ListenerConfig `json:"frontends"` // tcp listener configs
Sources []*schema.SourceConfig `json:"sources"` // backend servers/sources (es, mysql etc)
Schemas []*schema.SchemaConfig `json:"schemas"` // Schemas, each backend has 1 schema
Expand Down
2 changes: 0 additions & 2 deletions models/listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@ var (
// A listener is a protocol specific, and transport specific
// reader of requests which will be routed to a handler
type Listener interface {
// Blocking runner
Run(handle ConnectionHandle, stop chan bool) error
Close() error
}

//type func(*models.Config) (models.Listener, error)
type ListenerInit func(*ListenerConfig, *Config) (Listener, error)

func ListenerRegister(name string, fn ListenerInit, connHandle ConnectionHandle) {
Expand Down
6 changes: 6 additions & 0 deletions models/serverctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@ import (

"github.com/araddon/qlbridge/datasource"
"github.com/araddon/qlbridge/schema"

"github.com/dataux/dataux/planner"
"github.com/dataux/dataux/planner/gridrunner"
)

type ServerCtx struct {
Config *Config
schemas map[string]*schema.Schema
RtConf *datasource.RuntimeSchema
Grid *gridrunner.Server
}

func NewServerCtx(conf *Config) *ServerCtx {
Expand All @@ -31,6 +35,8 @@ func (m *ServerCtx) Init() error {
if err := m.loadConfig(); err != nil {
return err
}
m.Grid = planner.NewServerGrid(2)
go m.Grid.RunMaster()
return nil
}

Expand Down
31 changes: 31 additions & 0 deletions planner/build.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package planner

import (
u "github.com/araddon/gou"

"github.com/araddon/qlbridge/exec"
"github.com/araddon/qlbridge/rel"

"github.com/dataux/dataux/planner/gridrunner"
)

var (
// ensure it meets visitor
_ rel.Visitor = (*SqlJob)(nil)

_ = u.EMPTY
)

// Sql job that wraps the generic qlbridge job builder
type SqlJob struct {
*exec.JobBuilder
Grid *gridrunner.Server
Visitor rel.Visitor
runner exec.TaskRunners
}

// Many of the ShowMethods are MySql dialect specific so will be replaced here
func (m *SqlJob) VisitShow(stmt *rel.SqlShow) (rel.Task, rel.VisitStatus, error) {
u.Debugf("planner.VisitShow create?%v identity=%q raw=%s", stmt.Create, stmt.Identity, stmt.Raw)
return m.JobBuilder.VisitShow(stmt)
}
Loading

0 comments on commit 64ca6bf

Please sign in to comment.