Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cassandra data source part 1 #21

Merged
merged 12 commits into from
Aug 14, 2016
Prev Previous commit
Next Next commit
Cassandra data source sql rewrite for cass rules
  • Loading branch information
araddon committed Jul 11, 2016
commit 0af9e29d05bb786cb43494a802289be177240d97
9 changes: 4 additions & 5 deletions GLOCKFILE
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ github.com/go-sql-driver/mysql 3654d25ec346ee8ce71a68431025458d52a38ac0
github.com/gocql/gocql 261883b4ad61ed1b4820abf522f952d3e18b366c
github.com/gogo/protobuf 2752d97bbd91927dd1c43296dbf8700e50e2708c
github.com/golang/protobuf 3852dcfda249c2097355a6aabb199a28d97b30df
github.com/golang/snappy 723cc1e459b8eea2dea4583200fd60757d40097a
github.com/golang/snappy d9eb7a3d35ec988b8585d4a0068e462c27d28380
github.com/google/btree 7d79101e329e5a3adf994758c578dab82b90c017
github.com/hailocab/go-hostpool e80d13ce29ede4452c43dea11e79b9bc8a15b478
github.com/hashicorp/go-immutable-radix afc5a0dbb18abdf82c277a7bc01533e81fa1d6b8
Expand All @@ -29,17 +29,16 @@ github.com/lytics/metafora eb26f00432a98228e44de8e074b9627fc4c1e457
github.com/lytics/sereno f6128b640de0742892a439b3672ac88637df977d
github.com/mattbaird/elastigo 34c4c4d8425cbdcbc8e257943a2044d5e9f7dab5
github.com/mb0/glob 1eb79d2de6c448664e7272f8b9fe1938239e3aaa
github.com/nats-io/gnatsd 8e99354b8ca40ae921463cea076abf460dd12d62
github.com/nats-io/gnatsd d0c69c11181ae3707212aa681bbf665c07d7d79e
github.com/nats-io/nats 495efc04e982cdb69ffc655b142973f1f1a47323
github.com/nats-io/nuid 4f84f5f3b2786224e336af2e13dba0a0a80b76fa
github.com/nats-io/nuid a5152d67cf63cbfb5d992a395458722a45194715
github.com/pborman/uuid c55201b036063326c5b1b89ccfe45a184973d073
github.com/sony/sonyflake fa881fb1052b152e977c41023052c2f2a1c475e9
github.com/ugorji/go 45ce7596ace4534e47b69051a92aef7b64ec7b3f
golang.org/x/crypto 1f22c0103821b9390939b6776727195525381532
golang.org/x/net f841c39de738b1d0df95b5a7187744f0e03d8112
golang.org/x/oauth2 7357e96168422ba1e6e7f21075713b72d58764e7
google.golang.org/api c13a21ee847eca050f08db8373d8737494a1170e
google.golang.org/cloud 83ee71d0fec26977bad1662134fa1e8c91268025
google.golang.org/grpc 461dac99975b211ed3eda7eb45b997d82da4345a
gopkg.in/inf.v0 c85f1217d51339c0fa3a498cc8b2075de695dae6
gopkg.in/inf.v0 3887ee99ecf07df5b447e9b00d9c0b2adaa9f3e4
gopkg.in/mgo.v2 22287bab4379e1fbf6002fb4eb769888f3fb224c
50 changes: 49 additions & 1 deletion backends/cassandra/cass_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,27 @@ func TestSelectLimit(t *testing.T) {
})
}

func TestSelectGroupBy(t *testing.T) {
data := struct {
Author string
Ct int
}{}
validateQuerySpec(t, tu.QuerySpec{
Sql: "select count(*) as ct, author from article GROUP BY author;",
ExpectRowCt: 3,
ValidateRowData: func() {
//u.Infof("%v", data)
switch data.Author {
case "aaron":
assert.Tf(t, data.Ct == 1, "Should have found 1? %v", data)
case "bjorn":
assert.Tf(t, data.Ct == 2, "Should have found 2? %v", data)
}
},
RowData: &data,
})
}

