Skip to content

Commit

Permalink
File-Scanner work for custom file parsers
Browse files Browse the repository at this point in the history
  • Loading branch information
Aaron Raddon committed Mar 20, 2016
1 parent a75c73f commit 7c5e87f
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 56 deletions.
27 changes: 20 additions & 7 deletions backends/files/filesource.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,13 @@ func (m *FileSource) Setup(ss *schema.SourceSchema) error {

func (m *FileSource) Open(tableName string) (schema.SourceConn, error) {

u.Warnf("files open %q", tableName)
//u.Debugf("files open %q", tableName)
return m.loadScanner(tableName)
}
func (m *FileSource) Close() error { return nil }
func (m *FileSource) Close() error {
u.Warnf("in filesource close")
return nil
}
func (m *FileSource) Tables() []string { return m.tablenames }
func (m *FileSource) init() error {
if m.store == nil {
Expand Down Expand Up @@ -190,6 +193,7 @@ func (m *FileSource) Table(tableName string) (*schema.Table, error) {

u.Debugf("Table(%q)", tableName)
if t, ok := m.tables[tableName]; ok {
u.Warnf("%p table? %#v", t)
return t, nil
}

Expand All @@ -203,9 +207,9 @@ func (m *FileSource) Table(tableName string) (*schema.Table, error) {

if schemaSource, hasSchema := scanner.(schema.SchemaProvider); hasSchema {

t, err := schemaSource.Table(tableName)
t, err = schemaSource.Table(tableName)
if err != nil {
u.Errorf("could not get table %v", err)
u.Errorf("could not get %T P:%p table %q %v", schemaSource, schemaSource, tableName, err)
return nil, err
}

Expand All @@ -222,14 +226,22 @@ func (m *FileSource) Table(tableName string) (*schema.Table, error) {
return nil, err
}
}

if t == nil {
u.Warnf("wtf? nil")
}
m.tables[tableName] = t
return t, nil
}

func (m *FileSource) loadScanner(tableName string) (schema.Scanner, error) {

u.Debugf("loadScanner(%q)", tableName)
// t, ok := m.tables[tableName]
// if !ok {
// u.Warnf("%p no table? %v", m, tableName)
// //return t, nil
// }

//u.Debugf("loadScanner(%q) %#v", tableName, t)

// Read the object from cloud storage
files := m.files[tableName]
Expand All @@ -251,10 +263,11 @@ func (m *FileSource) loadScanner(tableName string) (schema.Scanner, error) {
u.Errorf("could not read %q table %v", tableName, err)
return nil, err
}
u.Infof("found file: %s %p", obj.Name(), f)
//u.Infof("found file: %s %p", obj.Name(), f)

fi := &FileInfo{
F: f,
Name: obj.Name(),
Exit: make(chan bool),
Table: tableName,
}
Expand Down
21 changes: 4 additions & 17 deletions backends/files/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
var (
// the global file-scanners registry mutex
registryMu sync.Mutex
registry = newScannerRegistry()
scanners = make(map[string]FileHandler)

_ FileHandler = (*csvFiles)(nil)
)
Expand All @@ -38,30 +38,17 @@ func RegisterFileScanner(scannerType string, fh FileHandler) {
u.Debugf("global FileHandler register: %v %T FileHandler:%p", scannerType, fh, fh)
registryMu.Lock()
defer registryMu.Unlock()
if _, dupe := registry.scanners[scannerType]; dupe {
if _, dupe := scanners[scannerType]; dupe {
panic("Register called twice for FileHandler type " + scannerType)
}
registry.scanners[scannerType] = fh
}

// Our internal map of different types of datasources that are registered
// for our runtime system to use
type scannerRegistry struct {
// Map of scanner name, to maker
scanners map[string]FileHandler
}

func newScannerRegistry() *scannerRegistry {
return &scannerRegistry{
scanners: make(map[string]FileHandler),
}
scanners[scannerType] = fh
}

func scannerGet(scannerType string) (FileHandler, bool) {
registryMu.Lock()
defer registryMu.Unlock()
scannerType = strings.ToLower(scannerType)
scanner, ok := registry.scanners[scannerType]
scanner, ok := scanners[scannerType]
return scanner, ok
}

Expand Down
28 changes: 0 additions & 28 deletions backends/mongo/mgo_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,6 @@ func (m *MongoDataSource) loadTableNames() error {

func (m *MongoDataSource) loadTableSchema(table string) (*schema.Table, error) {

//u.WarnT(7)
if m.srcschema == nil {
return nil, fmt.Errorf("no schema in use")
}
Expand Down Expand Up @@ -277,50 +276,28 @@ func (m *MongoDataSource) loadTableSchema(table string) (*schema.Table, error) {
//NewField(name string, valType value.ValueType, size int, nulls bool, defaultVal driver.Value, key, collation, description string)
switch val := iVal.(type) {
case bson.ObjectId:
//u.Debugf("found bson.ObjectId: %v='%v'", colName, val)
tbl.AddField(schema.NewField(colName, value.StringType, 16, schema.NoNulls, nil, "PRI", "", "bson.ObjectID AUTOGEN"))
//tbl.DescribeColumn([]driver.Value{colName, "char(24)", "NO", "PRI", "AUTOGEN", ""})
case bson.M:
//u.Debugf("found bson.M: %v='%v'", colName, val)
tbl.AddField(schema.NewFieldBase(colName, value.MapValueType, 24, "bson.M"))
//tbl.DescribeColumn([]driver.Value{colName, "text", "NO", "", "", "Nested Map Type, json object"})
case map[string]interface{}:
//u.Debugf("found map[string]interface{}: %v='%v'", colName, val)
tbl.AddField(schema.NewFieldBase(colName, value.MapValueType, 24, "map[string]interface{}"))
//tbl.DescribeColumn([]driver.Value{colName, "text", "NO", "", "", "Nested Map Type, json object"})
case int:
//u.Debugf("found int: %v='%v'", colName, val)
tbl.AddField(schema.NewFieldBase(colName, value.IntType, 32, "int"))
//tbl.DescribeColumn([]driver.Value{colName, "int(8)", "NO", "", "", "int"})
case int64:
//u.Debugf("found int64: %v='%v'", colName, val)
tbl.AddField(schema.NewFieldBase(colName, value.IntType, 64, "long"))
//tbl.DescribeColumn([]driver.Value{colName, "bigint", "NO", "", "", "long"})
case float64:
//u.Debugf("found float64: %v='%v'", colName, val)
tbl.AddField(schema.NewFieldBase(colName, value.NumberType, 32, "float64"))
//tbl.DescribeColumn([]driver.Value{colName, "float", "NO", "", "", "float64"})
case string:
//u.Debugf("found string: %v='%v'", colName, val)
tbl.AddField(schema.NewFieldBase(colName, value.StringType, 255, "string"))
//tbl.DescribeColumn([]driver.Value{colName, "varchar(255)", "NO", "", "", "string"})
case bool:
//u.Debugf("found string: %v='%v'", colName, val)
tbl.AddField(schema.NewFieldBase(colName, value.BoolType, 1, "bool"))
//tbl.DescribeColumn([]driver.Value{colName, "bool", "NO", "", "", "bool"})
case time.Time:
//u.Debugf("found time.Time: %v='%v'", colName, val)
tbl.AddField(schema.NewFieldBase(colName, value.TimeType, 32, "datetime"))
//tbl.DescribeColumn([]driver.Value{colName, "datetime", "NO", "", "", "datetime"})
case *time.Time:
//u.Debugf("found time.Time: %v='%v'", colName, val)
tbl.AddField(schema.NewFieldBase(colName, value.TimeType, 32, "datetime"))
//tbl.DescribeColumn([]driver.Value{colName, "datetime", "NO", "", "", "datetime"})
case []uint8:
// This is most likely binary data, json.RawMessage, or []bytes
//u.Debugf("found []uint8: %v='%v'", colName, val)
tbl.AddField(schema.NewFieldBase(colName, value.ByteSliceType, 1000, "[]byte"))
//tbl.DescribeColumn([]driver.Value{colName, "binary", "NO", "", "", "Binary data: []byte"})
case []string:
u.Warnf("NOT IMPLEMENTED: found []string %v='%v'", colName, val)
case []interface{}:
Expand All @@ -334,13 +311,9 @@ func (m *MongoDataSource) loadTableSchema(table string) (*schema.Table, error) {
switch typ {
case value.StringType:
tbl.AddField(schema.NewFieldBase(colName, value.StringsType, 1000, "[]string"))
//tbl.AddValues([]driver.Value{colName, "[]string", "NO", "", "", "[]string"})
//tbl.DescribeColumn([]driver.Value{colName, "text", "NO", "", "", "json []string"})
default:
//u.Debugf("SEMI IMPLEMENTED: found []interface{}: col:%s T:%T type:%v", colName, val, typ.String())
tbl.AddField(schema.NewFieldBase(colName, value.SliceValueType, 1000, "[]value"))
//tbl.AddValues([]driver.Value{colName, "[]value", "NO", "", "", "json []value"})
//tbl.DescribeColumn([]driver.Value{colName, "text", "NO", "", "", "json []value"})
}
case nil:
// ??
Expand All @@ -360,7 +333,6 @@ func (m *MongoDataSource) loadTableSchema(table string) (*schema.Table, error) {
u.Warnf(errmsg)
}
}
// buildMongoFields(s, tbl, jh, "", 0)
tbl.SetColumns(colNames)
m.srcschema.AddTable(tbl)

Expand Down
2 changes: 1 addition & 1 deletion frontends/mysqlfe/mysql_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (m *MySqlConnCreator) Init(conf *models.ListenerConfig, svr *models.ServerC
// - it re-uses the HandlerShard with has schema, etc on it
func (m *MySqlConnCreator) Open(connI interface{}) models.StatementHandler {

u.Infof("Open: ")
//u.Infof("Open: ")
handler := mySqlHandler{svr: m.svr}
handler.sess = NewMySqlSessionVars()

Expand Down
5 changes: 4 additions & 1 deletion frontends/mysqlfe/results_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func schemaWrite(m *MySqlResultWriter) exec.MessageHandler {

return func(_ *plan.Context, msg schema.Message) bool {

//u.Debugf("in schemaWrite: %#v", msg)
u.Debugf("in schemaWrite: %#v", msg)
if !m.wroteHeaders {
m.WriteHeaders()
}
Expand Down Expand Up @@ -131,6 +131,9 @@ func resultWrite(m *MySqlResultWriter) exec.MessageHandler {

return func(_ *plan.Context, msg schema.Message) bool {

if msg == nil {
return false
}
//u.Debugf("in resultWrite: %#v", msg)
if !m.wroteHeaders {
m.WriteHeaders()
Expand Down
1 change: 1 addition & 0 deletions frontends/mysqlfe/testmysql/runtestserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func NewTestServerForDb(t *testing.T, db string) {
ServerCtx.Init()

Schema, _ = ServerCtx.Schema(db)
u.Infof("starting %q schema in test", db)

svr, err := proxy.NewServer(ServerCtx)
assert.T(t, err == nil, "must start without error ", err)
Expand Down
2 changes: 1 addition & 1 deletion proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (m *Server) loadFrontends() error {
return err
}
m.listeners = append(m.listeners, listener)
u.Infof("Loaded listener %s ", name)
//u.Infof("Loaded listener %s ", name)
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions testutil/testsetup.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"flag"
"fmt"
"net/url"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -99,6 +100,17 @@ func (a *Article) Header() string {
func (a *Article) Row() string {
return fmt.Sprintf("%s,%s,%v,%v,%v,%v,%v", a.Title, a.Author, a.Count, a.Deleted, a.Created, a.Updated, a.F)
}
func (a *Article) UrlMsg() url.Values {
msg := url.Values{}
msg.Add("title", a.Title)
msg.Add("author", a.Author)
msg.Add("count", fmt.Sprintf("%v", a.Count))
msg.Add("deleted", fmt.Sprintf("%v", a.Deleted))
msg.Add("created", fmt.Sprintf("%v", a.Created))
msg.Add("updated", fmt.Sprintf("%v", a.Updated))
msg.Add("f", fmt.Sprintf("%v", a.F))
return msg
}

type User struct {
Id string
Expand Down
2 changes: 1 addition & 1 deletion vendored/mixer/proxy/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func newConn(m *mysqlListener, co net.Conn) *Conn {
c.pkg = mysql.NewPacketIO(co)

c.listener = m
u.Debugf("has sc? %#v", m.sc)
//u.Debugf("has sc? %#v", m.sc)
c.handler = m.sc.Open(c)

c.noRecover = c.listener.cfg.SupressRecover
Expand Down

0 comments on commit 7c5e87f

Please sign in to comment.