forked from juju/juju
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathassess_add_cloud.py
executable file
·291 lines (237 loc) · 9.77 KB
/
assess_add_cloud.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
#!/usr/bin/env python3
import logging
import re
import sys
from argparse import ArgumentParser
from collections import namedtuple
from copy import deepcopy
import yaml
from utility import (
add_arg_juju_bin,
JujuAssertionError,
temp_dir,
)
from jujupy import (
ModelClient,
JujuData,
)
from jujupy.exceptions import (
AuthNotAccepted,
InvalidEndpoint,
NameNotAccepted,
TypeNotAccepted,
)
# URLs are limited to 2083 bytes in many browsers, anything more is excessive.
# Juju has set 4096 as being excessive, but it needs to be lowered
# https://bugs.launchpad.net/juju/+bug/1678833
EXCEEDED_LIMIT = 4096
class CloudMismatch(JujuAssertionError):
"""The clouds did not match in some way."""
def __init__(self):
super(CloudMismatch, self).__init__('Cloud mismatch')
class NameMismatch(JujuAssertionError):
"""The cloud names did not match."""
def __init__(self):
super(NameMismatch, self).__init__('Name mismatch')
class NotRaised(Exception):
"""An expected exception was not raised."""
def __init__(self, cloud_spec):
msg = 'Expected exception not raised: {}'.format(
cloud_spec.exception)
super(NotRaised, self).__init__(msg)
class CloudValidation:
NONE = object
BASIC = object()
ENDPOINT = object()
def __init__(self, version):
"""Initialize with the juju version."""
self.version = version
if re.match(r'2\.0[^\d]', version):
self.support = self.NONE
elif re.match(r'2\.1[^\d]', version):
self.support = self.BASIC
else:
# re.match('2\.2[^\d]', version)
# 2.2 retracted manual endpoint validation because it is entangled
# with authentication.
self.support = self.ENDPOINT
@property
def is_basic(self):
return self.support is self.BASIC
@property
def is_endpoint(self):
return self.support is self.ENDPOINT
def has_endpoint(self, provider):
"""Return True if the juju provider supports endpoint validation.
:param provider: The cloud provider type.
"""
if self.support is self.ENDPOINT:
return True
return False
CloudSpec = namedtuple('CloudSpec', [
'label', 'name', 'config', 'exception', 'xfail_bug'])
def cloud_spec(label, name, config, exception=None, xfail_bug=None):
"""Generate a CloudSpec, with defaults.
:param label: The label to display in test results.
:param name: The name to use for the cloud.
:param config: The cloud-config.
:param exception: The exception that is expected to be raised (if any).
:param xfail_bug: If this CloudSpec represents an expected failure, the
bug number.
"""
return CloudSpec(label, name, config, exception, xfail_bug)
def xfail(spec, bug, xfail_exception):
"""Return a variant of a CloudSpec that is expected to fail.
Wrapping the original spec improves maintainability, because the xfail can
be removed to restore the original value.
"""
return CloudSpec(spec.label, spec.name, spec.config, xfail_exception, bug)
def assess_cloud(client, cloud_name, example_cloud):
"""Assess interactively adding a cloud.
Will raise an exception
- If no clouds are present after interactive add-cloud.
- If the resulting cloud name doesn't match the supplied cloud-name.
- If the cloud data doesn't match the supplied cloud data.
"""
clouds = client.env.read_clouds()
if len(clouds['clouds']) > 0:
raise AssertionError('Clouds already present!')
client.add_cloud_interactive(cloud_name, example_cloud)
clouds = client.env.read_clouds()
if len(clouds['clouds']) == 0:
raise JujuAssertionError('Clouds missing!')
if list(clouds['clouds'].keys()) != [cloud_name]:
raise NameMismatch()
actual_cloud = clouds['clouds'][cloud_name]
# Accept the creation of a default region even though there's not one in
# the example cloud.
using_default_region = actual_cloud.get('regions') == {'default': {}}
if 'regions' not in example_cloud and using_default_region:
del actual_cloud['regions']
if actual_cloud != example_cloud:
sys.stderr.write("\nMissmatch for cloud: {}\n".format(cloud_name))
sys.stderr.write('\nExpected:\n')
yaml.dump(example_cloud, sys.stderr)
sys.stderr.write('\nActual:\n')
yaml.dump(clouds['clouds'][cloud_name], sys.stderr)
raise CloudMismatch()
def iter_clouds(clouds, cloud_validation):
"""Iterate through CloudSpecs.
:param clouds: cloud data as defined in $JUJU_DATA/clouds.yaml
:param cloud_validation: an instance of CloudValidation.
"""
yield cloud_spec('bogus-type', 'bogus-type', {'type': 'bogus'},
exception=TypeNotAccepted)
long_text = 'A' * EXCEEDED_LIMIT
for cloud_name, cloud in clouds.items():
yield cloud_spec(cloud_name, cloud_name, cloud)
yield cloud_spec('slash-in-name-{}'.format(cloud_name), 'invalid/name',
cloud, NameNotAccepted, 1641981)
yield cloud_spec('numeral-prefix-{}'.format(cloud_name),
'99invalid/name', cloud, NameNotAccepted, 1641981)
if cloud['type'] not in ('maas', 'manual', 'vsphere'):
auth_config = deepcopy(cloud)
auth_config['auth-types'] = ['asdf']
variant_name = 'bogus-auth-{}'.format(cloud_name)
yield cloud_spec(variant_name, cloud_name, auth_config,
AuthNotAccepted, 1641970)
if cloud['type'] == 'vsphere':
continue
# juju saves for each cloud at least one region, even if the cloud does
# not support them. The code below `tests to add invalid regions for
# each region` but because juju always at least adds one empty region
# it will try to run the code below and test for invalid region
# endpoints.
regions = cloud.get('regions', {})
if regions.get("default") == {}:
regions = []
else:
regions = list(cloud.get('regions', {}).keys())
expected_exception = CloudMismatch
if cloud_validation.has_endpoint(cloud['type']):
expected_exception = InvalidEndpoint
illegal_endpoint_config = deepcopy(cloud)
illegal_endpoint_config['endpoint'] = long_text
illegal_endpoint_name = 'long-endpoint-{}'.format(cloud_name)
for rg_name in regions:
illegal_endpoint_config['regions'][rg_name]['endpoint'] = long_text
yield cloud_spec(illegal_endpoint_name, cloud_name,
illegal_endpoint_config, expected_exception, 1641970)
for region_name in regions:
regional_long_endpoint_name = 'long-endpoint-{}-{}'.format(
cloud_name, region_name)
regional_long_endpoint_config = deepcopy(cloud)
# test each region independently of others
regional_long_endpoint_config['regions'] = {
region_name: {'endpoint': long_text}}
yield cloud_spec(regional_long_endpoint_name, cloud_name,
regional_long_endpoint_config, expected_exception,
1641970)
def assess_all_clouds(client, cloud_specs):
"""Test all the supplied cloud_specs and return the results.
Returns a tuple of succeeded, expected_failed, and failed.
succeeded and failed are sets of cloud labels. expected_failed is a dict
linking a given bug to its associated failures.
"""
succeeded = set()
xfailed = {}
failed = set()
client.env.load_yaml()
for cloud_spec in cloud_specs:
sys.stdout.write('\n Testing {}.\n'.format(cloud_spec.label))
try:
if cloud_spec.exception is None:
assess_cloud(client, cloud_spec.name, cloud_spec.config)
else:
try:
assess_cloud(client, cloud_spec.name, cloud_spec.config)
except cloud_spec.exception:
pass
else:
raise NotRaised(cloud_spec)
except Exception as e:
logging.exception(e)
failed.add(cloud_spec.label)
else:
if cloud_spec.xfail_bug is not None:
xfailed.setdefault(
cloud_spec.xfail_bug, set()).add(cloud_spec.label)
else:
succeeded.add(cloud_spec.label)
finally:
client.env.clouds = {'clouds': {}}
client.env.dump_yaml(client.env.juju_home)
return succeeded, xfailed, failed
def write_status(status, tests):
if len(tests) == 0:
test_str = 'none'
else:
test_str = ', '.join(sorted(tests))
sys.stdout.write('{}: {}\n'.format(status, test_str))
def parse_args():
parser = ArgumentParser()
parser.add_argument('example_clouds',
help='A clouds.yaml file to use for testing.')
add_arg_juju_bin(parser)
return parser.parse_args()
def main():
args = parse_args()
juju_bin = args.juju_bin
version = ModelClient.get_version(juju_bin)
with open(args.example_clouds) as f:
clouds = yaml.safe_load(f)['clouds']
cloud_validation = CloudValidation(version)
cloud_specs = iter_clouds(clouds, cloud_validation)
with temp_dir() as juju_home:
env = JujuData('foo', config=None, juju_home=juju_home)
client = ModelClient(env, version, juju_bin)
succeeded, xfailed, failed = assess_all_clouds(client, cloud_specs)
write_status('Succeeded', succeeded)
for bug, failures in sorted(xfailed.items()):
write_status('Expected fail (bug #{})'.format(bug), failures)
write_status('Failed', failed)
if len(failed) > 0:
return 1
return 0
if __name__ == '__main__':
sys.exit(main())