Skip to content

Commit

Permalink
refactor: simplify exponential backoff and refactor env (#1185)
Browse files Browse the repository at this point in the history
Co-authored-by: Kévin Dunglas <[email protected]>
  • Loading branch information
withinboredom and dunglas authored Nov 23, 2024
1 parent 449a0e7 commit 1e279bc
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 130 deletions.
51 changes: 51 additions & 0 deletions backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package frankenphp

import (
"sync"
"time"
)

type exponentialBackoff struct {
backoff time.Duration
failureCount int
mu sync.RWMutex
maxBackoff time.Duration
minBackoff time.Duration
maxConsecutiveFailures int
}

// recordSuccess resets the backoff and failureCount
func (e *exponentialBackoff) recordSuccess() {
e.mu.Lock()
e.failureCount = 0
e.backoff = e.minBackoff
e.mu.Unlock()
}

// recordFailure increments the failure count and increases the backoff, it returns true if maxConsecutiveFailures has been reached
func (e *exponentialBackoff) recordFailure() bool {
e.mu.Lock()
e.failureCount += 1
if e.backoff < e.minBackoff {
e.backoff = e.minBackoff
}

e.backoff = min(e.backoff*2, e.maxBackoff)

e.mu.Unlock()
return e.failureCount >= e.maxConsecutiveFailures
}

// wait sleeps for the backoff duration if failureCount is non-zero.
// NOTE: this is not tested and should be kept 'obviously correct' (i.e., simple)
func (e *exponentialBackoff) wait() {
e.mu.RLock()
if e.failureCount == 0 {
e.mu.RUnlock()

return
}
e.mu.RUnlock()

time.Sleep(e.backoff)
}
41 changes: 41 additions & 0 deletions backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package frankenphp

import (
"github.com/stretchr/testify/assert"
"testing"
"time"
)

func TestExponentialBackoff_Reset(t *testing.T) {
e := &exponentialBackoff{
maxBackoff: 5 * time.Second,
minBackoff: 500 * time.Millisecond,
maxConsecutiveFailures: 3,
}

assert.False(t, e.recordFailure())
assert.False(t, e.recordFailure())
e.recordSuccess()

e.mu.RLock()
defer e.mu.RUnlock()
assert.Equal(t, 0, e.failureCount, "expected failureCount to be reset to 0")
assert.Equal(t, e.backoff, e.minBackoff, "expected backoff to be reset to minBackoff")
}

func TestExponentialBackoff_Trigger(t *testing.T) {
e := &exponentialBackoff{
maxBackoff: 500 * 3 * time.Millisecond,
minBackoff: 500 * time.Millisecond,
maxConsecutiveFailures: 3,
}

assert.False(t, e.recordFailure())
assert.False(t, e.recordFailure())
assert.True(t, e.recordFailure())

e.mu.RLock()
defer e.mu.RUnlock()
assert.Equal(t, e.failureCount, e.maxConsecutiveFailures, "expected failureCount to be maxConsecutiveFailures")
assert.Equal(t, e.backoff, e.maxBackoff, "expected backoff to be maxBackoff")
}
74 changes: 74 additions & 0 deletions env.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package frankenphp

// #include "frankenphp.h"
import "C"
import (
"os"
"strings"
"unsafe"
)

//export go_putenv
func go_putenv(str *C.char, length C.int) C.bool {
envString := C.GoStringN(str, length)

// Check if '=' is present in the string
if key, val, found := strings.Cut(envString, "="); found {
return os.Setenv(key, val) == nil
}

// No '=', unset the environment variable
return os.Unsetenv(envString) == nil
}

//export go_getfullenv
func go_getfullenv(threadIndex C.uintptr_t) (*C.go_string, C.size_t) {
thread := phpThreads[threadIndex]

env := os.Environ()
goStrings := make([]C.go_string, len(env)*2)

for i, envVar := range env {
key, val, _ := strings.Cut(envVar, "=")
goStrings[i*2] = C.go_string{C.size_t(len(key)), thread.pinString(key)}
goStrings[i*2+1] = C.go_string{C.size_t(len(val)), thread.pinString(val)}
}

value := unsafe.SliceData(goStrings)
thread.Pin(value)

return value, C.size_t(len(env))
}

//export go_getenv
func go_getenv(threadIndex C.uintptr_t, name *C.go_string) (C.bool, *C.go_string) {
thread := phpThreads[threadIndex]

// Create a byte slice from C string with a specified length
envName := C.GoStringN(name.data, C.int(name.len))

// Get the environment variable value
envValue, exists := os.LookupEnv(envName)
if !exists {
// Environment variable does not exist
return false, nil // Return 0 to indicate failure
}

// Convert Go string to C string
value := &C.go_string{C.size_t(len(envValue)), thread.pinString(envValue)}
thread.Pin(value)

return true, value // Return 1 to indicate success
}

//export go_sapi_getenv
func go_sapi_getenv(threadIndex C.uintptr_t, name *C.go_string) *C.char {
envName := C.GoStringN(name.data, C.int(name.len))

envValue, exists := os.LookupEnv(envName)
if !exists {
return nil
}

return phpThreads[threadIndex].pinCString(envValue)
}
82 changes: 0 additions & 82 deletions frankenphp.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,88 +505,6 @@ func ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) error
return nil
}

