Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions cmd/speaker/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,23 @@ package options
import (
"github.com/openelb/openelb/pkg/log"
"github.com/openelb/openelb/pkg/speaker/bgp"
"github.com/openelb/openelb/pkg/speaker/vip"
cliflag "k8s.io/component-base/cli/flag"
)

type OpenELBSpeakerOptions struct {
Bgp *bgp.BgpOptions
LogOptions *log.Options
MetricsAddr string
Bgp *bgp.BgpOptions
Vip *vip.VipOptions
LogOptions *log.Options
}

func NewOpenELBSpeakerOptions() *OpenELBSpeakerOptions {
return &OpenELBSpeakerOptions{
Bgp: bgp.NewBgpOptions(),
LogOptions: log.NewOptions(),
MetricsAddr: ":50053",
Bgp: bgp.NewBgpOptions(),
Vip: vip.NewVipOptions(),
LogOptions: log.NewOptions(),
}
}

Expand All @@ -27,7 +32,11 @@ func (s *OpenELBSpeakerOptions) Flags() cliflag.NamedFlagSets {
fss := cliflag.NamedFlagSets{}

s.Bgp.AddFlags(fss.FlagSet("bgp"))
s.Vip.AddFlags(fss.FlagSet("vip"))
s.LogOptions.AddFlags(fss.FlagSet("log"))

fs := fss.FlagSet("generic")
fs.StringVar(&s.MetricsAddr, "metrics-addr", s.MetricsAddr, "The address the metric endpoint binds to.")

return fss
}
45 changes: 37 additions & 8 deletions cmd/speaker/app/speaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@ import (
_ "github.com/openelb/openelb/pkg/metrics"
"github.com/openelb/openelb/pkg/speaker"
bgpd "github.com/openelb/openelb/pkg/speaker/bgp"
"github.com/openelb/openelb/pkg/speaker/vip"
"github.com/openelb/openelb/pkg/util"
"github.com/openelb/openelb/pkg/version"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
clientset "k8s.io/client-go/kubernetes"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/term"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -78,20 +81,21 @@ func NewOpenELBSpeakerCommand() *cobra.Command {
return cmd
}

func Run(c *options.OpenELBSpeakerOptions) error {
ctrl.SetLogger(zap.New(zap.UseFlagOptions(&c.LogOptions.Options)))
func Run(opt *options.OpenELBSpeakerOptions) error {
ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opt.LogOptions.Options)))
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
// MetricsBindAddress: c.MetricsAddr,
Scheme: scheme,
MetricsBindAddress: opt.MetricsAddr,
Scheme: scheme,
})

if err != nil {
setupLog.Error(err, "unable to new manager")
return err
}

spmanager := speaker.NewSpeakerManager(mgr.GetClient(), ctrl.Log.WithName("speakerManger"))

//For gobgp
bgpServer := bgpd.NewGoBgpd(c.Bgp)
bgpServer := bgpd.NewGoBgpd(opt.Bgp)
if err := bgp.SetupBgpConfReconciler(bgpServer, mgr); err != nil {
setupLog.Error(err, "unable to setup bgpconf")
return err
Expand All @@ -102,13 +106,38 @@ func Run(c *options.OpenELBSpeakerOptions) error {
return err
}

// TODO: for layer2 + vip mode
spmanager := speaker.NewSpeakerManager(mgr.GetClient(), ctrl.Log.WithName("speakerManger"))
if err := spmanager.RegisterSpeaker(constant.OpenELBProtocolBGP, bgpServer); err != nil {
setupLog.Error(err, "unable to register bgp speaker")
return err
}

//For keepalive
k8sClient := clientset.NewForConfigOrDie(ctrl.GetConfigOrDie())
if opt.Vip.EnableVIP {
ns := util.EnvNamespace()
config := constant.OpenELBVipConfigMap
if opt.Vip.ConfigNamespace != "" {
ns = opt.Vip.ConfigNamespace
}
if opt.Vip.ConfigName != "" {
config = opt.Vip.ConfigName
}
keepalive := vip.NewKeepAlived(k8sClient, &vip.KeepAlivedConfig{
Args: []string{
fmt.Sprintf("--services-configmap=%s/%s", ns, config),
fmt.Sprintf("--http-port=%d", opt.Vip.HealthPort)},
})

if err := spmanager.RegisterSpeaker(constant.OpenELBProtocolVip, keepalive); err != nil {
setupLog.Error(err, "unable to register keepalive speaker")
return err
}
} else {
vip.Clean(k8sClient)
}

// TODO: for layer2 mode

if err := (&speaker.LBReconciler{
Handler: spmanager.HandleService,
Client: mgr.GetClient(),
Expand Down
5 changes: 5 additions & 0 deletions config/workloads/speaker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ spec:
args:
- --api-hosts=:50051
- --zap-log-level=info
- --enable-keepalived-vip=false
image: speaker:latest
imagePullPolicy: IfNotPresent
name: openelb-speaker
Expand All @@ -47,6 +48,10 @@ spec:
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: OPENELB_DSNAME
valueFrom:
fieldRef:
fieldPath: metadata.name
terminationGracePeriodSeconds: 10
hostNetwork: true

5 changes: 5 additions & 0 deletions deploy/openelb.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1248,6 +1248,7 @@ spec:
- args:
- --api-hosts=:50051
- --zap-log-level=info
- --enable-keepalived-vip=false
command:
- openelb-speaker
env:
Expand All @@ -1259,6 +1260,10 @@ spec:
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: OPENELB_DSNAME
valueFrom:
fieldRef:
fieldPath: metadata.name
image: kubesphere/openelb-speaker:v0.6.0
imagePullPolicy: IfNotPresent
name: openelb-speaker
Expand Down
2 changes: 2 additions & 0 deletions pkg/constant/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@ const (
EipRangeSeparator string = "-"

OpenELBControllerLocker = "openelb-controller"
OpenELBSpeakerName = "openelb-speaker"
OpenELBNamespace = "openelb-system"
OpenELBVipConfigMap = "openelb-vip-configmap"
OpenELBVipName = "openelb-keepalive-vip"
OpenELBBgpName = "gobgp.conf"
OpenELBServiceAccountName = "kube-keepalived-vip"
EnvOpenELBNamespace = "OPENELB_NAMESPACE"
EnvDaemonsetName = "OPENELB_DSNAME"
EnvNodeName = "NODE_NAME"

// default images and specify images
Expand Down
5 changes: 5 additions & 0 deletions pkg/speaker/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ func (m *Manager) HandleEIP(eip *v1alpha2.Eip) error {
return nil
}

if eip.GetProtocol() == constant.OpenELBProtocolVip && m.GetSpeaker(eip.GetProtocol()) == nil {
m.Info(fmt.Sprintf("no registered speaker:[%s] eip:[%s]", eip.GetProtocol(), eip.GetName()))
return nil
}

m.Lock()
defer m.Unlock()
_, exist := m.eips[eip.GetName()]
Expand Down
Loading