forked from juju/juju
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutility.py
426 lines (353 loc) · 14.5 KB
/
utility.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
from contextlib import contextmanager
from datetime import (
datetime,
timedelta,
)
import errno
import json
import logging
import os
import re
import subprocess
import socket
import sys
from time import (
sleep,
time,
)
import warnings
from jujupy.utility import (
ensure_deleted,
ensure_dir,
get_timeout_path,
is_ipv6_address,
print_now,
qualified_model_name,
quote,
scoped_environ,
skip_on_missing_file,
temp_dir,
temp_yaml_file,
until_timeout
)
# Imported for other call sites to use.
__all__ = [
'ensure_deleted',
'ensure_dir',
'get_timeout_path',
'qualified_model_name',
'quote',
'scoped_environ',
'skip_on_missing_file',
'temp_dir',
'temp_yaml_file',
]
# Equivalent of socket.EAI_NODATA when using windows sockets
# <https://msdn.microsoft.com/ms740668#WSANO_DATA>
WSANO_DATA = 11004
class PortTimeoutError(Exception):
pass
class LoggedException(BaseException):
"""Raised in place of an exception that has already been logged.
This is a wrapper to avoid double-printing real Exceptions while still
unwinding the stack appropriately.
"""
def __init__(self, exception):
self.exception = exception
class JujuAssertionError(AssertionError):
"""Exception for juju assertion failures."""
def _clean_dir(maybe_dir):
"""Pseudo-type that validates an argument to be a clean directory path.
For safety, this function will not attempt to remove existing directory
contents but will just report a warning.
"""
try:
contents = os.listdir(maybe_dir)
except OSError as e:
if e.errno == errno.ENOENT:
# we don't raise this error due to tests abusing /tmp/logs
warnings.warn('Not a directory {}'.format(maybe_dir))
if e.errno == errno.EEXIST:
warnings.warn('Directory {} already exists'.format(maybe_dir))
else:
if contents and contents != ["empty"]:
warnings.warn(
'Directory {!r} has existing contents.'.format(maybe_dir))
return maybe_dir
def as_literal_address(address):
"""Returns address in form suitable for embedding in URL or similar.
In practice, this just puts square brackets round IPv6 addresses which
avoids conflict with port seperators and other uses of colons.
"""
if is_ipv6_address(address):
return address.join("[]")
return address
def wait_for_port(host, port, closed=False, timeout=30):
family = socket.AF_INET6 if is_ipv6_address(host) else socket.AF_INET
for remaining in until_timeout(timeout):
try:
addrinfo = socket.getaddrinfo(host, port, family,
socket.SOCK_STREAM)
except socket.error as e:
if e.errno not in (socket.EAI_NODATA, WSANO_DATA):
raise
if closed:
return
else:
continue
sockaddr = addrinfo[0][4]
# Treat Azure messed-up address lookup as a closed port.
if sockaddr[0] == '0.0.0.0':
if closed:
return
else:
continue
conn = socket.socket(*addrinfo[0][:3])
conn.settimeout(max(remaining or 0, 5))
try:
conn.connect(sockaddr)
except socket.timeout:
if closed:
return
except socket.error as e:
if e.errno not in (errno.ECONNREFUSED, errno.ENETUNREACH,
errno.ETIMEDOUT, errno.EHOSTUNREACH):
raise
if closed:
return
except socket.gaierror as e:
print_now(str(e))
except Exception as e:
print_now('Unexpected {!r}: {}'.format((type(e), e)))
raise
else:
conn.close()
if not closed:
return
sleep(1)
raise PortTimeoutError('Timed out waiting for port.')
def get_revision_build(build_info):
for action in build_info['actions']:
if 'parameters' in action:
for parameter in action['parameters']:
if parameter['name'] == 'revision_build':
return parameter['value']
def get_winrm_certs():
""""Returns locations of key and cert files for winrm in cloud-city."""
home = os.environ['HOME']
return (
os.path.join(home, 'cloud-city/winrm_client_cert.key'),
os.path.join(home, 'cloud-city/winrm_client_cert.pem'),
)
def s3_cmd(params, drop_output=False):
s3cfg_path = os.path.join(
os.environ['HOME'], 'cloud-city/juju-qa.s3cfg')
command = ['s3cmd', '-c', s3cfg_path, '--no-progress'] + params
if drop_output:
return subprocess.check_call(
command, stdout=open('/dev/null', 'w'))
else:
return subprocess.check_output(command)
def _get_test_name_from_filename():
try:
calling_file = sys._getframe(2).f_back.f_globals['__file__']
return os.path.splitext(os.path.basename(calling_file))[0]
except:
return 'unknown_test'
def generate_default_clean_dir(temp_env_name):
"""Creates a new unique directory for logging and returns name"""
logging.debug('Environment {}'.format(temp_env_name))
test_name = temp_env_name.split('-')[0]
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
log_dir = os.path.join('/tmp', test_name, 'logs', timestamp)
try:
os.makedirs(log_dir)
logging.info('Created logging directory {}'.format(log_dir))
except OSError as e:
if e.errno == errno.EEXIST:
logging.warn('"Directory {} already exists'.format(log_dir))
else:
raise('Failed to create logging directory: {} ' +
log_dir +
'. Please specify empty folder or try again')
return log_dir
def _generate_default_temp_env_name():
"""Creates a new unique name for environment and returns the name"""
# we need to sanitize the name
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
test_name = re.sub('[^a-zA-Z]', '', _get_test_name_from_filename())
return '{}-{}-temp-env'.format(test_name, timestamp)
def _to_deadline(timeout):
return datetime.utcnow() + timedelta(seconds=int(timeout))
def add_arg_juju_bin(parser):
parser.add_argument('juju_bin', nargs='?',
help='Full path to the Juju binary. By default, this'
' will use $PATH/juju',
default=None)
def add_basic_testing_arguments(
parser, using_jes=False, deadline=True, env=True, existing=True):
"""Returns the parser loaded with basic testing arguments.
The basic testing arguments, used in conjuction with boot_context ensures
a test can be run in any supported substrate in parallel.
This helper adds 4 positional arguments that defines the minimum needed
to run a test script.
These arguments (env, juju_bin, logs, temp_env_name) allow you to specify
specifics for which env, juju binary, which folder for logging and an
environment name for your test respectively.
There are many optional args that either update the env's config or
manipulate the juju command line options to test in controlled situations
or in uncommon substrates: --debug, --verbose, --agent-url, --agent-stream,
--series, --bootstrap-host, --machine, --keep-env. If not using_jes, the
--upload-tools arg will also be added.
:param parser: an ArgumentParser.
:param using_jes: whether args should be tailored for JES testing.
:param deadline: If true, support the --timeout option and convert to a
deadline.
:param existing: If true will supply the 'existing' argument to allow
running on an existing bootstrapped controller.
"""
# Optional postional arguments
if env:
parser.add_argument(
'env', nargs='?',
help='The juju environment to base the temp test environment on.',
default='lxd')
add_arg_juju_bin(parser)
parser.add_argument('logs', nargs='?', type=_clean_dir,
help='A directory in which to store logs. By default,'
' this will use the current directory',
default=None)
parser.add_argument('temp_env_name', nargs='?',
help='A temporary test environment name. By default, '
' this will generate an enviroment name using the '
' timestamp and testname. '
' test_name_timestamp_temp_env',
default=_generate_default_temp_env_name())
# Optional keyword arguments.
parser.add_argument('--debug', action='store_true',
help='Pass --debug to Juju.')
parser.add_argument('--verbose', action='store_const',
default=logging.INFO, const=logging.DEBUG,
help='Verbose test harness output.')
parser.add_argument('--region', help='Override environment region.')
parser.add_argument('--to', default=None,
help='Place the controller at a location.')
parser.add_argument('--agent-url', action='store', default=None,
help='URL for retrieving agent binaries.')
parser.add_argument('--agent-stream', action='store', default=None,
help='Stream for retrieving agent binaries.')
parser.add_argument('--series', action='store', default=None,
help='Name of the Ubuntu series to use.')
if not using_jes:
parser.add_argument('--upload-tools', action='store_true',
help='upload local version of tools to bootstrap.')
parser.add_argument('--bootstrap-host',
help='The host to use for bootstrap.')
parser.add_argument('--machine', help='A machine to add or when used with '
'KVM based MaaS, a KVM image to start.',
action='append', default=[])
parser.add_argument('--keep-env', action='store_true',
help='Keep the Juju environment after the test'
' completes.')
if existing:
parser.add_argument(
'--existing',
action='store',
default=None,
const='current',
nargs='?',
help='Test using an existing bootstrapped controller. '
'If no controller name is provided defaults to using the '
'current selected controller.')
if deadline:
parser.add_argument('--timeout', dest='deadline', type=_to_deadline,
help="The script timeout, in seconds.")
return parser
# suppress nosetests
add_basic_testing_arguments.__test__ = False
def configure_logging(log_level):
logging.basicConfig(
level=log_level, format='%(asctime)s %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
def get_candidates_path(root_dir):
return os.path.join(root_dir, 'candidate')
# GZ 2015-10-15: Paths returned in filesystem dependent order, may want sort?
def find_candidates(root_dir, find_all=False):
return (path for path, buildvars in _find_candidates(root_dir, find_all))
def find_latest_branch_candidates(root_dir):
"""Return a list of one candidate per branch.
:param root_dir: The root directory to find candidates from.
"""
candidates = []
for path, buildvars_path in _find_candidates(root_dir, find_all=False,
artifacts=True):
with open(buildvars_path) as buildvars_file:
buildvars = json.load(buildvars_file)
candidates.append(
(buildvars['branch'], int(buildvars['revision_build']), path))
latest = dict(
(branch, (path, build)) for branch, build, path in sorted(candidates))
return latest.values()
def _find_candidates(root_dir, find_all=False, artifacts=False):
candidates_path = get_candidates_path(root_dir)
a_week_ago = time() - timedelta(days=7).total_seconds()
for candidate_dir in os.listdir(candidates_path):
if candidate_dir.endswith('-artifacts') != artifacts:
continue
candidate_path = os.path.join(candidates_path, candidate_dir)
buildvars = os.path.join(candidate_path, 'buildvars.json')
try:
stat = os.stat(buildvars)
except OSError as e:
if e.errno in (errno.ENOENT, errno.ENOTDIR):
continue
raise
if not find_all and stat.st_mtime < a_week_ago:
continue
yield candidate_path, buildvars
def get_deb_arch():
"""Get the debian machine architecture."""
return subprocess.check_output(['dpkg', '--print-architecture']).strip()
def extract_deb(package_path, directory):
"""Extract a debian package to a specified directory."""
subprocess.check_call(['dpkg', '-x', package_path, directory])
def run_command(command, dry_run=False, verbose=False):
"""Optionally execute a command and maybe print the output."""
if verbose:
print_now('Executing: {}'.format(command))
if not dry_run:
output = subprocess.check_output(command)
if verbose:
print_now(output)
def get_unit_ipaddress(client, unit_name):
status = client.get_status()
return status.get_unit(unit_name)['public-address']
def log_and_wrap_exception(logger, exc):
"""Record exc details to logger and return wrapped in LoggedException."""
logger.exception(exc)
stdout = getattr(exc, 'output', None)
stderr = getattr(exc, 'stderr', None)
if stdout or stderr:
logger.info('Output from exception:\nstdout:\n%s\nstderr:\n%s',
stdout, stderr)
return LoggedException(exc)
@contextmanager
def logged_exception(logger):
"""\
Record exceptions in managed context to logger and reraise LoggedException.
Note that BaseException classes like SystemExit, GeneratorExit and
LoggedException itself are not wrapped, except for KeyboardInterrupt.
"""
try:
yield
except (Exception, KeyboardInterrupt) as e:
raise log_and_wrap_exception(logger, e)
def assert_dict_is_subset(sub_dict, super_dict):
"""Assert that every item in the sub_dict is in the super_dict.
:raises JujuAssertionError: when sub_dict items are missing.
:return: True when when sub_dict is a subset of super_dict
"""
if not all(item in super_dict.items() for item in sub_dict.items()):
raise JujuAssertionError(
'Found: {} \nExpected: {}'.format(super_dict, sub_dict))
return True