Skip to content

Commit

Permalink
chore: CR fixes (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
or-shachar authored Nov 23, 2023
1 parent 0bc8aa9 commit b8df310
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 95 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ We support S3 backend for caching.
You can connect to S3 backend by setting the following parameters:
- `GOCACHE_S3_BUCKET` - Name of S3 bucket
- `GOCACHE_AWS_REGION` - AWS Region of bucket
- `GOCACHE_AWS_ACCESS_KEY` + `GOCACHE_AWS_SECRET_KEY` / `GOCACHE_AWS_PROFILE` - Direct credentials or creds profile to use.
- `GOCACHE_AWS_ACCESS_KEY` + `GOCACHE_AWS_SECRET_KEY` / `GOCACHE_AWS_CREDS_PROFILE` - Direct credentials or creds profile to use.
- `GOCACHE_CACHE_KEY` - (Optional, default `v1`) Unique key

The cache would be stored to `s3://<bucket>/cache/<cache_key>/<architecture>/<os>/<go-version>`
84 changes: 30 additions & 54 deletions cachers/combined.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ type CombinedCache struct {
getsMetrics *timeKeeper
}

func (l *CombinedCache) Kind() string {
return "combined"
}
var _ LocalCache = &CombinedCache{}

func NewCombinedCache(localCache LocalCache, remoteCache RemoteCache, verbose bool) LocalCache {
cache := &CombinedCache{
Expand All @@ -39,6 +37,10 @@ func NewCombinedCache(localCache LocalCache, remoteCache RemoteCache, verbose bo
return cache
}

func (l *CombinedCache) Kind() string {
return "combined"
}

func (l *CombinedCache) Start() error {
err := l.localCache.Start()
if err != nil {
Expand All @@ -53,23 +55,6 @@ func (l *CombinedCache) Start() error {
return nil
}

func (l *CombinedCache) Close() error {
err := l.localCache.Close()
if err != nil {
err = fmt.Errorf("local cache stop failed: %w", err)
}
err = l.remoteCache.Close()
if err != nil {
err = errors.Join(fmt.Errorf("remote cache stop failed: %w", err))
}
l.putsMetrics.Stop()
l.getsMetrics.Stop()
if l.verbose {
log.Printf("[%s]\tDownloads: %s, Uploads %s", l.remoteCache.Kind(), l.getsMetrics.Summary(), l.putsMetrics.Summary())
}
return err
}

func (l *CombinedCache) Get(ctx context.Context, actionID string) (outputID, diskPath string, err error) {
outputID, diskPath, err = l.localCache.Get(ctx, actionID)
if err == nil && outputID != "" {
Expand All @@ -94,17 +79,18 @@ func (l *CombinedCache) Get(ctx context.Context, actionID string) (outputID, dis

func (l *CombinedCache) Put(ctx context.Context, actionID, outputID string, size int64, body io.Reader) (string, error) {
pr, pw := io.Pipe()
diskPutCh := make(chan any, 1)
diskPathCh := make(chan string, 1)
errCh := make(chan error, 1)
go func() {
var putBody io.Reader = pr
if size == 0 {
putBody = bytes.NewReader(nil)
}
diskPath, err := l.localCache.Put(ctx, actionID, outputID, size, putBody)
if err != nil {
diskPutCh <- err
errCh <- err
} else {
diskPutCh <- diskPath
diskPathCh <- diskPath
}
}()

Expand All @@ -124,38 +110,28 @@ func (l *CombinedCache) Put(ctx context.Context, actionID, outputID string, size
return "", e
})
pw.Close()
v := <-diskPutCh
if err, ok := v.(error); ok {
log.Printf("HTTPCache.Put local disk error: %v", err)
select {
case err := <-errCh:
log.Printf("[%s]\terror: %v", l.localCache.Kind(), err)
return "", err
case diskPath := <-diskPathCh:
return diskPath, nil
}
return v.(string), nil
}

// TODO: DELETEME
// func (l *CombinedCache) putOld(ctx context.Context, actionID, outputID string, size int64, body io.Reader) (diskPath string, err error) {
// var localError, remoteError error
// var bytesReaderForDisk io.Reader
// var bytesBufferRemote bytes.Buffer
// if size == 0 {
// bytesReaderForDisk = bytes.NewReader(nil)
// bytesBufferRemote = bytes.Buffer{}
// } else {
// bytesReaderForDisk = io.TeeReader(body, &bytesBufferRemote)
// }
// // TODO or-shachar: Can we stream the data in parallel to both caches?
// diskPath, localError = l.localCache.Put(ctx, actionID, outputID, size, bytesReaderForDisk)
// if localError != nil {
// return "", localError
// }
// _, remoteError = l.putsMetrics.DoWithMeasure(size, func() (string, error) {
// e := l.remoteCache.Put(ctx, actionID, outputID, size, &bytesBufferRemote)
// return "", e
// })
// if remoteError != nil {
// return "", remoteError
// }
// return diskPath, nil
// }

var _ LocalCache = &CombinedCache{}
func (l *CombinedCache) Close() error {
err := l.localCache.Close()
if err != nil {
err = fmt.Errorf("local cache stop failed: %w", err)
}
err = l.remoteCache.Close()
if err != nil {
err = errors.Join(fmt.Errorf("remote cache stop failed: %w", err))
}
l.putsMetrics.Stop()
l.getsMetrics.Stop()
if l.verbose {
log.Printf("[%s]\tDownloads: %s, Uploads %s", l.remoteCache.Kind(), l.getsMetrics.Summary(), l.putsMetrics.Summary())
}
return err
}
8 changes: 4 additions & 4 deletions cachers/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,6 @@ func (dc *SimpleDiskCache) Start() error {
return nil
}

func (dc *SimpleDiskCache) Close() error {
return nil
}

func (dc *SimpleDiskCache) Get(ctx context.Context, actionID string) (outputID, diskPath string, err error) {
actionFile := filepath.Join(dc.dir, fmt.Sprintf("a-%s", actionID))
ij, err := os.ReadFile(actionFile)
Expand Down Expand Up @@ -109,6 +105,10 @@ func (dc *SimpleDiskCache) Put(ctx context.Context, actionID, objectID string, s
return file, nil
}

func (dc *SimpleDiskCache) Close() error {
return nil
}

func writeAtomic(dest string, r io.Reader) (int64, error) {
tf, err := os.CreateTemp(filepath.Dir(dest), filepath.Base(dest)+".*")
if err != nil {
Expand Down
31 changes: 19 additions & 12 deletions cachers/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,26 @@ type ActionValue struct {

// HTTPCache is a RemoteCache that talks to a cacher server over HTTP.
type HTTPCache struct {
// BaseURL is the base URL of the cacher server, like "http://localhost:31364".
BaseURL string
// baseURL is the base URL of the cacher server, like "http://localhost:31364".
baseURL string

// HTTPClient optionally specifies the http.Client to use.
// client optionally specifies the http.Client to use.
// If nil, http.DefaultClient is used.
HTTPClient *http.Client
client *http.Client

// Verbose optionally specifies whether to log verbose messages.
Verbose bool
// verbose optionally specifies whether to log verbose messages.
verbose bool
}

func NewHttpCache(baseURL string, verbose bool) *HTTPCache {
return &HTTPCache{
baseURL: baseURL,
verbose: verbose,
}
}

func (c *HTTPCache) Start() error {
log.Printf("[%s]\tconfigured to %s", c.Kind(), c.BaseURL)
log.Printf("[%s]\tconfigured to %s", c.Kind(), c.baseURL)
return nil
}

Expand All @@ -43,7 +50,7 @@ func (c *HTTPCache) Kind() string {
}

func (c *HTTPCache) Get(ctx context.Context, actionID string) (outputID string, size int64, output io.ReadCloser, err error) {
req, _ := http.NewRequestWithContext(ctx, "GET", c.BaseURL+"/action/"+actionID, nil)
req, _ := http.NewRequestWithContext(ctx, "GET", c.baseURL+"/action/"+actionID, nil)
res, err := c.httpClient().Do(req)
if err != nil {
return "", 0, nil, err
Expand All @@ -63,7 +70,7 @@ func (c *HTTPCache) Get(ctx context.Context, actionID string) (outputID string,
if av.Size == 0 {
return outputID, av.Size, io.NopCloser(bytes.NewReader(nil)), nil
}
req, _ = http.NewRequestWithContext(ctx, "GET", c.BaseURL+"/output/"+outputID, nil)
req, _ = http.NewRequestWithContext(ctx, "GET", c.baseURL+"/output/"+outputID, nil)
res, err = c.httpClient().Do(req)
if err != nil {
return "", 0, nil, err
Expand Down Expand Up @@ -91,7 +98,7 @@ func (c *HTTPCache) Put(ctx context.Context, actionID, outputID string, size int
} else {
putBody = body
}
req, _ := http.NewRequestWithContext(ctx, "PUT", c.BaseURL+"/"+actionID+"/"+outputID, putBody)
req, _ := http.NewRequestWithContext(ctx, "PUT", c.baseURL+"/"+actionID+"/"+outputID, putBody)
req.ContentLength = size
res, err := c.httpClient().Do(req)
if err != nil {
Expand All @@ -109,8 +116,8 @@ func (c *HTTPCache) Put(ctx context.Context, actionID, outputID string, size int
var _ RemoteCache = &HTTPCache{}

func (c *HTTPCache) httpClient() *http.Client {
if c.HTTPClient != nil {
return c.HTTPClient
if c.client != nil {
return c.client
}
return http.DefaultClient
}
10 changes: 5 additions & 5 deletions cachers/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type S3Cache struct {
s3Client s3Client
}

var _ RemoteCache = &S3Cache{}

func (s *S3Cache) Kind() string {
return "s3"
}
Expand All @@ -42,10 +44,6 @@ func (s *S3Cache) Start() error {
return nil
}

func (s *S3Cache) Close() error {
return nil
}

func (s *S3Cache) Get(ctx context.Context, actionID string) (outputID string, size int64, output io.ReadCloser, err error) {
actionKey := s.actionKey(actionID)
outputResult, getOutputErr := s.s3Client.GetObject(ctx, &s3.GetObjectInput{
Expand Down Expand Up @@ -91,7 +89,9 @@ func (s *S3Cache) Put(ctx context.Context, actionID, outputID string, size int64
return
}

var _ RemoteCache = &S3Cache{}
func (s *S3Cache) Close() error {
return nil
}

func NewS3Cache(client s3Client, bucketName string, cacheKey string, verbose bool) *S3Cache {
// get current architecture
Expand Down
68 changes: 49 additions & 19 deletions cmd/go-cacher/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,36 @@ import (

const defaultCacheKey = "v1"

// All the following env variable names are optional
const (
// path to local disk directory. defaults to os.UserCacheDir()/go-cacher
envVarDiskCacheDir = "GOCACHE_DISK_DIR"

// S3 cache
envVarS3CacheRegion = "GOCACHE_AWS_REGION"
envVarS3AwsAccessKey = "GOCACHE_AWS_ACCESS_KEY"
envVarS3AwsSecretAccessKey = "GOCACHE_AWS_SECRET_ACCESS_KEY"
envVarS3AwsCredsProfile = "GOCACHE_AWS_CREDS_PROFILE"
envVarS3BucketName = "GOCACHE_S3_BUCKET"
envVarS3CacheKey = "GOCACHE_CACHE_KEY"

// HTTP cache - optional cache server HTTP prefix (scheme and authority only);
envVarHttpCacheServerBase = "GOCACHE_HTTP_SERVER_BASE"
)

var (
serverBase = flag.String("cache-server", "", "optional cache server HTTP prefix (scheme and authority only); should be low latency. empty means to not use one.")
verbose = flag.Bool("verbose", false, "be verbose")
verbose = flag.Bool("verbose", false, "be verbose")
)

func getAwsConfigFromEnv() (*aws.Config, error) {
// read from env
awsRegion, awsRegionOk := os.LookupEnv("GOCACHE_AWS_REGION")
if !awsRegionOk {
awsRegion := os.Getenv(envVarS3CacheRegion)
if awsRegion != "" {
return nil, nil
}
accessKey, accessKeyOk := os.LookupEnv("GOCACHE_AWS_ACCESS_KEY")
secretAccessKey, secretKeyOk := os.LookupEnv("GOCACHE_AWS_SECRET_ACCESS_KEY")
if accessKeyOk && secretKeyOk {
accessKey := os.Getenv(envVarS3AwsAccessKey)
secretAccessKey := os.Getenv(envVarS3AwsSecretAccessKey)
if accessKey != "" && secretAccessKey != "" {
cfg, err := config.LoadDefaultConfig(context.TODO(),
config.WithRegion(awsRegion),
config.WithCredentialsProvider(credentials.StaticCredentialsProvider{
Expand All @@ -52,8 +68,8 @@ func getAwsConfigFromEnv() (*aws.Config, error) {
}
return &cfg, nil
}
credsProfile, credsProfileOk := os.LookupEnv("GOCACHE_CREDS_PROFILE")
if credsProfileOk {
credsProfile := os.Getenv(envVarS3AwsCredsProfile)
if credsProfile != "" {
cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion(awsRegion), config.WithSharedConfigProfile(credsProfile))
if err != nil {
return nil, err
Expand All @@ -68,12 +84,12 @@ func maybeS3Cache() (cachers.RemoteCache, error) {
if err != nil {
return nil, err
}
bucket, ok := os.LookupEnv("GOCACHE_S3_BUCKET")
if !ok || awsConfig == nil {
bucket := os.Getenv(envVarS3BucketName)
if bucket == "" || awsConfig == nil {
// We need at least name of bucket and valid aws config
return nil, nil
}
cacheKey := os.Getenv("GOCACHE_CACHE_KEY")
cacheKey := os.Getenv(envVarS3CacheKey)
if cacheKey == "" {
cacheKey = defaultCacheKey
}
Expand All @@ -82,7 +98,17 @@ func maybeS3Cache() (cachers.RemoteCache, error) {
return s3Cache, nil
}

func getFinalCacher(local cachers.LocalCache, remote cachers.RemoteCache, verbose bool) cachers.LocalCache {
func getCache(local cachers.LocalCache, verbose bool) cachers.LocalCache {
remote, err := maybeS3Cache()
if err != nil {
log.Fatal(err)
}
if remote == nil {
remote, err = maybeHttpCache()
if err != nil {
log.Fatal(err)
}
}
if remote != nil {
return cachers.NewCombinedCache(local, remote, verbose)
}
Expand All @@ -92,9 +118,17 @@ func getFinalCacher(local cachers.LocalCache, remote cachers.RemoteCache, verbos
return local
}

func maybeHttpCache() (cachers.RemoteCache, error) {
serverBase := os.Getenv(envVarHttpCacheServerBase)
if serverBase == "" {
return nil, nil
}
return cachers.NewHttpCache(serverBase, *verbose), nil
}

func main() {
flag.Parse()
dir := os.Getenv("GOCACHE_DISK_DIR")
dir := os.Getenv(envVarDiskCacheDir)
if dir == "" {
d, err := os.UserCacheDir()
if err != nil {
Expand All @@ -107,11 +141,7 @@ func main() {
log.Fatal(err)
}
var localCache cachers.LocalCache = cachers.NewSimpleDiskCache(*verbose, dir)
s3Cache, err := maybeS3Cache()
if err != nil {
log.Fatal(err)
}
proc := cacheproc.NewCacheProc(getFinalCacher(localCache, s3Cache, *verbose))
proc := cacheproc.NewCacheProc(getCache(localCache, *verbose))
if err := proc.Run(); err != nil {
log.Fatal(err)
}
Expand Down

0 comments on commit b8df310

Please sign in to comment.