Skip to content

Commit

Permalink
feat(bigquery/storage/managedwriter): define append retry predicate (#…
Browse files Browse the repository at this point in the history
…6650)

* feat(bigquery/storage/managedwriter): define append retry predicate

This PR models the retry predicate we'll use for evaluating whether
appends should be retried automatically.
  • Loading branch information
shollyman authored Sep 16, 2022
1 parent 596d6e6 commit 478b8dd
Showing 1 changed file with 44 additions and 6 deletions.
50 changes: 44 additions & 6 deletions bigquery/storage/managedwriter/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,35 +18,73 @@ import (
"context"
"errors"
"io"
"strings"
"time"

"github.com/googleapis/gax-go/v2"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

var (
defaultAppendRetries = 3
)

func newDefaultRetryer() *defaultRetryer {
return &defaultRetryer{
bigBo: gax.Backoff{
Initial: 2 * time.Second,
Multiplier: 5,
Max: 5 * time.Minute,
},
}
}

type defaultRetryer struct {
bo gax.Backoff
bo gax.Backoff
bigBo gax.Backoff // For more aggressive backoff, such as throughput quota
}

func (r *defaultRetryer) Retry(err error) (pause time.Duration, shouldRetry bool) {
// TODO: refine this logic in a subsequent PR, there's some service-specific
// retry predicates in addition to statuscode-based.
// This predicate evaluates errors for both enqueuing and reconnection.
// See RetryAppend for retry that bounds attempts to a fixed number.
s, ok := status.FromError(err)
if !ok {
// Treat context errors as non-retriable.
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return r.bo.Pause(), false
}
// EOF can happen in the case of connection close.
if errors.Is(err, io.EOF) {
return r.bo.Pause(), true
}
// Any other non-status based errors treated as retryable.
return r.bo.Pause(), true
}
switch s.Code() {
case codes.Unavailable:
case codes.Aborted,
codes.Canceled,
codes.DeadlineExceeded,
codes.Internal,
codes.Unavailable:
return r.bo.Pause(), true
default:
return r.bo.Pause(), false
case codes.ResourceExhausted:
if strings.HasPrefix(s.Message(), "Exceeds 'AppendRows throughput' quota") {
// Note: internal b/246031522 opened to give this a structured error
// and avoid string parsing. Should be a QuotaFailure or similar.
return r.bigBo.Pause(), true // more aggressive backoff
}
}
return 0, false
}

// RetryAppend is a variation of the retry predicate that also bounds retries to a finite number of attempts.
func (r *defaultRetryer) RetryAppend(err error, attemptCount int) (pause time.Duration, shouldRetry bool) {

if attemptCount > defaultAppendRetries {
return 0, false // exceeded maximum retries.
}
return r.Retry(err)
}

// shouldReconnect is akin to a retry predicate, in that it evaluates whether we should force
Expand Down

0 comments on commit 478b8dd

Please sign in to comment.