Skip to content

Commit e309e88

Browse files
committed
Use a key-based mutex lock to download resources/agent bins once.
1 parent 3fb99c5 commit e309e88

14 files changed

+431
-87
lines changed

apiserver/tools.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@ import (
1616
"strconv"
1717
"strings"
1818

19+
"github.com/im7mortal/kmutex"
1920
"github.com/juju/errors"
2021
jujuhttp "github.com/juju/http/v2"
2122
"github.com/juju/version/v2"
22-
"golang.org/x/sync/singleflight"
2323

2424
"github.com/juju/juju/apiserver/common"
2525
"github.com/juju/juju/apiserver/httpcontext"
@@ -67,13 +67,14 @@ type toolsUploadHandler struct {
6767

6868
// toolsHandler handles tool download through HTTPS in the API server.
6969
type toolsDownloadHandler struct {
70-
ctxt httpContext
71-
fetchOnce singleflight.Group
70+
ctxt httpContext
71+
fetchMutex *kmutex.Kmutex
7272
}
7373

7474
func newToolsDownloadHandler(httpCtxt httpContext) *toolsDownloadHandler {
7575
return &toolsDownloadHandler{
76-
ctxt: httpCtxt,
76+
ctxt: httpCtxt,
77+
fetchMutex: kmutex.New(),
7778
}
7879
}
7980

@@ -215,6 +216,10 @@ func (h *toolsDownloadHandler) getToolsForRequest(r *http.Request, st *state.Sta
215216
}
216217
}
217218

219+
locker := h.fetchMutex.Locker(storageVers.String())
220+
locker.Lock()
221+
defer locker.Unlock()
222+
218223
md, reader, err := storage.Open(storageVers.String())
219224
if errors.IsNotFound(err) {
220225
// Tools could not be found in tools storage,
@@ -224,11 +229,8 @@ func (h *toolsDownloadHandler) getToolsForRequest(r *http.Request, st *state.Sta
224229
if osTypeName != "" {
225230
storageVers.Release = osTypeName
226231
}
227-
_, err, _ = h.fetchOnce.Do(storageVers.String(), func() (interface{}, error) {
228-
return nil, h.fetchAndCacheTools(vers, storageVers)
229-
})
232+
err = h.fetchAndCacheTools(vers, storageVers)
230233
if err != nil {
231-
h.fetchOnce.Forget(storageVers.String())
232234
err = errors.Annotate(err, "error fetching agent binaries")
233235
} else {
234236
md, reader, err = storage.Open(storageVers.String())

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ require (
4444
github.com/hashicorp/golang-lru v0.5.4 // indirect
4545
github.com/hashicorp/raft v2.0.0-20200420012049-88ad3b3f0a54+incompatible
4646
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea
47+
github.com/im7mortal/kmutex v1.0.1
4748
github.com/imdario/mergo v0.3.10 // indirect
4849
github.com/juju/ansiterm v0.0.0-20180109212912-720a0952cc2a
4950
github.com/juju/blobstore/v2 v2.0.0-20210302045357-edd2b24570b7

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,8 @@ github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uG
361361
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
362362
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
363363
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
364+
github.com/im7mortal/kmutex v1.0.1 h1:zAACzjwD+OEknDqnLdvRa/BhzFM872EBwKijviGLc9Q=
365+
github.com/im7mortal/kmutex v1.0.1/go.mod h1:f71c/Ugk/+58OHRAgvgzPP3QEiWGUjK13fd8ozfKWdo=
364366
github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
365367
github.com/imdario/mergo v0.3.10 h1:6q5mVkdH/vYmqngx7kZQTjJ5HRsx+ImorDIEQ+beJgc=
366368
github.com/imdario/mergo v0.3.10/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=

resource/repositories/mocks/mocks.go

Lines changed: 139 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

resource/repositories/operations.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,10 @@ func GetResource(args GetResourceArgs) (resource.Resource, io.ReadCloser, error)
9999
repo: args.Repository,
100100
}
101101

102+
locker := opRepo.resourceLocker(args.Name)
103+
locker.Lock()
104+
defer locker.Unlock()
105+
102106
res, reader, err := opRepo.get(args.Name)
103107
if err != nil {
104108
return resource.Resource{}, nil, errors.Trace(err)
@@ -132,7 +136,6 @@ func GetResource(args GetResourceArgs) (resource.Resource, io.ReadCloser, error)
132136
if err != nil {
133137
return resource.Resource{}, nil, errors.Trace(err)
134138
}
135-
136139
res, reader, err = opRepo.set(data.Resource, data, state.DoNotIncrementCharmModifiedVersion)
137140
if err != nil {
138141
return resource.Resource{}, nil, errors.Trace(err)
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
// Copyright 2021 Canonical Ltd.
2+
// Licensed under the AGPLv3, see LICENCE file for details.
3+
4+
package repositories_test
5+
6+
import (
7+
"bytes"
8+
"io"
9+
"io/ioutil"
10+
"sync"
11+
"time"
12+
13+
"github.com/golang/mock/gomock"
14+
"github.com/juju/charm/v8"
15+
charmresource "github.com/juju/charm/v8/resource"
16+
jc "github.com/juju/testing/checkers"
17+
gc "gopkg.in/check.v1"
18+
19+
"github.com/juju/errors"
20+
"github.com/juju/juju/charmstore"
21+
"github.com/juju/juju/resource"
22+
"github.com/juju/juju/resource/repositories"
23+
"github.com/juju/juju/resource/repositories/mocks"
24+
"github.com/juju/juju/state"
25+
)
26+
27+
type OperationsSuite struct{}
28+
29+
var _ = gc.Suite(&OperationsSuite{})
30+
31+
func (s *OperationsSuite) TestConcurrentGetResource(c *gc.C) {
32+
ctrl := gomock.NewController(c)
33+
defer ctrl.Finish()
34+
er := mocks.NewMockEntityRepository(ctrl)
35+
rg := mocks.NewMockResourceGetter(ctrl)
36+
37+
stateLock := sync.Mutex{}
38+
fetchMut := &sync.Mutex{}
39+
fetchMut.Lock()
40+
41+
er.EXPECT().FetchLock(gomock.Any()).AnyTimes().Return(fetchMut)
42+
43+
openState := struct {
44+
res resource.Resource
45+
buffer []byte
46+
err error
47+
}{
48+
err: errors.NotFoundf("resource"),
49+
}
50+
er.EXPECT().OpenResource(gomock.Any()).AnyTimes().DoAndReturn(func(name string) (resource.Resource, io.ReadCloser, error) {
51+
stateLock.Lock()
52+
defer stateLock.Unlock()
53+
reader := io.ReadCloser(nil)
54+
if openState.err == nil && openState.buffer != nil {
55+
reader = ioutil.NopCloser(bytes.NewBuffer(openState.buffer))
56+
}
57+
return openState.res, reader, openState.err
58+
})
59+
60+
getState := struct {
61+
res resource.Resource
62+
}{
63+
res: resource.Resource{
64+
ApplicationID: "gitlab",
65+
Username: "gitlab-0",
66+
Resource: charmresource.Resource{
67+
Meta: charmresource.Meta{
68+
Name: "company-icon",
69+
},
70+
Origin: charmresource.OriginStore,
71+
},
72+
},
73+
}
74+
er.EXPECT().GetResource(gomock.Any()).AnyTimes().DoAndReturn(func(name string) (resource.Resource, error) {
75+
stateLock.Lock()
76+
defer stateLock.Unlock()
77+
return getState.res, nil
78+
})
79+
80+
gomock.InOrder(
81+
rg.EXPECT().GetResource(repositories.ResourceRequest{
82+
CharmID: repositories.CharmID{URL: charm.MustParseURL("cs:gitlab")},
83+
Name: "company-icon",
84+
}).Times(1).Return(charmstore.ResourceData{
85+
ReadCloser: ioutil.NopCloser(bytes.NewBufferString("data")),
86+
Resource: charmresource.Resource{
87+
Meta: charmresource.Meta{
88+
Name: "company-icon",
89+
},
90+
Origin: charmresource.OriginStore,
91+
},
92+
}, nil),
93+
er.EXPECT().SetResource(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).DoAndReturn(func(res charmresource.Resource, reader io.Reader, arg state.IncrementCharmModifiedVersionType) (charmresource.Resource, error) {
94+
stateLock.Lock()
95+
defer stateLock.Unlock()
96+
// Make sure this takes a while.
97+
time.Sleep(10 * time.Millisecond)
98+
buf, err := ioutil.ReadAll(reader)
99+
if err != nil {
100+
return charmresource.Resource{}, errors.Trace(err)
101+
}
102+
res.Size = int64(len(buf))
103+
openState.buffer = buf
104+
openState.err = nil
105+
getState.res.Resource = res
106+
openState.res = getState.res
107+
return res, nil
108+
}),
109+
)
110+
111+
args := repositories.GetResourceArgs{
112+
Client: rg,
113+
Repository: er,
114+
Name: "company-icon",
115+
CharmID: repositories.CharmID{URL: charm.MustParseURL("cs:gitlab")},
116+
}
117+
118+
start := sync.WaitGroup{}
119+
done := sync.WaitGroup{}
120+
for i := 0; i < 100; i++ {
121+
start.Add(1)
122+
done.Add(1)
123+
go func() {
124+
defer done.Done()
125+
start.Done()
126+
rsc, reader, err := repositories.GetResource(args)
127+
c.Check(err, jc.ErrorIsNil)
128+
c.Check(reader, gc.NotNil)
129+
c.Check(rsc, gc.DeepEquals, getState.res)
130+
}()
131+
}
132+
133+
start.Wait()
134+
// This synchronises all the threads to start at the same time.
135+
fetchMut.Unlock()
136+
done.Wait()
137+
}

resource/repositories/package_test.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
// Copyright 2021 Canonical Ltd.
2+
// Licensed under the AGPLv3, see LICENCE file for details.
3+
4+
package repositories_test
5+
6+
import (
7+
"testing"
8+
9+
gc "gopkg.in/check.v1"
10+
)
11+
12+
//go:generate go run github.com/golang/mock/mockgen -package mocks -destination mocks/mocks.go github.com/juju/juju/resource/repositories EntityRepository,ResourceGetter
13+
14+
func Test(t *testing.T) {
15+
gc.TestingT(t)
16+
}

0 commit comments

Comments
 (0)