Skip to content

Commit

Permalink
speaker clear datas when stopped
Browse files Browse the repository at this point in the history
Signed-off-by: renyunkang <[email protected]>
  • Loading branch information
renyunkang committed May 23, 2024
1 parent a7f8963 commit 8e6318a
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 118 deletions.
11 changes: 6 additions & 5 deletions cmd/speaker/app/speaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ func Run(opt *options.OpenELBSpeakerOptions) error {
klog.Fatalf("unable to new manager: %v", err)
}

spmanager := speaker.NewSpeakerManager(mgr.GetClient(), mgr.GetEventRecorderFor("speakerManager"))
ctx := ctrl.SetupSignalHandler()
spmanager := speaker.NewSpeakerManager(mgr)

//For gobgp
bgpServer := bgpd.NewGoBgpd(opt.Bgp)
Expand All @@ -106,7 +107,7 @@ func Run(opt *options.OpenELBSpeakerOptions) error {
klog.Fatalf("unable to setup bgppeer: %v", err)
}

if err := spmanager.RegisterSpeaker(constant.OpenELBProtocolBGP, bgpServer); err != nil {
if err := spmanager.RegisterSpeaker(ctx, constant.OpenELBProtocolBGP, bgpServer); err != nil {
klog.Fatalf("unable to register bgp speaker: %v", err)
}

Expand All @@ -117,7 +118,7 @@ func Run(opt *options.OpenELBSpeakerOptions) error {
if err != nil {
klog.Fatalf("unable to new vip speaker: %v", err)
}
if err := spmanager.RegisterSpeaker(constant.OpenELBProtocolVip, keepalive); err != nil {
if err := spmanager.RegisterSpeaker(ctx, constant.OpenELBProtocolVip, keepalive); err != nil {
klog.Fatalf("unable to register vip speaker: %v", err)
}
}
Expand All @@ -130,7 +131,7 @@ func Run(opt *options.OpenELBSpeakerOptions) error {
klog.Fatalf("unable to new layer2 speaker: %v", err)
}

if err := spmanager.RegisterSpeaker(constant.OpenELBProtocolLayer2, layer2speaker); err != nil {
if err := spmanager.RegisterSpeaker(ctx, constant.OpenELBProtocolLayer2, layer2speaker); err != nil {
klog.Fatalf("unable to register layer2 speaker: %v", err)
}
}
Expand All @@ -153,7 +154,7 @@ func Run(opt *options.OpenELBSpeakerOptions) error {
klog.Fatalf("unable to setup eipcontroller: %v", err)
}

if err = mgr.Start(ctrl.SetupSignalHandler()); err != nil {
if err = spmanager.Start(ctx); err != nil {
klog.Fatalf("unable to run the manager: %v", err)
}

Expand Down
7 changes: 2 additions & 5 deletions pkg/speaker/bgp/bgp/gobgpd.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,16 @@ func NewGoBgpd(bgpOptions *BgpOptions) *Bgp {
}
}

func (b *Bgp) run(stopCh <-chan struct{}) {
func (b *Bgp) Start(stopCh <-chan struct{}) error {
klog.Info("gobgpd starting")
go b.bgpServer.Serve()

<-stopCh
klog.Info("gobgpd ending")
err := b.bgpServer.StopBgp(context.Background(), &api.StopBgpRequest{})
if err != nil {
klog.Errorf("failed to stop gobgpd: %v", err)
}
}

func (b *Bgp) Start(stopCh <-chan struct{}) error {
go b.run(stopCh)
return nil
}

Expand Down
7 changes: 6 additions & 1 deletion pkg/speaker/bgp/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,12 @@ var _ = BeforeSuite(func(done Done) {

// Setup all Controllers
bgpServer = bgpd.NewGoBgpd(bgpd.NewBgpOptions())
bgpServer.Start(stopCh.Done())
go func() {
err := bgpServer.Start(stopCh.Done())
if err != nil {
klog.Errorf("failed to start manager: %v", err)
}
}()
err = SetupBgpPeerReconciler(bgpServer, mgr)
Expect(err).ToNot(HaveOccurred())
err = SetupBgpConfReconciler(bgpServer, mgr)
Expand Down
25 changes: 11 additions & 14 deletions pkg/speaker/layer2/memberlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,21 +136,18 @@ func (l *layer2Speaker) Start(stopCh <-chan struct{}) error {
return err
}

go func() {
for {
select {
case <-stopCh:
l.unregisterAllAnnouncers()
case <-l.eventCh:
evt := v1alpha2.Eip{}
evt.Name = constant.Layer2ReloadEIPName
evt.Namespace = constant.Layer2ReloadEIPNamespace
l.reloadChan <- event.GenericEvent{Object: &evt}
}
for {
select {
case <-stopCh:
l.unregisterAllAnnouncers()
return nil
case <-l.eventCh:
evt := v1alpha2.Eip{}
evt.Name = constant.Layer2ReloadEIPName
evt.Namespace = constant.Layer2ReloadEIPNamespace
l.reloadChan <- event.GenericEvent{Object: &evt}
}
}()

return nil
}
}

func (l *layer2Speaker) ConfigureWithEIP(config speaker.Config, deleted bool) error {
Expand Down
95 changes: 71 additions & 24 deletions pkg/speaker/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"reflect"
"sort"
"strings"
"sync"

"github.com/openelb/openelb/api/v1alpha2"
"github.com/openelb/openelb/pkg/constant"
Expand All @@ -17,59 +18,105 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
)

type speaker struct {
type speakerWithCancelFunc struct {
Speaker
ch chan struct{}
cancel context.CancelFunc
}

type Manager struct {
client.Client
record.EventRecorder

speakers map[string]speaker
pools map[string]*v1alpha2.Eip
mgr manager.Manager
speakers map[string]speakerWithCancelFunc
pools map[string]*v1alpha2.Eip
waitGroup sync.WaitGroup
errChan chan error
}

func NewSpeakerManager(c client.Client, record record.EventRecorder) *Manager {
func NewSpeakerManager(mgr manager.Manager) *Manager {
return &Manager{
Client: c,
EventRecorder: record,
speakers: make(map[string]speaker, 0),
mgr: mgr,
Client: mgr.GetClient(),
EventRecorder: mgr.GetEventRecorderFor("speakerManager"),
speakers: make(map[string]speakerWithCancelFunc, 0),
pools: make(map[string]*v1alpha2.Eip, 0),
errChan: make(chan error),
}
}

func (m *Manager) RegisterSpeaker(name string, s Speaker) error {
t := speaker{
Speaker: s,
ch: make(chan struct{}),
func (m *Manager) Start(ctx context.Context) (err error) {
ictx, cancelFunc := context.WithCancel(context.TODO())
errCh := make(chan error)
defer close(errCh)

m.waitGroup.Add(1)
go func() {
defer m.waitGroup.Done()
if err := m.mgr.Start(ictx); err != nil {
errCh <- err
}
}()

// The ctx (signals.SetupSignalHandler()) is to control the entire program life cycle,
// The ictx(internal context) is created here to control the life cycle of the controller-manager(all controllers, sharedInformer, webhook etc.)
// when config changed, stop server and renew context, start new server
for {
select {
case <-ctx.Done():
cancelFunc()
m.waitGroup.Wait()
return nil
case err = <-errCh:
case err = <-m.errChan:
cancelFunc()
for _, s := range m.speakers {
if s.cancel != nil {
s.cancel()
}
}
m.waitGroup.Wait()
return err
}
}
}

if err := s.Start(t.ch); err != nil {
return err
// TODO: Dynamically configure the speaker through configmap
func (m *Manager) RegisterSpeaker(ctx context.Context, name string, speaker Speaker) error {
if s, exist := m.speakers[name]; exist && s.cancel != nil {
s.cancel()
}

m.speakers[name] = t
m.waitGroup.Add(1)
ctxChild, cancel := context.WithCancel(ctx)
s := speakerWithCancelFunc{Speaker: speaker, cancel: cancel}

go func() {
defer m.waitGroup.Done()
if err := s.Start(ctxChild.Done()); err != nil {
s.cancel()
klog.Errorf("speaker %s start failed: %s", name, err.Error())
m.errChan <- err
}
}()

m.speakers[name] = s
return nil
}

func (m *Manager) UnRegisterSpeaker(name string) {
t, ok := m.speakers[name]
if ok {
close(t.ch)
if s, exist := m.speakers[name]; exist && s.cancel != nil {
s.cancel()
}

delete(m.speakers, name)
}

func (m *Manager) GetSpeaker(name string) Speaker {
t, ok := m.speakers[name]
if ok {
return t.Speaker
}

return nil
return m.speakers[name]
}

func (m *Manager) HandleEIP(ctx context.Context, eip *v1alpha2.Eip) error {
Expand Down
Loading

0 comments on commit 8e6318a

Please sign in to comment.