-
Notifications
You must be signed in to change notification settings - Fork 26
Expand file tree
/
Copy pathstatement.py
More file actions
358 lines (265 loc) · 14.1 KB
/
statement.py
File metadata and controls
358 lines (265 loc) · 14.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
from datetime import datetime
from .utils import NS, get_text
from .atom_objects import Category
from .deposit_receipt import Deposit_Receipt
from .sword2_logging import logging
from lxml import etree
s_l = logging.getLogger(__name__)
class Sword_Statement(object):
def __init__(self, xml_document=None):
self.xml_document = xml_document
self.dom = None
self.parsed = False
self.valid = False
self.original_deposits = []
self.states = []
self.resources = []
self._parse_xml_document()
self._validate()
def _parse_xml_document(self):
if self.xml_document is not None:
try:
s_l.info("Attempting to parse the Statement XML document")
self.dom = etree.fromstring(self.xml_document)
self.parsed = True
except Exception as e:
s_l.error("Failed to parse document - %s" % e)
s_l.error("XML document begins:\n %s" % self.xml_document[:300])
def _validate(self): pass
class Statement_Resource(object):
def __init__(self, uri=None, is_original_deposit=False, deposited_on=None,
deposited_by=None, deposited_on_behalf_of=None):
self.uri = uri
self.is_original_deposit = is_original_deposit
self.deposited_on = deposited_on
self.deposited_by = deposited_by
self.deposited_on_behalf_of = deposited_on_behalf_of
class Atom_Statement_Entry(Deposit_Receipt, Statement_Resource):
def __init__(self, dom):
Deposit_Receipt.__init__(self, dom=dom)
Statement_Resource.__init__(self)
self.is_original_deposit = self._is_original_deposit()
self._parse_depositors()
# to provide a stable interface, use the content iri as the uri
self.uri = self.cont_iri
def _is_original_deposit(self):
# is this an original deposit?
is_original_deposit = False
for cat in self.dom.findall(NS['atom'] % 'category'):
if cat.get("term") == "http://purl.org/net/sword/terms/originalDeposit":
is_original_deposit = True
break
return is_original_deposit
def _parse_depositors(self):
do = self.dom.find(NS['sword'] % "depositedOn")
if do is not None and do.text is not None and do.text.strip() != "":
try:
self.deposited_on = datetime.strptime(do.text.strip(), "%Y-%m-%dT%H:%M:%SZ") # e.g. 2011-03-02T20:50:06Z
except Exception as e:
s_l.error("Failed to parse date - %s" % e)
s_l.error("Supplied date as string was: %s" % do.text.strip())
db = self.dom.find(NS['sword'] % "depositedBy")
if db is not None and db.text is not None and db.text.strip() != "":
self.deposited_by = db.text.strip()
dobo = self.dom.find(NS['sword'] % "depositedOnBehalfOf")
if dobo is not None and dobo.text is not None and db.text.strip() != "":
self.deposited_on_behalf_of = dobo.text.strip()
def validate(self):
# don't validate statement entries
return True
class Atom_Sword_Statement(Sword_Statement):
def __init__(self, xml_document=None):
Sword_Statement.__init__(self, xml_document)
if self.valid:
self._enumerate_feed()
else:
s_l.warn("Statement did not parse as valid, so the content will" +
" not be examined further; see the 'dom' attribute for the xml")
"""
FIXME: this implementation assumes that the atom document is a single
page, but Ben's original implementation at least started to make some
overtures towards dealing with that. This is the left behind code ...
self.first = None
self.next = None
self.previous = None
self.last = None
self.categories = []
self.entries = []
try:
coll_l.info("Attempting to parse the Feed XML document")
self.feed = etree.fromstring(xml_document)
self.parsed = True
except Exception, e:
coll_l.error("Failed to parse document - %s" % e)
coll_l.error("XML document begins:\n %s" % xml_document[:300])
self.enumerate_feed()
"""
def _enumerate_feed(self):
if self.dom is None:
return
# Handle Categories
for cat in self.dom.findall(NS['atom'] % 'category'):
if cat.get("scheme") == "http://purl.org/net/sword/terms/state":
self.states.append((cat.get("term"), cat.text.strip()))
# Handle Entries
for entry in self.dom.findall(NS['atom'] % 'entry'):
ase = Atom_Statement_Entry(entry)
if ase.is_original_deposit:
self.original_deposits.append(ase)
self.resources.append(ase)
def _validate(self):
valid = True
if self.dom is None:
return
# MUST be an ATOM Feed document
if self.dom.tag != NS['atom'] % "feed" and self.dom.tag != "feed":
valid = False
self.valid = valid
# The Feed MUST represent files contained in the item as an atom:entry element (this does not
# mandate that all files in the item are listed, though)
# Each atom:entry which is an original deposit file MUST have an atom:category element with
# the term sword:originalDeposit (this does not mandate that all original deposits are listed as entries)
# NOTE: neither of these requirements can easily be used to validate, since
# a statement may have zero entries, and an entry may or may not contain
# a category for an original deposit. So, we'll just settle for verifying
# that this is a feed, and be done with it.
class Ore_Statement_Resource(Statement_Resource):
def __init__(self, uri, is_original_deposit=False, packaging_uris=[],
deposited_on=None, deposited_by=None, deposited_on_behalf_of=None):
Statement_Resource.__init__(self, uri, is_original_deposit, deposited_on,
deposited_by, deposited_on_behalf_of)
self.uri = uri
self.packaging = packaging_uris
def __str__(self):
# FIXME: unfinished ...
return "URI: %s ; is_original_deposit: %s ; packaging_uris: %s ; deposited_on: %s"
class Ore_Sword_Statement(Sword_Statement):
def __init__(self, xml_document=None):
Sword_Statement.__init__(self, xml_document)
if self.valid:
self._enumerate_descriptions()
else:
s_l.warn("Statement did not parse as valid, so the content will" +
" not be examined further; see the 'dom' attribute for the xml")
def _enumerate_descriptions(self):
if self.dom is None:
return
aggregated_resource_uris = []
original_deposit_uris = []
state_uris = []
# first pass gets me the uris of all the things I care about
for desc in self.dom.findall(NS['rdf'] % "Description"):
# look for the aggregation
ore_idb = desc.findall(NS['ore'] % "isDescribedBy")
if ore_idb is None:
continue
# we are looking at the aggregation Describes itself
for agg_uri in desc.findall(NS['ore'] % "aggregates"):
aggregated_resource_uris.append(agg_uri.get(NS['rdf'] % "resource"))
for od_uri in desc.findall(NS['sword'] % "originalDeposit"):
original_deposit_uris.append(od_uri.get(NS['rdf'] % "resource"))
for state_uri in desc.findall(NS['sword'] % "state"):
state_uris.append(state_uri.get(NS['rdf'] % "resource"))
s_l.debug("First pass on ORE statement yielded the following Aggregated Resources: " + str(aggregated_resource_uris))
s_l.debug("First pass on ORE statement yielded the following Original Deposits: " + str(original_deposit_uris))
s_l.debug("First pass on ORE statement yielded the following States: " + str(state_uris))
# second pass, sort out the different descriptions
for desc in self.dom.findall(NS['rdf'] % "Description"):
about = desc.get(NS['rdf'] % "about")
s_l.debug("Examining Described Resource: " + str(about))
if about in state_uris:
s_l.debug(str(about) + " is a State URI")
# read and store the state information
description_text = None
sdesc = desc.find(NS['sword'] % "stateDescription")
if sdesc is not None and sdesc.text is not None and sdesc.text.strip() != "":
description_text = sdesc.text.strip()
self.states.append((about, description_text))
# remove this uri from the list of state_uris, so that we can
# deal with any left over later
state_uris.remove(about)
elif about in aggregated_resource_uris:
s_l.debug(str(about) + " is an Aggregated Resource")
is_original_deposit = about in original_deposit_uris
s_l.debug("Is Aggregated Resource an original deposit? " + str(is_original_deposit))
packaging_uris = []
for pack in desc.findall(NS['sword'] % "packaging"):
pack_uri = pack.get(NS['rdf'] % "resource")
packaging_uris.append(pack_uri)
s_l.debug("Registering Packaging URI: " + pack_uri)
deposited_on = None
do = desc.find(NS['sword'] % "depositedOn")
if do is not None and do.text is not None and do.text.strip() != "":
try:
deposited_on = datetime.strptime(do.text.strip(), "%Y-%m-%dT%H:%M:%SZ") # e.g. 2011-03-02T20:50:06Z
s_l.debug("Registering Deposited On: " + do.text.strip())
except Exception as e:
s_l.error("Failed to parse date - %s" % e)
s_l.error("Supplied date as string was: %s" % do.text.strip())
deposited_by = None
db = desc.find(NS['sword'] % "depositedBy")
if db is not None and db.text is not None and db.text.strip() != "":
deposited_by = db.text.strip()
s_l.debug("Registering Deposited By: " + deposited_by)
deposited_on_behalf_of = None
dobo = desc.find(NS['sword'] % "depositedOnBehalfOf")
if dobo is not None and dobo.text is not None and db.text.strip() != "":
deposited_on_behalf_of = dobo.text.strip()
s_l.debug("Registering Deposited On Behalf Of: " + deposited_on_behalf_of)
ose = Ore_Statement_Resource(about, is_original_deposit, packaging_uris,
deposited_on, deposited_by, deposited_on_behalf_of)
if is_original_deposit:
s_l.debug("Registering Aggregated Resource as an Original Deposit")
self.original_deposits.append(ose)
self.resources.append(ose)
# remove this uri from the list of resource_uris, so that we can
# deal with any left over later
aggregated_resource_uris.remove(about)
# finally, we may have aggregated resources and states which did not
# have rdf:Description elements associated with them. We do the minimum
# possible here to accommodate them
s_l.debug("Undescribed State URIs: " + str(state_uris))
for state in state_uris:
self.states.append((state, None))
s_l.debug("Undescribed Aggregated Resource URIs: " + str(aggregated_resource_uris))
for ar in aggregated_resource_uris:
ose = Ore_Statement_Resource(ar)
self.resources.append(ose)
def _validate(self):
valid = True
if self.dom is None:
return
# MUST be an RDF/XML resource map
# is this rdf xml:
if self.dom.tag.lower() != NS['rdf'] % "rdf" and self.dom.tag.lower() != "rdf":
s_l.info("Validation of Ore Statement failed, as root tag is not RDF: " + self.dom.tag)
valid = False
# does it meet the basic requirements of being a resource map, which
# is to have an ore:describes and and ore:isDescribedBy
describes_uri = None
rem_uri = None
aggregation_uri = None
is_described_by_uris = []
for desc in self.dom.findall(NS['rdf'] % "Description"):
# look for the describes tag
ore_desc = desc.find(NS['ore'] % "describes")
if ore_desc is not None:
describes_uri = ore_desc.get(NS['rdf'] % "resource")
rem_uri = desc.get(NS['rdf'] % "about")
# look for the isDescribedBy tag
ore_idb = desc.findall(NS['ore'] % "isDescribedBy")
if len(ore_idb) > 0:
aggregation_uri = desc.get(NS['rdf'] % "about")
for idb in ore_idb:
is_described_by_uris.append(idb.get(NS['rdf'] % "resource"))
# now check that all those uris tie up:
if describes_uri != aggregation_uri:
s_l.info("Validation of Ore Statement failed; ore:describes URI does not match Aggregation URI: " +
describes_uri + " != " + aggregation_uri)
valid = False
if rem_uri not in is_described_by_uris:
s_l.info("Validation of Ore Statement failed; Resource Map URI does not match one of ore:isDescribedBy URIs: " +
rem_uri + " not in " + str(is_described_by_uris))
valid = False
s_l.info("Statement validation; was it a success? " + str(valid))
self.valid = valid