Skip to content

Commit 3717448

Browse files
authored
Merge pull request etcd-io#15479 from serathius/test-cmux
Test connection multiplexing.
2 parents c6d8b65 + dfc2c6d commit 3717448

File tree

11 files changed

+349
-112
lines changed

11 files changed

+349
-112
lines changed

bill-of-materials.json

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -656,6 +656,15 @@
656656
}
657657
]
658658
},
659+
{
660+
"project": "golang.org/x/sync/errgroup",
661+
"licenses": [
662+
{
663+
"type": "BSD 3-clause \"New\" or \"Revised\" License",
664+
"confidence": 0.9663865546218487
665+
}
666+
]
667+
},
659668
{
660669
"project": "golang.org/x/sys/unix",
661670
"licenses": [

tests/e2e/cluster_downgrade_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ func compareMemberVersion(expect version.Versions, target version.Versions) erro
232232
}
233233

234234
func getMemberVersionByCurl(cfg *e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess) (version.Versions, error) {
235-
args := e2e.CURLPrefixArgs(cfg, member, "GET", e2e.CURLReq{Endpoint: "/version"})
235+
args := e2e.CURLPrefixArgsCluster(cfg, member, "GET", e2e.CURLReq{Endpoint: "/version"})
236236
lines, err := e2e.RunUtilCompletion(args, nil)
237237
if err != nil {
238238
return version.Versions{}, err

tests/e2e/cmux_test.go

Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
// Copyright 2023 The etcd Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
// These tests are directly validating etcd connection multiplexing.
16+
//go:build !cluster_proxy
17+
18+
package e2e
19+
20+
import (
21+
"context"
22+
"encoding/json"
23+
"fmt"
24+
"strings"
25+
"testing"
26+
27+
"github.com/prometheus/common/expfmt"
28+
"github.com/stretchr/testify/assert"
29+
"github.com/stretchr/testify/require"
30+
31+
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
32+
"go.etcd.io/etcd/api/v3/mvccpb"
33+
"go.etcd.io/etcd/api/v3/version"
34+
"go.etcd.io/etcd/server/v3/etcdserver/api/etcdhttp"
35+
"go.etcd.io/etcd/tests/v3/framework/config"
36+
"go.etcd.io/etcd/tests/v3/framework/e2e"
37+
)
38+
39+
func TestConnectionMultiplexing(t *testing.T) {
40+
e2e.BeforeTest(t)
41+
for _, tc := range []struct {
42+
name string
43+
serverTLS e2e.ClientConnType
44+
}{
45+
{
46+
name: "ServerTLS",
47+
serverTLS: e2e.ClientTLS,
48+
},
49+
{
50+
name: "ServerNonTLS",
51+
serverTLS: e2e.ClientNonTLS,
52+
},
53+
{
54+
name: "ServerTLSAndNonTLS",
55+
serverTLS: e2e.ClientTLSAndNonTLS,
56+
},
57+
} {
58+
t.Run(tc.name, func(t *testing.T) {
59+
ctx := context.Background()
60+
cfg := e2e.EtcdProcessClusterConfig{ClusterSize: 1, Client: e2e.ClientConfig{ConnectionType: tc.serverTLS}}
61+
clus, err := e2e.NewEtcdProcessCluster(ctx, t, e2e.WithConfig(&cfg))
62+
require.NoError(t, err)
63+
defer clus.Close()
64+
65+
var clientScenarios []e2e.ClientConnType
66+
switch tc.serverTLS {
67+
case e2e.ClientTLS:
68+
clientScenarios = []e2e.ClientConnType{e2e.ClientTLS}
69+
case e2e.ClientNonTLS:
70+
clientScenarios = []e2e.ClientConnType{e2e.ClientNonTLS}
71+
case e2e.ClientTLSAndNonTLS:
72+
clientScenarios = []e2e.ClientConnType{e2e.ClientTLS, e2e.ClientNonTLS}
73+
}
74+
75+
for _, clientTLS := range clientScenarios {
76+
name := "ClientNonTLS"
77+
if clientTLS == e2e.ClientTLS {
78+
name = "ClientTLS"
79+
}
80+
t.Run(name, func(t *testing.T) {
81+
testConnectionMultiplexing(t, ctx, clus.EndpointsV3()[0], clientTLS)
82+
})
83+
}
84+
})
85+
}
86+
87+
}
88+
89+
func testConnectionMultiplexing(t *testing.T, ctx context.Context, endpoint string, connType e2e.ClientConnType) {
90+
switch connType {
91+
case e2e.ClientTLS:
92+
endpoint = e2e.ToTLS(endpoint)
93+
case e2e.ClientNonTLS:
94+
default:
95+
panic(fmt.Sprintf("Unsupported conn type %v", connType))
96+
}
97+
t.Run("etcdctl", func(t *testing.T) {
98+
etcdctl, err := e2e.NewEtcdctl(e2e.ClientConfig{ConnectionType: connType}, []string{endpoint})
99+
require.NoError(t, err)
100+
_, err = etcdctl.Get(ctx, "a", config.GetOptions{})
101+
assert.NoError(t, err)
102+
})
103+
t.Run("clientv3", func(t *testing.T) {
104+
c := newClient(t, []string{endpoint}, e2e.ClientConfig{ConnectionType: connType})
105+
_, err := c.Get(ctx, "a")
106+
assert.NoError(t, err)
107+
})
108+
t.Run("curl", func(t *testing.T) {
109+
for _, httpVersion := range []string{"2", "1.1", "1.0", ""} {
110+
tname := "http" + httpVersion
111+
if httpVersion == "" {
112+
tname = "default"
113+
}
114+
t.Run(tname, func(t *testing.T) {
115+
assert.NoError(t, fetchGrpcGateway(endpoint, httpVersion, connType))
116+
assert.NoError(t, fetchMetrics(endpoint, httpVersion, connType))
117+
assert.NoError(t, fetchVersion(endpoint, httpVersion, connType))
118+
assert.NoError(t, fetchHealth(endpoint, httpVersion, connType))
119+
assert.NoError(t, fetchDebugVars(endpoint, httpVersion, connType))
120+
})
121+
}
122+
})
123+
}
124+
125+
func fetchGrpcGateway(endpoint string, httpVersion string, connType e2e.ClientConnType) error {
126+
rangeData, err := json.Marshal(&pb.RangeRequest{
127+
Key: []byte("a"),
128+
})
129+
if err != nil {
130+
return err
131+
}
132+
req := e2e.CURLReq{Endpoint: "/v3/kv/range", Value: string(rangeData), Timeout: 5, HttpVersion: httpVersion}
133+
respData, err := curl(endpoint, "POST", req, connType)
134+
return validateGrpcgatewayRangeReponse([]byte(respData))
135+
}
136+
137+
func validateGrpcgatewayRangeReponse(respData []byte) error {
138+
// Modified json annotation so ResponseHeader fields are stored in string.
139+
type responseHeader struct {
140+
ClusterId uint64 `json:"cluster_id,string,omitempty"`
141+
MemberId uint64 `json:"member_id,string,omitempty"`
142+
Revision int64 `json:"revision,string,omitempty"`
143+
RaftTerm uint64 `json:"raft_term,string,omitempty"`
144+
}
145+
type rangeResponse struct {
146+
Header *responseHeader `json:"header,omitempty"`
147+
Kvs []*mvccpb.KeyValue `json:"kvs,omitempty"`
148+
More bool `json:"more,omitempty"`
149+
Count int64 `json:"count,omitempty"`
150+
}
151+
var resp rangeResponse
152+
return json.Unmarshal(respData, &resp)
153+
}
154+
155+
func fetchMetrics(endpoint string, httpVersion string, connType e2e.ClientConnType) error {
156+
req := e2e.CURLReq{Endpoint: "/metrics", Timeout: 5, HttpVersion: httpVersion}
157+
respData, err := curl(endpoint, "GET", req, connType)
158+
if err != nil {
159+
return err
160+
}
161+
var parser expfmt.TextParser
162+
_, err = parser.TextToMetricFamilies(strings.NewReader(strings.ReplaceAll(respData, "\r\n", "\n")))
163+
return err
164+
}
165+
166+
func fetchVersion(endpoint string, httpVersion string, connType e2e.ClientConnType) error {
167+
req := e2e.CURLReq{Endpoint: "/version", Timeout: 5, HttpVersion: httpVersion}
168+
respData, err := curl(endpoint, "GET", req, connType)
169+
if err != nil {
170+
return err
171+
}
172+
var resp version.Versions
173+
return json.Unmarshal([]byte(respData), &resp)
174+
}
175+
176+
func fetchHealth(endpoint string, httpVersion string, connType e2e.ClientConnType) error {
177+
req := e2e.CURLReq{Endpoint: "/health", Timeout: 5, HttpVersion: httpVersion}
178+
respData, err := curl(endpoint, "GET", req, connType)
179+
if err != nil {
180+
return err
181+
}
182+
var resp etcdhttp.Health
183+
return json.Unmarshal([]byte(respData), &resp)
184+
}
185+
186+
func fetchDebugVars(endpoint string, httpVersion string, connType e2e.ClientConnType) error {
187+
req := e2e.CURLReq{Endpoint: "/debug/vars", Timeout: 5, HttpVersion: httpVersion}
188+
respData, err := curl(endpoint, "GET", req, connType)
189+
if err != nil {
190+
return err
191+
}
192+
var resp map[string]interface{}
193+
return json.Unmarshal([]byte(respData), &resp)
194+
}
195+
196+
func curl(endpoint string, method string, curlReq e2e.CURLReq, connType e2e.ClientConnType) (string, error) {
197+
args := e2e.CURLPrefixArgs(endpoint, e2e.ClientConfig{ConnectionType: connType}, false, method, curlReq)
198+
lines, err := e2e.RunUtilCompletion(args, nil)
199+
if err != nil {
200+
return "", err
201+
}
202+
return strings.Join(lines, "\n"), nil
203+
}

tests/e2e/metrics_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func metricsTest(cx ctlCtx) {
6363
if err := ctlV3Watch(cx, []string{"k", "--rev", "1"}, []kvExec{{key: "k", val: "v"}}...); err != nil {
6464
cx.t.Fatal(err)
6565
}
66-
if err := e2e.CURLGet(cx.epc, e2e.CURLReq{Endpoint: test.endpoint, Expected: test.expected, MetricsURLScheme: cx.cfg.MetricsURLScheme}); err != nil {
66+
if err := e2e.CURLGet(cx.epc, e2e.CURLReq{Endpoint: test.endpoint, Expected: test.expected}); err != nil {
6767
cx.t.Fatalf("failed get with curl (%v)", err)
6868
}
6969
}

tests/e2e/utils.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
// Copyright 2023 The etcd Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package e2e
16+
17+
import (
18+
"context"
19+
"fmt"
20+
"testing"
21+
"time"
22+
23+
"go.uber.org/zap"
24+
"golang.org/x/sync/errgroup"
25+
"google.golang.org/grpc"
26+
27+
"go.etcd.io/etcd/client/pkg/v3/transport"
28+
clientv3 "go.etcd.io/etcd/client/v3"
29+
"go.etcd.io/etcd/pkg/v3/stringutil"
30+
"go.etcd.io/etcd/tests/v3/framework/e2e"
31+
"go.etcd.io/etcd/tests/v3/framework/integration"
32+
)
33+
34+
func newClient(t *testing.T, entpoints []string, cfg e2e.ClientConfig) *clientv3.Client {
35+
tlscfg, err := tlsInfo(t, cfg)
36+
if err != nil {
37+
t.Fatal(err)
38+
}
39+
ccfg := clientv3.Config{
40+
Endpoints: entpoints,
41+
DialTimeout: 5 * time.Second,
42+
DialOptions: []grpc.DialOption{grpc.WithBlock()},
43+
}
44+
if tlscfg != nil {
45+
tls, err := tlscfg.ClientConfig()
46+
if err != nil {
47+
t.Fatal(err)
48+
}
49+
ccfg.TLS = tls
50+
}
51+
c, err := clientv3.New(ccfg)
52+
if err != nil {
53+
t.Fatal(err)
54+
}
55+
t.Cleanup(func() {
56+
c.Close()
57+
})
58+
return c
59+
}
60+
61+
func tlsInfo(t testing.TB, cfg e2e.ClientConfig) (*transport.TLSInfo, error) {
62+
switch cfg.ConnectionType {
63+
case e2e.ClientNonTLS, e2e.ClientTLSAndNonTLS:
64+
return nil, nil
65+
case e2e.ClientTLS:
66+
if cfg.AutoTLS {
67+
tls, err := transport.SelfCert(zap.NewNop(), t.TempDir(), []string{"localhost"}, 1)
68+
if err != nil {
69+
return nil, fmt.Errorf("failed to generate cert: %s", err)
70+
}
71+
return &tls, nil
72+
} else {
73+
return &integration.TestTLSInfo, nil
74+
}
75+
default:
76+
return nil, fmt.Errorf("config %v not supported", cfg)
77+
}
78+
}
79+
80+
func fillEtcdWithData(ctx context.Context, c *clientv3.Client, keyCount int, valueSize uint) error {
81+
g := errgroup.Group{}
82+
concurrency := 10
83+
keysPerRoutine := keyCount / concurrency
84+
for i := 0; i < concurrency; i++ {
85+
i := i
86+
g.Go(func() error {
87+
for j := 0; j < keysPerRoutine; j++ {
88+
_, err := c.Put(ctx, fmt.Sprintf("%d", i*keysPerRoutine+j), stringutil.RandString(valueSize))
89+
if err != nil {
90+
return err
91+
}
92+
}
93+
return nil
94+
})
95+
}
96+
return g.Wait()
97+
}

tests/e2e/v3_cipher_suite_test.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,21 +48,19 @@ func testV3CurlCipherSuites(t *testing.T, valid bool) {
4848

4949
func cipherSuiteTestValid(cx ctlCtx) {
5050
if err := e2e.CURLGet(cx.epc, e2e.CURLReq{
51-
Endpoint: "/metrics",
52-
Expected: fmt.Sprintf(`etcd_server_version{server_version="%s"} 1`, version.Version),
53-
MetricsURLScheme: cx.cfg.MetricsURLScheme,
54-
Ciphers: "ECDHE-RSA-AES128-GCM-SHA256", // TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
51+
Endpoint: "/metrics",
52+
Expected: fmt.Sprintf(`etcd_server_version{server_version="%s"} 1`, version.Version),
53+
Ciphers: "ECDHE-RSA-AES128-GCM-SHA256", // TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
5554
}); err != nil {
5655
require.ErrorContains(cx.t, err, fmt.Sprintf(`etcd_server_version{server_version="%s"} 1`, version.Version))
5756
}
5857
}
5958

6059
func cipherSuiteTestMismatch(cx ctlCtx) {
6160
err := e2e.CURLGet(cx.epc, e2e.CURLReq{
62-
Endpoint: "/metrics",
63-
Expected: "failed setting cipher list",
64-
MetricsURLScheme: cx.cfg.MetricsURLScheme,
65-
Ciphers: "ECDHE-RSA-DES-CBC3-SHA", // TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA
61+
Endpoint: "/metrics",
62+
Expected: "failed setting cipher list",
63+
Ciphers: "ECDHE-RSA-DES-CBC3-SHA", // TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA
6664
})
6765
require.ErrorContains(cx.t, err, "curl: (59) failed setting cipher list")
6866
}

tests/e2e/v3_curl_maxstream_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ func submitConcurrentWatch(cx ctlCtx, number int, wgDone *sync.WaitGroup, closeC
157157
member := cluster.Procs[rand.Intn(cluster.Cfg.ClusterSize)]
158158
curlReq := e2e.CURLReq{Endpoint: "/v3/watch", Value: string(watchData)}
159159

160-
args := e2e.CURLPrefixArgs(cluster.Cfg, member, "POST", curlReq)
160+
args := e2e.CURLPrefixArgsCluster(cluster.Cfg, member, "POST", curlReq)
161161
proc, err := e2e.SpawnCmd(args, nil)
162162
if err != nil {
163163
return fmt.Errorf("failed to spawn: %w", err)

tests/e2e/v3_curl_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ func testV3CurlAuth(cx ctlCtx) {
246246
lineFunc = func(txt string) bool { return true }
247247
)
248248

249-
cmdArgs = e2e.CURLPrefixArgs(cx.epc.Cfg, cx.epc.Procs[rand.Intn(cx.epc.Cfg.ClusterSize)], "POST", e2e.CURLReq{Endpoint: path.Join(p, "/auth/authenticate"), Value: string(authreq)})
249+
cmdArgs = e2e.CURLPrefixArgsCluster(cx.epc.Cfg, cx.epc.Procs[rand.Intn(cx.epc.Cfg.ClusterSize)], "POST", e2e.CURLReq{Endpoint: path.Join(p, "/auth/authenticate"), Value: string(authreq)})
250250
proc, err := e2e.SpawnCmd(cmdArgs, cx.envMap)
251251
testutil.AssertNil(cx.t, err)
252252
defer proc.Close()
@@ -285,7 +285,7 @@ func testV3CurlCampaign(cx ctlCtx) {
285285
if err != nil {
286286
cx.t.Fatal(err)
287287
}
288-
cargs := e2e.CURLPrefixArgs(cx.epc.Cfg, cx.epc.Procs[rand.Intn(cx.epc.Cfg.ClusterSize)], "POST", e2e.CURLReq{
288+
cargs := e2e.CURLPrefixArgsCluster(cx.epc.Cfg, cx.epc.Procs[rand.Intn(cx.epc.Cfg.ClusterSize)], "POST", e2e.CURLReq{
289289
Endpoint: path.Join(cx.apiPrefix, "/election/campaign"),
290290
Value: string(cdata),
291291
})

0 commit comments

Comments
 (0)