Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ We support S3 backend for caching.
You can connect to S3 backend by setting the following parameters:
- `GOCACHE_S3_BUCKET` - Name of S3 bucket
- `GOCACHE_AWS_REGION` - AWS Region of bucket
- `GOCACHE_AWS_ACCESS_KEY` + `GOCACHE_AWS_SECRET_KEY` / `GOCACHE_AWS_PROFILE` - Direct credentials or creds profile to use.
- `GOCACHE_AWS_ACCESS_KEY` + `GOCACHE_AWS_SECRET_KEY` / `GOCACHE_AWS_CREDS_PROFILE` - Direct credentials or creds profile to use.
- `GOCACHE_CACHE_KEY` - (Optional, default `v1`) Unique key

The cache would be stored to `s3://<bucket>/cache/<cache_key>/<architecture>/<os>/<go-version>`
84 changes: 30 additions & 54 deletions cachers/combined.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ type CombinedCache struct {
getsMetrics *timeKeeper
}

func (l *CombinedCache) Kind() string {
return "combined"
}
var _ LocalCache = &CombinedCache{}

func NewCombinedCache(localCache LocalCache, remoteCache RemoteCache, verbose bool) LocalCache {
cache := &CombinedCache{
Expand All @@ -39,6 +37,10 @@ func NewCombinedCache(localCache LocalCache, remoteCache RemoteCache, verbose bo
return cache
}

func (l *CombinedCache) Kind() string {
return "combined"
}

func (l *CombinedCache) Start() error {
err := l.localCache.Start()
if err != nil {
Expand All @@ -53,23 +55,6 @@ func (l *CombinedCache) Start() error {
return nil
}

func (l *CombinedCache) Close() error {
err := l.localCache.Close()
if err != nil {
err = fmt.Errorf("local cache stop failed: %w", err)
}
err = l.remoteCache.Close()
if err != nil {
err = errors.Join(fmt.Errorf("remote cache stop failed: %w", err))
}
l.putsMetrics.Stop()
l.getsMetrics.Stop()
if l.verbose {
log.Printf("[%s]\tDownloads: %s, Uploads %s", l.remoteCache.Kind(), l.getsMetrics.Summary(), l.putsMetrics.Summary())
}
return err
}

func (l *CombinedCache) Get(ctx context.Context, actionID string) (outputID, diskPath string, err error) {
outputID, diskPath, err = l.localCache.Get(ctx, actionID)
if err == nil && outputID != "" {
Expand All @@ -94,17 +79,18 @@ func (l *CombinedCache) Get(ctx context.Context, actionID string) (outputID, dis

func (l *CombinedCache) Put(ctx context.Context, actionID, outputID string, size int64, body io.Reader) (string, error) {
pr, pw := io.Pipe()
diskPutCh := make(chan any, 1)
diskPathCh := make(chan string, 1)
errCh := make(chan error, 1)
go func() {
var putBody io.Reader = pr
if size == 0 {
putBody = bytes.NewReader(nil)
}
diskPath, err := l.localCache.Put(ctx, actionID, outputID, size, putBody)
if err != nil {
diskPutCh <- err
errCh <- err
} else {
diskPutCh <- diskPath
diskPathCh <- diskPath
}
}()

Expand All @@ -124,38 +110,28 @@ func (l *CombinedCache) Put(ctx context.Context, actionID, outputID string, size
return "", e
})
pw.Close()
v := <-diskPutCh
if err, ok := v.(error); ok {
log.Printf("HTTPCache.Put local disk error: %v", err)
select {
case err := <-errCh:
log.Printf("[%s]\terror: %v", l.localCache.Kind(), err)
return "", err
case diskPath := <-diskPathCh:
return diskPath, nil
}
return v.(string), nil
}

// TODO: DELETEME
// func (l *CombinedCache) putOld(ctx context.Context, actionID, outputID string, size int64, body io.Reader) (diskPath string, err error) {
// var localError, remoteError error
// var bytesReaderForDisk io.Reader
// var bytesBufferRemote bytes.Buffer
// if size == 0 {
// bytesReaderForDisk = bytes.NewReader(nil)
// bytesBufferRemote = bytes.Buffer{}
// } else {
// bytesReaderForDisk = io.TeeReader(body, &bytesBufferRemote)
// }
// // TODO or-shachar: Can we stream the data in parallel to both caches?
// diskPath, localError = l.localCache.Put(ctx, actionID, outputID, size, bytesReaderForDisk)
// if localError != nil {
// return "", localError
// }
// _, remoteError = l.putsMetrics.DoWithMeasure(size, func() (string, error) {
// e := l.remoteCache.Put(ctx, actionID, outputID, size, &bytesBufferRemote)
// return "", e
// })
// if remoteError != nil {
// return "", remoteError
// }
// return diskPath, nil
// }

var _ LocalCache = &CombinedCache{}
func (l *CombinedCache) Close() error {
err := l.localCache.Close()
if err != nil {
err = fmt.Errorf("local cache stop failed: %w", err)
}
err = l.remoteCache.Close()
if err != nil {
err = errors.Join(fmt.Errorf("remote cache stop failed: %w", err))
}
l.putsMetrics.Stop()
l.getsMetrics.Stop()
if l.verbose {
log.Printf("[%s]\tDownloads: %s, Uploads %s", l.remoteCache.Kind(), l.getsMetrics.Summary(), l.putsMetrics.Summary())
}
return err
}
8 changes: 4 additions & 4 deletions cachers/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,6 @@ func (dc *SimpleDiskCache) Start() error {
return nil
}

func (dc *SimpleDiskCache) Close() error {
return nil
}

func (dc *SimpleDiskCache) Get(ctx context.Context, actionID string) (outputID, diskPath string, err error) {
actionFile := filepath.Join(dc.dir, fmt.Sprintf("a-%s", actionID))
ij, err := os.ReadFile(actionFile)
Expand Down Expand Up @@ -109,6 +105,10 @@ func (dc *SimpleDiskCache) Put(ctx context.Context, actionID, objectID string, s
return file, nil
}

func (dc *SimpleDiskCache) Close() error {
return nil
}

func writeAtomic(dest string, r io.Reader) (int64, error) {
tf, err := os.CreateTemp(filepath.Dir(dest), filepath.Base(dest)+".*")
if err != nil {
Expand Down
31 changes: 19 additions & 12 deletions cachers/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,26 @@ type ActionValue struct {

// HTTPCache is a RemoteCache that talks to a cacher server over HTTP.
type HTTPCache struct {
// BaseURL is the base URL of the cacher server, like "http://localhost:31364".
BaseURL string
// baseURL is the base URL of the cacher server, like "http://localhost:31364".
baseURL string

// HTTPClient optionally specifies the http.Client to use.
// client optionally specifies the http.Client to use.
// If nil, http.DefaultClient is used.
HTTPClient *http.Client
client *http.Client

// Verbose optionally specifies whether to log verbose messages.
Verbose bool
// verbose optionally specifies whether to log verbose messages.
verbose bool
}

func NewHttpCache(baseURL string, verbose bool) *HTTPCache {
return &HTTPCache{
baseURL: baseURL,
verbose: verbose,
}
}

func (c *HTTPCache) Start() error {
log.Printf("[%s]\tconfigured to %s", c.Kind(), c.BaseURL)
log.Printf("[%s]\tconfigured to %s", c.Kind(), c.baseURL)
return nil
}

Expand All @@ -43,7 +50,7 @@ func (c *HTTPCache) Kind() string {
}

func (c *HTTPCache) Get(ctx context.Context, actionID string) (outputID string, size int64, output io.ReadCloser, err error) {
req, _ := http.NewRequestWithContext(ctx, "GET", c.BaseURL+"/action/"+actionID, nil)
req, _ := http.NewRequestWithContext(ctx, "GET", c.baseURL+"/action/"+actionID, nil)
res, err := c.httpClient().Do(req)
if err != nil {
return "", 0, nil, err
Expand All @@ -63,7 +70,7 @@ func (c *HTTPCache) Get(ctx context.Context, actionID string) (outputID string,
if av.Size == 0 {
return outputID, av.Size, io.NopCloser(bytes.NewReader(nil)), nil
}
req, _ = http.NewRequestWithContext(ctx, "GET", c.BaseURL+"/output/"+outputID, nil)
req, _ = http.NewRequestWithContext(ctx, "GET", c.baseURL+"/output/"+outputID, nil)
res, err = c.httpClient().Do(req)
if err != nil {
return "", 0, nil, err
Expand Down Expand Up @@ -91,7 +98,7 @@ func (c *HTTPCache) Put(ctx context.Context, actionID, outputID string, size int
} else {
putBody = body
}
req, _ := http.NewRequestWithContext(ctx, "PUT", c.BaseURL+"/"+actionID+"/"+outputID, putBody)
req, _ := http.NewRequestWithContext(ctx, "PUT", c.baseURL+"/"+actionID+"/"+outputID, putBody)
req.ContentLength = size
res, err := c.httpClient().Do(req)
if err != nil {
Expand All @@ -109,8 +116,8 @@ func (c *HTTPCache) Put(ctx context.Context, actionID, outputID string, size int
var _ RemoteCache = &HTTPCache{}

func (c *HTTPCache) httpClient() *http.Client {
if c.HTTPClient != nil {
return c.HTTPClient
if c.client != nil {
return c.client
}
return http.DefaultClient
}
10 changes: 5 additions & 5 deletions cachers/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type S3Cache struct {
s3Client s3Client
}

var _ RemoteCache = &S3Cache{}

func (s *S3Cache) Kind() string {
return "s3"
}
Expand All @@ -42,10 +44,6 @@ func (s *S3Cache) Start() error {
return nil
}

func (s *S3Cache) Close() error {
return nil
}

func (s *S3Cache) Get(ctx context.Context, actionID string) (outputID string, size int64, output io.ReadCloser, err error) {
actionKey := s.actionKey(actionID)
outputResult, getOutputErr := s.s3Client.GetObject(ctx, &s3.GetObjectInput{
Expand Down Expand Up @@ -91,7 +89,9 @@ func (s *S3Cache) Put(ctx context.Context, actionID, outputID string, size int64
return
}

var _ RemoteCache = &S3Cache{}
func (s *S3Cache) Close() error {
return nil
}

func NewS3Cache(client s3Client, bucketName string, cacheKey string, verbose bool) *S3Cache {
// get current architecture
Expand Down
68 changes: 49 additions & 19 deletions cmd/go-cacher/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,36 @@ import (

const defaultCacheKey = "v1"

// All the following env variable names are optional
const (
// path to local disk directory. defaults to os.UserCacheDir()/go-cacher
envVarDiskCacheDir = "GOCACHE_DISK_DIR"

// S3 cache
envVarS3CacheRegion = "GOCACHE_AWS_REGION"
envVarS3AwsAccessKey = "GOCACHE_AWS_ACCESS_KEY"
envVarS3AwsSecretAccessKey = "GOCACHE_AWS_SECRET_ACCESS_KEY"
envVarS3AwsCredsProfile = "GOCACHE_AWS_CREDS_PROFILE"
envVarS3BucketName = "GOCACHE_S3_BUCKET"
envVarS3CacheKey = "GOCACHE_CACHE_KEY"

// HTTP cache - optional cache server HTTP prefix (scheme and authority only);
envVarHttpCacheServerBase = "GOCACHE_HTTP_SERVER_BASE"
)

var (
serverBase = flag.String("cache-server", "", "optional cache server HTTP prefix (scheme and authority only); should be low latency. empty means to not use one.")
verbose = flag.Bool("verbose", false, "be verbose")
verbose = flag.Bool("verbose", false, "be verbose")
)

func getAwsConfigFromEnv() (*aws.Config, error) {
// read from env
awsRegion, awsRegionOk := os.LookupEnv("GOCACHE_AWS_REGION")
if !awsRegionOk {
awsRegion := os.Getenv(envVarS3CacheRegion)
if awsRegion != "" {
return nil, nil
}
accessKey, accessKeyOk := os.LookupEnv("GOCACHE_AWS_ACCESS_KEY")
secretAccessKey, secretKeyOk := os.LookupEnv("GOCACHE_AWS_SECRET_ACCESS_KEY")
if accessKeyOk && secretKeyOk {
accessKey := os.Getenv(envVarS3AwsAccessKey)
secretAccessKey := os.Getenv(envVarS3AwsSecretAccessKey)
if accessKey != "" && secretAccessKey != "" {
cfg, err := config.LoadDefaultConfig(context.TODO(),
config.WithRegion(awsRegion),
config.WithCredentialsProvider(credentials.StaticCredentialsProvider{
Expand All @@ -52,8 +68,8 @@ func getAwsConfigFromEnv() (*aws.Config, error) {
}
return &cfg, nil
}
credsProfile, credsProfileOk := os.LookupEnv("GOCACHE_CREDS_PROFILE")
if credsProfileOk {
credsProfile := os.Getenv(envVarS3AwsCredsProfile)
if credsProfile != "" {
cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion(awsRegion), config.WithSharedConfigProfile(credsProfile))
if err != nil {
return nil, err
Expand All @@ -68,12 +84,12 @@ func maybeS3Cache() (cachers.RemoteCache, error) {
if err != nil {
return nil, err
}
bucket, ok := os.LookupEnv("GOCACHE_S3_BUCKET")
if !ok || awsConfig == nil {
bucket := os.Getenv(envVarS3BucketName)
if bucket == "" || awsConfig == nil {
// We need at least name of bucket and valid aws config
return nil, nil
}
cacheKey := os.Getenv("GOCACHE_CACHE_KEY")
cacheKey := os.Getenv(envVarS3CacheKey)
if cacheKey == "" {
cacheKey = defaultCacheKey
}
Expand All @@ -82,7 +98,17 @@ func maybeS3Cache() (cachers.RemoteCache, error) {
return s3Cache, nil
}

func getFinalCacher(local cachers.LocalCache, remote cachers.RemoteCache, verbose bool) cachers.LocalCache {
func getCache(local cachers.LocalCache, verbose bool) cachers.LocalCache {
remote, err := maybeS3Cache()
if err != nil {
log.Fatal(err)
}
if remote == nil {
remote, err = maybeHttpCache()
if err != nil {
log.Fatal(err)
}
}
if remote != nil {
return cachers.NewCombinedCache(local, remote, verbose)
}
Expand All @@ -92,9 +118,17 @@ func getFinalCacher(local cachers.LocalCache, remote cachers.RemoteCache, verbos
return local
}

func maybeHttpCache() (cachers.RemoteCache, error) {
serverBase := os.Getenv(envVarHttpCacheServerBase)
if serverBase == "" {
return nil, nil
}
return cachers.NewHttpCache(serverBase, *verbose), nil
}

func main() {
flag.Parse()
dir := os.Getenv("GOCACHE_DISK_DIR")
dir := os.Getenv(envVarDiskCacheDir)
if dir == "" {
d, err := os.UserCacheDir()
if err != nil {
Expand All @@ -107,11 +141,7 @@ func main() {
log.Fatal(err)
}
var localCache cachers.LocalCache = cachers.NewSimpleDiskCache(*verbose, dir)
s3Cache, err := maybeS3Cache()
if err != nil {
log.Fatal(err)
}
proc := cacheproc.NewCacheProc(getFinalCacher(localCache, s3Cache, *verbose))
proc := cacheproc.NewCacheProc(getCache(localCache, *verbose))
if err := proc.Run(); err != nil {
log.Fatal(err)
}
Expand Down