Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

keepalive speaker clear datas when stopped #420

Merged
merged 1 commit into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions cmd/speaker/app/speaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ func Run(opt *options.OpenELBSpeakerOptions) error {
klog.Fatalf("unable to new manager: %v", err)
}

spmanager := speaker.NewSpeakerManager(mgr.GetClient(), mgr.GetEventRecorderFor("speakerManager"))
ctx := ctrl.SetupSignalHandler()
spmanager := speaker.NewSpeakerManager(mgr)

//For gobgp
bgpServer := bgpd.NewGoBgpd(opt.Bgp)
Expand All @@ -106,7 +107,7 @@ func Run(opt *options.OpenELBSpeakerOptions) error {
klog.Fatalf("unable to setup bgppeer: %v", err)
}

if err := spmanager.RegisterSpeaker(constant.OpenELBProtocolBGP, bgpServer); err != nil {
if err := spmanager.RegisterSpeaker(ctx, constant.OpenELBProtocolBGP, bgpServer); err != nil {
klog.Fatalf("unable to register bgp speaker: %v", err)
}

Expand All @@ -117,7 +118,7 @@ func Run(opt *options.OpenELBSpeakerOptions) error {
if err != nil {
klog.Fatalf("unable to new vip speaker: %v", err)
}
if err := spmanager.RegisterSpeaker(constant.OpenELBProtocolVip, keepalive); err != nil {
if err := spmanager.RegisterSpeaker(ctx, constant.OpenELBProtocolVip, keepalive); err != nil {
klog.Fatalf("unable to register vip speaker: %v", err)
}
}
Expand All @@ -130,7 +131,7 @@ func Run(opt *options.OpenELBSpeakerOptions) error {
klog.Fatalf("unable to new layer2 speaker: %v", err)
}

if err := spmanager.RegisterSpeaker(constant.OpenELBProtocolLayer2, layer2speaker); err != nil {
if err := spmanager.RegisterSpeaker(ctx, constant.OpenELBProtocolLayer2, layer2speaker); err != nil {
klog.Fatalf("unable to register layer2 speaker: %v", err)
}
}
Expand All @@ -153,7 +154,7 @@ func Run(opt *options.OpenELBSpeakerOptions) error {
klog.Fatalf("unable to setup eipcontroller: %v", err)
}

if err = mgr.Start(ctrl.SetupSignalHandler()); err != nil {
if err = spmanager.Start(ctx); err != nil {
klog.Fatalf("unable to run the manager: %v", err)
}

Expand Down
7 changes: 2 additions & 5 deletions pkg/speaker/bgp/bgp/gobgpd.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,16 @@ func NewGoBgpd(bgpOptions *BgpOptions) *Bgp {
}
}

func (b *Bgp) run(stopCh <-chan struct{}) {
func (b *Bgp) Start(stopCh <-chan struct{}) error {
klog.Info("gobgpd starting")
go b.bgpServer.Serve()

<-stopCh
klog.Info("gobgpd ending")
err := b.bgpServer.StopBgp(context.Background(), &api.StopBgpRequest{})
if err != nil {
klog.Errorf("failed to stop gobgpd: %v", err)
}
}

func (b *Bgp) Start(stopCh <-chan struct{}) error {
go b.run(stopCh)
return nil
}

Expand Down
7 changes: 6 additions & 1 deletion pkg/speaker/bgp/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,12 @@ var _ = BeforeSuite(func(done Done) {

// Setup all Controllers
bgpServer = bgpd.NewGoBgpd(bgpd.NewBgpOptions())
bgpServer.Start(stopCh.Done())
go func() {
err := bgpServer.Start(stopCh.Done())
if err != nil {
klog.Errorf("failed to start manager: %v", err)
}
}()
err = SetupBgpPeerReconciler(bgpServer, mgr)
Expect(err).ToNot(HaveOccurred())
err = SetupBgpConfReconciler(bgpServer, mgr)
Expand Down
25 changes: 11 additions & 14 deletions pkg/speaker/layer2/memberlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,21 +136,18 @@ func (l *layer2Speaker) Start(stopCh <-chan struct{}) error {
return err
}

go func() {
for {
select {
case <-stopCh:
l.unregisterAllAnnouncers()
case <-l.eventCh:
evt := v1alpha2.Eip{}
evt.Name = constant.Layer2ReloadEIPName
evt.Namespace = constant.Layer2ReloadEIPNamespace
l.reloadChan <- event.GenericEvent{Object: &evt}
}
for {
select {
case <-stopCh:
l.unregisterAllAnnouncers()
return nil
case <-l.eventCh:
evt := v1alpha2.Eip{}
evt.Name = constant.Layer2ReloadEIPName
evt.Namespace = constant.Layer2ReloadEIPNamespace
l.reloadChan <- event.GenericEvent{Object: &evt}
}
}()

return nil
}
}

func (l *layer2Speaker) ConfigureWithEIP(config speaker.Config, deleted bool) error {
Expand Down
95 changes: 71 additions & 24 deletions pkg/speaker/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"reflect"
"sort"
"strings"
"sync"

"github.com/openelb/openelb/api/v1alpha2"
"github.com/openelb/openelb/pkg/constant"
Expand All @@ -17,59 +18,105 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
)

type speaker struct {
type speakerWithCancelFunc struct {
Speaker
ch chan struct{}
cancel context.CancelFunc
}

type Manager struct {
client.Client
record.EventRecorder

speakers map[string]speaker
pools map[string]*v1alpha2.Eip
mgr manager.Manager
speakers map[string]speakerWithCancelFunc
pools map[string]*v1alpha2.Eip
waitGroup sync.WaitGroup
errChan chan error
}

func NewSpeakerManager(c client.Client, record record.EventRecorder) *Manager {
func NewSpeakerManager(mgr manager.Manager) *Manager {
return &Manager{
Client: c,
EventRecorder: record,
speakers: make(map[string]speaker, 0),
mgr: mgr,
Client: mgr.GetClient(),
EventRecorder: mgr.GetEventRecorderFor("speakerManager"),
speakers: make(map[string]speakerWithCancelFunc, 0),
pools: make(map[string]*v1alpha2.Eip, 0),
errChan: make(chan error),
}
}

func (m *Manager) RegisterSpeaker(name string, s Speaker) error {
t := speaker{
Speaker: s,
ch: make(chan struct{}),
func (m *Manager) Start(ctx context.Context) (err error) {
ictx, cancelFunc := context.WithCancel(context.TODO())
errCh := make(chan error)
defer close(errCh)

m.waitGroup.Add(1)
go func() {
defer m.waitGroup.Done()
if err := m.mgr.Start(ictx); err != nil {
errCh <- err
}
}()

// The ctx (signals.SetupSignalHandler()) is to control the entire program life cycle,
// The ictx(internal context) is created here to control the life cycle of the controller-manager(all controllers, sharedInformer, webhook etc.)
// when config changed, stop server and renew context, start new server
for {
select {
case <-ctx.Done():
cancelFunc()
m.waitGroup.Wait()
return nil
case err = <-errCh:
case err = <-m.errChan:
cancelFunc()
for _, s := range m.speakers {
if s.cancel != nil {
s.cancel()
}
}
m.waitGroup.Wait()
return err
}
}
}

if err := s.Start(t.ch); err != nil {
return err
// TODO: Dynamically configure the speaker through configmap
func (m *Manager) RegisterSpeaker(ctx context.Context, name string, speaker Speaker) error {
if s, exist := m.speakers[name]; exist && s.cancel != nil {
s.cancel()
}

m.speakers[name] = t
m.waitGroup.Add(1)
ctxChild, cancel := context.WithCancel(ctx)
s := speakerWithCancelFunc{Speaker: speaker, cancel: cancel}

go func() {
defer m.waitGroup.Done()
if err := s.Start(ctxChild.Done()); err != nil {
s.cancel()
klog.Errorf("speaker %s start failed: %s", name, err.Error())
m.errChan <- err
}
}()

m.speakers[name] = s
return nil
}

func (m *Manager) UnRegisterSpeaker(name string) {
t, ok := m.speakers[name]
if ok {
close(t.ch)
if s, exist := m.speakers[name]; exist && s.cancel != nil {
s.cancel()
}

delete(m.speakers, name)
}

func (m *Manager) GetSpeaker(name string) Speaker {
t, ok := m.speakers[name]
if ok {
return t.Speaker
}

return nil
return m.speakers[name]
}

func (m *Manager) HandleEIP(ctx context.Context, eip *v1alpha2.Eip) error {
Expand Down
Loading
Loading