Skip to content

Commit 72be0cc

Browse files
committed
feat: transport layer performance optimization
Signed-off-by: jyjiangkai <[email protected]>
1 parent 6e9856b commit 72be0cc

File tree

13 files changed

+1855
-347
lines changed

13 files changed

+1855
-347
lines changed

client/internal/vanus/store/block_store.go

Lines changed: 211 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,9 @@ import (
3838
"github.com/linkall-labs/vanus/client/internal/vanus/codec"
3939
"github.com/linkall-labs/vanus/client/internal/vanus/net/rpc"
4040
"github.com/linkall-labs/vanus/client/internal/vanus/net/rpc/bare"
41-
"github.com/linkall-labs/vanus/client/pkg/api"
4241
"github.com/linkall-labs/vanus/client/pkg/primitive"
4342
"github.com/linkall-labs/vanus/observability/log"
44-
)
45-
46-
const (
47-
defaultTaskChannelBuffer = 512
43+
"github.com/linkall-labs/vanus/pkg/errors"
4844
)
4945

5046
func newBlockStore(endpoint string) (*BlockStore, error) {
@@ -56,120 +52,124 @@ func newBlockStore(endpoint string) (*BlockStore, error) {
5652
})),
5753
tracer: tracing.NewTracer("internal.store.BlockStore", trace.SpanKindClient),
5854
}
59-
s.stream, err = s.connect(context.Background())
55+
s.appendStream, err = s.connectAppendStream(context.Background())
6056
if err != nil {
6157
// TODO: check error
6258
return nil, err
6359
}
64-
s.taskC = make(chan Task, defaultTaskChannelBuffer)
65-
s.run(context.Background())
66-
s.receive(context.Background(), s.stream)
60+
s.readStream, err = s.connectReadStream(context.Background())
61+
if err != nil {
62+
// TODO: check error
63+
return nil, err
64+
}
65+
s.runAppendStreamRecv(context.Background(), s.appendStream)
66+
s.runReadStreamRecv(context.Background(), s.readStream)
6767
return s, nil
6868
}
6969

7070
type BlockStore struct {
7171
primitive.RefCount
72-
client rpc.Client
73-
tracer *tracing.Tracer
74-
stream segpb.SegmentServer_AppendToBlockStreamClient
75-
taskC chan Task
76-
callbacks sync.Map
77-
mu sync.Mutex
72+
client rpc.Client
73+
tracer *tracing.Tracer
74+
appendStream segpb.SegmentServer_AppendToBlockStreamClient
75+
readStream segpb.SegmentServer_ReadFromBlockStreamClient
76+
appendCallbacks sync.Map
77+
readCallbacks sync.Map
78+
appendMu sync.Mutex
79+
readMu sync.Mutex
7880
}
7981

80-
type Task struct {
81-
request *segpb.AppendToBlockStreamRequest
82-
cb api.Callback
83-
}
82+
type appendCallback func(*segpb.AppendToBlockStreamResponse)
83+
type readCallback func(*segpb.ReadFromBlockStreamResponse)
8484

