forked from globocom/GloboNetworkAPI
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathjson_validate.py
More file actions
127 lines (108 loc) · 4.51 KB
/
json_validate.py
File metadata and controls
127 lines (108 loc) · 4.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# -*- coding: utf-8 -*-
import json
import logging
from functools import wraps
from jsonspec.reference import resolve
from jsonspec.validators import load
from jsonspec.validators.exceptions import ValidationError
from rest_framework import exceptions as exceptions_api
from networkapi.api_rest import exceptions as rest_exceptions
log = logging.getLogger(__name__)
def verify_ports(pools):
for idx, pool in enumerate(pools['server_pools']):
ips = list()
v6 = list()
for member in pool['server_pool_members']:
if member['ip']:
ips.append((resolve(member, '#/ip/id'),
resolve(member, '#/port_real')))
if member['ipv6']:
v6.append((resolve(member, '#/ipv6/id'),
resolve(member, '#/port_real')))
if len(ips) != len(list(set(ips))):
raise ValidationError(
'Ips have same ports',
['#server_pools/%s/server_pool_members/*/ip/id - #server_pools/%s/server_pool_members/*/port_real' % (idx, idx)])
if len(v6) != len(list(set(v6))):
raise ValidationError(
'Ipv6 have same ports',
['#server_pools/%s/server_pool_members/*/ipv6/id - #server_pools/%s/server_pool_members/*/port_real' % (idx, idx)])
def verify_ports_vip(vips):
for idx, vip in enumerate(vips['vips']):
pts = list()
for port in vip['ports']:
if port['port']:
pts.append(resolve(port, '#/port'))
if len(pts) != len(list(set(pts))):
raise ValidationError(
'Vip has same ports',
['#vips/%s/ports/*/port ' % (idx)])
def json_validate(json_file):
with open(json_file) as data_file:
data = json.load(data_file)
validator = load(data)
return validator
def raise_json_validate(info=None):
def raise_json_validate_inner(func):
@wraps(func)
def inner(self, request, *args, **kwargs):
try:
return func(self, request, *args, **kwargs)
except ValidationError, error:
msg = list()
if error.flatten():
for pointer, reasons in error.flatten().items():
valor = resolve(
error[1], pointer) if pointer != '#/' else ''
msg.append({
'error_pointer': pointer,
'received_value': valor,
'error_reasons': list(reasons)
})
else:
msg.append({
'error_pointer': error[0],
'received_value': None,
'error_reasons': list(error[1])
})
res = {
'errors': msg
}
if info:
protocol = 'https' if request.is_secure() else 'http'
res['spec'] = '%s://%s/api/v3/help/%s/' % (
protocol, request.get_host(), info)
log.error(res)
raise rest_exceptions.ValidationExceptionJson(res)
except exceptions_api.AuthenticationFailed, error:
log.exception(error)
raise error
except exceptions_api.MethodNotAllowed, error:
log.exception(error)
raise error
except exceptions_api.NotAcceptable, error:
log.exception(error)
raise error
except exceptions_api.NotAuthenticated, error:
log.exception(error)
raise error
except exceptions_api.ParseError, error:
log.exception(error)
raise error
except exceptions_api.PermissionDenied, error:
log.exception(error)
raise error
except rest_exceptions.ValidationAPIException, error:
log.exception(error)
raise error
except rest_exceptions.ObjectDoesNotExistException, error:
log.exception(error)
raise error
except exceptions_api.APIException, error:
log.exception(error)
raise rest_exceptions.NetworkAPIException(error)
except Exception, error:
log.error(error)
raise rest_exceptions.NetworkAPIException(error)
return inner
return raise_json_validate_inner