Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cassandra data source part 1 #21

Merged
merged 12 commits into from
Aug 14, 2016
Prev Previous commit
Next Next commit
Cassandra wip
  • Loading branch information
araddon committed Jul 25, 2016
commit 375b92d527f20e7a8da41e025a599bbc108b99f2
5 changes: 5 additions & 0 deletions backends/cassandra/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@


```
# log onto container to get a cqlsh
docker run -it --rm --net container:lioenv_cass_1 cassandra:2 cqlsh

INSERT INTO user (id, name, deleted, created, updated) VALUES ('user814', 'test_name', false, '2016-07-24','2016-07-24 23:46:01')

cqlsh:datauxtest> select * from system.schema_columnfamilies WHERE keyspace_name = "datauxtest";
SyntaxException: <ErrorMessage code=2000 [Syntax error in CQL query] message="line 1:65 no viable alternative at input 'datauxtest' (...system.schema_columnfamilies WHERE keyspace_name = ["datauxtes]...)">

Expand Down
32 changes: 28 additions & 4 deletions backends/cassandra/cass_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,11 @@ CREATE TABLE IF NOT EXISTS event (
jsondata text,
PRIMARY KEY ((date, url), ts)
);
`}
`,
`truncate event`,
`truncate user`,
`truncate article`,
}

func jobMaker(ctx *plan.Context) (*planner.ExecutorGrid, error) {
ctx.Schema = testmysql.Schema
Expand Down Expand Up @@ -459,26 +463,46 @@ func TestSelectOrderBy(t *testing.T) {
}

func TestMutationInsertSimple(t *testing.T) {
validateQuerySpec(t, tu.QuerySpec{
Sql: "select id, name from user;",
ExpectRowCt: 3,
ValidateRowData: func() {},
})
validateQuerySpec(t, tu.QuerySpec{
Exec: `INSERT INTO user (id, name, deleted, created, updated) VALUES ("user814", "test_name",false, now(), now());`,
ValidateRowData: func() {},
ExpectRowCt: 1,
})
validateQuerySpec(t, tu.QuerySpec{
Exec: `
INSERT INTO user (id, name, deleted, created, updated)
VALUES
("user815", "test_name2",false, now(), now()),
("user816", "test_name3",false, now(), now());
`,
ValidateRowData: func() {},
ExpectRowCt: 2,
})
validateQuerySpec(t, tu.QuerySpec{
Sql: "select id, name from user;",
ExpectRowCt: 6,
ValidateRowData: func() {},
})
}

func TestMutationDeleteSimple(t *testing.T) {
validateQuerySpec(t, tu.QuerySpec{
Exec: `INSERT INTO user (id, name, deleted, created, updated) VALUES ("user814", "test_name",false, now(), now());`,
Exec: `INSERT INTO user (id, name, deleted, created, updated) VALUES ("user817", "test_name",false, now(), now());`,
ValidateRowData: func() {},
ExpectRowCt: 1,
})
validateQuerySpec(t, tu.QuerySpec{
Exec: `DELETE FROM user WHERE id = "user814"`,
Exec: `DELETE FROM user WHERE id = "user817"`,
ValidateRowData: func() {},
ExpectRowCt: 1,
})
validateQuerySpec(t, tu.QuerySpec{
Exec: `SELECT * FROM user WHERE id = "user814"`,
Exec: `SELECT * FROM user WHERE id = "user817"`,
ValidateRowData: func() {},
ExpectRowCt: 0,
})
Expand Down
49 changes: 25 additions & 24 deletions backends/cassandra/sql_to_cql.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Cassandra implements a data source (backend) to allow
// package Cassandra implements a data source (backend) to allow
// dataux to query cassandra via sql
package cassandra

Expand Down Expand Up @@ -255,26 +255,23 @@ func (m *SqlToCql) Put(ctx context.Context, key schema.Key, val interface{}) (sc
return nil, fmt.Errorf("Must have schema for updates in cassandra")
}

// TODO:
// 1) rewrite statement
// 2) move to ?? some place more like a prepared statement

cols := m.tbl.Columns()
if m.stmt == nil {
return nil, fmt.Errorf("Must have stmts to infer columns ")
}
upsertCql := ""
switch q := m.stmt.(type) {
case *rel.SqlInsert:
cols = q.ColumnNames()
u.Debugf("nice, columns %v", cols)
upsertCql = q.RewriteAsPrepareable(1, '?')
//u.Debugf("prepared: \n%s", upsertCql)
default:
return nil, fmt.Errorf("%T not yet supported ", q)
}

var row []driver.Value
colNames := make(map[string]int, len(cols))

// TODO: rewrite insert/update"
upsertCql := m.stmt.String()

for i, colName := range cols {
colNames[colName] = i
}
Expand All @@ -283,29 +280,25 @@ func (m *SqlToCql) Put(ctx context.Context, key schema.Key, val interface{}) (sc
switch valT := val.(type) {
case []driver.Value:
row = valT
u.Infof("row len=%v fieldlen=%v col len=%v", len(row), len(m.tbl.Fields), len(cols))
//u.Debugf("row: %v", row)
//u.Debugf("row len=%v fieldlen=%v col len=%v", len(row), len(m.tbl.Fields), len(cols))
for _, f := range m.tbl.Fields {
for i, colName := range cols {
if f.Name == colName {
if len(row) <= i-1 {
u.Errorf("bad column count? %d vs %d col: %+v", len(row), i, f)
} else {
u.Debugf("col info? %d vs %d col: %+v", len(row), i, f)
switch val := row[i].(type) {
case string, []byte, int, int64, bool, time.Time:
u.Debugf("PUT field: i=%d col=%s row[i]=%v T:%T", i, colName, row[i], row[i])
//props = append(props, datastore.Property{Name: f.Name, Value: val})
curRow[i] = val
case []value.Value:
by, err := json.Marshal(val)
if err != nil {
u.Errorf("Error converting field %v err=%v", val, err)
}
u.Debugf("PUT field: i=%d col=%s row[i]=%v T:%T", i, colName, string(by), by)
//props = append(props, datastore.Property{Name: f.Name, Value: by})
//u.Debugf("PUT field: i=%d col=%s row[i]=%v T:%T", i, colName, string(by), by)
default:
u.Warnf("unsupported conversion: %T %v", val, val)
//props = append(props, datastore.Property{Name: f.Name, Value: val})
}
}
break
Expand All @@ -317,7 +310,6 @@ func (m *SqlToCql) Put(ctx context.Context, key schema.Key, val interface{}) (sc
for i, f := range m.tbl.Fields {
for colName, driverVal := range valT {
if f.Name == colName {
//u.Debugf("PUT field: i=%d col=%s val=%v T:%T cur:%v", i, colName, driverVal, driverVal, curRow[i])
switch val := driverVal.(type) {
case string, []byte, int, int64, bool:
curRow[i] = val
Expand All @@ -336,23 +328,18 @@ func (m *SqlToCql) Put(ctx context.Context, key schema.Key, val interface{}) (sc
break
}
}
//u.Infof(" %v curRow? %d %#v", f.Name, len(curRow), curRow)
//u.Debugf("%d writing %-10s %T\t%v", i, f.Name, curRow[i], curRow[i])
//props = append(props, datastore.Property{Name: f.Name, Value: curRow[i]})
}

default:
u.Warnf("unsupported type: %T %#v", val, val)
return nil, fmt.Errorf("Was not []driver.Value? %T", val)
}

u.Debugf("%s", upsertCql)
err := m.s.session.Query(upsertCql, curRow...).Exec()
if err != nil {
u.Errorf("could not insert: %v", err)
return nil, err
}
u.Infof("err %v", err)
newKey := datasource.NewKeyCol("id", "fixme")
return newKey, nil
}
Expand All @@ -363,14 +350,28 @@ func (m *SqlToCql) PutMulti(ctx context.Context, keys []schema.Key, src interfac

// Delete delete by row
func (m *SqlToCql) Delete(key driver.Value) (int, error) {
u.Warnf("hm, in delete? %v", key)
return 0, schema.ErrNotImplemented
}

// DeleteExpression - delete by expression (where clause)
// - For where columns contained in Partition Keys we can push to cassandra
// - for others we might have to do a select -> delete
func (m *SqlToCql) DeleteExpression(where expr.Node) (int, error) {
return 0, schema.ErrNotImplemented
func (m *SqlToCql) DeleteExpression(p interface{}, where expr.Node) (int, error) {
u.Warnf("hm, in delete? %v %T", where, p)
pd, ok := p.(*plan.Delete)
if !ok {
return 0, plan.ErrNoPlan
}
cql := fmt.Sprintf("DELETE FROM %s WHERE %s", pd.Stmt.Table, where)
err := m.s.session.Query(cql).Exec()
if err != nil {
u.Errorf("could not delete: %v", err)
return 0, err
}
// Wow, we have serious problems here because cql/cassandra doesn't
// tell us how many were deleted. hm
return 1, nil
}

func (m *SqlToCql) isCassKey(name string) bool {
Expand Down
38 changes: 32 additions & 6 deletions frontends/mysqlfe/results_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mysqlfe

import (
"database/sql/driver"
"errors"
"time"

u "github.com/araddon/gou"
Expand Down Expand Up @@ -42,6 +43,8 @@ type MySqlExecResultWriter struct {
writer models.ResultWriter
schema *schema.Schema
Rs *mysql.Result
ct int64
err error
}

func NewMySqlResultWriter(writer models.ResultWriter, ctx *plan.Context) *MySqlResultWriter {
Expand Down Expand Up @@ -353,7 +356,7 @@ func NewMySqlExecResultWriter(writer models.ResultWriter, ctx *plan.Context) *My
m := &MySqlExecResultWriter{writer: writer, schema: ctx.Schema}
m.TaskBase = exec.NewTaskBase(ctx)
m.Rs = mysql.NewResult()
m.Handler = nilWriter(m)
m.Handler = m.ResultWriter()
return m
}
func (m *MySqlExecResultWriter) Close() error {
Expand All @@ -364,18 +367,41 @@ func (m *MySqlExecResultWriter) Close() error {
if m.writer == nil {
u.Warnf("wat? nil writer? ")
}
//u.Debugf("rs?%#v writer?%#v", m.Rs, m.writer)
// TODO: we need to send a message to get this count
m.Rs.AffectedRows = 1
if m.err != nil {
m.Rs.Status = mysql.ER_UNKNOWN_ERROR
} else {
m.Rs.AffectedRows = uint64(m.ct)
}
m.writer.WriteResult(m.Rs)
return m.TaskBase.Close()
}
func (m *MySqlExecResultWriter) Finalize() error {
return nil
}
func nilWriter(m *MySqlExecResultWriter) exec.MessageHandler {
func (m *MySqlExecResultWriter) ResultWriter() exec.MessageHandler {
return func(_ *plan.Context, msg schema.Message) bool {
u.Debugf("in nilWriter: %#v", msg)

var vals []driver.Value
switch mt := msg.Body().(type) {
case *datasource.SqlDriverMessageMap:
vals = mt.Values()
case []driver.Value:
vals = mt
default:
u.Warnf("%T not supported", mt)
return false
}
if len(vals) == 2 {
switch rt := vals[0].(type) {
case string: // error
m.err = errors.New(rt)
default:
if affectedCt, isInt := vals[1].(int64); isInt {
m.ct = affectedCt
}
}
return true
}
return false
}
}
2 changes: 1 addition & 1 deletion testutil/testsetup.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func ValidateQuerySpec(t *testing.T, testSpec QuerySpec) {
case len(testSpec.Exec) > 0:
result, err := dbx.Exec(testSpec.Exec)
assert.Tf(t, err == nil, "%v", err)
//u.Infof("result: %Ev", result)
u.Infof("result: %#v", result)
if testSpec.ExpectRowCt > -1 {
affected, err := result.RowsAffected()
assert.Tf(t, err == nil, "%v", err)
Expand Down
1 change: 1 addition & 0 deletions vendored/mixer/proxy/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func (c *Conn) Run() {
}

if err != mysql.ErrBadConn {
u.Warnf("writing error %v", err)
c.WriteError(err)
}
}
Expand Down