Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cassandra data source part 1 #21

Merged
merged 12 commits into from
Aug 14, 2016
Prev Previous commit
Next Next commit
Cassandra data source wip
  • Loading branch information
araddon committed Jul 10, 2016
commit da4855e4ad60155cd711e5cc41ca1f7d3f62e767
44 changes: 27 additions & 17 deletions backends/cassandra/cass_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,17 @@ func jobMaker(ctx *plan.Context) (*planner.ExecutorGrid, error) {
func RunTestServer(t *testing.T) func() {
if !testServicesRunning {
testServicesRunning = true

loadTestData(t)

planner.GridConf.JobMaker = jobMaker
planner.GridConf.SchemaLoader = testmysql.SchemaLoader
planner.GridConf.SupressRecover = testmysql.Conf.SupressRecover
testmysql.RunTestServer(t)
quit := make(chan bool)
planner.RunWorkerNodes(quit, 2, testmysql.ServerCtx.Reg)

loadTestData(t)
}
return func() {
// placeholder
}
return func() {}
}

func validateQuerySpec(t *testing.T, testSpec tu.QuerySpec) {
Expand All @@ -102,16 +101,27 @@ func validateQuerySpec(t *testing.T, testSpec tu.QuerySpec) {
func loadTestData(t *testing.T) {
loadTestDataOnce.Do(func() {
u.Debugf("loading cassandra test data")
preKeyspace := gocql.NewCluster(*cassHost)
// no keyspace
s1, err := preKeyspace.CreateSession()
assert.Tf(t, err == nil, "Must create cassandra session got err=%v", err)
cqlKeyspace := fmt.Sprintf(`
CREATE KEYSPACE IF NOT EXISTS %s WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };`, cassKeyspace)
err = s1.Query(cqlKeyspace).Exec()
assert.Tf(t, err == nil, "Must create cassandra keyspace got err=%v", err)

cluster := gocql.NewCluster(*cassHost)
cluster.Keyspace = cassKeyspace
// Error querying table schema: Undefined name key_aliases in selection clause
cluster.ProtoVersion = 4
cluster.CQLVersion = "3.1.0"
// cluster.Timeout = time.Second * 10
// cluster.NumConns = 10
// cluster.SocketKeepalive = time.Duration(5 * time.Minute)
// cluster.RetryPolicy = &gocql.SimpleRetryPolicy{NumRetries: 5}
sess, err := cluster.CreateSession()
assert.Tf(t, err == nil, "Must create cassandra session")
session = sess
cqlKeyspace := fmt.Sprintf(`
CREATE KEYSPACE IF NOT EXISTS %s WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };`, cassKeyspace)
err = sess.Query(cqlKeyspace).Exec()
time.Sleep(time.Millisecond * 30)
assert.T(t, err == nil, "must create keyspace", err)
assert.Tf(t, err == nil, "Must create cassandra session got err=%v", err)

for _, table := range testTables {
err = sess.Query(table).Consistency(gocql.All).Exec()
time.Sleep(time.Millisecond * 30)
Expand Down Expand Up @@ -246,22 +256,22 @@ func TestDescribeTable(t *testing.T) {
assert.Tf(t, data.Field != "", "%v", data)
switch data.Field {
case "embedded":
assert.Tf(t, data.Type == "binary", "%#v", data)
assert.Tf(t, data.Type == "binary" || data.Type == "text", "%#v", data)
describedCt++
case "author":
assert.Tf(t, data.Type == "string", "data: %#v", data)
assert.Tf(t, data.Type == "varchar(255)", "data: %#v", data)
describedCt++
case "created":
assert.Tf(t, data.Type == "datetime", "data: %#v", data)
describedCt++
case "category":
assert.Tf(t, data.Type == "binary", "data: %#v", data)
assert.Tf(t, data.Type == "text", "data: %#v", data)
describedCt++
case "body":
assert.Tf(t, data.Type == "binary", "data: %#v", data)
assert.Tf(t, data.Type == "text", "data: %#v", data)
describedCt++
case "deleted":
assert.Tf(t, data.Type == "bool", "data: %#v", data)
assert.Tf(t, data.Type == "bool" || data.Type == "tinyint", "data: %#v", data)
describedCt++
}
},
Expand Down
27 changes: 23 additions & 4 deletions backends/cassandra/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

u "github.com/araddon/gou"
"github.com/gocql/gocql"
"github.com/hailocab/go-hostpool"

"github.com/araddon/qlbridge/datasource"
"github.com/araddon/qlbridge/rel"
Expand Down Expand Up @@ -42,18 +43,31 @@ func createCassSession(conf *schema.ConfigSource, keyspace string) (*gocql.Sessi
}
cluster := gocql.NewCluster(servers...)

// Error querying table schema: Undefined name key_aliases in selection clause
// if on cass 2.1 use 3.0.0 for cqlversion
cluster.ProtoVersion = 4
cluster.CQLVersion = "3.1.0"

cluster.Keyspace = keyspace
cluster.NumConns = 10

if numconns := conf.Settings.Int("numconns"); numconns > 0 {
cluster.NumConns = numconns
}

cluster.Timeout = time.Second * 10
cluster.PoolConfig.HostSelectionPolicy = gocql.HostPoolHostPolicy(
hostpool.NewEpsilonGreedy(nil, 0, &hostpool.LinearEpsilonValueCalculator{}),
)

// load-balancers often kill idel conns so lets heart beat to keep alive
// see https://cloud.google.com/compute/docs/troubleshooting#communicatewithinternet
cluster.SocketKeepalive = time.Duration(5 * time.Minute)

if retries := conf.Settings.Int("retries"); retries > 0 {
cluster.RetryPolicy = &gocql.SimpleRetryPolicy{NumRetries: retries}
} else {
cluster.RetryPolicy = &gocql.SimpleRetryPolicy{NumRetries: 3}
}

return cluster.CreateSession()
Expand Down Expand Up @@ -96,7 +110,7 @@ func (m *Source) Setup(ss *schema.SchemaSource) error {
m.conf = ss.Conf
m.db = strings.ToLower(ss.Name)

u.Infof("Init: %#v", m.schema.Conf)
//u.Infof("Init: %#v", m.schema.Conf)
if m.schema.Conf == nil {
return fmt.Errorf("Schema conf not found")
}
Expand All @@ -109,6 +123,7 @@ func (m *Source) Setup(ss *schema.SchemaSource) error {

sess, err := createCassSession(m.conf, m.keyspace)
if err != nil {
u.Errorf("Could not create cass conn %v", err)
return err
}
m.session = sess
Expand All @@ -120,6 +135,7 @@ func (m *Source) loadSchema() error {

kmd, err := m.session.KeyspaceMetadata(m.keyspace)
if err != nil {
u.Warnf("ks:%v change protocol version if 2.1 or earlier %v", m.keyspace, err)
return err
}
m.kmd = kmd
Expand All @@ -146,12 +162,15 @@ func (m *Source) loadSchema() error {
colName := strings.ToLower(col.Name)
colNames = append(colNames, colName)

u.Debugf("%s cass col %v ", colName, col.Type.Type())
var f *schema.Field
switch col.Type.Type() {
case gocql.TypeBlob:
f = schema.NewFieldBase(colName, value.StringType, 24, "blob")
case gocql.TypeVarchar, gocql.TypeText:
f = schema.NewFieldBase(colName, value.ByteSliceType, 2000, "blob")
case gocql.TypeVarchar:
f = schema.NewFieldBase(colName, value.StringType, 256, "string")
case gocql.TypeText:
f = schema.NewFieldBase(colName, value.StringType, 2000, "string")
case gocql.TypeInt, gocql.TypeTinyInt:
f = schema.NewFieldBase(colName, value.IntType, 32, "int")
case gocql.TypeBigInt:
Expand All @@ -171,7 +190,7 @@ func (m *Source) loadSchema() error {
case gocql.TypeInt, gocql.TypeBigInt, gocql.TypeTinyInt:
f.NativeType = value.IntType
}
u.Warnf("SET TYPE CASSANDRA Not handled very well?!")
u.Warnf("SET TYPE CASSANDRA Not handled very well?! %v", col.Type.Type())
case gocql.TypeMap:

switch col.Type.Type() {
Expand Down
2 changes: 1 addition & 1 deletion frontends/mysqlfe/testmysql/runtestserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ sources : [
type : cassandra
settings {
keyspace "datauxtest"
hosts ["localhost"]
hosts ["localhost:9042"]
}
}

Expand Down
1 change: 0 additions & 1 deletion updatepackages.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ cd $GOPATH/src/github.com/mb0/glob && git checkout master && git pull
cd $GOPATH/src/github.com/nats-io/nats && git checkout master && git pull
cd $GOPATH/src/github.com/pborman/uuid && git checkout master && git pull
cd $GOPATH/src/github.com/rcrowley/go-metrics && git checkout master && git pull
cd $GOPATH/src/github.com/relops/cqlc && git checkout master && git pull
cd $GOPATH/src/github.com/sony/sonyflake && git checkout master && git pull

# this one if updated breaks older versions of etcd
Expand Down