Skip to content

Commit

Permalink
vendor updates, and fixes for lytics
Browse files Browse the repository at this point in the history
  • Loading branch information
Aaron Raddon committed Apr 28, 2018
1 parent afd58d7 commit aceece8
Show file tree
Hide file tree
Showing 16 changed files with 150 additions and 126 deletions.
23 changes: 12 additions & 11 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 0 additions & 4 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,6 @@
name = "google.golang.org/grpc"
version = "1.11.3"

[[constraint]]
branch = "v2"
name = "gopkg.in/mgo.v2"

[prune]
go-tests = true
unused-packages = true
3 changes: 2 additions & 1 deletion backends/lytics/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ docker run --rm -e "LOGGING=debug" -p 4000:4000 --name dataux gcr.io/dataux-io/d
mysql -h 127.0.0.1 -P4000


-- Get api key from Account in Lytics
-- Create api key from Api-Tokens in Lytics

CREATE source lytics WITH {
"type":"lytics",
Expand All @@ -55,6 +55,7 @@ select * from user WHERE EXISTS email limit 10;

TODO
------------------------
* SELECT single entities.
* handle ending paging/limits (ie, don't open cursors)
* paging segment scans efficiently
* Select INTO (mytable)
9 changes: 4 additions & 5 deletions backends/lytics/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package lytics
import (
"fmt"

u "github.com/araddon/gou"

"github.com/araddon/qlbridge/exec"
"github.com/araddon/qlbridge/expr"
"github.com/araddon/qlbridge/plan"
Expand Down Expand Up @@ -42,7 +40,6 @@ type Generator struct {
tbl *schema.Table
sel *rel.SqlSelect
schema *schema.Schema
ctx *plan.Context
partition *schema.Partition // current partition for this request
needsPolyFill bool // do we request that features be polyfilled?
apiKey string
Expand Down Expand Up @@ -70,15 +67,17 @@ func (m *Generator) WalkSourceSelect(planner plan.Planner, p *plan.Source) (plan
// WalkExecSource allow this to do its own Exec planning.
func (m *Generator) WalkExecSource(p *plan.Source) (exec.Task, error) {

if p.Context() == nil {
return nil, fmt.Errorf("Missing plan context")
}

if p.Stmt == nil {
return nil, fmt.Errorf("Plan did not include Sql Statement?")
}
if p.Stmt.Source == nil {
return nil, fmt.Errorf("Plan did not include Sql Select Statement?")
}
if m.p == nil {
u.Debugf("custom? %v", p.Custom)

m.p = p
if p.Custom.Bool("poly_fill") {
m.needsPolyFill = true
Expand Down
34 changes: 31 additions & 3 deletions backends/lytics/lytics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package lytics_test

import (
"database/sql"
"encoding/json"
"fmt"
"os"
"testing"

Expand All @@ -11,6 +13,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/araddon/qlbridge/plan"
"github.com/araddon/qlbridge/schema"
"github.com/dataux/dataux/frontends/mysqlfe/testmysql"
"github.com/dataux/dataux/planner"
"github.com/dataux/dataux/testutil"
Expand All @@ -37,6 +40,31 @@ func RunTestServer(t *testing.T) func() {
planner.GridConf.JobMaker = jobMaker
planner.GridConf.SchemaLoader = testmysql.SchemaLoader
planner.GridConf.SupressRecover = testmysql.Conf.SupressRecover

reg := schema.DefaultRegistry()

by := []byte(fmt.Sprintf(`{
"name": "lytics_test",
"schema":"lyticsx",
"type": "lytics",
"settings" :{
"apikey" : "%s"
}
}`, os.Getenv("LIOTESTKEY")))

sourceConf := &schema.ConfigSource{}
err := json.Unmarshal(by, sourceConf)
assert.Equal(t, nil, err)
err = reg.SchemaAddFromConfig(sourceConf)
assert.Equal(t, nil, err)

s, ok := reg.Schema("lyticsx")
assert.Equal(t, true, ok)
assert.NotEqual(t, nil, s)

// Setup test schema
testmysql.Schema = s

testmysql.RunTestServer(t)
}
return func() {}
Expand All @@ -61,14 +89,14 @@ func validateQuery(t *testing.T, querySql string, expectCols []string, expectCol

func validateQuerySpec(t *testing.T, testSpec QuerySpec) {

if os.Getenv("LIOKEY") == "" {
t.Skip("No LIOKEY to run tests")
if os.Getenv("LIOTESTKEY") == "" {
t.Skip("No LIOTESTKEY to run tests")
return
}
RunTestServer(t)

// This is a connection to RunTestServer, which starts on port 13307
dbx, err := sqlx.Connect("mysql", "root@tcp(127.0.0.1:13307)/datauxtest")
dbx, err := sqlx.Connect("mysql", "root@tcp(127.0.0.1:13307)/lyticsx")
assert.True(t, err == nil, "%v", err)
defer dbx.Close()
//u.Debugf("%v", testSpec.Sql)
Expand Down
8 changes: 1 addition & 7 deletions backends/lytics/resultreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type ResultReaderNext struct {

func NewResultReader(req *Generator) *ResultReader {
m := &ResultReader{}
m.TaskBase = exec.NewTaskBase(req.ctx)
m.TaskBase = exec.NewTaskBase(req.p.Context())
m.Req = req
return m
}
Expand All @@ -63,23 +63,18 @@ func (m *ResultReader) Run() error {

defer func() {
close(outCh) // closing output channels is the signal to stop
u.Debugf("nice, finalize ResultReader out: %p row ct %v", outCh, len(m.Vals))
}()

// create the scanner
scan := client.PageAdHocSegment(m.Req.ql.String())

rowCt := 0

// handle processing the entities
for {
e := scan.Next()
if e == nil {
break
}

//u.Debugf("%v\n\n", e.PrettyJson())

row := make([]driver.Value, len(colNames))
eh := u.JsonHelper(e)
for i, col := range cols {
Expand Down Expand Up @@ -112,7 +107,6 @@ func (m *ResultReader) Run() error {
u.Warnf("unhandled %s", col.Type)
row[i] = eh.PrettyJson()
}

//u.Debugf("%q %T %v", col.As, row[i], row[i])
}

Expand Down
25 changes: 10 additions & 15 deletions backends/lytics/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

var (
// implement interfaces
// ensure we implement schema.Source interfaces
_ schema.Source = (*Source)(nil)
)

Expand All @@ -27,9 +27,8 @@ func init() {
schema.RegisterSourceType(SourceType, &Source{})
}

// Source is the Lytics data source provider
// responsible for schema management, and connection
// management to Lytics
// Source is the Lytics data source provider responsible for
// schema management, and connection management to Lytics API's.
type Source struct {
schema *schema.Schema
conf *schema.ConfigSource
Expand All @@ -51,7 +50,7 @@ func (m *Source) Setup(ss *schema.Schema) error {
m.conf = ss.Conf
m.tablemap = make(map[string]*schema.Table)

u.Infof("%p Conf: %+v settings:%v", ss, ss.Conf, ss.Conf.Settings)
u.Debugf("%p Conf: %+v settings:%v", ss, ss.Conf, ss.Conf.Settings)
if ss.Conf != nil && len(ss.Conf.Settings) > 0 {
m.apiKey = ss.Conf.Settings.String("apikey")
}
Expand All @@ -63,20 +62,15 @@ func (m *Source) Setup(ss *schema.Schema) error {
}

m.client = lytics.NewLytics(m.apiKey, "", nil)
u.Debugf("Init() Lytics schema P=%p", m.schema)

if err := m.loadSchema(); err != nil {
u.Errorf("could not load es tables: %v", err)
return err
}
if m.schema != nil {
u.Debugf("Post Init() Lytics schema P=%p tblct=%d", m.schema, len(m.schema.Tables()))
}
return nil
}

func (m *Source) Open(schemaName string) (schema.Conn, error) {
u.Debugf("Open(%v)", schemaName)
//u.Debugf("Open(%v)", schemaName)
tbl, err := m.schema.Table(schemaName)
if err != nil {
return nil, err
Expand All @@ -91,7 +85,9 @@ func (m *Source) Open(schemaName string) (schema.Conn, error) {
}

// Close this source
func (m *Source) Close() error { return nil }
func (m *Source) Close() error {
return nil
}

//func (m *Source) DataSource() schema.Source { return m }

Expand Down Expand Up @@ -128,7 +124,7 @@ func (m *Source) loadSchema() error {
return err
}
}
u.Debugf("found tables: %v", m.tables)
//u.Debugf("found tables: %v", m.tables)
return nil
}

Expand All @@ -138,8 +134,6 @@ func (m *Source) loadTableSchema(s *lytics.Schema) error {

for _, col := range s.Columns {

u.Infof("%#v", col)

var fld *schema.Field
switch col.Type {
case "boolean", "bool":
Expand All @@ -158,6 +152,7 @@ func (m *Source) loadTableSchema(s *lytics.Schema) error {
"map[string]time", "map[string]string", "map[string]bool", "membership":
fld = schema.NewFieldBase(col.As, value.JsonType, 2000, col.ShortDesc)
case "[]timebucket", "dynamic":
// These types are not supported
continue
default:
u.Warnf("Unahndled type %v type=%v", col.As, col.Type)
Expand Down
Loading

0 comments on commit aceece8

Please sign in to comment.