Skip to content

Commit

Permalink
Make it easier to load a default config
Browse files Browse the repository at this point in the history
  • Loading branch information
araddon committed Dec 11, 2016
1 parent 29d6cb6 commit a472b32
Show file tree
Hide file tree
Showing 14 changed files with 100 additions and 35 deletions.
8 changes: 4 additions & 4 deletions backends/bigtable/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
Google BigTable Data source
--------------------------------------

Provides SQL Access to Google BigTable
Provides SQL Layer on top of [Google BigTable](https://cloud.google.com/bigtable/) storage.

![Dataux BigTable](https://cloud.githubusercontent.com/assets/7269/20776318/a5711ad4-b714-11e6-96bf-408506158cbf.png)

* https://cloud.google.com/bigtable/docs/go/cbt-reference
* https://cloud.google.com/bigtable/docs/emulator
* https://github.com/spotify/docker-bigtable
**Links**
* BigTable CLI utility https://cloud.google.com/bigtable/docs/go/cbt-reference
* BigTable emulator for local testing/usage https://cloud.google.com/bigtable/docs/emulator

SQL -> BigTable
----------------------------------
Expand Down
2 changes: 2 additions & 0 deletions backends/bigtable/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ type Mutator struct {
ds *Source
}

func (m *Source) Init() {}

func (m *Source) Setup(ss *schema.SchemaSource) error {

m.mu.Lock()
Expand Down
2 changes: 2 additions & 0 deletions backends/cassandra/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ type Mutator struct {
ds *Source
}

func (m *Source) Init() {}

func (m *Source) Setup(ss *schema.SchemaSource) error {

m.mu.Lock()
Expand Down
2 changes: 2 additions & 0 deletions backends/datastore/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ type DatastoreMutator struct {
ds *Source
}

func (m *Source) Init() {}

func (m *Source) Setup(ss *schema.SchemaSource) error {

if m.schema != nil {
Expand Down
2 changes: 2 additions & 0 deletions backends/elasticsearch/es_datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type ElasticsearchDataSource struct {
tablemap map[string]*schema.Table
}

func (m *ElasticsearchDataSource) Init() {}

func (m *ElasticsearchDataSource) Setup(ss *schema.SchemaSource) error {

if m.srcschema != nil {
Expand Down
2 changes: 2 additions & 0 deletions backends/files/filesource.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ func NewFileSource() schema.Source {
return &m
}

func (m *FileSource) Init() {}

// Setup the filesource with schema info
func (m *FileSource) Setup(ss *schema.SchemaSource) error {
m.ss = ss
Expand Down
6 changes: 5 additions & 1 deletion backends/kubernetes/dataux.conf
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,12 @@ sources : [
# since we have no connection info it is going
# to assume kubectl .kube config info
settings {
# locaction of kube config
# defaults to $HOME/.kube/config
#kube_conf "/etc/kube.conf"

# kubernetes namespace to use
namespace default
hosts ["localhost"]
}
}
]
Expand Down
54 changes: 48 additions & 6 deletions backends/kubernetes/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ var (

// Ensure our Kubernetes source implements schema.Source interface
_ schema.Source = (*Source)(nil)

// tracing
trace = false
)

func init() {
Expand All @@ -46,7 +49,6 @@ func init() {
// - provides schema info about the apis
type Source struct {
db string
cluster string
tables []string // Lower cased
tablemap map[string]*schema.Table
conf *schema.ConfigSource
Expand All @@ -64,6 +66,39 @@ type Mutator struct {
ds *Source
}

func (m *Source) Init() {

u.Debugf("kube init()")

if m.schema != nil {
return
}

u.Warnf("kube Init() No Schema!!!!!!")
ss := schema.NewSchemaSource("kubernetes", DataSourceLabel)

ss.Conf = &schema.ConfigSource{Name: "kubernetes"}

u.Debug("init 2")
reg := datasource.DataSourcesRegistry()
sch := schema.NewSchema("kubernetes")
u.Debug("init 2.5")
reg.SchemaAdd(sch)
u.Debug("init 3")
//sch.AddSourceSchema(ss)
u.Debug("init 4")

ss.DS = m
reg.SourceSchemaAdd(sch.Name, ss)
u.Debug("init 5")
sch.RefreshSchema()
u.Debug("init 6")

// if err := m.Setup(ss); err != nil {
// u.Warnf("could not sniff kube config %v", err)
// }
}

// Setup accepts the schema source config
func (m *Source) Setup(ss *schema.SchemaSource) error {

Expand All @@ -73,22 +108,29 @@ func (m *Source) Setup(ss *schema.SchemaSource) error {
if m.schema != nil {
return nil
}
u.Infof("kube Setup()")

m.schema = ss
m.conf = ss.Conf
m.db = strings.ToLower(ss.Name)
m.tablemap = make(map[string]*schema.Table)

kubeConf := ""
if len(ss.Conf.Settings) > 0 {
kubeConf = ss.Conf.Settings.String("kube_conf")
}
if kubeConf == "" {
kubeConf = os.Getenv("HOME") + "/.kube/config"
}

//u.Debugf("Kube Source Init: %#v", m.schema.Conf)
if m.schema.Conf == nil {
return fmt.Errorf("Schema conf not found for kubernetes")
}

m.cluster = m.conf.Settings.String("cluster")

// uses the current context in kubeconfig
// TODO: allow this to be specified
config, err := clientcmd.BuildConfigFromFlags("", os.Getenv("HOME")+"/.kube/config")
config, err := clientcmd.BuildConfigFromFlags("", kubeConf)
if err != nil {
u.Errorf("could not read kube config %v", err)
return err
Expand Down Expand Up @@ -193,7 +235,7 @@ func (m *Source) describeServices(c *kubernetes.Clientset) error {
u.Infof("There are %d services in the cluster", len(services.Items))
for _, svc := range services.Items {
svcJson, _ := json.MarshalIndent(svc, "", " ")
if true == true {
if trace == true {
u.Debugf("\n%s", string(svcJson))
}
}
Expand Down Expand Up @@ -241,7 +283,7 @@ func (m *Source) describeNodes(c *kubernetes.Clientset) error {
u.Infof("There are %d nodes in the cluster", len(nodes.Items))
for _, node := range nodes.Items {
nodeJson, _ := json.MarshalIndent(node, "", " ")
if true == true {
if trace == true {
u.Debugf("\n%s", string(nodeJson))
}
}
Expand Down
1 change: 1 addition & 0 deletions backends/mongo/mgo_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func NewMongoDataSource() schema.Source {
return &MongoDataSource{tablesNotFound: make(map[string]string)}
}

func (m *MongoDataSource) Init() {}
func (m *MongoDataSource) Setup(ss *schema.SchemaSource) error {

if m.srcschema != nil {
Expand Down
10 changes: 6 additions & 4 deletions dataux.conf
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,13 @@ sources : [
{
name : minikube
type : kubernetes
# since we have no connection info it is going
# to assume kubectl .kube config info
settings {
namespace default
hosts ["localhost"]
# locaction of kube config
# defaults to $HOME/.kube/config
#kube_conf "/etc/kube.conf"

# kubernetes namespace to use
#namespace default
}
}

Expand Down
1 change: 1 addition & 0 deletions frontends/mysqlfe/mysql_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ func (m *mySqlHandler) handleQuery(writer models.ResultWriter, sql string) (err
if m.schema == nil {
s, err := m.svr.InfoSchema()
if err != nil {
u.Warnf("no infoschema? %v", err)
return err
}
m.schema = s.InfoSchema
Expand Down
5 changes: 3 additions & 2 deletions models/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package models

import (
"io/ioutil"
"os"

"github.com/araddon/qlbridge/schema"
"github.com/lytics/confl"
Expand All @@ -14,7 +15,7 @@ func LoadConfigFromFile(filename string) (*Config, error) {
if err != nil {
return nil, err
}
if _, err = confl.Decode(string(confBytes), &c); err != nil {
if _, err = confl.Decode(os.ExpandEnv(string(confBytes)), &c); err != nil {
return nil, err
}
return &c, nil
Expand All @@ -24,7 +25,7 @@ func LoadConfigFromFile(filename string) (*Config, error) {
// from file or passed in
func LoadConfig(conf string) (*Config, error) {
var c Config
if _, err := confl.Decode(conf, &c); err != nil {
if _, err := confl.Decode(os.ExpandEnv(conf), &c); err != nil {
return nil, err
}
return &c, nil
Expand Down
4 changes: 4 additions & 0 deletions models/serverctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ func (m *ServerCtx) Init() error {
return err
}

m.Reg.Init()

// Copy over the nats, etcd info from config to
// Planner grid
planner.GridConf.NatsServers = m.Config.Nats
Expand Down Expand Up @@ -103,6 +105,8 @@ func (m *ServerCtx) loadConfig() error {

m.schemas = make(map[string]*schema.Schema)

u.Debugf("server load config schema ct=%d", len(m.schemas))

for _, schemaConf := range m.Config.Schemas {

//u.Debugf("parse schemas: %v", schemaConf)
Expand Down
36 changes: 18 additions & 18 deletions proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,21 +73,21 @@ func RunDaemon(listener bool, workerCt int) {
return
}

sc := make(chan os.Signal, 1)
quit := make(chan bool)
signal.Notify(sc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)

go func() {
sc := make(chan os.Signal, 1)
signal.Notify(sc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)

sig := <-sc
close(quit) // This should signal worker nodes, master node to quit
u.Infof("Got signal [%d] to exit.", sig)
time.Sleep(time.Millisecond * 50)
svr.Shutdown(Reason{Reason: "signal", Message: fmt.Sprintf("%v", sig)})

}()

// Gratuitous Loading Banner
Expand All @@ -102,19 +102,19 @@ func RunDaemon(listener bool, workerCt int) {
}
}

// If this is a front end listener servers (optional) then we will
// AND its a distributed mode then we need to prepare the master planner
if listener && Conf.DistributedMode() {
go func() {
// PlanGrid is the master that coordinates
// with etcd, nats, etc, submit tasks to worker nodes
// Only needed on listener nodes
svrCtx.PlanGrid.Run(quit)
}()
}

// If listener, run tcp listeners
if listener {

// If distributed mode then we need to prepare the master planner
if Conf.DistributedMode() {
go func() {
// PlanGrid is the master that coordinates
// with etcd, nats, etc, submit tasks to worker nodes
// Only needed on listener nodes
svrCtx.PlanGrid.Run(quit)
}()
}

// Blocking
svr.RunListeners()
}
Expand Down

0 comments on commit a472b32

Please sign in to comment.