Skip to content

Commit

Permalink
Merge pull request #64 from dataux/qlbridge_schema_changes
Browse files Browse the repository at this point in the history
Qlbridge schema changes
  • Loading branch information
araddon authored Oct 29, 2017
2 parents 87b4982 + a328260 commit 0377a1f
Show file tree
Hide file tree
Showing 19 changed files with 246 additions and 218 deletions.
28 changes: 14 additions & 14 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 6 additions & 4 deletions backends/bigquery/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,21 @@ import (
"golang.org/x/net/context"
"google.golang.org/api/iterator"

"github.com/araddon/qlbridge/datasource"
"github.com/araddon/qlbridge/rel"
"github.com/araddon/qlbridge/schema"
"github.com/araddon/qlbridge/value"
)

const (
// DataSourceLabel is public sourcetype for bigquery.
DataSourceLabel = "bigquery"
)

var (
// ErrNoSchema is an error that no schema could be found.
ErrNoSchema = fmt.Errorf("No schema or configuration exists")

// SchemaRefreshInterval is time between checking for schema changes
SchemaRefreshInterval = time.Duration(time.Minute * 5)

// Ensure our Google BigQuery implements schema.Source interface
Expand All @@ -37,7 +39,7 @@ var (

func init() {
// We need to register our DataSource provider here
datasource.Register(DataSourceLabel, &Source{})
schema.RegisterSourceType(DataSourceLabel, &Source{})
}

// Source is a BigQuery datasource, this provides Reads, Insert, Update, Delete
Expand All @@ -53,7 +55,7 @@ type Source struct {
tables []string // Lower cased
tablemap map[string]*schema.Table
conf *schema.ConfigSource
schema *schema.SchemaSource
schema *schema.Schema
lastSchemaUpdate time.Time
mu sync.Mutex
closed bool
Expand All @@ -68,7 +70,7 @@ type Mutator struct {

func (m *Source) Init() {}

func (m *Source) Setup(ss *schema.SchemaSource) error {
func (m *Source) Setup(ss *schema.Schema) error {

m.mu.Lock()
defer m.mu.Unlock()
Expand Down
6 changes: 3 additions & 3 deletions backends/bigquery/sql_to_bq.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

var (
// Default page limit
// DefaultLimit ie page-size defaut
DefaultLimit = 5000

// Ensure we implment appropriate interfaces
Expand All @@ -49,7 +49,7 @@ type SqlToBQ struct {
original *rel.SqlSelect
whereIdents map[string]bool
stmt rel.SqlStatement
schema *schema.SchemaSource
schema *schema.Schema
s *Source
partition *schema.Partition // current partition for this request
needsPolyFill bool // polyfill?
Expand All @@ -62,7 +62,7 @@ type SqlToBQ struct {
func NewSqlToBQ(s *Source, t *schema.Table) *SqlToBQ {
m := &SqlToBQ{
tbl: t,
schema: t.SchemaSource,
schema: t.Schema,
s: s,
}
return m
Expand Down
7 changes: 3 additions & 4 deletions backends/bigtable/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"cloud.google.com/go/bigtable"
"golang.org/x/net/context"

"github.com/araddon/qlbridge/datasource"
"github.com/araddon/qlbridge/rel"
"github.com/araddon/qlbridge/schema"
"github.com/araddon/qlbridge/value"
Expand Down Expand Up @@ -41,7 +40,7 @@ var (

func init() {
// We need to register our DataSource provider here
datasource.Register(DataSourceLabel, &Source{})
schema.RegisterSourceType(DataSourceLabel, &Source{})
}

func getClient(project, instance string) (*bigtable.Client, error) {
Expand Down Expand Up @@ -73,7 +72,7 @@ type Source struct {
tables []string // Lower cased
tablemap map[string]*schema.Table
conf *schema.ConfigSource
schema *schema.SchemaSource
schema *schema.Schema
client *bigtable.Client
ac *bigtable.AdminClient
lastSchemaUpdate time.Time
Expand All @@ -90,7 +89,7 @@ type Mutator struct {

func (m *Source) Init() {}

func (m *Source) Setup(ss *schema.SchemaSource) error {
func (m *Source) Setup(ss *schema.Schema) error {

m.mu.Lock()
defer m.mu.Unlock()
Expand Down
4 changes: 2 additions & 2 deletions backends/bigtable/sql_to_bt.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type SqlToBT struct {
original *rel.SqlSelect
whereIdents map[string]bool
stmt rel.SqlStatement
schema *schema.SchemaSource
schema *schema.Schema
s *Source
partition *schema.Partition // current partition for this request
needsPolyFill bool // polyfill?
Expand All @@ -62,7 +62,7 @@ type SqlToBT struct {
func NewSqlToBT(s *Source, t *schema.Table) *SqlToBT {
m := &SqlToBT{
tbl: t,
schema: t.SchemaSource,
schema: t.Schema,
s: s,
}
return m
Expand Down
9 changes: 4 additions & 5 deletions backends/cassandra/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,13 @@ import (
"github.com/gocql/gocql"
"github.com/hailocab/go-hostpool"

"github.com/araddon/qlbridge/datasource"
"github.com/araddon/qlbridge/rel"
"github.com/araddon/qlbridge/schema"
"github.com/araddon/qlbridge/value"
)

const (
DataSourceLabel = "cassandra"
SourceType = "cassandra"
)

var (
Expand All @@ -32,7 +31,7 @@ var (

func init() {
// We need to register our DataSource provider here
datasource.Register(DataSourceLabel, &Source{})
schema.RegisterSourceType(SourceType, &Source{})
}

// Create a gocql session
Expand Down Expand Up @@ -92,7 +91,7 @@ type Source struct {
tables []string // Lower cased
tablemap map[string]*schema.Table
conf *schema.ConfigSource
schema *schema.SchemaSource
schema *schema.Schema
session *gocql.Session
lastSchemaUpdate time.Time
mu sync.Mutex
Expand All @@ -108,7 +107,7 @@ type Mutator struct {

func (m *Source) Init() {}

func (m *Source) Setup(ss *schema.SchemaSource) error {
func (m *Source) Setup(ss *schema.Schema) error {

m.mu.Lock()
defer m.mu.Unlock()
Expand Down
6 changes: 3 additions & 3 deletions backends/cassandra/sql_to_cql.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

var (
// Default page limit
// DefaultLimit is page limit
DefaultLimit = 5000

// Ensure we implment appropriate interfaces
Expand All @@ -46,7 +46,7 @@ type SqlToCql struct {
original *rel.SqlSelect
whereIdents map[string]bool
stmt rel.SqlStatement
schema *schema.SchemaSource
schema *schema.Schema
s *Source
q *gocql.Query
cf *gocql.TableMetadata
Expand All @@ -61,7 +61,7 @@ type SqlToCql struct {
func NewSqlToCql(s *Source, t *schema.Table) *SqlToCql {
m := &SqlToCql{
tbl: t,
schema: t.SchemaSource,
schema: t.Schema,
s: s,
}
return m
Expand Down
21 changes: 12 additions & 9 deletions backends/datastore/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ import (
"google.golang.org/api/iterator"
"google.golang.org/api/option"

"github.com/araddon/qlbridge/datasource"
"github.com/araddon/qlbridge/rel"
"github.com/araddon/qlbridge/schema"
"github.com/araddon/qlbridge/value"
)

const (
SourceLabel = "google-datastore"
// SourceType is the Type label for global source type registery.
SourceType = "google-datastore"
)

var (
Expand All @@ -40,11 +40,11 @@ var (

func init() {
// We need to register our Source into Datasource provider here
datasource.Register(SourceLabel, &Source{})
schema.RegisterSourceType(SourceType, &Source{})
}

// Source Google Datastore Data Source, is a singleton, non-threadsafe source
// to a create connections/clients to datastore
// to a create connections/clients to datastore
type Source struct {
db string
namespace string
Expand All @@ -58,20 +58,23 @@ type Source struct {
dsCtx context.Context
dsClient *datastore.Client
conf *schema.ConfigSource
schema *schema.SchemaSource
schema *schema.Schema
mu sync.Mutex
closed bool
}

// DatastoreMutator is a connection for mutation
type DatastoreMutator struct {
tbl *schema.Table
sql rel.SqlStatement
ds *Source
}

// Init init the source
func (m *Source) Init() {}

func (m *Source) Setup(ss *schema.SchemaSource) error {
// Setup the source.
func (m *Source) Setup(ss *schema.Schema) error {

if m.schema != nil {
return nil
Expand Down Expand Up @@ -185,7 +188,7 @@ func (m *Source) Open(tableName string) (schema.Conn, error) {
return nil, fmt.Errorf("Could not find '%v'.'%v' schema", m.schema.Name, tableName)
}

gdsSource := NewSqlToDatstore(tbl, m.dsClient, m.dsCtx)
gdsSource := NewSQLToDatstore(tbl, m.dsClient, m.dsCtx)
return gdsSource, nil
}

Expand All @@ -203,7 +206,7 @@ func (m *Source) selectQuery(stmt *rel.SqlSelect) (*ResultReader, error) {
return nil, fmt.Errorf("Could not find '%v'.'%v' schema", m.schema.Name, tblName)
}

sqlDs := NewSqlToDatstore(tbl, m.dsClient, m.dsCtx)
sqlDs := NewSQLToDatstore(tbl, m.dsClient, m.dsCtx)
//u.Debugf("SqlToDatstore: %#v", sqlDs)
resp, err := sqlDs.query(stmt)
if err != nil {
Expand All @@ -213,8 +216,8 @@ func (m *Source) selectQuery(stmt *rel.SqlSelect) (*ResultReader, error) {
return resp, nil
}

// Table get table schema.
func (m *Source) Table(table string) (*schema.Table, error) {
//u.Debugf("get table for %s", table)
return m.loadTableSchema(table, "")
}

Expand Down
Loading

0 comments on commit 0377a1f

Please sign in to comment.