85-
func (s *BlockStore) run(ctx context.Context) {
85+
func (s *BlockStore) runAppendStreamRecv(ctx context.Context, stream segpb.SegmentServer_AppendToBlockStreamClient) {
8686
go func() {
8787
for {
88-
var err error
89-
select {
90-
case <-ctx.Done():
91-
s.stream.CloseSend()
92-
s.stream = nil
93-
return
94-
case task := <-s.taskC:
95-
stream := s.stream
96-
if stream == nil {
97-
if stream, err = s.connect(ctx); err != nil {
98-
task.cb(err)
99-
break
100-
}
101-
s.receive(ctx, stream)
102-
s.stream = stream
103-
}
104-
if err = stream.Send(task.request); err != nil {
105-
log.Warning(ctx, "===Send failed===", map[string]interface{}{
106-
log.KeyError: err,
107-
})
108-
s.processSendError(task, err)
109-
}
88+
res, err := stream.Recv()
89+
if err != nil {
90+
log.Error(ctx, "append stream recv failed", map[string]interface{}{
91+
log.KeyError: err,
92+
})
93+
break
94+
}
95+
c, _ := s.appendCallbacks.LoadAndDelete(res.ResponseId)
96+
if c != nil {
97+
c.(appendCallback)(res)
11098
}
11199
}
112100
}()
113101
}
114102

115-
func (s *BlockStore) receive(ctx context.Context, stream segpb.SegmentServer_AppendToBlockStreamClient) {
103+
func (s *BlockStore) runReadStreamRecv(ctx context.Context, stream segpb.SegmentServer_ReadFromBlockStreamClient) {
116104
go func() {
117105
for {
118106
res, err := stream.Recv()
119107
if err != nil {
120-
log.Warning(ctx, "===Recv failed===", map[string]interface{}{
108+
log.Error(ctx, "read stream recv failed", map[string]interface{}{
121109
log.KeyError: err,
122110
})
123111
break
124112
}
125-
c, _ := s.callbacks.LoadAndDelete(res.ResponseId)
126-
if c == nil {
127-
// TODO(jiangkai): check err
128-
continue
129-
}
130-
if res.ResponseCode != segpb.ResponseCode_SUCCESS {
131-
c.(api.Callback)(stderr.New(res.ResponseCode.String()))
113+
c, _ := s.readCallbacks.LoadAndDelete(res.ResponseId)
114+
if c != nil {
115+
c.(readCallback)(res)
132116
}
133117
}
134118
}()
135119
}
136120

137-
func (s *BlockStore) connect(ctx context.Context) (segpb.SegmentServer_AppendToBlockStreamClient, error) {
138-
if s.stream != nil {
139-
return s.stream, nil
121+
func (s *BlockStore) connectAppendStream(ctx context.Context) (segpb.SegmentServer_AppendToBlockStreamClient, error) {
122+
if s.appendStream != nil {
123+
return s.appendStream, nil
140124
}
141125

142-
s.mu.Lock()
143-
defer s.mu.Unlock()
126+
s.appendMu.Lock()
127+
defer s.appendMu.Unlock()
144128

145-
if s.stream != nil { //double check
146-
return s.stream, nil
129+
if s.appendStream != nil { //double check
130+
return s.appendStream, nil
147131
}
148132

149133
client, err := s.client.Get(ctx)
150134
if err != nil {
151135
return nil, err
152136
}
153137

154-
stream, err := client.(segpb.SegmentServerClient).AppendToBlockStream(context.Background())
138+
stream, err := client.(segpb.SegmentServerClient).AppendToBlockStream(ctx)
155139
if err != nil {
156-
log.Warning(ctx, "===Get Stream failed===", map[string]interface{}{
140+
log.Warning(ctx, "get append stream failed", map[string]interface{}{
157141
log.KeyError: err,
158142
})
159143
return nil, err
160144
}
161145
return stream, nil
162146
}
163147

164-
func (s *BlockStore) processSendError(t Task, err error) {
165-
cb, _ := s.callbacks.LoadAndDelete(t.request.RequestId)
166-
if cb != nil {
167-
cb.(api.Callback)(err)
148+
func (s *BlockStore) connectReadStream(ctx context.Context) (segpb.SegmentServer_ReadFromBlockStreamClient, error) {
149+
if s.readStream != nil {
150+
return s.readStream, nil
168151
}
169-
if stderr.Is(err, io.EOF) {
170-
s.stream.CloseSend()
171-
s.stream = nil
152+
153+
s.readMu.Lock()
154+
defer s.readMu.Unlock()
155+
156+
if s.readStream != nil { //double check
157+
return s.readStream, nil
158+
}
159+
160+
client, err := s.client.Get(ctx)
161+
if err != nil {
162+
return nil, err
163+
}
164+
165+
stream, err := client.(segpb.SegmentServerClient).ReadFromBlockStream(ctx)
166+
if err != nil {
167+
log.Warning(ctx, "get read stream failed", map[string]interface{}{
168+
log.KeyError: err,
169+
})
170+
return nil, err
172171
}
172+
return stream, nil
173173
}
174174

175175
func (s *BlockStore) Endpoint() string {
@@ -207,34 +207,79 @@ func (s *BlockStore) Append(ctx context.Context, block uint64, event *ce.Event)
207207
return res.GetOffsets()[0], nil
208208
}
209209

210-
func (s *BlockStore) AppendStream(ctx context.Context, block uint64, event *ce.Event, cb api.Callback) {
211-
_, span := s.tracer.Start(ctx, "AppendStream")
210+
func (s *BlockStore) SyncAppendStream(ctx context.Context, block uint64, event *ce.Event) (int64, error) {
211+
_ctx, span := s.tracer.Start(ctx, "SyncAppendStream")
212212
defer span.End()
213213

214214
var (
215-
err error
215+
err error
216+
wg sync.WaitGroup
217+
resp *segpb.AppendToBlockStreamResponse
216218
)
217219

218-
eventpb, err := codec.ToProto(event)
219-
if err != nil {
220-
cb(err)
221-
return
220+
if s.appendStream == nil {
221+
s.appendStream, err = s.connectAppendStream(_ctx)
222+
if err != nil {
223+
return -1, err
224+
}
225+
s.runAppendStreamRecv(_ctx, s.appendStream)
222226
}
223227

224228
// generate unique RequestId
225229
requestID := rand.Uint64()
226-
s.callbacks.Store(requestID, cb)
227-
task := Task{
228-
request: &segpb.AppendToBlockStreamRequest{
229-
RequestId: requestID,
230-
BlockId: block,
231-
Events: &cepb.CloudEventBatch{
232-
Events: []*cepb.CloudEvent{eventpb},
233-
},
230+
231+
wg.Add(1)
232+
233+
eventpb, err := codec.ToProto(event)
234+
if err != nil {
235+
return -1, err
236+
}
237+
238+
s.appendCallbacks.Store(requestID, appendCallback(func(res *segpb.AppendToBlockStreamResponse) {
239+
resp = res
240+
wg.Done()
241+
}))
242+
243+
req := &segpb.AppendToBlockStreamRequest{
244+
RequestId: requestID,
245+
BlockId: block,
246+
Events: &cepb.CloudEventBatch{
247+
Events: []*cepb.CloudEvent{eventpb},
234248
},
235-
cb: cb,
236249
}
237-
s.taskC <- task
250+
251+
if err = s.appendStream.Send(req); err != nil {
252+
log.Error(ctx, "append stream send failed", map[string]interface{}{
253+
log.KeyError: err,
254+
})
255+
if stderr.Is(err, io.EOF) {
256+
s.appendStream.CloseSend()
257+
s.appendStream = nil
258+
c, _ := s.appendCallbacks.LoadAndDelete(requestID)
259+
if c != nil {
260+
c.(appendCallback)(&segpb.AppendToBlockStreamResponse{
261+
ResponseId: requestID,
262+
ResponseCode: segpb.ResponseCode_UNKNOWN,
263+
Offsets: []int64{},
264+
})
265+
}
266+
}
267+
return -1, err
268+
}
269+
270+
wg.Wait()
271+
272+
if resp.ResponseCode == segpb.ResponseCode_SegmentFull {
273+
log.Warning(ctx, "block append failed cause the segment is full", nil)
274+
return -1, errors.ErrSegmentFull
275+
}
276+
277+
if resp.ResponseCode != segpb.ResponseCode_SUCCESS {
278+
log.Warning(ctx, "block append failed cause unknown error", nil)
279+
return -1, errors.ErrUnknown
280+
}
281+
282+
return resp.Offsets[0], nil
238283
}
239284

240285
func (s *BlockStore) Read(
@@ -278,6 +323,88 @@ func (s *BlockStore) Read(
278323
return []*ce.Event{}, err
279324
}
280325

326+
func (s *BlockStore) SyncReadStream(
327+
ctx context.Context, block uint64, offset int64, size int16, pollingTimeout uint32,
328+
) ([]*ce.Event, error) {
329+
_ctx, span := s.tracer.Start(ctx, "SyncReadStream")
330+
defer span.End()
331+
332+
var (
333+
err error
334+
wg sync.WaitGroup
335+
resp *segpb.ReadFromBlockStreamResponse
336+
)
337+
338+
if s.readStream == nil {
339+
s.readStream, err = s.connectReadStream(_ctx)
340+
if err != nil {
341+
return []*ce.Event{}, err
342+
}
343+
s.runReadStreamRecv(_ctx, s.readStream)
344+
}
345+
346+
// generate unique RequestId
347+
requestID := rand.Uint64()
348+
349+
wg.Add(1)
350+
351+
s.appendCallbacks.Store(requestID, readCallback(func(res *segpb.ReadFromBlockStreamResponse) {
352+
resp = res
353+
wg.Done()
354+
}))
355+
356+
req := &segpb.ReadFromBlockStreamRequest{
357+
BlockId: block,
358+
Offset: offset,
359+
Number: int64(size),
360+
PollingTimeout: pollingTimeout,
361+
}
362+
363+
if err = s.readStream.Send(req); err != nil {
364+
log.Error(ctx, "read stream send failed", map[string]interface{}{
365+
log.KeyError: err,
366+
})
367+
if stderr.Is(err, io.EOF) {
368+
s.readStream.CloseSend()
369+
s.readStream = nil
370+
c, _ := s.readCallbacks.LoadAndDelete(requestID)
371+
if c != nil {
372+
c.(readCallback)(&segpb.ReadFromBlockStreamResponse{
373+
ResponseId: requestID,
374+
ResponseCode: segpb.ResponseCode_UNKNOWN,
375+
Events: &cepb.CloudEventBatch{
376+
Events: []*cepb.CloudEvent{},
377+
},
378+
})
379+
}
380+
}
381+
return []*ce.Event{}, err
382+
}
383+
384+
wg.Wait()
385+
386+
if resp.ResponseCode != segpb.ResponseCode_SUCCESS {
387+
log.Warning(ctx, "block append failed cause unknown error", nil)
388+
return []*ce.Event{}, errors.ErrUnknown
389+
}
390+
391+
if batch := resp.GetEvents(); batch != nil {
392+
if eventpbs := batch.GetEvents(); len(eventpbs) > 0 {
393+
events := make([]*ce.Event, 0, len(eventpbs))
394+
for _, eventpb := range eventpbs {
395+
event, err2 := codec.FromProto(eventpb)
396+
if err2 != nil {
397+
// TODO: return events or error?
398+
return events, err2
399+
}
400+
events = append(events, event)
401+
}
402+
return events, nil
403+
}
404+
}
405+
return []*ce.Event{}, errors.ErrUnknown
406+
}
407+
281408
func (s *BlockStore) LookupOffset(ctx context.Context, blockID uint64, t time.Time) (int64, error) {
282409
ctx, span := s.tracer.Start(ctx, "LookupOffset")
283410
defer span.End()

0 commit comments

Comments
 (0)