func TestSelectWhereLike(t *testing.T) {

// We are testing the LIKE clause doesn't exist in Cassandra so we are polyfillying
Expand Down Expand Up @@ -386,19 +407,46 @@ func TestSelectProjectionRewrite(t *testing.T) {
}

func TestSelectOrderBy(t *testing.T) {
RunTestServer(t)

dbx, err := sqlx.Connect("mysql", DbConn)
assert.Tf(t, err == nil, "%v", err)
defer dbx.Close()

// This is an error BECAUSE cassandra won't allow order by on
// partition key that is not part of == or IN clause in where
// - do i poly/fill? with option?
// - what about changing the partition keys to be more realistic?
// - new artificial partition key? ie hash(url) = assigns it to 1 of ?? 1000 partitions?
// how would i declare that hash(url) syntax? a materialized view?
rows, err := dbx.Queryx("select title, count64 AS ct FROM article ORDER BY title DESC LIMIT 1;")
assert.Tf(t, err == nil, "Should error! %v", err)
cols, err := rows.Columns()
assert.Tf(t, err == nil, "%v", err)
assert.Tf(t, len(cols) == 2, "has 2 cols")
assert.T(t, rows.Next() == false, "Should not have any rows")

return
// return
data := struct {
Title string
Ct int
}{}
// Try order by on primary partition key
validateQuerySpec(t, tu.QuerySpec{
Sql: "select title, count64 AS ct FROM article ORDER BY count64 DESC LIMIT 1;",
Sql: "select title, count64 AS ct FROM article ORDER BY title DESC LIMIT 1;",
ExpectRowCt: 1,
ValidateRowData: func() {
assert.Tf(t, data.Title == "zarticle3", "%v", data)
assert.Tf(t, data.Ct == 100, "%v", data)
},
RowData: &data,
})

// try order by on some other keys

// need to fix OrderBy for ints first
return
validateQuerySpec(t, tu.QuerySpec{
Sql: "select title, count64 AS ct FROM article ORDER BY count64 ASC LIMIT 1;",
ExpectRowCt: 1,
Expand Down
18 changes: 7 additions & 11 deletions backends/cassandra/resultreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ type ResultReader struct {
cursor int
proj *rel.Projection
cols []string
rewritenSql string
Total int
Req *SqlToCql
}
Expand All @@ -37,11 +36,10 @@ type ResultReaderNext struct {
*ResultReader
}

func NewResultReader(req *SqlToCql, rewritenSql string) *ResultReader {
func NewResultReader(req *SqlToCql) *ResultReader {
m := &ResultReader{}
m.TaskBase = exec.NewTaskBase(req.Ctx)
m.Req = req
m.rewritenSql = rewritenSql
return m
}

Expand Down Expand Up @@ -100,9 +98,7 @@ func (m *ResultReader) Run() error {
m.finalized = true
m.buildProjection()

sql := m.Req.sel

if sql.CountStar() {
if m.Req.sel.CountStar() {
// Count(*) Do we really want to do push-down here?
// possibly default to not allowing this and allow a setting?
u.Warnf("Count(*) on Cassandra, your crazy!")
Expand All @@ -119,12 +115,12 @@ func (m *ResultReader) Run() error {
}

limit := DefaultLimit
if sql.Limit > 0 {
limit = sql.Limit
if m.Req.sel.Limit > 0 {
limit = m.Req.sel.Limit
}

u.Debugf("%p cass limit: %d sel:%s", m.Req.sel, limit, m.Req.sel)
queryStart := time.Now()
cassQry := m.Req.s.session.Query(m.rewritenSql).PageSize(limit)
cassQry := m.Req.s.session.Query(m.Req.sel.String()).PageSize(limit)
iter := cassQry.Iter()

for {
Expand All @@ -151,7 +147,7 @@ func (m *ResultReader) Run() error {
}
}

u.Debugf("new row ct: %v cols:%v vals:%v", m.Total, colNames, vals)
//u.Debugf("new row ct: %v cols:%v vals:%v", m.Total, colNames, vals)
//msg := &datasource.SqlDriverMessage{vals, len(m.Vals)}
msg := datasource.NewSqlDriverMessageMap(uint64(m.Total), vals, colNames)
m.Total++
Expand Down
2 changes: 1 addition & 1 deletion backends/cassandra/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func (m *Source) Table(table string) (*schema.Table, error) {
}

func (m *Source) Open(tableName string) (schema.Conn, error) {
u.Debugf("Open(%v)", tableName)
//u.Debugf("Open(%v)", tableName)
if m.schema == nil {
u.Warnf("no schema?")
return nil, nil
Expand Down
103 changes: 86 additions & 17 deletions backends/cassandra/sql_to_cql.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type SqlToCql struct {
needsPolyFill bool // polyfill?
needsWherePolyFill bool // do we request that features be polyfilled?
needsProjectPolyFill bool // do we request that features be polyfilled?
needsOrderByPolyFill bool
}

// NewSqlToCql create a SQL -> CQL ast converter
Expand All @@ -66,14 +67,14 @@ func NewSqlToCql(s *Source, t *schema.Table) *SqlToCql {
return m
}

func (m *SqlToCql) query(original *rel.SqlSelect) (*ResultReader, error) {
func (m *SqlToCql) queryRewrite(original *rel.SqlSelect) error {

//u.Debugf("%p %s query:%s", m, m.tbl.Name, m.sel)
m.original = original

cf, ok := m.tbl.Context["cass_table"].(*gocql.TableMetadata)
if !ok {
return nil, fmt.Errorf("What, expected gocql.TableMetadata but got %T", m.tbl.Context["cass_table"])
return fmt.Errorf("What, expected gocql.TableMetadata but got %T", m.tbl.Context["cass_table"])
}
m.cf = cf

Expand All @@ -84,13 +85,15 @@ func (m *SqlToCql) query(original *rel.SqlSelect) (*ResultReader, error) {
limit = DefaultLimit
}

req.RewriteAsRawSelect()

if req.Where != nil {
u.Debugf("original vs new: \n%s\n%s", original.Where, req.Where)
m.whereIdents = make(map[string]bool)
w, err := m.walkWhereNode(req.Where.Expr)
if err != nil {
u.Warnf("Could Not evaluate Where Node %s %v", req.Where.Expr.String(), err)
return nil, err
return err
}
req.Where = nil
if w != nil {
Expand All @@ -100,16 +103,31 @@ func (m *SqlToCql) query(original *rel.SqlSelect) (*ResultReader, error) {
}

// Obviously we aren't doing group-by
if req.GroupBy != nil {
if len(req.GroupBy) > 0 {
u.Debugf(" poly-filling groupby")
req.GroupBy = nil
}

u.Infof("%s", req)
if len(req.OrderBy) > 0 {
u.Debugf("orderby?")
ob := req.OrderBy
req.OrderBy = make(rel.Columns, 0, len(ob))
for _, c := range ob {
if c.Expr == nil {
u.Warnf("nil expr orderby %#v", c)
} else {
newOrderExpr, _ := m.walkOrderBy(c.Expr)
if newOrderExpr != nil {
c.Expr = newOrderExpr
u.Debugf("yay can orderby %s", c)
req.OrderBy = append(req.OrderBy, c)
}
}
}
}

resultReader := NewResultReader(m, req.String())
m.resp = resultReader
return resultReader, nil
u.Infof("%s", req)
return nil
}

// WalkSourceSelect An interface implemented by this connection allowing the planner
Expand All @@ -126,7 +144,7 @@ func (m *SqlToCql) WalkSourceSelect(planner plan.Planner, p *plan.Source) (plan.
func (m *SqlToCql) WalkExecSource(p *plan.Source) (exec.Task, error) {

//u.Debugf("%p WalkExecSource(): %T nil?%v %#v", m, p, p == nil, p)
//u.Debugf("m? %v", m == nil)
//u.Debugf("%p WalkExecSource() %s", m, p.Stmt)

if p.Stmt == nil {
return nil, fmt.Errorf("Plan did not include Sql Statement?")
Expand Down Expand Up @@ -154,43 +172,62 @@ func (m *SqlToCql) WalkExecSource(p *plan.Source) (exec.Task, error) {
if pt.Id == partitionId {
//u.Debugf("partition: %s %#v", partitionId, pt)
m.partition = pt
if pt.Left == "" {
//m.filter = bson.M{p.Tbl.Partition.Keys[0]: bson.M{"$lt": pt.Right}}
} else if pt.Right == "" {
//m.filter = bson.M{p.Tbl.Partition.Keys[0]: bson.M{"$gte": pt.Left}}
} else {

}
}
}
}
}
}

reader, err := m.query(p.Stmt.Source)
err := m.queryRewrite(p.Stmt.Source)
if err != nil {
return nil, nil
}

reader := NewResultReader(m)
m.resp = reader

// For aggregations, group-by, or limit clauses we will need to do final
// aggregation here in master as the reduce step
if m.sel.IsAggQuery() {
u.Debugf("Adding aggregate/group by?")
gbplan := plan.NewGroupBy(m.sel)
gb := exec.NewGroupByFinal(m.Ctx, gbplan)
reader.Add(gb)
m.needsPolyFill = true
}

if m.needsWherePolyFill {
wp := plan.NewWhere(m.sel)
wt := exec.NewWhere(m.Ctx, wp)
reader.Add(wt)
m.needsPolyFill = true
}

// do we need poly fill Having?
if m.sel.Having != nil {
u.Infof("needs HAVING polyfill")
}

if m.needsOrderByPolyFill {
u.Infof("adding order by poly fill")
op := plan.NewOrder(m.sel)
ot := exec.NewOrder(m.Ctx, op)
reader.Add(ot)
m.needsPolyFill = true
}

u.Infof("%p needsPolyFill?%v limit:%d ", m.sel, m.needsPolyFill, m.sel.Limit)
if m.needsPolyFill {
if m.sel.Limit > 0 {
// Since we are poly-filling we need to over-read
// because group-by's, order-by's, where poly-fills mean
// cass limits aren't valid from original statement
m.sel.Limit = 0
reader.Req.sel.Limit = 0
u.Warnf("%p setting limit up!!!!!! %v", m.sel, m.sel.Limit)
}
}

return reader, nil
}

Expand Down Expand Up @@ -488,3 +525,35 @@ func (m *SqlToCql) walkFilterBinary(node *expr.BinaryNode) (expr.Node, error) {
}
return nil, nil
}

func (m *SqlToCql) walkOrderBy(node expr.Node) (expr.Node, error) {
switch n := node.(type) {
case *expr.IdentityNode:
if m.canOrder(n) {
return node, nil
}
default:
m.needsOrderByPolyFill = true
u.Warnf("un-handled order clause expression type? %T %#v", node, n)
//return fmt.Errorf("Not implemented node: %s", n)
}
return nil, nil
}

func (m *SqlToCql) canOrder(in *expr.IdentityNode) bool {

_, lhIdentityName, _ := in.LeftRight()
_, exists := m.tbl.FieldMap[lhIdentityName]
if !exists {
// Doesn't exist in cassandra? possibly json path?
return false
}
// if it is an identity lets make sure it is in parition or cluster key
if !m.isCassKey(lhIdentityName) {
m.needsOrderByPolyFill = true
u.Warnf("cannot use [%s] in ORDER BY due to not being part of key", in)
return false
}

return true
}
4 changes: 2 additions & 2 deletions backends/datastore/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (m *GoogleDSDataSource) selectQuery(stmt *rel.SqlSelect) (*ResultReader, er

sqlDs := NewSqlToDatstore(tbl, m.dsClient, m.dsCtx)
//u.Debugf("SqlToDatstore: %#v", sqlDs)
resp, err := sqlDs.Query(stmt)
resp, err := sqlDs.query(stmt)
if err != nil {
u.Errorf("Google datastore query interpreter failed: %v", err)
return nil, err
Expand Down Expand Up @@ -300,7 +300,7 @@ func (m *GoogleDSDataSource) loadTableSchema(tableLower, tableOriginal string) (
case int:
f = schema.NewFieldBase(colName, value.IntType, 32, "int")
case int64:
tf = schema.NewFieldBase(colName, value.IntType, 64, "long")
f = schema.NewFieldBase(colName, value.IntType, 64, "long")
case float64:
f = schema.NewFieldBase(colName, value.NumberType, 64, "float64")
case bool:
Expand Down
4 changes: 2 additions & 2 deletions backends/datastore/resultreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ func (m *ResultReader) Run() error {
m.buildProjection()

sql := m.Req.sel
//u.Infof("query: %#v", m.Req.query)
q := m.Req.query
//u.Infof("query: %#v", m.Req.dsq)
q := m.Req.dsq

m.Vals = make([][]driver.Value, 0)

Expand Down
Loading