@@ -38,13 +38,9 @@ import (
3838 "github.com/linkall-labs/vanus/client/internal/vanus/codec"
3939 "github.com/linkall-labs/vanus/client/internal/vanus/net/rpc"
4040 "github.com/linkall-labs/vanus/client/internal/vanus/net/rpc/bare"
41- "github.com/linkall-labs/vanus/client/pkg/api"
4241 "github.com/linkall-labs/vanus/client/pkg/primitive"
4342 "github.com/linkall-labs/vanus/observability/log"
44- )
45-
46- const (
47- defaultTaskChannelBuffer = 512
43+ "github.com/linkall-labs/vanus/pkg/errors"
4844)
4945
5046func newBlockStore (endpoint string ) (* BlockStore , error ) {
@@ -56,120 +52,124 @@ func newBlockStore(endpoint string) (*BlockStore, error) {
5652 })),
5753 tracer : tracing .NewTracer ("internal.store.BlockStore" , trace .SpanKindClient ),
5854 }
59- s .stream , err = s .connect (context .Background ())
55+ s .appendStream , err = s .connectAppendStream (context .Background ())
6056 if err != nil {
6157 // TODO: check error
6258 return nil , err
6359 }
64- s .taskC = make (chan Task , defaultTaskChannelBuffer )
65- s .run (context .Background ())
66- s .receive (context .Background (), s .stream )
60+ s .readStream , err = s .connectReadStream (context .Background ())
61+ if err != nil {
62+ // TODO: check error
63+ return nil , err
64+ }
65+ s .runAppendStreamRecv (context .Background (), s .appendStream )
66+ s .runReadStreamRecv (context .Background (), s .readStream )
6767 return s , nil
6868}
6969
7070type BlockStore struct {
7171 primitive.RefCount
72- client rpc.Client
73- tracer * tracing.Tracer
74- stream segpb.SegmentServer_AppendToBlockStreamClient
75- taskC chan Task
76- callbacks sync.Map
77- mu sync.Mutex
72+ client rpc.Client
73+ tracer * tracing.Tracer
74+ appendStream segpb.SegmentServer_AppendToBlockStreamClient
75+ readStream segpb.SegmentServer_ReadFromBlockStreamClient
76+ appendCallbacks sync.Map
77+ readCallbacks sync.Map
78+ appendMu sync.Mutex
79+ readMu sync.Mutex
7880}
7981
80- type Task struct {
81- request * segpb.AppendToBlockStreamRequest
82- cb api.Callback
83- }
82+ type appendCallback func (* segpb.AppendToBlockStreamResponse )
83+ type readCallback func (* segpb.ReadFromBlockStreamResponse )
8484
85- func (s * BlockStore ) run (ctx context.Context ) {
85+ func (s * BlockStore ) runAppendStreamRecv (ctx context.Context , stream segpb. SegmentServer_AppendToBlockStreamClient ) {
8686 go func () {
8787 for {
88- var err error
89- select {
90- case <- ctx .Done ():
91- s .stream .CloseSend ()
92- s .stream = nil
93- return
94- case task := <- s .taskC :
95- stream := s .stream
96- if stream == nil {
97- if stream , err = s .connect (ctx ); err != nil {
98- task .cb (err )
99- break
100- }
101- s .receive (ctx , stream )
102- s .stream = stream
103- }
104- if err = stream .Send (task .request ); err != nil {
105- log .Warning (ctx , "===Send failed===" , map [string ]interface {}{
106- log .KeyError : err ,
107- })
108- s .processSendError (task , err )
109- }
88+ res , err := stream .Recv ()
89+ if err != nil {
90+ log .Error (ctx , "append stream recv failed" , map [string ]interface {}{
91+ log .KeyError : err ,
92+ })
93+ break
94+ }
95+ c , _ := s .appendCallbacks .LoadAndDelete (res .ResponseId )
96+ if c != nil {
97+ c .(appendCallback )(res )
11098 }
11199 }
112100 }()
113101}
114102
115- func (s * BlockStore ) receive (ctx context.Context , stream segpb.SegmentServer_AppendToBlockStreamClient ) {
103+ func (s * BlockStore ) runReadStreamRecv (ctx context.Context , stream segpb.SegmentServer_ReadFromBlockStreamClient ) {
116104 go func () {
117105 for {
118106 res , err := stream .Recv ()
119107 if err != nil {
120- log .Warning (ctx , "===Recv failed=== " , map [string ]interface {}{
108+ log .Error (ctx , "read stream recv failed" , map [string ]interface {}{
121109 log .KeyError : err ,
122110 })
123111 break
124112 }
125- c , _ := s .callbacks .LoadAndDelete (res .ResponseId )
126- if c == nil {
127- // TODO(jiangkai): check err
128- continue
129- }
130- if res .ResponseCode != segpb .ResponseCode_SUCCESS {
131- c .(api.Callback )(stderr .New (res .ResponseCode .String ()))
113+ c , _ := s .readCallbacks .LoadAndDelete (res .ResponseId )
114+ if c != nil {
115+ c .(readCallback )(res )
132116 }
133117 }
134118 }()
135119}
136120
137- func (s * BlockStore ) connect (ctx context.Context ) (segpb.SegmentServer_AppendToBlockStreamClient , error ) {
138- if s .stream != nil {
139- return s .stream , nil
121+ func (s * BlockStore ) connectAppendStream (ctx context.Context ) (segpb.SegmentServer_AppendToBlockStreamClient , error ) {
122+ if s .appendStream != nil {
123+ return s .appendStream , nil
140124 }
141125
142- s .mu .Lock ()
143- defer s .mu .Unlock ()
126+ s .appendMu .Lock ()
127+ defer s .appendMu .Unlock ()
144128
145- if s .stream != nil { //double check
146- return s .stream , nil
129+ if s .appendStream != nil { //double check
130+ return s .appendStream , nil
147131 }
148132
149133 client , err := s .client .Get (ctx )
150134 if err != nil {
151135 return nil , err
152136 }
153137
154- stream , err := client .(segpb.SegmentServerClient ).AppendToBlockStream (context . Background () )
138+ stream , err := client .(segpb.SegmentServerClient ).AppendToBlockStream (ctx )
155139 if err != nil {
156- log .Warning (ctx , "===Get Stream failed=== " , map [string ]interface {}{
140+ log .Warning (ctx , "get append stream failed" , map [string ]interface {}{
157141 log .KeyError : err ,
158142 })
159143 return nil , err
160144 }
161145 return stream , nil
162146}
163147
164- func (s * BlockStore ) processSendError (t Task , err error ) {
165- cb , _ := s .callbacks .LoadAndDelete (t .request .RequestId )
166- if cb != nil {
167- cb .(api.Callback )(err )
148+ func (s * BlockStore ) connectReadStream (ctx context.Context ) (segpb.SegmentServer_ReadFromBlockStreamClient , error ) {
149+ if s .readStream != nil {
150+ return s .readStream , nil
168151 }
169- if stderr .Is (err , io .EOF ) {
170- s .stream .CloseSend ()
171- s .stream = nil
152+
153+ s .readMu .Lock ()
154+ defer s .readMu .Unlock ()
155+
156+ if s .readStream != nil { //double check
157+ return s .readStream , nil
158+ }
159+
160+ client , err := s .client .Get (ctx )
161+ if err != nil {
162+ return nil , err
163+ }
164+
165+ stream , err := client .(segpb.SegmentServerClient ).ReadFromBlockStream (ctx )
166+ if err != nil {
167+ log .Warning (ctx , "get read stream failed" , map [string ]interface {}{
168+ log .KeyError : err ,
169+ })
170+ return nil , err
172171 }
172+ return stream , nil
173173}
174174
175175func (s * BlockStore ) Endpoint () string {
@@ -207,34 +207,79 @@ func (s *BlockStore) Append(ctx context.Context, block uint64, event *ce.Event)
207207 return res .GetOffsets ()[0 ], nil
208208}
209209
210- func (s * BlockStore ) AppendStream (ctx context.Context , block uint64 , event * ce.Event , cb api. Callback ) {
211- _ , span := s .tracer .Start (ctx , "AppendStream " )
210+ func (s * BlockStore ) SyncAppendStream (ctx context.Context , block uint64 , event * ce.Event ) ( int64 , error ) {
211+ _ctx , span := s .tracer .Start (ctx , "SyncAppendStream " )
212212 defer span .End ()
213213
214214 var (
215- err error
215+ err error
216+ wg sync.WaitGroup
217+ resp * segpb.AppendToBlockStreamResponse
216218 )
217219
218- eventpb , err := codec .ToProto (event )
219- if err != nil {
220- cb (err )
221- return
220+ if s .appendStream == nil {
221+ s .appendStream , err = s .connectAppendStream (_ctx )
222+ if err != nil {
223+ return - 1 , err
224+ }
225+ s .runAppendStreamRecv (_ctx , s .appendStream )
222226 }
223227
224228 // generate unique RequestId
225229 requestID := rand .Uint64 ()
226- s .callbacks .Store (requestID , cb )
227- task := Task {
228- request : & segpb.AppendToBlockStreamRequest {
229- RequestId : requestID ,
230- BlockId : block ,
231- Events : & cepb.CloudEventBatch {
232- Events : []* cepb.CloudEvent {eventpb },
233- },
230+
231+ wg .Add (1 )
232+
233+ eventpb , err := codec .ToProto (event )
234+ if err != nil {
235+ return - 1 , err
236+ }
237+
238+ s .appendCallbacks .Store (requestID , appendCallback (func (res * segpb.AppendToBlockStreamResponse ) {
239+ resp = res
240+ wg .Done ()
241+ }))
242+
243+ req := & segpb.AppendToBlockStreamRequest {
244+ RequestId : requestID ,
245+ BlockId : block ,
246+ Events : & cepb.CloudEventBatch {
247+ Events : []* cepb.CloudEvent {eventpb },
234248 },
235- cb : cb ,
236249 }
237- s .taskC <- task
250+
251+ if err = s .appendStream .Send (req ); err != nil {
252+ log .Error (ctx , "append stream send failed" , map [string ]interface {}{
253+ log .KeyError : err ,
254+ })
255+ if stderr .Is (err , io .EOF ) {
256+ s .appendStream .CloseSend ()
257+ s .appendStream = nil
258+ c , _ := s .appendCallbacks .LoadAndDelete (requestID )
259+ if c != nil {
260+ c .(appendCallback )(& segpb.AppendToBlockStreamResponse {
261+ ResponseId : requestID ,
262+ ResponseCode : segpb .ResponseCode_UNKNOWN ,
263+ Offsets : []int64 {},
264+ })
265+ }
266+ }
267+ return - 1 , err
268+ }
269+
270+ wg .Wait ()
271+
272+ if resp .ResponseCode == segpb .ResponseCode_SegmentFull {
273+ log .Warning (ctx , "block append failed cause the segment is full" , nil )
274+ return - 1 , errors .ErrSegmentFull
275+ }
276+
277+ if resp .ResponseCode != segpb .ResponseCode_SUCCESS {
278+ log .Warning (ctx , "block append failed cause unknown error" , nil )
279+ return - 1 , errors .ErrUnknown
280+ }
281+
282+ return resp .Offsets [0 ], nil
238283}
239284
240285func (s * BlockStore ) Read (
@@ -278,6 +323,88 @@ func (s *BlockStore) Read(
278323 return []* ce.Event {}, err
279324}
280325
326+ func (s * BlockStore ) SyncReadStream (
327+ ctx context.Context , block uint64 , offset int64 , size int16 , pollingTimeout uint32 ,
328+ ) ([]* ce.Event , error ) {
329+ _ctx , span := s .tracer .Start (ctx , "SyncReadStream" )
330+ defer span .End ()
331+
332+ var (
333+ err error
334+ wg sync.WaitGroup
335+ resp * segpb.ReadFromBlockStreamResponse
336+ )
337+
338+ if s .readStream == nil {
339+ s .readStream , err = s .connectReadStream (_ctx )
340+ if err != nil {
341+ return []* ce.Event {}, err
342+ }
343+ s .runReadStreamRecv (_ctx , s .readStream )
344+ }
345+
346+ // generate unique RequestId
347+ requestID := rand .Uint64 ()
348+
349+ wg .Add (1 )
350+
351+ s .appendCallbacks .Store (requestID , readCallback (func (res * segpb.ReadFromBlockStreamResponse ) {
352+ resp = res
353+ wg .Done ()
354+ }))
355+
356+ req := & segpb.ReadFromBlockStreamRequest {
357+ BlockId : block ,
358+ Offset : offset ,
359+ Number : int64 (size ),
360+ PollingTimeout : pollingTimeout ,
361+ }
362+
363+ if err = s .readStream .Send (req ); err != nil {
364+ log .Error (ctx , "read stream send failed" , map [string ]interface {}{
365+ log .KeyError : err ,
366+ })
367+ if stderr .Is (err , io .EOF ) {
368+ s .readStream .CloseSend ()
369+ s .readStream = nil
370+ c , _ := s .readCallbacks .LoadAndDelete (requestID )
371+ if c != nil {
372+ c .(readCallback )(& segpb.ReadFromBlockStreamResponse {
373+ ResponseId : requestID ,
374+ ResponseCode : segpb .ResponseCode_UNKNOWN ,
375+ Events : & cepb.CloudEventBatch {
376+ Events : []* cepb.CloudEvent {},
377+ },
378+ })
379+ }
380+ }
381+ return []* ce.Event {}, err
382+ }
383+
384+ wg .Wait ()
385+
386+ if resp .ResponseCode != segpb .ResponseCode_SUCCESS {
387+ log .Warning (ctx , "block append failed cause unknown error" , nil )
388+ return []* ce.Event {}, errors .ErrUnknown
389+ }
390+
391+ if batch := resp .GetEvents (); batch != nil {
392+ if eventpbs := batch .GetEvents (); len (eventpbs ) > 0 {
393+ events := make ([]* ce.Event , 0 , len (eventpbs ))
394+ for _ , eventpb := range eventpbs {
395+ event , err2 := codec .FromProto (eventpb )
396+ if err2 != nil {
397+ // TODO: return events or error?
398+ return events , err2
399+ }
400+ events = append (events , event )
401+ }
402+ return events , nil
403+ }
404+ }
405+ return []* ce.Event {}, errors .ErrUnknown
406+ }
407+
281408func (s * BlockStore ) LookupOffset (ctx context.Context , blockID uint64 , t time.Time ) (int64 , error ) {
282409 ctx , span := s .tracer .Start (ctx , "LookupOffset" )
283410 defer span .End ()
0 commit comments