Skip to content

Commit

Permalink
Cassandra test fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
araddon committed Nov 6, 2017
1 parent 44bb8ce commit 33fe545
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 242 deletions.
38 changes: 28 additions & 10 deletions backends/cassandra/cass_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ package cassandra_test
import (
"database/sql"
"encoding/json"
"flag"
"fmt"
"os"
"strings"
"sync"
"testing"
Expand All @@ -19,6 +17,7 @@ import (

"github.com/araddon/qlbridge/datasource"
"github.com/araddon/qlbridge/plan"
"github.com/araddon/qlbridge/schema"

_ "github.com/dataux/dataux/backends/cassandra"
"github.com/dataux/dataux/frontends/mysqlfe/testmysql"
Expand All @@ -32,17 +31,12 @@ var (
now = time.Now()
expectedRowCt = 105
testServicesRunning bool
cassHost *string = flag.String("casshost", "localhost:9042", "Cassandra Host")
cassHost string = "localhost:9042"
session *gocql.Session
cassKeyspace = "datauxtest"
_ = json.RawMessage(nil)
)

func init() {
cass := os.Getenv("CASSANDRA_HOST")
if len(cass) > 0 {
*cassHost = cass
}
tu.Setup()
}

Expand Down Expand Up @@ -97,6 +91,30 @@ func RunTestServer(t *testing.T) func() {

loadTestData(t)

reg := schema.DefaultRegistry()
by := []byte(`{
"name": "cass",
"schema":"datauxtest",
"type": "cassandra",
"hosts": ["localhost:9042"],
"settings": {
"keyspace": "datauxtest"
}
}`)

conf := &schema.ConfigSource{}
err := json.Unmarshal(by, conf)
assert.Equal(t, nil, err)
if len(conf.Hosts) > 0 {
cassHost = conf.Hosts[0]
}
err = reg.SchemaAddFromConfig(conf)
assert.Equal(t, nil, err)

s, ok := reg.Schema("datauxtest")
assert.Equal(t, true, ok)
assert.NotEqual(t, nil, s)

planner.GridConf.JobMaker = jobMaker
planner.GridConf.SchemaLoader = testmysql.SchemaLoader
planner.GridConf.SupressRecover = testmysql.Conf.SupressRecover
Expand All @@ -113,7 +131,7 @@ func validateQuerySpec(t *testing.T, testSpec tu.QuerySpec) {
func loadTestData(t *testing.T) {
loadTestDataOnce.Do(func() {
u.Debugf("loading cassandra test data")
preKeyspace := gocql.NewCluster(*cassHost)
preKeyspace := gocql.NewCluster(cassHost)
// no keyspace
s1, err := preKeyspace.CreateSession()
assert.True(t, err == nil, "Must create cassandra session got err=%v", err)
Expand All @@ -122,7 +140,7 @@ func loadTestData(t *testing.T) {
err = s1.Query(cqlKeyspace).Exec()
assert.True(t, err == nil, "Must create cassandra keyspace got err=%v", err)

cluster := gocql.NewCluster(*cassHost)
cluster := gocql.NewCluster(cassHost)
cluster.Keyspace = cassKeyspace
// Error querying table schema: Undefined name key_aliases in selection clause
// this assumes cassandra 2.2.x ??
Expand Down
8 changes: 6 additions & 2 deletions backends/cassandra/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ func init() {
// Create a gocql session
func createCassSession(conf *schema.ConfigSource, keyspace string) (*gocql.Session, error) {

servers := conf.Settings.Strings("hosts")
servers := conf.Hosts
if len(servers) == 0 {
servers = conf.Settings.Strings("hosts")
}

if len(servers) == 0 {
return nil, fmt.Errorf("No 'hosts' for cassandra found in config %v", conf.Settings)
}
Expand Down Expand Up @@ -121,7 +125,7 @@ func (m *Source) Setup(ss *schema.Schema) error {
m.db = strings.ToLower(ss.Name)
m.tablemap = make(map[string]*schema.Table)

//u.Infof("Init: %#v", m.schema.Conf)
u.Infof("Init: %#v", m.schema.Conf)
if m.schema.Conf == nil {
return fmt.Errorf("Schema conf not found")
}
Expand Down
27 changes: 24 additions & 3 deletions backends/cassandra/sql_to_cql.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,27 @@ func (m *SqlToCql) isCassKey(name string) bool {
return false
}

func eval(arg expr.Node) (value.Value, bool) {
switch arg := arg.(type) {
case *expr.NumberNode, *expr.StringNode:
return vm.Eval(nil, arg)
case *expr.IdentityNode:
return value.NewStringValue(arg.Text), true
case *expr.ArrayNode:
sv := value.NewSliceValues(nil)
for _, na := range arg.Args {
v, ok := eval(na)
if ok {
sv.Append(v)
}
}
return sv, true
default:
u.Debugf("%T %v", arg, arg)
}
return nil, false
}

// walkWhereNode() We are re-writing the sql select statement and need
// to walk the ast and see if we can push down this where clause
// completely or partially.
Expand Down Expand Up @@ -498,11 +519,11 @@ func (m *SqlToCql) walkFilterBinary(node *expr.BinaryNode) (expr.Node, error) {
return nil, nil
}

lhval, lhok := vm.Eval(nil, node.Args[0])
rhval, rhok := vm.Eval(nil, node.Args[1])
lhval, lhok := eval(node.Args[0])
rhval, rhok := eval(node.Args[1])
if !lhok || !rhok {
u.Warnf("not ok: %v l:%v r:%v", node, lhval, rhval)
return nil, fmt.Errorf("could not evaluate: %v", node.String())
//return nil, fmt.Errorf("could not evaluate: %v", node.String())
}
//u.Debugf("walkBinary: %v l:%v r:%v %T %T", node, lhval, rhval, lhval, rhval)
switch node.Operator.T {
Expand Down
217 changes: 0 additions & 217 deletions dataux.conf

This file was deleted.

10 changes: 0 additions & 10 deletions frontends/mysqlfe/testmysql/runtestserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,6 @@ frontends [

var oldConfig = `
sources : [
{
name : cass
type : cassandra
settings {
keyspace "datauxtest"
hosts ["localhost:9042"]
}
}
# csv-file "db" of data from http://seanlahman.com/baseball-archive/statistics/
# must have TESTINT=true integration test flag turned on
{
Expand Down

0 comments on commit 33fe545

Please sign in to comment.