Skip to content

Commit

Permalink
refactor job builder
Browse files Browse the repository at this point in the history
  • Loading branch information
Aaron Raddon committed Jan 19, 2016
1 parent 95bac72 commit ead7716
Show file tree
Hide file tree
Showing 59 changed files with 220 additions and 282 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package datastore

import (
"database/sql/driver"
"flag"
"fmt"
"io/ioutil"
Expand All @@ -12,17 +11,18 @@ import (
"time"

u "github.com/araddon/gou"

"golang.org/x/net/context"
"golang.org/x/oauth2/google"
"golang.org/x/oauth2/jwt"
"google.golang.org/cloud"
"google.golang.org/cloud/datastore"

"github.com/araddon/qlbridge/datasource"
"github.com/araddon/qlbridge/expr"
"github.com/araddon/qlbridge/rel"
"github.com/araddon/qlbridge/schema"
"github.com/araddon/qlbridge/value"
"github.com/dataux/dataux/pkg/models"

"github.com/dataux/dataux/models"
)

const (
Expand All @@ -36,7 +36,7 @@ var (
ErrNoSchema = fmt.Errorf("No schema or configuration exists")

// Ensure our Google DataStore implements datasource.DataSource interface
_ datasource.DataSource = (*GoogleDSDataSource)(nil)
_ schema.DataSource = (*GoogleDSDataSource)(nil)
//_ datasource.SourceMutation = (*GoogleDSDataSource)(nil)
//_ datasource.Deletion = (*GoogleDSDataSource)(nil)
//_ datasource.Scanner = (*ResultReader)(nil)
Expand All @@ -62,24 +62,24 @@ type GoogleDSDataSource struct {
dsCtx context.Context
dsClient *datastore.Client
conf *models.Config
schema *datasource.SourceSchema
schema *schema.SourceSchema
mu sync.Mutex
closed bool
}

type DatastoreMutator struct {
tbl *datasource.Table
sql expr.SqlStatement
tbl *schema.Table
sql rel.SqlStatement
ds *GoogleDSDataSource
}

func NewGoogleDataStoreDataSource(schema *datasource.SourceSchema, conf *models.Config) models.DataSource {
func NewGoogleDataStoreDataSource(s *schema.SourceSchema, conf *models.Config) models.DataSource {
m := GoogleDSDataSource{}
m.schema = schema
m.schema = s
m.conf = conf
m.cloudProjectId = *GoogleProject

m.db = strings.ToLower(schema.Name)
m.db = strings.ToLower(s.Name)
// Register our datasource.Datasources in registry
m.Init()
u.Infof("datasource.Register: %v", DataSourceLabel)
Expand Down Expand Up @@ -162,14 +162,14 @@ func (m *GoogleDSDataSource) connect() error {
return nil
}

func (m *GoogleDSDataSource) DataSource() datasource.DataSource {
func (m *GoogleDSDataSource) DataSource() schema.DataSource {
return m
}
func (m *GoogleDSDataSource) Tables() []string {
return m.schema.Tables()
}

func (m *GoogleDSDataSource) Open(tableName string) (datasource.SourceConn, error) {
func (m *GoogleDSDataSource) Open(tableName string) (schema.SourceConn, error) {
u.Debugf("Open(%v)", tableName)
if m.schema == nil {
u.Warnf("no schema?")
Expand All @@ -189,7 +189,7 @@ func (m *GoogleDSDataSource) Open(tableName string) (datasource.SourceConn, erro
return gdsSource, nil
}

func (m *GoogleDSDataSource) selectQuery(stmt *expr.SqlSelect) (*ResultReader, error) {
func (m *GoogleDSDataSource) selectQuery(stmt *rel.SqlSelect) (*ResultReader, error) {

//u.Debugf("get sourceTask for %v", stmt)
tblName := strings.ToLower(stmt.From[0].Name)
Expand All @@ -213,7 +213,7 @@ func (m *GoogleDSDataSource) selectQuery(stmt *expr.SqlSelect) (*ResultReader, e
return resp, nil
}

func (m *GoogleDSDataSource) Table(table string) (*datasource.Table, error) {
func (m *GoogleDSDataSource) Table(table string) (*schema.Table, error) {
//u.Debugf("get table for %s", table)
return m.loadTableSchema(table, "")
}
Expand Down Expand Up @@ -258,7 +258,7 @@ func (m *GoogleDSDataSource) loadTableNames() error {
return nil
}

func (m *GoogleDSDataSource) loadTableSchema(tableLower, tableOriginal string) (*datasource.Table, error) {
func (m *GoogleDSDataSource) loadTableSchema(tableLower, tableOriginal string) (*schema.Table, error) {

if tableOriginal == "" {
name, ok := m.tablesOriginal[tableLower]
Expand Down Expand Up @@ -287,7 +287,7 @@ func (m *GoogleDSDataSource) loadTableSchema(tableLower, tableOriginal string) (
- Need to recurse through enough records to get good idea of types
*/
u.Debugf("loadTableSchema lower:%q original:%q", tableLower, tableOriginal)
tbl = datasource.NewTable(tableOriginal, m.schema)
tbl = schema.NewTable(tableOriginal, m.schema)
colNames := make([]string, 0)

// We are going to scan this table, introspecting a few rows
Expand All @@ -306,39 +306,39 @@ func (m *GoogleDSDataSource) loadTableSchema(tableLower, tableOriginal string) (
switch val := p.Value.(type) {
case *datastore.Key:
//u.Debugf("found datastore.Key: %v='%#v'", colName, val)
tbl.AddField(datasource.NewField(p.Name, value.StringType, 24, "Key"))
tbl.AddValues([]driver.Value{p.Name, "string", "NO", "PRI", "Key", ""})
tbl.AddField(schema.NewFieldBase(p.Name, value.StringType, 24, "Key"))
//tbl.AddValues([]driver.Value{p.Name, "string", "NO", "PRI", "Key", ""})
case string:
//u.Debugf("found property.Value string: %v='%#v'", colName, val)
tbl.AddField(datasource.NewField(colName, value.StringType, 32, "string"))
tbl.AddValues([]driver.Value{colName, "string", "NO", "", "", "string"})
tbl.AddField(schema.NewFieldBase(colName, value.StringType, 32, "string"))
//tbl.AddValues([]driver.Value{colName, "string", "NO", "", "", "string"})
case int:
//u.Debugf("found int: %v='%v'", colName, val)
tbl.AddField(datasource.NewField(colName, value.IntType, 32, "int"))
tbl.AddValues([]driver.Value{colName, "int", "NO", "", "", "int"})
tbl.AddField(schema.NewFieldBase(colName, value.IntType, 32, "int"))
//tbl.AddValues([]driver.Value{colName, "int", "NO", "", "", "int"})
case int64:
//u.Debugf("found int64: %v='%v'", colName, val)
tbl.AddField(datasource.NewField(colName, value.IntType, 32, "long"))
tbl.AddValues([]driver.Value{colName, "long", "NO", "", "", "long"})
tbl.AddField(schema.NewFieldBase(colName, value.IntType, 32, "long"))
//tbl.AddValues([]driver.Value{colName, "long", "NO", "", "", "long"})
case float64:
//u.Debugf("found float64: %v='%v'", colName, val)
tbl.AddField(datasource.NewField(colName, value.NumberType, 32, "float64"))
tbl.AddValues([]driver.Value{colName, "float64", "NO", "", "", "float64"})
tbl.AddField(schema.NewFieldBase(colName, value.NumberType, 32, "float64"))
//tbl.AddValues([]driver.Value{colName, "float64", "NO", "", "", "float64"})
case bool:
//u.Debugf("found string: %v='%v'", colName, val)
tbl.AddField(datasource.NewField(colName, value.BoolType, 1, "bool"))
tbl.AddValues([]driver.Value{colName, "bool", "NO", "", "", "bool"})
tbl.AddField(schema.NewFieldBase(colName, value.BoolType, 1, "bool"))
//tbl.AddValues([]driver.Value{colName, "bool", "NO", "", "", "bool"})
case time.Time:
//u.Debugf("found time.Time: %v='%v'", colName, val)
tbl.AddField(datasource.NewField(colName, value.TimeType, 32, "datetime"))
tbl.AddValues([]driver.Value{colName, "datetime", "NO", "", "", "datetime"})
tbl.AddField(schema.NewFieldBase(colName, value.TimeType, 32, "datetime"))
//tbl.AddValues([]driver.Value{colName, "datetime", "NO", "", "", "datetime"})
// case *time.Time: // datastore doesn't allow pointers
// //u.Debugf("found time.Time: %v='%v'", colName, val)
// tbl.AddField(datasource.NewField(colName, value.TimeType, 32, "datetime"))
// tbl.AddField(schema.NewFieldBase(colName, value.TimeType, 32, "datetime"))
// tbl.AddValues([]driver.Value{colName, "datetime", "NO", "", "", "datetime"})
case []uint8:
tbl.AddField(datasource.NewField(colName, value.ByteSliceType, 256, "[]byte"))
tbl.AddValues([]driver.Value{colName, "binary", "NO", "", "", "[]byte"})
tbl.AddField(schema.NewFieldBase(colName, value.ByteSliceType, 256, "[]byte"))
//tbl.AddValues([]driver.Value{colName, "binary", "NO", "", "", "[]byte"})
default:
u.Warnf("gds unknown type %T %#v", val, p)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,20 @@ import (
"testing"
"time"

u "github.com/araddon/gou"
"github.com/bmizerany/assert"
_ "github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
"golang.org/x/net/context"
"golang.org/x/oauth2/google"
"google.golang.org/cloud"
"google.golang.org/cloud/datastore"

u "github.com/araddon/gou"
"github.com/araddon/qlbridge/datasource"
"github.com/bmizerany/assert"
_ "github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"

gds "github.com/dataux/dataux/pkg/backends/datastore"
"github.com/dataux/dataux/pkg/frontends/testmysql"
tu "github.com/dataux/dataux/pkg/testutil"
gds "github.com/dataux/dataux/backends/datastore"
"github.com/dataux/dataux/frontends/mysqlfe/testmysql"
tu "github.com/dataux/dataux/testutil"
)

var (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import (
"database/sql/driver"
"time"

u "github.com/araddon/gou"
"google.golang.org/cloud/datastore"

u "github.com/araddon/gou"
"github.com/araddon/qlbridge/datasource"
"github.com/araddon/qlbridge/exec"
"github.com/araddon/qlbridge/expr"
"github.com/araddon/qlbridge/rel"
"github.com/araddon/qlbridge/value"
)

Expand All @@ -26,7 +26,7 @@ type ResultReader struct {
finalized bool
hasprojection bool
cursor int
proj *expr.Projection
proj *rel.Projection
cols []string
Vals [][]driver.Value
Total int
Expand All @@ -41,7 +41,7 @@ type ResultReaderNext struct {

func NewResultReader(req *SqlToDatstore) *ResultReader {
m := &ResultReader{}
m.TaskBase = exec.NewTaskBase("gds-resultreader")
m.TaskBase = exec.NewTaskBase(req.ctx, "gds-resultreader")
m.Req = req
return m
}
Expand All @@ -54,22 +54,22 @@ func (m *ResultReader) buildProjection() {
return
}
m.hasprojection = true
m.proj = expr.NewProjection()
m.proj = rel.NewProjection()
cols := m.proj.Columns
sql := m.Req.sel
if sql.Star {
// Select Each field, grab fields from Table Schema
for _, fld := range m.Req.tbl.Fields {
cols = append(cols, expr.NewResultColumn(fld.Name, len(cols), nil, fld.Type))
cols = append(cols, rel.NewResultColumn(fld.Name, len(cols), nil, fld.Type))
}
} else if sql.CountStar() {
// Count *
cols = append(cols, expr.NewResultColumn("count", len(cols), nil, value.IntType))
cols = append(cols, rel.NewResultColumn("count", len(cols), nil, value.IntType))
} else {
for _, col := range m.Req.sel.Columns {
if fld, ok := m.Req.tbl.FieldMap[col.SourceField]; ok {
//u.Debugf("column: %#v", col)
cols = append(cols, expr.NewResultColumn(col.SourceField, len(cols), col, fld.Type))
cols = append(cols, rel.NewResultColumn(col.SourceField, len(cols), col, fld.Type))
} else {
u.Debugf("Could not find: '%v' in %#v", col.SourceField, m.Req.tbl.FieldMap)
u.Warnf("%#v", col)
Expand All @@ -90,7 +90,7 @@ func (m *ResultReader) buildProjection() {
// as well as making a projection, ie column selection
//
// func (m *ResultReader) Finalize() error {
func (m *ResultReader) Run(context *expr.Context) error {
func (m *ResultReader) Run() error {

sigChan := m.SigChan()
outCh := m.MessageOut()
Expand Down
Loading

0 comments on commit ead7716

Please sign in to comment.