Skip to content

Commit e2dc913

Browse files
author
Aaron Raddon
committed
Cassandra data source part 1
1 parent 63ed08b commit e2dc913

File tree

11 files changed

+1259
-55
lines changed

11 files changed

+1259
-55
lines changed

backends/cassandra/cass_test.go

Lines changed: 466 additions & 0 deletions
Large diffs are not rendered by default.

backends/cassandra/resultreader.go

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
package cassandra
2+
3+
import (
4+
"database/sql/driver"
5+
"time"
6+
7+
u "github.com/araddon/gou"
8+
9+
"github.com/araddon/qlbridge/datasource"
10+
"github.com/araddon/qlbridge/exec"
11+
"github.com/araddon/qlbridge/rel"
12+
"github.com/araddon/qlbridge/value"
13+
)
14+
15+
var (
16+
// Ensure we implement TaskRunner
17+
_ exec.TaskRunner = (*ResultReader)(nil)
18+
)
19+
20+
// ResultReader implements result paging, reading
21+
type ResultReader struct {
22+
*exec.TaskBase
23+
exit <-chan bool
24+
finalized bool
25+
hasprojection bool
26+
cursor int
27+
proj *rel.Projection
28+
cols []string
29+
Total int
30+
Req *SqlToCql
31+
}
32+
33+
// A wrapper, allowing us to implement sql/driver Next() interface
34+
// which is different than qlbridge/datasource Next()
35+
type ResultReaderNext struct {
36+
*ResultReader
37+
}
38+
39+
func NewResultReader(req *SqlToCql) *ResultReader {
40+
m := &ResultReader{}
41+
m.TaskBase = exec.NewTaskBase(req.Ctx)
42+
m.Req = req
43+
return m
44+
}
45+
46+
func (m *ResultReader) Close() error { return nil }
47+
48+
func (m *ResultReader) buildProjection() {
49+
50+
if m.hasprojection {
51+
return
52+
}
53+
m.hasprojection = true
54+
m.proj = rel.NewProjection()
55+
cols := m.proj.Columns
56+
sql := m.Req.sel
57+
if sql.Star {
58+
// Select Each field, grab fields from Table Schema
59+
for _, fld := range m.Req.tbl.Fields {
60+
cols = append(cols, rel.NewResultColumn(fld.Name, len(cols), nil, fld.Type))
61+
}
62+
} else if sql.CountStar() {
63+
// Count *
64+
cols = append(cols, rel.NewResultColumn("count", len(cols), nil, value.IntType))
65+
} else {
66+
for _, col := range m.Req.sel.Columns {
67+
if fld, ok := m.Req.tbl.FieldMap[col.SourceField]; ok {
68+
//u.Debugf("column: %#v", col)
69+
cols = append(cols, rel.NewResultColumn(col.SourceField, len(cols), col, fld.Type))
70+
} else {
71+
u.Debugf("Could not find: '%v' in %#v", col.SourceField, m.Req.tbl.FieldMap)
72+
u.Warnf("%#v", col)
73+
}
74+
}
75+
}
76+
colNames := make([]string, len(cols))
77+
for i, col := range cols {
78+
colNames[i] = col.As
79+
}
80+
m.cols = colNames
81+
m.proj.Columns = cols
82+
//u.Debugf("leaving Columns: %v", len(m.proj.Columns))
83+
}
84+
85+
// Runs the Google Datastore properties into
86+
// [][]interface{} which is compabitble with sql/driver values
87+
// as well as making a projection, ie column selection
88+
//
89+
// func (m *ResultReader) Finalize() error {
90+
func (m *ResultReader) Run() error {
91+
92+
sigChan := m.SigChan()
93+
outCh := m.MessageOut()
94+
defer func() {
95+
close(outCh) // closing output channels is the signal to stop
96+
u.Debugf("nice, finalize ResultReader out: %p row ct %v", outCh, m.Total)
97+
}()
98+
m.finalized = true
99+
m.buildProjection()
100+
101+
sql := m.Req.sel
102+
103+
if sql.CountStar() {
104+
// Count(*) Do we really want to do push-down here?
105+
// possibly default to not allowing this and allow a setting?
106+
u.Warnf("Count(*) on Cassandra, your crazy!")
107+
}
108+
109+
cols := m.Req.p.Proj.Columns
110+
colNames := make(map[string]int, len(cols))
111+
for i, col := range cols {
112+
colNames[col.Name] = i
113+
//u.Debugf("col.name=%v col.as=%s", col.Name, col.As)
114+
}
115+
if len(cols) == 0 {
116+
u.Errorf("no cols? %v *?", cols)
117+
}
118+
119+
limit := DefaultLimit
120+
if sql.Limit > 0 {
121+
limit = sql.Limit
122+
}
123+
124+
queryStart := time.Now()
125+
cassQry := m.Req.s.session.Query(sql.String()).PageSize(limit)
126+
iter := cassQry.Iter()
127+
128+
for {
129+
row := make(map[string]interface{})
130+
if !iter.MapScan(row) {
131+
u.Infof("done with query")
132+
break
133+
}
134+
135+
vals := make([]driver.Value, len(cols))
136+
for k, v := range row {
137+
found := false
138+
for i, col := range cols {
139+
//u.Infof("prop.name=%s col.Name=%s", prop.Name, col.Name)
140+
if col.Name == k {
141+
vals[i] = v
142+
u.Debugf("%-2d col.name=%-10s prop.T %T\tprop.v%v", i, col.Name, v, v)
143+
found = true
144+
break
145+
}
146+
}
147+
if !found {
148+
u.Warnf("not found? %s=%v", k, v)
149+
}
150+
}
151+
152+
u.Debugf("new row ct: %v cols:%v vals:%v", m.Total, colNames, vals)
153+
//msg := &datasource.SqlDriverMessage{vals, len(m.Vals)}
154+
msg := datasource.NewSqlDriverMessageMap(uint64(m.Total), vals, colNames)
155+
m.Total++
156+
//u.Debugf("In gds source iter %#v", vals)
157+
select {
158+
case <-sigChan:
159+
return nil
160+
case outCh <- msg:
161+
// continue
162+
}
163+
//u.Debugf("vals: %v", row.Vals)
164+
}
165+
err := iter.Close()
166+
167+
if err != nil {
168+
u.Errorf("could not close iter err:%v", err)
169+
return err
170+
}
171+
u.Infof("finished query, took: %v for %v rows err:%v", time.Now().Sub(queryStart), m.Total, err)
172+
return err
173+
}

0 commit comments

Comments
 (0)