-
Notifications
You must be signed in to change notification settings - Fork 33
Expand file tree
/
Copy pathconstraints.py
More file actions
364 lines (300 loc) · 11.2 KB
/
constraints.py
File metadata and controls
364 lines (300 loc) · 11.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (C) 2011 Loic Jaquemet [email protected]
#
try:
import ConfigParser as configparser
except ImportError as e:
import configparser
import logging
import numbers
import os
import re
import sys
from haystack.abc import interfaces
"""
This module holds some basic constraint class for the Haystack model.
"""
log = logging.getLogger('constraints')
class ConstraintsConfigHandler(interfaces.IConstraintsConfigHandler):
"""
Read constraints config files, applies constraints to modules.
a list(), a [], a dict(), a set() (possibly containing RangeValue, PerfectMatch and NotNull)
IgnoreMember
RangeValue(low, high)
NotNull
PerfectMatch('bytestr')
"""
valid_names = ['IgnoreMember', 'NotNull', 'RangeValue', 'PerfectMatch', 'ListLimitDepthValidation']
_rv = re.compile(r'''(?P<fn>RangeValue\((?P<args>[^)]+)\))''')
_pm = re.compile(r'''(?P<fn>PerfectMatch\('(?P<args>[^']+)'\))''')
_nn = re.compile(r'''(?P<fn>NotNull)[,]*,?''')
_ld = re.compile(r'''(?P<fn>ListLimitDepthValidation\((?P<args>[^)]+)\))''')
def read(self, filename):
"""
Read the constraints list from a file
:param filename:
:return:
"""
if not os.access(filename, os.F_OK):
raise IOError("File not found")
# read the constraint file
parser = configparser.RawConfigParser()
parser.optionxform = str
parser.read(filename)
# prepare the return object
_constraints = ModuleConstraints()
# each section anem is the name of the target structure
for struct_name in parser.sections():
log.debug('handling structure %s', struct_name)
record_constraints = RecordConstraints()
# each config entry is a field and its IConstraint
for field, value in parser.items(struct_name):
log.debug('%s: field %s ::= %s', struct_name, field, value)
try:
value = self._parse(value)
except ValueError as e:
raise ValueError("%s: struct_name: %s Field: %s constraint: %s" % (
e.message, struct_name, field, value))
# each field can only have one IConstraint (which can be a list of)
if field not in record_constraints:
record_constraints[field] = []
if isinstance(value, list):
record_constraints[field].extend(value)
else:
record_constraints[field].append(value)
# we set it
_constraints.set_constraints(struct_name, record_constraints)
return _constraints
def _parse(self, value):
if IgnoreMember.__name__ == value:
return IgnoreMember
elif IgnoreMember.__name__ in value:
raise ValueError("IgnoreMember should be alone as a constraint")
# TODO use re and .start(groups)
if '[' == value[0]:
remnant = value[1:-1]
log.debug('list is %s', remnant)
_args = []
# check for functions in the list
for fn in [self._rv, self._pm, self._nn]:
res = []
# find all fn
for x in fn.finditer(remnant):
log.debug("Found fn %s", x.group('fn'))
res.append(x.group('fn'))
# now parse each fn
for match in res:
_args.append(self._parse_c(match))
# remove them from the parsing lefts
remnant = remnant.replace(match, "")
log.debug("remnant is %s", remnant)
# now handle other element in list, like integers and floats
_class_type = list
args = remnant.split(',')
for x in args:
if '' == x.strip():
continue
else:
_args.append(self._try_numbers(x))
return _class_type(_args)
else:
return self._parse_c(value)
def _parse_c(self, value):
"""
Parse the function and args for a known function.
:param value:
:return:
"""
if 'NotNull' in value:
return NotNull
# get the function name
_t = value.split('(')
_class_name = _t[0]
args = _t[1][:-1]
# else its a RangeValue or a PerfectMatch
log.debug('we have a IConstraint %s', _class_name)
if _class_name not in ['RangeValue', 'PerfectMatch', 'ListLimitDepthValidation']:
raise ValueError('invalid constraints near %s', _class_name)
# we know the constraint
_class_type = getattr(sys.modules[__name__], _class_name)
log.debug('args: %s', args)
# look at the args
_args = None
if _class_name == 'RangeValue':
_args = self._rv.search(value).group('args').split(',')
assert len(_args) == 2
_args = [self._try_numbers(x) for x in _args]
return _class_type(*_args)
elif _class_name == 'PerfectMatch':
_args = self._pm.search(value).group('args')
return _class_type(_args)
elif _class_name == 'ListLimitDepthValidation':
_args = self._ld.search(value).group('args')
assert ',' not in _args
_args = self._try_numbers(_args)
return _class_type(_args)
else:
raise RuntimeError('no such constraint %s',_class_name)
def _try_numbers(self, _arg):
ret = None
try:
if '0x' in _arg.lower():
# try an hex
ret = int(_arg, 16)
elif '.' in _arg:
# try a float
ret = float(_arg)
else:
# try an int
ret = int(_arg)
except ValueError as e:
ret = str(_arg)
return ret
class ModuleConstraints(interfaces.IModuleConstraints):
"""
Holds the constraints for all record types of a module.
"""
def __init__(self):
self.__constraints = {}
self.__dynamics = {}
def get_constraints(self):
"""
get the list of IConstraint for all fields of record_name
:return the list of IConstraint for that record
"""
return self.__constraints
def set_constraints(self, record_type_name, record_constraints):
"""
Add constraints for that record_type name
:param record_type_name:
:param record_constraints:
:return:
"""
self.__constraints[record_type_name] = record_constraints
def get_dynamic_constraints(self):
"""
get the IRecordTypeDynamicConstraintsValidator for record_type_name
:return the list of IRecordTypeDynamicConstraintsValidator
"""
return self.__dynamics
def set_dynamic_constraints(self, record_type_name, record_constraints):
"""
Add dynamic constraints validator for that record_type name
:param record_type_name: str
:param record_constraints: IRecordTypeDynamicConstraintsValidator
:return:
"""
assert isinstance(record_constraints, interfaces.IRecordTypeDynamicConstraintsValidator)
self.__dynamics[record_type_name] = record_constraints
class RecordConstraints(interfaces.IRecordConstraints, dict):
"""
Holds the constraints for fields of a specific record type.
"""
def get_fields(self):
"""get the list of field names."""
return self.keys()
def get_constraints_for_field(self, field_name):
"""get the list of IConstraint for a field
"""
return self[field_name]
class IgnoreMember(interfaces.IConstraint):
"""
Constraint class for the Haystack model.
If this constraints is applied on a Structure member,
the member will be ignored by the validation engine.
"""
def __contains__(self, obj):
return True
class ListLimitDepthValidation(interfaces.IConstraint):
"""
Constraint class for the Haystack model.
If this constraints is applied on a Record member,
the member will be ignored by the listmodel validation engine.
"""
def __init__(self, max_depth):
self.max_depth = max_depth
def __contains__(self, obj):
return True
class RangeValue(interfaces.IConstraint):
"""
Constraint class for the Haystack model.
If this constraints is applied on a Structure member,
the member has to be between 'low' and 'high' values to be
considered as Valid.
"""
def __init__(self, low, high):
self.low = low
self.high = high
def __contains__(self, obj):
return self.low <= obj <= self.high
def __eq__(self, obj):
if isinstance(obj, RangeValue):
return self.low == obj.low and self.high == obj.high
elif isinstance(obj, numbers.Number):
return self.low <= obj <= self.high
else:
return False
class NotValue(interfaces.IConstraint):
"""
Constraint class for the Haystack model.
If this constraints is applied on a Structure member,
the member has to NOT be the value listed
considered as Valid.
"""
def __init__(self, not_value):
self.not_value = not_value
def __contains__(self, obj):
return self.not_value != obj
def __eq__(self, obj):
if isinstance(obj, NotValue):
return self.not_value == obj.not_value
return self.not_value != obj
class NotNullComparable(interfaces.IConstraint):
"""
Constraint class for the Haystack model.
If this constraints is applied on a Structure member,
the member should not be null to be considered valid by the validation engine.
"""
def __contains__(self, obj):
return bool(obj)
def __eq__(self, obj):
return bool(obj)
"""
Constraint class for the Haystack model.
If this constraints is applied on a Structure member,
the member should not be null to be considered valid by the validation engine.
"""
NotNull = NotNullComparable()
class BytesComparable(interfaces.IConstraint):
"""
Constraint class for the Haystack model.
If this constraints is applied on a Structure member,
the member should have the same bytes value and length.
"""
def __init__(self, seq):
self.seq = seq
def __contains__(self, obj):
if cmp(self, obj) == 0:
return True
return False
def __cmp__(self, obj):
import ctypes
if isinstance(obj, type(ctypes.c_void_p)):
if ctypes.sizeof(obj) != len(self.seq):
return -1
bytes = ctypes.string_at(ctypes.addressof(obj), ctypes.sizeof(obj))
if bytes == self.seq:
return 0
else:
return -1
# check if its a ctypes
try:
ctypes.sizeof(obj)
return cmp(self.seq, ctypes.string_at(
ctypes.addressof(obj), ctypes.sizeof(obj)))
except TypeError:
return cmp(self.seq, obj)
PerfectMatch = BytesComparable