Skip to content

Commit

Permalink
Cassandra data source wip
Browse files Browse the repository at this point in the history
  • Loading branch information
araddon committed Jul 10, 2016
1 parent e2dc913 commit da4855e
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 23 deletions.
44 changes: 27 additions & 17 deletions backends/cassandra/cass_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,17 @@ func jobMaker(ctx *plan.Context) (*planner.ExecutorGrid, error) {
func RunTestServer(t *testing.T) func() {
if !testServicesRunning {
testServicesRunning = true

loadTestData(t)

planner.GridConf.JobMaker = jobMaker
planner.GridConf.SchemaLoader = testmysql.SchemaLoader
planner.GridConf.SupressRecover = testmysql.Conf.SupressRecover
testmysql.RunTestServer(t)
quit := make(chan bool)
planner.RunWorkerNodes(quit, 2, testmysql.ServerCtx.Reg)

loadTestData(t)
}
return func() {
// placeholder
}
return func() {}
}

func validateQuerySpec(t *testing.T, testSpec tu.QuerySpec) {
Expand All @@ -102,16 +101,27 @@ func validateQuerySpec(t *testing.T, testSpec tu.QuerySpec) {
func loadTestData(t *testing.T) {
loadTestDataOnce.Do(func() {
u.Debugf("loading cassandra test data")
preKeyspace := gocql.NewCluster(*cassHost)
// no keyspace
s1, err := preKeyspace.CreateSession()
assert.Tf(t, err == nil, "Must create cassandra session got err=%v", err)
cqlKeyspace := fmt.Sprintf(`
CREATE KEYSPACE IF NOT EXISTS %s WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };`, cassKeyspace)
err = s1.Query(cqlKeyspace).Exec()
assert.Tf(t, err == nil, "Must create cassandra keyspace got err=%v", err)

cluster := gocql.NewCluster(*cassHost)
cluster.Keyspace = cassKeyspace
// Error querying table schema: Undefined name key_aliases in selection clause
cluster.ProtoVersion = 4
cluster.CQLVersion = "3.1.0"
// cluster.Timeout = time.Second * 10
// cluster.NumConns = 10
// cluster.SocketKeepalive = time.Duration(5 * time.Minute)
// cluster.RetryPolicy = &gocql.SimpleRetryPolicy{NumRetries: 5}
sess, err := cluster.CreateSession()
assert.Tf(t, err == nil, "Must create cassandra session")
session = sess
cqlKeyspace := fmt.Sprintf(`
CREATE KEYSPACE IF NOT EXISTS %s WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };`, cassKeyspace)
err = sess.Query(cqlKeyspace).Exec()
time.Sleep(time.Millisecond * 30)
assert.T(t, err == nil, "must create keyspace", err)
assert.Tf(t, err == nil, "Must create cassandra session got err=%v", err)

for _, table := range testTables {
err = sess.Query(table).Consistency(gocql.All).Exec()
time.Sleep(time.Millisecond * 30)
Expand Down Expand Up @@ -246,22 +256,22 @@ func TestDescribeTable(t *testing.T) {
assert.Tf(t, data.Field != "", "%v", data)
switch data.Field {
case "embedded":
assert.Tf(t, data.Type == "binary", "%#v", data)
assert.Tf(t, data.Type == "binary" || data.Type == "text", "%#v", data)
describedCt++
case "author":
assert.Tf(t, data.Type == "string", "data: %#v", data)
assert.Tf(t, data.Type == "varchar(255)", "data: %#v", data)
describedCt++
case "created":
assert.Tf(t, data.Type == "datetime", "data: %#v", data)
describedCt++
case "category":
assert.Tf(t, data.Type == "binary", "data: %#v", data)
assert.Tf(t, data.Type == "text", "data: %#v", data)
describedCt++
case "body":
assert.Tf(t, data.Type == "binary", "data: %#v", data)
assert.Tf(t, data.Type == "text", "data: %#v", data)
describedCt++
case "deleted":
assert.Tf(t, data.Type == "bool", "data: %#v", data)
assert.Tf(t, data.Type == "bool" || data.Type == "tinyint", "data: %#v", data)
describedCt++
}
},
Expand Down
27 changes: 23 additions & 4 deletions backends/cassandra/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

u "github.com/araddon/gou"
"github.com/gocql/gocql"
"github.com/hailocab/go-hostpool"

"github.com/araddon/qlbridge/datasource"
"github.com/araddon/qlbridge/rel"
Expand Down Expand Up @@ -42,18 +43,31 @@ func createCassSession(conf *schema.ConfigSource, keyspace string) (*gocql.Sessi
}
cluster := gocql.NewCluster(servers...)

// Error querying table schema: Undefined name key_aliases in selection clause
// if on cass 2.1 use 3.0.0 for cqlversion
cluster.ProtoVersion = 4
cluster.CQLVersion = "3.1.0"

cluster.Keyspace = keyspace
cluster.NumConns = 10

if numconns := conf.Settings.Int("numconns"); numconns > 0 {
cluster.NumConns = numconns
}

cluster.Timeout = time.Second * 10
cluster.PoolConfig.HostSelectionPolicy = gocql.HostPoolHostPolicy(
hostpool.NewEpsilonGreedy(nil, 0, &hostpool.LinearEpsilonValueCalculator{}),
)

// load-balancers often kill idel conns so lets heart beat to keep alive
// see https://cloud.google.com/compute/docs/troubleshooting#communicatewithinternet
cluster.SocketKeepalive = time.Duration(5 * time.Minute)

if retries := conf.Settings.Int("retries"); retries > 0 {
cluster.RetryPolicy = &gocql.SimpleRetryPolicy{NumRetries: retries}
} else {
cluster.RetryPolicy = &gocql.SimpleRetryPolicy{NumRetries: 3}
}

return cluster.CreateSession()
Expand Down Expand Up @@ -96,7 +110,7 @@ func (m *Source) Setup(ss *schema.SchemaSource) error {
m.conf = ss.Conf
m.db = strings.ToLower(ss.Name)

u.Infof("Init: %#v", m.schema.Conf)
//u.Infof("Init: %#v", m.schema.Conf)
if m.schema.Conf == nil {
return fmt.Errorf("Schema conf not found")
}
Expand All @@ -109,6 +123,7 @@ func (m *Source) Setup(ss *schema.SchemaSource) error {

sess, err := createCassSession(m.conf, m.keyspace)
if err != nil {
u.Errorf("Could not create cass conn %v", err)
return err
}
m.session = sess
Expand All @@ -120,6 +135,7 @@ func (m *Source) loadSchema() error {

kmd, err := m.session.KeyspaceMetadata(m.keyspace)
if err != nil {
u.Warnf("ks:%v change protocol version if 2.1 or earlier %v", m.keyspace, err)
return err
}
m.kmd = kmd
Expand All @@ -146,12 +162,15 @@ func (m *Source) loadSchema() error {
colName := strings.ToLower(col.Name)
colNames = append(colNames, colName)

u.Debugf("%s cass col %v ", colName, col.Type.Type())
var f *schema.Field
switch col.Type.Type() {
case gocql.TypeBlob:
f = schema.NewFieldBase(colName, value.StringType, 24, "blob")
case gocql.TypeVarchar, gocql.TypeText:
f = schema.NewFieldBase(colName, value.ByteSliceType, 2000, "blob")
case gocql.TypeVarchar:
f = schema.NewFieldBase(colName, value.StringType, 256, "string")
case gocql.TypeText:
f = schema.NewFieldBase(colName, value.StringType, 2000, "string")
case gocql.TypeInt, gocql.TypeTinyInt:
f = schema.NewFieldBase(colName, value.IntType, 32, "int")
case gocql.TypeBigInt:
Expand All @@ -171,7 +190,7 @@ func (m *Source) loadSchema() error {
case gocql.TypeInt, gocql.TypeBigInt, gocql.TypeTinyInt:
f.NativeType = value.IntType
}
u.Warnf("SET TYPE CASSANDRA Not handled very well?!")
u.Warnf("SET TYPE CASSANDRA Not handled very well?! %v", col.Type.Type())
case gocql.TypeMap:

switch col.Type.Type() {
Expand Down
2 changes: 1 addition & 1 deletion frontends/mysqlfe/testmysql/runtestserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ sources : [
type : cassandra
settings {
keyspace "datauxtest"
hosts ["localhost"]
hosts ["localhost:9042"]
}
}
Expand Down
1 change: 0 additions & 1 deletion updatepackages.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ cd $GOPATH/src/github.com/mb0/glob && git checkout master && git pull
cd $GOPATH/src/github.com/nats-io/nats && git checkout master && git pull
cd $GOPATH/src/github.com/pborman/uuid && git checkout master && git pull
cd $GOPATH/src/github.com/rcrowley/go-metrics && git checkout master && git pull
cd $GOPATH/src/github.com/relops/cqlc && git checkout master && git pull
cd $GOPATH/src/github.com/sony/sonyflake && git checkout master && git pull

# this one if updated breaks older versions of etcd
Expand Down

0 comments on commit da4855e

Please sign in to comment.