//export go_putenv
func go_putenv(str *C.char, length C.int) C.bool {
// Create a byte slice from C string with a specified length
s := C.GoBytes(unsafe.Pointer(str), length)

// Convert byte slice to string
envString := string(s)

// Check if '=' is present in the string
if key, val, found := strings.Cut(envString, "="); found {
if os.Setenv(key, val) != nil {
return false // Failure
}
} else {
// No '=', unset the environment variable
if os.Unsetenv(envString) != nil {
return false // Failure
}
}

return true // Success
}

//export go_getfullenv
func go_getfullenv(threadIndex C.uintptr_t) (*C.go_string, C.size_t) {
thread := phpThreads[threadIndex]

env := os.Environ()
goStrings := make([]C.go_string, len(env)*2)

for i, envVar := range env {
key, val, _ := strings.Cut(envVar, "=")
k := unsafe.StringData(key)
v := unsafe.StringData(val)
thread.Pin(k)
thread.Pin(v)

goStrings[i*2] = C.go_string{C.size_t(len(key)), (*C.char)(unsafe.Pointer(k))}
goStrings[i*2+1] = C.go_string{C.size_t(len(val)), (*C.char)(unsafe.Pointer(v))}
}

value := unsafe.SliceData(goStrings)
thread.Pin(value)

return value, C.size_t(len(env))
}

//export go_getenv
func go_getenv(threadIndex C.uintptr_t, name *C.go_string) (C.bool, *C.go_string) {
thread := phpThreads[threadIndex]

// Create a byte slice from C string with a specified length
envName := C.GoStringN(name.data, C.int(name.len))

// Get the environment variable value
envValue, exists := os.LookupEnv(envName)
if !exists {
// Environment variable does not exist
return false, nil // Return 0 to indicate failure
}

// Convert Go string to C string
val := unsafe.StringData(envValue)
thread.Pin(val)
value := &C.go_string{C.size_t(len(envValue)), (*C.char)(unsafe.Pointer(val))}
thread.Pin(value)

return true, value // Return 1 to indicate success
}

//export go_sapi_getenv
func go_sapi_getenv(threadIndex C.uintptr_t, name *C.go_string) *C.char {
envName := C.GoStringN(name.data, C.int(name.len))

envValue, exists := os.LookupEnv(envName)
if !exists {
return nil
}

return phpThreads[threadIndex].pinCString(envValue)
}

//export go_handle_request
func go_handle_request(threadIndex C.uintptr_t) bool {
select {
Expand Down
60 changes: 12 additions & 48 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ type worker struct {
ready chan struct{}
}

const maxWorkerErrorBackoff = 1 * time.Second
const minWorkerErrorBackoff = 100 * time.Millisecond
const maxWorkerConsecutiveFailures = 6

var (
watcherIsEnabled bool
workerShutdownWG sync.WaitGroup
Expand Down Expand Up @@ -97,33 +93,15 @@ func newWorker(o workerOpt) (*worker, error) {
func (worker *worker) startNewWorkerThread() {
workerShutdownWG.Add(1)
defer workerShutdownWG.Done()

backoff := minWorkerErrorBackoff
failureCount := 0
backingOffLock := sync.RWMutex{}
backoff := &exponentialBackoff{
maxBackoff: 1 * time.Second,
minBackoff: 100 * time.Millisecond,
maxConsecutiveFailures: 6,
}

for {
// if the worker can stay up longer than backoff*2, it is probably an application error
upFunc := sync.Once{}
go func() {
backingOffLock.RLock()
wait := backoff * 2
backingOffLock.RUnlock()
time.Sleep(wait)
upFunc.Do(func() {
backingOffLock.Lock()
defer backingOffLock.Unlock()
// if we come back to a stable state, reset the failure count
if backoff == minWorkerErrorBackoff {
failureCount = 0
}

// earn back the backoff over time
if failureCount > 0 {
backoff = max(backoff/2, 100*time.Millisecond)
}
})
}()
backoff.wait()

metrics.StartWorker(worker.fileName)

Expand Down Expand Up @@ -176,31 +154,17 @@ func (worker *worker) startNewWorkerThread() {
c.Write(zap.String("worker", worker.fileName))
}
metrics.StopWorker(worker.fileName, StopReasonRestart)
backoff.recordSuccess()
continue
}

// on exit status 1 we log the error and apply an exponential backoff when restarting
upFunc.Do(func() {
backingOffLock.Lock()
defer backingOffLock.Unlock()
// if we end up here, the worker has not been up for backoff*2
// this is probably due to a syntax error or another fatal error
if failureCount >= maxWorkerConsecutiveFailures {
if !watcherIsEnabled {
panic(fmt.Errorf("workers %q: too many consecutive failures", worker.fileName))
}
logger.Warn("many consecutive worker failures", zap.String("worker", worker.fileName), zap.Int("failures", failureCount))
if backoff.recordFailure() {
if !watcherIsEnabled {
panic(fmt.Errorf("workers %q: too many consecutive failures", worker.fileName))
}
failureCount += 1
})
backingOffLock.RLock()
wait := backoff
backingOffLock.RUnlock()
time.Sleep(wait)
backingOffLock.Lock()
backoff *= 2
backoff = min(backoff, maxWorkerErrorBackoff)
backingOffLock.Unlock()
logger.Warn("many consecutive worker failures", zap.String("worker", worker.fileName), zap.Int("failures", backoff.failureCount))
}
metrics.StopWorker(worker.fileName, StopReasonCrash)
}

Expand Down

0 comments on commit 1e279bc

Please sign in to comment.