Skip to content

Commit

Permalink
feat(bigquery/storage/managedwriter): support append retries (googlea…
Browse files Browse the repository at this point in the history
…pis#6695)

This PR adds support for automatic retry of failed appends.  Failures are evaluated both from the perspective of any receive errors getting the response, as well as any response that may be embedded in response from the service.  Previous behavior was to simply re-attempt failures when issuing the append.

This PR also adds a new WriterOption (`DisableWriteRetries(disable bool)`) to control this behavior (default is to have retries enabled).

For users sensitive to write duplication, this PR also exposes a new TotalAttempts method on the AppendResult, which will indicate the total number of times this write was attempted.

This also tries to clean up retries in general a bit more.  The generated client will already retry unary RPCs, subject to the [service config](https://github.com/googleapis/googleapis/blob/master/google/cloud/bigquery/storage/v1/bigquerystorage_grpc_service_config.json) present when generating the storage API.

We specifically clarify/introduce two additional kinds of retries above that: a unary retry and a stateless retry.

The unary retry is used to (re)open the underlying bidi network connection which appends are sent upon, as we want to be resilient to reconnection.  The unary retry is effectively stateful for the operation of reopening the connection, and thus uses a gax-based backoff that backs off with increasing intervals.

The stateless retry is used when processing the responses returning from the backend on the bidi stream, where backing off incrementally doesn't make sense.  Instead, we use a base backoff and jitter, and for cases where a more severe backoff is warranted (throughput exhaustion) we use a multiplication factor.  The intent here is to provide backpressure which will eventually saturate the append queue and cause blocking/rejection of writes until the backlog recovers.
  • Loading branch information
shollyman authored Sep 27, 2022
1 parent 8e09288 commit 6ae9c67
Show file tree
Hide file tree
Showing 8 changed files with 427 additions and 61 deletions.
22 changes: 19 additions & 3 deletions bigquery/storage/managedwriter/appendresult.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ type AppendResult struct {

// retains the original response.
response *storagepb.AppendRowsResponse

// retains the number of times this individual write was enqueued.
totalAttempts int
}

func newAppendResult(data [][]byte) *AppendResult {
Expand Down Expand Up @@ -146,6 +149,18 @@ func (ar *AppendResult) UpdatedSchema(ctx context.Context) (*storagepb.TableSche
}
}

// TotalAttempts returns the number of times this write was attempted.
//
// This call blocks until the result is ready, or context is no longer valid.
func (ar *AppendResult) TotalAttempts(ctx context.Context) (int, error) {
select {
case <-ctx.Done():
return 0, fmt.Errorf("context done")
case <-ar.Ready():
return ar.totalAttempts, nil
}
}

// pendingWrite tracks state for a set of rows that are part of a single
// append request.
type pendingWrite struct {
Expand Down Expand Up @@ -180,9 +195,8 @@ func newPendingWrite(ctx context.Context, appends [][]byte) *pendingWrite {
},
},
},
result: newAppendResult(appends),
attemptCount: 1,
reqCtx: ctx,
result: newAppendResult(appends),
reqCtx: ctx,
}
// We compute the size now for flow controller purposes, though
// the actual request size may be slightly larger (e.g. the first
Expand All @@ -198,6 +212,8 @@ func (pw *pendingWrite) markDone(resp *storagepb.AppendRowsResponse, err error,
pw.result.response = resp
}
pw.result.err = err
// Record the final attempts in the result for the user.
pw.result.totalAttempts = pw.attemptCount

close(pw.result.ready)
// Clear the reference to the request.
Expand Down
2 changes: 2 additions & 0 deletions bigquery/storage/managedwriter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient
gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(10 * 1024 * 1024)),
},
open: createOpenF(ctx, streamFunc),
// We add the new retryer by default, and add a new option to disable it.
retry: newStatelessRetryer(),
}

