Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cassandra data source part 1 #21

Merged
merged 12 commits into from
Aug 14, 2016
Next Next commit
Cassandra data source part 1
  • Loading branch information
Aaron Raddon committed Jul 9, 2016
commit e2dc913f031ce843734499f9e5a0d00670808def
466 changes: 466 additions & 0 deletions backends/cassandra/cass_test.go

Large diffs are not rendered by default.

173 changes: 173 additions & 0 deletions backends/cassandra/resultreader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package cassandra

import (
"database/sql/driver"
"time"

u "github.com/araddon/gou"

"github.com/araddon/qlbridge/datasource"
"github.com/araddon/qlbridge/exec"
"github.com/araddon/qlbridge/rel"
"github.com/araddon/qlbridge/value"
)

var (
// Ensure we implement TaskRunner
_ exec.TaskRunner = (*ResultReader)(nil)
)

// ResultReader implements result paging, reading
type ResultReader struct {
*exec.TaskBase
exit <-chan bool
finalized bool
hasprojection bool
cursor int
proj *rel.Projection
cols []string
Total int
Req *SqlToCql
}

// A wrapper, allowing us to implement sql/driver Next() interface
// which is different than qlbridge/datasource Next()
type ResultReaderNext struct {
*ResultReader
}

func NewResultReader(req *SqlToCql) *ResultReader {
m := &ResultReader{}
m.TaskBase = exec.NewTaskBase(req.Ctx)
m.Req = req
return m
}

func (m *ResultReader) Close() error { return nil }

func (m *ResultReader) buildProjection() {

if m.hasprojection {
return
}
m.hasprojection = true
m.proj = rel.NewProjection()
cols := m.proj.Columns
sql := m.Req.sel
if sql.Star {
// Select Each field, grab fields from Table Schema
for _, fld := range m.Req.tbl.Fields {
cols = append(cols, rel.NewResultColumn(fld.Name, len(cols), nil, fld.Type))
}
} else if sql.CountStar() {
// Count *
cols = append(cols, rel.NewResultColumn("count", len(cols), nil, value.IntType))
} else {
for _, col := range m.Req.sel.Columns {
if fld, ok := m.Req.tbl.FieldMap[col.SourceField]; ok {
//u.Debugf("column: %#v", col)
cols = append(cols, rel.NewResultColumn(col.SourceField, len(cols), col, fld.Type))
} else {
u.Debugf("Could not find: '%v' in %#v", col.SourceField, m.Req.tbl.FieldMap)
u.Warnf("%#v", col)
}
}
}
colNames := make([]string, len(cols))
for i, col := range cols {
colNames[i] = col.As
}
m.cols = colNames
m.proj.Columns = cols
//u.Debugf("leaving Columns: %v", len(m.proj.Columns))
}

// Runs the Google Datastore properties into
// [][]interface{} which is compabitble with sql/driver values
// as well as making a projection, ie column selection
//
// func (m *ResultReader) Finalize() error {
func (m *ResultReader) Run() error {

sigChan := m.SigChan()
outCh := m.MessageOut()
defer func() {
close(outCh) // closing output channels is the signal to stop
u.Debugf("nice, finalize ResultReader out: %p row ct %v", outCh, m.Total)
}()
m.finalized = true
m.buildProjection()

sql := m.Req.sel

if sql.CountStar() {
// Count(*) Do we really want to do push-down here?
// possibly default to not allowing this and allow a setting?
u.Warnf("Count(*) on Cassandra, your crazy!")
}

cols := m.Req.p.Proj.Columns
colNames := make(map[string]int, len(cols))
for i, col := range cols {
colNames[col.Name] = i
//u.Debugf("col.name=%v col.as=%s", col.Name, col.As)
}
if len(cols) == 0 {
u.Errorf("no cols? %v *?", cols)
}

limit := DefaultLimit
if sql.Limit > 0 {
limit = sql.Limit
}

queryStart := time.Now()
cassQry := m.Req.s.session.Query(sql.String()).PageSize(limit)
iter := cassQry.Iter()

for {
row := make(map[string]interface{})
if !iter.MapScan(row) {
u.Infof("done with query")
break
}

vals := make([]driver.Value, len(cols))
for k, v := range row {
found := false
for i, col := range cols {
//u.Infof("prop.name=%s col.Name=%s", prop.Name, col.Name)
if col.Name == k {
vals[i] = v
u.Debugf("%-2d col.name=%-10s prop.T %T\tprop.v%v", i, col.Name, v, v)
found = true
break
}
}
if !found {
u.Warnf("not found? %s=%v", k, v)
}
}

u.Debugf("new row ct: %v cols:%v vals:%v", m.Total, colNames, vals)
//msg := &datasource.SqlDriverMessage{vals, len(m.Vals)}
msg := datasource.NewSqlDriverMessageMap(uint64(m.Total), vals, colNames)
m.Total++
//u.Debugf("In gds source iter %#v", vals)
select {
case <-sigChan:
return nil
case outCh <- msg:
// continue
}
//u.Debugf("vals: %v", row.Vals)
}
err := iter.Close()

if err != nil {
u.Errorf("could not close iter err:%v", err)
return err
}
u.Infof("finished query, took: %v for %v rows err:%v", time.Now().Sub(queryStart), m.Total, err)
return err
}
Loading