// apply writer options
Expand Down
12 changes: 11 additions & 1 deletion bigquery/storage/managedwriter/instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ var (
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendResponseErrors = stats.Int64(statsPrefix+"append_response_errors", "Number of append responses with errors attached", stats.UnitDimensionless)

// AppendRetryCount is a measure of the number of appends that were automatically retried by the library
// after receiving a non-successful response.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendRetryCount = stats.Int64(statsPrefix+"append_retry_count", "Number of appends that were retried", stats.UnitDimensionless)

// FlushRequests is a measure of the number of FlushRows requests sent.
// It is EXPERIMENTAL and subject to change or removal without notice.
FlushRequests = stats.Int64(statsPrefix+"flush_requests", "Number of FlushRows requests sent", stats.UnitDimensionless)
Expand Down Expand Up @@ -114,6 +119,10 @@ var (
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendResponseErrorsView *view.View

// AppendRetryView is a cumulative sum of AppendRetryCount.
// It is EXPERIMENTAL and subject to change or removal without notice.
AppendRetryView *view.View

// FlushRequestsView is a cumulative sum of FlushRequests.
// It is EXPERIMENTAL and subject to change or removal without notice.
FlushRequestsView *view.View
Expand All @@ -130,7 +139,7 @@ func init() {

AppendResponsesView = createSumView(stats.Measure(AppendResponses), keyStream, keyDataOrigin)
AppendResponseErrorsView = createSumView(stats.Measure(AppendResponseErrors), keyStream, keyDataOrigin, keyError)

AppendRetryView = createSumView(stats.Measure(AppendRetryCount), keyStream, keyDataOrigin)
FlushRequestsView = createSumView(stats.Measure(FlushRequests), keyStream, keyDataOrigin)

DefaultOpenCensusViews = []*view.View{
Expand All @@ -144,6 +153,7 @@ func init() {

AppendResponsesView,
AppendResponseErrorsView,
AppendRetryView,

FlushRequestsView,
}
Expand Down
97 changes: 73 additions & 24 deletions bigquery/storage/managedwriter/managed_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"io"
"sync"
"time"

"cloud.google.com/go/bigquery/internal"
"github.com/googleapis/gax-go/v2"
Expand Down Expand Up @@ -76,6 +77,7 @@ type ManagedStream struct {
destinationTable string
c *Client
fc *flowController
retry *statelessRetryer

// aspects of the stream client
ctx context.Context // retained context for the stream
Expand Down Expand Up @@ -223,7 +225,7 @@ func (ms *ManagedStream) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient
//
// Only getStream() should call this.
func (ms *ManagedStream) openWithRetry() (storagepb.BigQueryWrite_AppendRowsClient, chan *pendingWrite, error) {
r := defaultRetryer{}
r := &unaryRetryer{}
for {
recordStat(ms.ctx, AppendClientOpenCount, 1)
streamID := ""
Expand All @@ -250,7 +252,7 @@ func (ms *ManagedStream) openWithRetry() (storagepb.BigQueryWrite_AppendRowsClie
}
}
ch := make(chan *pendingWrite, depth)
go recvProcessor(ms.ctx, arc, ms.fc, ch)
go recvProcessor(ms, arc, ch)
// Also, replace the sync.Once for setting up a new stream, as we need to do "special" work
// for every new connection.
ms.streamSetup = new(sync.Once)
Expand Down Expand Up @@ -315,6 +317,8 @@ func (ms *ManagedStream) lockingAppend(pw *pendingWrite) error {
req = reqCopy
})

// Increment the attempt count.
pw.attemptCount = pw.attemptCount + 1
if req != nil {
// First append in a new connection needs properties like schema and stream name set.
err = (*arc).Send(req)
Expand All @@ -324,6 +328,8 @@ func (ms *ManagedStream) lockingAppend(pw *pendingWrite) error {
}
if err != nil {
if shouldReconnect(err) {
// certain error responses are indicative that this connection is no longer healthy.
// if we encounter them, we force a reconnect so the next append has a healthy connection.
ms.reconnect = true
}
return err
Expand All @@ -345,16 +351,11 @@ func (ms *ManagedStream) lockingAppend(pw *pendingWrite) error {
// lived bidirectional network stream, with it's own managed context (ms.ctx). requestCtx is checked
// for expiry to enable faster failures, it is not propagated more deeply.
func (ms *ManagedStream) appendWithRetry(pw *pendingWrite, opts ...gax.CallOption) error {

// Resolve retry settings.
var settings gax.CallSettings
for _, opt := range opts {
opt.Resolve(&settings)
}
var r gax.Retryer = &defaultRetryer{}
if settings.Retry != nil {
r = settings.Retry()
}

for {
appendErr := ms.lockingAppend(pw)
Expand All @@ -365,7 +366,7 @@ func (ms *ManagedStream) appendWithRetry(pw *pendingWrite, opts ...gax.CallOptio
ctx, _ := tag.New(ms.ctx, tag.Insert(keyError, status.Code().String()))
recordStat(ctx, AppendRequestErrors, 1)
}
bo, shouldRetry := r.Retry(appendErr)
bo, shouldRetry := ms.statelessRetryer().Retry(appendErr, pw.attemptCount)
if shouldRetry {
if err := gax.Sleep(ms.ctx, bo); err != nil {
return err
Expand Down Expand Up @@ -466,44 +467,92 @@ func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, opts ...

// recvProcessor is used to propagate append responses back up with the originating write requests in a goroutine.
//
// The receive processor only deals with a single instance of a connection/channel, and thus should never interact
// with the mutex lock.
func recvProcessor(ctx context.Context, arc storagepb.BigQueryWrite_AppendRowsClient, fc *flowController, ch <-chan *pendingWrite) {
// TODO: We'd like to re-send requests that are in an ambiguous state due to channel errors. For now, we simply
// ensure that pending writes get acknowledged with a terminal state.
// The receive processor is only responsible for a single bidi channel/channel. As new connections are established,
// each gets it's own instance of a processor.
//
// The ManagedStream reference is used for performing re-enqueing of failed writes.
func recvProcessor(ms *ManagedStream, arc storagepb.BigQueryWrite_AppendRowsClient, ch <-chan *pendingWrite) {
for {
select {
case <-ctx.Done():
// Context is done, so we're not going to get further updates. Mark all work failed with the context error.
case <-ms.ctx.Done():
// Context is done, so we're not going to get further updates. Mark all work left in the channel
// with the context error. We don't attempt to re-enqueue in this case.
for {
pw, ok := <-ch
if !ok {
return
}
pw.markDone(nil, ctx.Err(), fc)
pw.markDone(nil, ms.ctx.Err(), ms.fc)
}
case nextWrite, ok := <-ch:
if !ok {
// Channel closed, all elements processed.
return
}

// block until we get a corresponding response or err from stream.
resp, err := arc.Recv()
if err != nil {
nextWrite.markDone(nil, err, fc)
// Evaluate the error from the receive and possibly retry.
ms.processRetry(nextWrite, nil, err)
// We're done with the write regardless of outcome, continue onto the
// next element.
continue
}
recordStat(ctx, AppendResponses, 1)
// Record that we did in fact get a response from the backend.
recordStat(ms.ctx, AppendResponses, 1)

if status := resp.GetError(); status != nil {
tagCtx, _ := tag.New(ctx, tag.Insert(keyError, codes.Code(status.GetCode()).String()))
if err != nil {
tagCtx = ctx
// The response from the backend embedded a status error. We record that the error
// occurred, and tag it based on the response code of the status.
if tagCtx, tagErr := tag.New(ms.ctx, tag.Insert(keyError, codes.Code(status.GetCode()).String())); tagErr == nil {
recordStat(tagCtx, AppendResponseErrors, 1)
}
respErr := grpcstatus.ErrorProto(status)
if _, shouldRetry := ms.statelessRetryer().Retry(respErr, nextWrite.attemptCount); shouldRetry {
// We use the status error to evaluate and possible re-enqueue the write.
ms.processRetry(nextWrite, resp, respErr)
// We're done with the write regardless of outcome, continue on to the next
// element.
continue
}
recordStat(tagCtx, AppendResponseErrors, 1)
}
nextWrite.markDone(resp, nil, fc)
// We had no error in the receive or in the response. Mark the write done.
nextWrite.markDone(resp, nil, ms.fc)
}
}
}

// processRetry is responsible for evaluating and re-enqueing an append.
// If the append is not retried, it is marked complete.
func (ms *ManagedStream) processRetry(pw *pendingWrite, appendResp *storagepb.AppendRowsResponse, initialErr error) {
err := initialErr
for {
pause, shouldRetry := ms.retry.Retry(err, pw.attemptCount)
if !shouldRetry {
// Should not attempt to re-append.
pw.markDone(appendResp, err, ms.fc)
return
}
time.Sleep(pause)
err = ms.appendWithRetry(pw)
if err != nil {
// Re-enqueue failed, send it through the loop again.
continue
}
// Break out of the loop, we were successful and the write has been
// re-inserted.
recordStat(ms.ctx, AppendRetryCount, 1)
break
}
}

// returns the stateless retryer. If one's not set (re-enqueue retries disabled),
// it returns a retryer that only permits single attempts.
func (ms *ManagedStream) statelessRetryer() *statelessRetryer {
if ms.retry != nil {
return ms.retry
}
return &statelessRetryer{
maxAttempts: 1,
}
}
Loading

0 comments on commit 6ae9c67

Please sign in to comment.