#!/usr/local/bin/python """ $Id: notation3.py,v 1.204 2014/07/23 14:26:13 timbl Exp $ This module implements a Nptation3 parser, and the final part of a notation3 serializer. See also: Notation 3 http://www.w3.org/DesignIssues/Notation3 Closed World Machine - and RDF Processor http://www.w3.org/2000/10/swap/cwm To DO: See also "@@" in comments - Clean up interfaces ______________________________________________ Module originally by Dan Connolly, includeing notation3 parser and RDF generator. TimBL added RDF stream model and N3 generation, replaced stream model with use of common store/formula API. Yosi Scharf developped the module, including tests and test harness. 2013-11-07 Added naked dateZ and datetimeZ literal parsing """ # Python standard libraries import types, sys import string import codecs # python 2-ism; for writing utf-8 in RDF/xml output import urllib import re from warnings import warn from sax2rdf import XMLtoDOM # Incestuous.. would be nice to separate N3 and XML # SWAP http://www.w3.org/2000/10/swap from diag import verbosity, setVerbosity, progress from uripath import refTo, join import uripath import RDFSink from RDFSink import CONTEXT, PRED, SUBJ, OBJ, PARTS, ALL4 from RDFSink import LITERAL, XMLLITERAL, LITERAL_DT, LITERAL_LANG, ANONYMOUS, SYMBOL from RDFSink import Logic_NS import diag from xmlC14n import Canonicalize from why import BecauseOfData, becauseSubexpression N3_forSome_URI = RDFSink.forSomeSym N3_forAll_URI = RDFSink.forAllSym # Magic resources we know about from RDFSink import RDF_type_URI, RDF_NS_URI, DAML_sameAs_URI, parsesTo_URI from RDFSink import RDF_spec, List_NS, uniqueURI from local_decimal import Decimal ADDED_HASH = "#" # Stop where we use this in case we want to remove it! # This is the hash on namespace URIs RDF_type = ( SYMBOL , RDF_type_URI ) DAML_sameAs = ( SYMBOL, DAML_sameAs_URI ) from RDFSink import N3_first, N3_rest, N3_nil, N3_li, N3_List, N3_Empty LOG_implies_URI = "http://www.w3.org/2000/10/swap/log#implies" INTEGER_DATATYPE = "http://www.w3.org/2001/XMLSchema#integer" FLOAT_DATATYPE = "http://www.w3.org/2001/XMLSchema#double" DECIMAL_DATATYPE = "http://www.w3.org/2001/XMLSchema#decimal" BOOLEAN_DATATYPE = "http://www.w3.org/2001/XMLSchema#boolean" DATE_DATATYPE = "http://www.w3.org/2001/XMLSchema#date" DATETIME_DATATYPE = "http://www.w3.org/2001/XMLSchema#dateTime" option_noregen = 0 # If set, do not regenerate genids on output # @@ I18n - the notname chars need extending for well known unicode non-text # characters. The XML spec switched to assuming unknown things were name # characaters. # _namechars = string.lowercase + string.uppercase + string.digits + '_-' _notQNameChars = "\t\r\n !\"#$%&'()*.,+/;<=>?@[\\]^`{|}~" # else valid qname :-/ _notNameChars = _notQNameChars + ":" # Assume anything else valid name :-/ _rdfns = 'http://www.w3.org/1999/02/22-rdf-syntax-ns#' N3CommentCharacter = "#" # For unix script #! compatabilty ########################################## Parse string to sink # # Regular expressions: eol = re.compile(r'[ \t]*(#[^\n]*)?\r?\n') # end of line, poss. w/comment eof = re.compile(r'[ \t]*(#[^\n]*)?$') # end of file, poss. w/comment ws = re.compile(r'[ \t]*') # Whitespace not including NL signed_integer = re.compile(r'[-+]?[0-9]+') # integer number_syntax = re.compile(r'(?P[-+]?[0-9]+)(?P\.[0-9]+)?(?Pe[-+]?[0-9]+)?') datetime_syntax = re.compile(r'[0-9][0-9][0-9][0-9]-[0-9][0-9]-[0-9][0-9](T[0-9][0-9]:[0-9][0-9](:[0-9][0-9](\.[0-9]*)?)?)?Z?'); digitstring = re.compile(r'[0-9]+') # Unsigned integer interesting = re.compile(r'[\\\r\n\"]') langcode = re.compile(r'[a-zA-Z0-9]+(-[a-zA-Z0-9]+)?') #" class SinkParser: def __init__(self, store, openFormula=None, thisDoc="", baseURI=None, genPrefix = "", metaURI=None, flags="", why=None): """ note: namespace names should *not* end in #; the # will get added during qname processing """ self._bindings = {} self._flags = flags if thisDoc != "": assert ':' in thisDoc, "Document URI not absolute: <%s>" % thisDoc self._bindings[""] = thisDoc + "#" # default self._store = store if genPrefix: store.setGenPrefix(genPrefix) # pass it on self._thisDoc = thisDoc self.lines = 0 # for error handling self.startOfLine = 0 # For calculating character number self._genPrefix = genPrefix self.keywords = ['a', 'this', 'bind', 'has', 'is', 'of', 'true', 'false' ] self.keywordsSet = 0 # Then only can others be considerd qnames self._anonymousNodes = {} # Dict of anon nodes already declared ln: Term self._variables = {} self._parentVariables = {} self._reason = why # Why the parser was asked to parse this self._reason2 = None # Why these triples if diag.tracking: self._reason2 = BecauseOfData( store.newSymbol(thisDoc), because=self._reason) if baseURI: self._baseURI = baseURI else: if thisDoc: self._baseURI = thisDoc else: self._baseURI = None assert not self._baseURI or ':' in self._baseURI if not self._genPrefix: if self._thisDoc: self._genPrefix = self._thisDoc + "#_g" else: self._genPrefix = uniqueURI() if openFormula ==None: if self._thisDoc: self._formula = store.newFormula(thisDoc + "#_formula") else: self._formula = store.newFormula() else: self._formula = openFormula self._context = self._formula self._parentContext = None if metaURI: self.makeStatement((SYMBOL, metaURI), # relate doc to parse tree (SYMBOL, PARSES_TO_URI ), #pred (SYMBOL, thisDoc), #subj self._context) # obj self.makeStatement(((SYMBOL, metaURI), # quantifiers - use inverse? (SYMBOL, N3_forSome_URI), #pred self._context, #subj subj)) # obj def here(self, i): """String generated from position in file This is for repeatability when refering people to bnodes in a document. This has diagnostic uses less formally, as it should point one to which bnode the arbitrary identifier actually is. It gives the line and character number of the '[' charcacter or path character which introduced the blank node. The first blank node is boringly _L1C1. It used to be used only for tracking, but for tests in general it makes the canonical ordering of bnodes repeatable.""" return "%s_L%iC%i" % (self._genPrefix , self.lines, i - self.startOfLine + 1) def formula(self): return self._formula def loadStream(self, stream): return self.loadBuf(stream.read()) # Not ideal def loadBuf(self, buf): """Parses a buffer and returns its top level formula""" self.startDoc() self.feed(buf) return self.endDoc() # self._formula def feed(self, octets): """Feed an octet stream tothe parser if BadSyntax is raised, the string passed in the exception object is the remainder after any statements have been parsed. So if there is more data to feed to the parser, it should be straightforward to recover.""" str = octets.decode('utf-8') i = 0 while i >= 0: j = self.skipSpace(str, i) if j<0: return i = self.directiveOrStatement(str,j) if i<0: # print "# next char: ", `str[j]` raise BadSyntax(self._thisDoc, self.lines, str, j, "expected directive or statement") def directiveOrStatement(self, str,h): i = self.skipSpace(str, h) if i<0: return i # EOF j = self.directive(str, i) if j>=0: return self.checkDot(str,j) j = self.statement(str, i) if j>=0: return self.checkDot(str,j) return j #@@I18N global _notNameChars #_namechars = string.lowercase + string.uppercase + string.digits + '_-' def tok(self, tok, str, i): """Check for keyword. Space must have been stripped on entry and we must not be at end of file.""" assert tok[0] not in _notNameChars # not for punctuation # was: string.whitespace which is '\t\n\x0b\x0c\r \xa0' -- not ascii whitespace = '\t\n\x0b\x0c\r ' if str[i:i+1] == "@": i = i+1 else: if tok not in self.keywords: return -1 # No, this has neither keywords declaration nor "@" if (str[i:i+len(tok)] == tok and (str[i+len(tok)] in _notQNameChars )): i = i + len(tok) return i else: return -1 def directive(self, str, i): j = self.skipSpace(str, i) if j<0: return j # eof res = [] j = self.tok('bind', str, i) # implied "#". Obsolete. if j>0: raise BadSyntax(self._thisDoc, self.lines, str, i, "keyword bind is obsolete: use @prefix") j = self.tok('keywords', str, i) if j>0: i = self.commaSeparatedList(str, j, res, self.bareWord) if i < 0: raise BadSyntax(self._thisDoc, self.lines, str, i, "'@keywords' needs comma separated list of words") self.setKeywords(res[:]) if diag.chatty_flag > 80: progress("Keywords ", self.keywords) return i j = self.tok('forAll', str, i) if j > 0: i = self.commaSeparatedList(str, j, res, self.uri_ref2) if i <0: raise BadSyntax(self._thisDoc, self.lines, str, i, "Bad variable list after @forAll") for x in res: #self._context.declareUniversal(x) if x not in self._variables or x in self._parentVariables: self._variables[x] = self._context.newUniversal(x) return i j = self.tok('forSome', str, i) if j > 0: i = self. commaSeparatedList(str, j, res, self.uri_ref2) if i <0: raise BadSyntax(self._thisDoc, self.lines, str, i, "Bad variable list after @forSome") for x in res: self._context.declareExistential(x) return i j=self.tok('prefix', str, i) # no implied "#" if j>=0: t = [] i = self.qname(str, j, t) if i<0: raise BadSyntax(self._thisDoc, self.lines, str, j, "expected qname after @prefix") j = self.uri_ref2(str, i, t) if j<0: raise BadSyntax(self._thisDoc, self.lines, str, i, "expected after @prefix _qname_") ns = self.uriOf(t[1]) if self._baseURI: ns = join(self._baseURI, ns) elif ":" not in ns: raise BadSyntax(self._thisDoc, self.lines, str, j, "With no base URI, cannot use relative URI in @prefix <"+ns+">") assert ':' in ns # must be absolute self._bindings[t[0][0]] = ns self.bind(t[0][0], hexify(ns)) return j j=self.tok('base', str, i) # Added 2007/7/7 if j >= 0: t = [] i = self.uri_ref2(str, j, t) if i<0: raise BadSyntax(self._thisDoc, self.lines, str, j, "expected after @base ") ns = self.uriOf(t[0]) if self._baseURI: ns = join(self._baseURI, ns) elif ':' not in ns: raise BadSyntax(self._thisDoc, self.lines, str, j, "With no previous base URI, cannot use relative URI in @base <"+ns+">") assert ':' in ns # must be absolute self._baseURI = ns return i return -1 # Not a directive, could be something else. def bind(self, qn, uri): assert isinstance(uri, types.StringType), "Any unicode must be %x-encoded already" if qn == "": self._store.setDefaultNamespace(uri) else: self._store.bind(qn, uri) def setKeywords(self, k): "Takes a list of strings" if k == None: self.keywordsSet = 0 else: self.keywords = k self.keywordsSet = 1 def startDoc(self): self._store.startDoc() def endDoc(self): """Signal end of document and stop parsing. returns formula""" self._store.endDoc(self._formula) # don't canonicalize yet return self._formula def makeStatement(self, quadruple): #$$$$$$$$$$$$$$$$$$$$$ # print "# Parser output: ", `quadruple` self._store.makeStatement(quadruple, why=self._reason2) def statement(self, str, i): r = [] i = self.object(str, i, r) # Allow literal for subject - extends RDF if i<0: return i j = self.property_list(str, i, r[0]) if j<0: raise BadSyntax(self._thisDoc, self.lines, str, i, "expected propertylist") return j def subject(self, str, i, res): return self.item(str, i, res) def verb(self, str, i, res): """ has _prop_ is _prop_ of a = _prop_ >- prop -> <- prop -< _operator_""" j = self.skipSpace(str, i) if j<0:return j # eof r = [] j = self.tok('has', str, i) if j>=0: i = self.prop(str, j, r) if i < 0: raise BadSyntax(self._thisDoc, self.lines, str, j, "expected property after 'has'") res.append(('->', r[0])) return i j = self.tok('is', str, i) if j>=0: i = self.prop(str, j, r) if i < 0: raise BadSyntax(self._thisDoc, self.lines, str, j, "expected after 'is'") j = self.skipSpace(str, i) if j<0: raise BadSyntax(self._thisDoc, self.lines, str, i, "End of file found, expected property after 'is'") return j # eof i=j j = self.tok('of', str, i) if j<0: raise BadSyntax(self._thisDoc, self.lines, str, i, "expected 'of' after 'is' ") res.append(('<-', r[0])) return j j = self.tok('a', str, i) if j>=0: res.append(('->', RDF_type)) return j if str[i:i+2] == "<=": res.append(('<-', self._store.newSymbol(Logic_NS+"implies"))) return i+2 if str[i:i+1] == "=": if str[i+1:i+2] == ">": res.append(('->', self._store.newSymbol(Logic_NS+"implies"))) return i+2 res.append(('->', DAML_sameAs)) return i+1 if str[i:i+2] == ":=": # patch file relates two formulae, uses this @@ really? res.append(('->', Logic_NS+"becomes")) return i+2 j = self.prop(str, i, r) if j >= 0: res.append(('->', r[0])) return j if str[i:i+2] == ">-" or str[i:i+2] == "<-": raise BadSyntax(self._thisDoc, self.lines, str, j, ">- ... -> syntax is obsolete.") return -1 def prop(self, str, i, res): return self.item(str, i, res) def item(self, str, i, res): return self.path(str, i, res) def blankNode(self, uri=None): if "B" not in self._flags: return self._context.newBlankNode(uri, why=self._reason2) x = self._context.newSymbol(uri) self._context.declareExistential(x) return x def path(self, str, i, res): """Parse the path production. """ j = self.nodeOrLiteral(str, i, res) if j<0: return j # nope while str[j:j+1] in "!^.": # no spaces, must follow exactly (?) ch = str[j:j+1] # @@ Allow "." followed IMMEDIATELY by a node. if ch == ".": ahead = str[j+1:j+2] if not ahead or (ahead in _notNameChars and ahead not in ":?<[{("): break subj = res.pop() obj = self.blankNode(uri=self.here(j)) j = self.node(str, j+1, res) if j<0: raise BadSyntax(self._thisDoc, self.lines, str, j, "EOF found in middle of path syntax") pred = res.pop() if ch == "^": # Reverse traverse self.makeStatement((self._context, pred, obj, subj)) else: self.makeStatement((self._context, pred, subj, obj)) res.append(obj) return j def anonymousNode(self, ln): """Remember or generate a term for one of these _: anonymous nodes""" term = self._anonymousNodes.get(ln, None) if term != None: return term term = self._store.newBlankNode(self._context, why=self._reason2) self._anonymousNodes[ln] = term return term def node(self, str, i, res, subjectAlready=None): """Parse the production. Space is now skipped once at the beginning instead of in multipe calls to self.skipSpace(). """ subj = subjectAlready j = self.skipSpace(str,i) if j<0: return j #eof i=j ch = str[i:i+1] # Quick 1-character checks first: if ch == "[": bnodeID = self.here(i) j=self.skipSpace(str,i+1) if j<0: raise BadSyntax(self._thisDoc, self.lines, str, i, "EOF after '['") if str[j:j+1] == "=": # Hack for "is" binding name to anon node i = j+1 objs = [] j = self.objectList(str, i, objs); if j>=0: subj = objs[0] if len(objs)>1: for obj in objs: self.makeStatement((self._context, DAML_sameAs, subj, obj)) j = self.skipSpace(str, j) if j<0: raise BadSyntax(self._thisDoc, self.lines, str, i, "EOF when objectList expected after [ = ") if str[j:j+1] == ";": j=j+1 else: raise BadSyntax(self._thisDoc, self.lines, str, i, "objectList expected after [= ") if subj is None: subj=self.blankNode(uri= bnodeID) i = self.property_list(str, j, subj) if i<0: raise BadSyntax(self._thisDoc, self.lines, str, j, "property_list expected") j = self.skipSpace(str, i) if j<0: raise BadSyntax(self._thisDoc, self.lines, str, i, "EOF when ']' expected after [ ") if str[j:j+1] != "]": raise BadSyntax(self._thisDoc, self.lines, str, j, "']' expected") res.append(subj) return j+1 if ch == "{": ch2 = str[i+1:i+2] if ch2 == '$': i += 1 j = i + 1 List = [] first_run = True while 1: i = self.skipSpace(str, j) if i<0: raise BadSyntax(self._thisDoc, self.lines, str, i, "needed '$}', found end.") if str[i:i+2] == '$}': j = i+2 break if not first_run: if str[i:i+1] == ',': i+=1 else: raise BadSyntax(self._thisDoc, self.lines, str, i, "expected: ','") else: first_run = False item = [] j = self.item(str,i, item) #@@@@@ should be path, was object if j<0: raise BadSyntax(self._thisDoc, self.lines, str, i, "expected item in set or '$}'") List.append(self._store.intern(item[0])) res.append(self._store.newSet(List, self._context)) return j else: j=i+1 oldParentContext = self._parentContext self._parentContext = self._context parentAnonymousNodes = self._anonymousNodes grandParentVariables = self._parentVariables self._parentVariables = self._variables self._anonymousNodes = {} self._variables = self._variables.copy() reason2 = self._reason2 self._reason2 = becauseSubexpression if subj is None: subj = self._store.newFormula() self._context = subj while 1: i = self.skipSpace(str, j) if i<0: raise BadSyntax(self._thisDoc, self.lines, str, i, "needed '}', found end.") if str[i:i+1] == "}": j = i+1 break j = self.directiveOrStatement(str,i) if j<0: raise BadSyntax(self._thisDoc, self.lines, str, i, "expected statement or '}'") self._anonymousNodes = parentAnonymousNodes self._variables = self._parentVariables self._parentVariables = grandParentVariables self._context = self._parentContext self._reason2 = reason2 self._parentContext = oldParentContext res.append(subj.close()) # No use until closed return j if ch == "(": thing_type = self._store.newList ch2 = str[i+1:i+2] if ch2 == '$': thing_type = self._store.newSet i += 1 j=i+1 List = [] while 1: i = self.skipSpace(str, j) if i<0: raise BadSyntax(self._thisDoc, self.lines, str, i, "needed ')', found end.") if str[i:i+2] == '$)': j = i+2 break if str[i:i+1] == ')': j = i+1 break item = [] j = self.item(str,i, item) #@@@@@ should be path, was object if j<0: raise BadSyntax(self._thisDoc, self.lines, str, i, "expected item in list or ')'") List.append(self._store.intern(item[0])) res.append(thing_type(List, self._context)) return j j = self.tok('this', str, i) # This context if j>=0: warn(''.__class__(BadSyntax(self._thisDoc, self.lines, str, i, "Keyword 'this' was ancient N3. Now use @forSome and @forAll keywords."))) res.append(self._context) return j #booleans j = self.tok('true', str, i) if j>=0: res.append(True) return j j = self.tok('false', str, i) if j>=0: res.append(False) return j if subj is None: # If this can be a named node, then check for a name. j = self.uri_ref2(str, i, res) if j >= 0: return j return -1 def property_list(self, str, i, subj): """Parse property list Leaves the terminating punctuation in the buffer """ while 1: j = self.skipSpace(str, i) if j<0: raise BadSyntax(self._thisDoc, self.lines, str, i, "EOF found when expected verb in property list") return j #eof if str[j:j+2] ==":-": i = j + 2 res = [] j = self.node(str, i, res, subj) if j<0: raise BadSyntax(self._thisDoc, self.lines, str, i, "bad {} or () or [] node after :- ") i=j continue i=j v = [] j = self.verb(str, i, v) if j<=0: return i # void but valid objs = [] i = self.objectList(str, j, objs) if i<0: raise BadSyntax(self._thisDoc, self.lines, str, j, "objectList expected") for obj in objs: dir, sym = v[0] if dir == '->': self.makeStatement((self._context, sym, subj, obj)) else: self.makeStatement((self._context, sym, obj, subj)) j = self.skipSpace(str, i) if j<0: raise BadSyntax(self._thisDoc, self.lines, str, j, "EOF found in list of objects") return j #eof if str[i:i+1] != ";": return i i = i+1 # skip semicolon and continue def commaSeparatedList(self, str, j, res, what): """return value: -1 bad syntax; >1 new position in str res has things found appended """ i = self.skipSpace(str, j) if i<0: raise BadSyntax(self._thisDoc, self.lines, str, i, "EOF found expecting comma sep list") return i if str[i] == ".": return j # empty list is OK i = what(str, i, res) if i<0: return -1 while 1: j = self.skipSpace(str, i) if j<0: return j # eof ch = str[j:j+1] if ch != ",": if ch != ".": return -1 return j # Found but not swallowed "." i = what(str, j+1, res) if i<0: raise BadSyntax(self._thisDoc, self.lines, str, i, "bad list content") return i def objectList(self, str, i, res): i = self.object(str, i, res) if i<0: return -1 while 1: j = self.skipSpace(str, i) if j<0: raise BadSyntax(self._thisDoc, self.lines, str, j, "EOF found after object") return j #eof if str[j:j+1] != ",": return j # Found something else! i = self.object(str, j+1, res) if i<0: return i def checkDot(self, str, i): j = self.skipSpace(str, i) if j<0: return j #eof if str[j:j+1] == ".": return j+1 # skip if str[j:j+1] == "}": return j # don't skip it if str[j:j+1] == "]": return j raise BadSyntax(self._thisDoc, self.lines, str, j, "expected '.' or '}' or ']' at end of statement") return i def uri_ref2(self, str, i, res): """Generate uri from n3 representation. Note that the RDF convention of directly concatenating NS and local name is now used though I prefer inserting a '#' to make the namesapces look more like what XML folks expect. """ qn = [] j = self.qname(str, i, qn) if j>=0: pfx, ln = qn[0] if pfx is None: assert 0, "not used?" ns = self._baseURI + ADDED_HASH else: try: ns = self._bindings[pfx] except KeyError: if pfx == "_": # Magic prefix 2001/05/30, can be overridden res.append(self.anonymousNode(ln)) return j raise BadSyntax(self._thisDoc, self.lines, str, i, "Prefix \"%s:\" not bound" % (pfx)) symb = self._store.newSymbol(ns + ln) if symb in self._variables: res.append(self._variables[symb]) else: res.append(symb) # @@@ "#" CONVENTION if not string.find(ns, "#"):progress( "Warning: no # on namespace %s," % ns) return j i = self.skipSpace(str, i) if i<0: return -1 if str[i] == "?": v = [] j = self.variable(str,i,v) if j>0: #Forget varibles as a class, only in context. res.append(v[0]) return j return -1 elif str[i]=="<": i = i + 1 st = i while i < len(str): if str[i] == ">": uref = str[st:i] # the join should dealt with "": if self._baseURI: uref = uripath.join(self._baseURI, uref) else: assert ":" in uref, \ "With no base URI, cannot deal with relative URIs" if str[i-1:i]=="#" and not uref[-1:]=="#": uref = uref + "#" # She meant it! Weirdness in urlparse? symb = self._store.newSymbol(uref) if symb in self._variables: res.append(self._variables[symb]) else: res.append(symb) return i+1 i = i + 1 raise BadSyntax(self._thisDoc, self.lines, str, j, "unterminated URI reference") elif self.keywordsSet: v = [] j = self.bareWord(str,i,v) if j<0: return -1 #Forget varibles as a class, only in context. if v[0] in self.keywords: raise BadSyntax(self._thisDoc, self.lines, str, i, 'Keyword "%s" not allowed here.' % v[0]) res.append(self._store.newSymbol(self._bindings[""]+v[0])) return j else: return -1 def skipSpace(self, str, i): """Skip white space, newlines and comments. return -1 if EOF, else position of first non-ws character""" while 1: m = eol.match(str, i) if m == None: break self.lines = self.lines + 1 i = m.end() # Point to first character unmatched self.startOfLine = i m = ws.match(str, i) if m != None: i = m.end() m = eof.match(str, i) if m != None: return -1 return i def variable(self, str, i, res): """ ?abc -> variable(:abc) """ j = self.skipSpace(str, i) if j<0: return -1 if str[j:j+1] != "?": return -1 j=j+1 i = j if str[j] in "0123456789-": raise BadSyntax(self._thisDoc, self.lines, str, j, "Varible name can't start with '%s'" % str[j]) return -1 while i :abc """ j = self.skipSpace(str, i) if j<0: return -1 if str[j] in "0123456789-" or str[j] in _notNameChars: return -1 i = j while i ('xyz', 'def') If not in keywords and keywordsSet: def -> ('', 'def') :def -> ('', 'def') """ i = self.skipSpace(str, i) if i<0: return -1 c = str[i] if c in "0123456789-+": return -1 if c not in _notNameChars: ln = c i = i + 1 while i < len(str): c = str[i] if c not in _notNameChars: ln = ln + c i = i + 1 else: break else: # First character is non-alpha ln = '' # Was: None - TBL (why? useful?) if i= 0: return j else: j = self.skipSpace(str, i) if j<0: return -1 else: i=j if str[i]=='"': if str[i:i+3] == '"""': delim = '"""' else: delim = '"' i = i + len(delim) j, s = self.strconst(str, i, delim) res.append(self._store.newLiteral(s)) progress("New string const ", s, j) return j else: return -1 def nodeOrLiteral(self, str, i, res): j = self.node(str, i, res) if j>= 0: return j else: j = self.skipSpace(str, i) if j<0: return -1 else: i=j ch = str[i] if ch in "-+0987654321": m = datetime_syntax.match(str, i); if m != None: j = m.end(); # print "date time "+str[i:j] if 'T' in str[i:j]: res.append(self._store.newLiteral(str[i:j], self._store.newSymbol(DATETIME_DATATYPE))) return j else: res.append(self._store.newLiteral(str[i:j], self._store.newSymbol(DATE_DATATYPE))) return j m = number_syntax.match(str, i) if m != None: j = m.end() if m.group('exponent') != None: # includes decimal exponent res.append(float(str[i:j])) # res.append(self._store.newLiteral(str[i:j], # self._store.newSymbol(FLOAT_DATATYPE))) elif m.group('decimal') != None: res.append(Decimal(str[i:j])) else: res.append(long(str[i:j])) # res.append(self._store.newLiteral(str[i:j], # self._store.newSymbol(INTEGER_DATATYPE))) return j raise BadSyntax(self._thisDoc, self.lines, str, i, "Bad number or datetime syntax") if str[i]=='"': if str[i:i+3] == '"""': delim = '"""' else: delim = '"' i = i + len(delim) dt = None j, s = self.strconst(str, i, delim) lang = None if str[j:j+1] == "@": # Language? m = langcode.match(str, j+1) if m == None: raise BadSyntax(self._thisDoc, startline, str, i, "Bad language code syntax on string literal, after @") i = m.end() lang = str[j+1:i] j = i if str[j:j+2] == "^^": res2 = [] j = self.uri_ref2(str, j+2, res2) # Read datatype URI dt = res2[0] if dt.uriref() == "http://www.w3.org/1999/02/22-rdf-syntax-ns#XMLLiteral": try: dom = XMLtoDOM('' + s + '').firstChild except: raise ValueError('s="%s"' % s) res.append(self._store.newXMLLiteral(dom)) return j res.append(self._store.newLiteral(s, dt, lang)) return j else: return -1 def uriOf(self, sym): if isinstance(sym, types.TupleType): return sym[1] # old system for --pipe return sym.uriref() # cwm api def strconst(self, str, i, delim): """parse an N3 string constant delimited by delim. return index, val """ j = i ustr = u"" # Empty unicode string startline = self.lines # Remember where for error messages while j= 0: uch = '\a\b\f\r\t\v\n\\"'[k] ustr = ustr + uch j = j + 1 elif ch == "u": j, ch = self.uEscape(str, j+1, startline) ustr = ustr + ch elif ch == "U": j, ch = self.UEscape(str, j+1, startline) ustr = ustr + ch else: raise BadSyntax(self._thisDoc, self.lines, str, i, "bad escape") raise BadSyntax(self._thisDoc, self.lines, str, i, "unterminated string literal") def uEscape(self, str, i, startline): j = i count = 0 value = 0 while count < 4: # Get 4 more characters ch = str[j:j+1].lower() # sbp http://ilrt.org/discovery/chatlogs/rdfig/2002-07-05 j = j + 1 if ch == "": raise BadSyntax(self._thisDoc, startline, str, i, "unterminated string literal(3)") k = string.find("0123456789abcdef", ch) if k < 0: raise BadSyntax(self._thisDoc, startline, str, i, "bad string literal hex escape") value = value * 16 + k count = count + 1 uch = unichr(value) return j, uch def UEscape(self, str, i, startline): stringType = type('') j = i count = 0 value = '\\U' while count < 8: # Get 8 more characters ch = str[j:j+1].lower() # sbp http://ilrt.org/discovery/chatlogs/rdfig/2002-07-05 j = j + 1 if ch == "": raise BadSyntax(self._thisDoc, startline, str, i, "unterminated string literal(3)") k = string.find("0123456789abcdef", ch) if k < 0: raise BadSyntax(self._thisDoc, startline, str, i, "bad string literal hex escape") value = value + ch count = count + 1 uch = stringType(value).decode('unicode-escape') return j, uch wide_build = True try: unichr(0x10000) except ValueError: wide_build = False # If we are going to do operators then they should generate # [ is operator:plus of ( \1 \2 ) ] class BadSyntax(SyntaxError): def __init__(self, uri, lines, str, i, why): self._str = str.encode('utf-8') # Better go back to strings for errors self._i = i self._why = why self.lines = lines self._uri = uri def __str__(self): str = self._str i = self._i st = 0 if i>60: pre="..." st = i - 60 else: pre="" if len(str)-i > 60: post="..." else: post="" return 'at line %i of <%s>:\nBad syntax (%s) at ^ in:\n"%s%s^%s%s"' \ % (self.lines +1, self._uri, self._why, pre, str[st:i], str[i:i+60], post) def stripCR(str): res = "" for ch in str: if ch != "\r": res = res + ch return res def dummyWrite(x): pass ################################################################################ def toBool(s): if s == 'true' or s == 'True' or s == '1': return True if s == 'false' or s == 'False' or s == '0': return False raise ValueError(s) class ToN3(RDFSink.RDFSink): """Serializer output sink for N3 keeps track of most recent subject and predicate reuses them. Adapted from Dan's ToRDFParser(Parser); """ flagDocumentation = """Flags for N3 output are as follows:- a Anonymous nodes should be output using the _: convention (p flag or not). d Don't use default namespace (empty prefix) c Comments added at top about version and base URI used. e escape literals --- use \u notation g Suppress => shothand for log:implies i Use identifiers from store - don't regen on output l List syntax suppression. Don't use (..) n No numeric syntax - use strings typed with ^^ syntax p Prefix suppression - don't use them, always URIs in <> instead of qnames. r Relative URI suppression. Always use absolute URIs. s Subject must be explicit for every statement. Don't use ";" shorthand. t "=" and "()" special syntax should be suppresed. u Use \u for unicode escaping in URIs instead of utf-8 %XX v Use "this log:forAll" for @forAll, and "this log:forAll" for "@forSome". / If namespace has no # in it, assume it ends at the last slash if outputting. Flags for N3 input: B Turn any blank node into a existentially qualified explicitly named node. """ # " # A word about regenerated Ids. # # Within the program, the URI of a resource is kept the same, and in fact # tampering with it would leave risk of all kinds of inconsistencies. # Hwoever, on output, where there are URIs whose values are irrelevant, # such as variables and generated IDs from anonymous ndoes, it makes the # document very much more readable to regenerate the IDs. # We use here a convention that underscores at the start of fragment IDs # are reserved for generated Ids. The caller can change that. # # Now there is a new way of generating these, with the "_" prefix # for anonymous nodes. def __init__(self, write, base=None, genPrefix = None, noLists=0 , quiet=0, flags=""): gp = genPrefix if gp == None: gp = "#_g" if base!=None: try: gp = uripath.join(base, "#_g") except ValueError: pass # bogus: base eg RDFSink.RDFSink.__init__(self, gp) self._write = self.writeEncoded self._writeRaw = write self._quiet = quiet or "c" not in flags self._flags = flags self._subj = None self.prefixes = {} # Look up prefix conventions self.defaultNamespace = None self.indent = 1 # Level of nesting of output self.base = base # self.nextId = 0 # Regenerate Ids on output self.regen = {} # Mapping of regenerated Ids self.noLists = noLists # Suppress generation of lists? self._anodeName = {} # For "a" flag self._anodeId = {} # For "a" flag - reverse mapping self._needNL = 0 # Do we need to have a newline before a new element? if "l" in self._flags: self.noLists = 1 def dummyClone(self): "retun a version of myself which will only count occurrences" return ToN3(write=dummyWrite, base=self.base, genPrefix=self._genPrefix, noLists=self.noLists, quiet=self._quiet, flags=self._flags ) def writeEncoded(self, str): """Write a possibly unicode string out to the output""" try: return self._writeRaw(str.encode('utf-8')) except UnicodeDecodeError: raise UnicodeDecodeError(str, str.__class__) def setDefaultNamespace(self, uri): return self.bind("", uri) def bind(self, prefixString, uri): """ Just accepting a convention here """ assert ':' in uri # absolute URI references only if "p" in self._flags: return # Ignore the prefix system completely # if not prefixString: # raise RuntimError("Please use setDefaultNamespace instead") if (uri == self.defaultNamespace and "d" not in self._flags): return # don't duplicate ?? self._endStatement() self.prefixes[uri] = prefixString if 'r' in self._flags: self._write(u"@prefix %s: <%s> ."%(prefixString, uri)) else: self._write(u"@prefix %s: <%s> ."%(prefixString, refTo(self.base, uri))) self._newline() def setDefaultNamespace(self, uri): if "d" in self._flags or "p" in self._flags: return # Ignore the prefix system completely self._endStatement() self.defaultNamespace = uri if self.base: # Sometimes there is none, and now refTo is intolerant x = refTo(self.base, uri) else: x = uri self._write(u" @prefix : <%s> ." % x ) self._newline() def startDoc(self): if not self._quiet: # Suppress stuff which will confuse test diffs self._write(u"\n# Notation3 generation by\n") idstr = u"$Id: notation3.py,v 1.204 2014/07/23 14:26:13 timbl Exp $" # CVS CHANGES THE ABOVE LINE self._write(u"# " + idstr[5:-2] + u"\n\n") # Strip "$" in case the N3 file is checked in to CVS if self.base: self._write(u"# Base was: " + self.base + u"\n") self._write(u" " * self.indent) self._subj = None # self._nextId = 0 def endDoc(self, rootFormulaPair=None): self._endStatement() self._write(u"\n") if self.stayOpen: return # fo concatenation if not self._quiet: self._write(u"#ENDS\n") return # No formula returned - this is not a store def makeComment(self, str): for line in string.split(str, "\n"): self._write(u"#" + line + "\n") # Newline order??@@ self._write(u" " * self.indent + " ") def _newline(self, extra=0): self._write(u"\n"+ u" " * (self.indent+extra)) def makeStatement(self, triple, why=None, aIsPossible=1): # triple = tuple([a.asPair() for a in triple2]) if ("a" in self._flags and triple[PRED] == (SYMBOL, N3_forSome_URI) and triple[CONTEXT] == triple[SUBJ]) : # and # We assume the output is flat @@@ true, we should not try: aIsPossible = aIsPossible() except TypeError: aIsPossible = 1 if aIsPossible: ty, value = triple[OBJ] i = len(value) while i > 0 and value[i-1] not in _notNameChars+"_": i = i - 1 str2 = value[i:] if self._anodeName.get(str2, None) != None: j = 1 while 1: str3 = str2 + `j` if self._anodeName.get(str3, None) == None: break j = j +1 str2 = str3 if str2[0] in "0123456789": str2 = "a"+str2 if diag.chatty_flag > 60: progress( "Anode %s means %s" % (str2, value)) self._anodeName[str2] = value self._anodeId[value] = str2 return self._makeSubjPred(triple[CONTEXT], triple[SUBJ], triple[PRED]) self._write(self.representationOf(triple[CONTEXT], triple[OBJ])) self._needNL = 1 # Below is for writing an anonymous node # As object, with one incoming arc: def startAnonymous(self, triple): self._makeSubjPred(triple[CONTEXT], triple[SUBJ], triple[PRED]) self._write(u" [") self.indent = self.indent + 1 self._pred = None self._newline() self._subj = triple[OBJ] # The object is now the current subject def endAnonymous(self, subject, verb): # Remind me where we are self._write(u" ]") self.indent = self.indent - 1 self._subj = subject self._pred = verb # As subject: def startAnonymousNode(self, subj): if self._subj: self._write(u" .") self._newline() self.indent = self.indent + 1 self._write(u" [ ") self._subj = subj # The object is not the subject context self._pred = None def endAnonymousNode(self, subj=None): # Remove default subject self._write(u" ]") if not subj: self._write(u".") self.indent = self.indent - 1 self._newline() self._subj = subj self._pred = None # Below we print lists. A list expects to have lots of li links sent # As subject: def startListSubject(self, subj): if self._subj: self._write(u" .") self._newline() self.indent = self.indent + 1 self._write(u" ( ") self._needNL = 0 self._subj = subj # The object is not the subject context self._pred = N3_li # expect these until list ends def endListSubject(self, subj=None): # Remove default subject self._write(u" )") if not subj: self._write(u".") self.indent = self.indent - 1 self._newline() self._subj = subj self._pred = None # As Object: def startListObject(self, triple): self._makeSubjPred(triple[CONTEXT], triple[SUBJ], triple[PRED]) self._subj = triple[OBJ] self._write(u" (") self._needNL = 1 # Choice here of compactness self.indent = self.indent + 1 self._pred = N3_li # expect these until list ends self._subj = triple[OBJ] # The object is now the current subject def endListObject(self, subject, verb): # Remind me where we are self._write(u" )") self.indent = self.indent - 1 self._subj = subject self._pred = verb # Below we print a nested formula of statements def startFormulaSubject(self, context): if self._subj != context: self._endStatement() self.indent = self.indent + 1 self._write(u"{") self._newline() self._subj = None self._pred = None def endFormulaSubject(self, subj): # Remove context self._endStatement() # @@@@@@@@ remove in syntax change to implicit self._newline() self.indent = self.indent - 1 self._write(u"}") self._subj = subj self._pred = None def startFormulaObject(self, triple): self._makeSubjPred(triple[CONTEXT], triple[SUBJ], triple[PRED]) self.indent = self.indent + 1 self._write(u"{") self._subj = None self._pred = None def endFormulaObject(self, pred, subj): # Remove context self._endStatement() # @@@@@@@@ remove in syntax change to implicit self.indent = self.indent - 1 self._write(u"}") # self._newline() self._subj = subj self._pred = pred def _makeSubjPred(self, context, subj, pred): if pred == N3_li: if self._needNL: self._newline() return # If we are in list mode, don't need to. varDecl = (subj == context and "v" not in self._flags and ( pred == (SYMBOL, N3_forAll_URI) or pred == (SYMBOL, N3_forSome_URI))) if self._subj != subj or "s" in self._flags: self._endStatement() if self.indent == 1: # Top level only - extra newline self._newline() if "v" in self._flags or subj != context: self._write(self.representationOf(context, subj)) else: # "this" suppressed if (pred != (SYMBOL, N3_forAll_URI) and pred != (SYMBOL, N3_forSome_URI)): raise ValueError( "On N3 output, 'this' used with bad predicate: %s" % (pred, )) self._subj = subj self._pred = None if self._pred != pred: if self._pred: if "v" not in self._flags and ( self._pred== (SYMBOL, N3_forAll_URI) or self._pred == (SYMBOL, N3_forSome_URI)): self._write(u".") else: self._write(u";") self._newline(1) # Indent predicate from subject elif not varDecl: self._write(u" ") if varDecl: if pred == (SYMBOL, N3_forAll_URI): self._write( u" @forAll ") else: self._write( u" @forSome ") elif pred == (SYMBOL, DAML_sameAs_URI) and "t" not in self._flags: self._write(u" = ") elif pred == (SYMBOL, LOG_implies_URI) and "g" not in self._flags: self._write(u" => ") elif pred == (SYMBOL, RDF_type_URI) and "t" not in self._flags: self._write(u" a ") else : self._write( u" %s " % self.representationOf(context, pred)) self._pred = pred else: self._write(u",") self._newline(3) # Same subject and pred => object list def _endStatement(self): if self._subj: self._write(u" .") self._newline() self._subj = None def representationOf(self, context, pair): """ Representation of a thing in the output stream Regenerates genids if required. Uses prefix dictionary to use qname syntax if possible. """ if "t" not in self._flags: if pair == context: return u"this" if pair == N3_nil and not self.noLists: return u"()" ty, value = pair singleLine = "n" in self._flags if ty == LITERAL: return stringToN3(value, singleLine=singleLine, flags = self._flags) if ty == XMLLITERAL: st = u''.join([Canonicalize(x, None, unsuppressedPrefixes=['foo']) for x in value.childNodes]) st = stringToN3(st, singleLine=singleLine, flags=self._flags) return st + u"^^" + self.representationOf(context, (SYMBOL, u"http://www.w3.org/1999/02/22-rdf-syntax-ns#XMLLiteral")) if ty == LITERAL_DT: s, dt = value if "b" not in self._flags: if (dt == BOOLEAN_DATATYPE): return toBool(s) and u"true" or u"false" if "n" not in self._flags: dt_uri = dt if (dt_uri == INTEGER_DATATYPE): return unicode(long(s)) if (dt_uri == FLOAT_DATATYPE): retVal = unicode(float(s)) # numeric value python-normalized if 'e' not in retVal: retVal += 'e+00' return retVal if (dt_uri == DECIMAL_DATATYPE): retVal = unicode(Decimal(s)) if '.' not in retVal: retVal += '.0' return retVal st = stringToN3(s, singleLine= singleLine, flags=self._flags) return st + u"^^" + self.representationOf(context, (SYMBOL, dt)) if ty == LITERAL_LANG: s, lang = value return stringToN3(s, singleLine= singleLine, flags=self._flags)+ u"@" + lang aid = self._anodeId.get(pair[1], None) if aid != None: # "a" flag only return u"_:" + aid # Must start with alpha as per NTriples spec. if ((ty == ANONYMOUS) and not option_noregen and "i" not in self._flags ): x = self.regen.get(value, None) if x == None: x = self.genId() self.regen[value] = x value = x # return "<"+x+">" j = string.rfind(value, "#") if j<0: # and "/" in self._flags: Always allow 2010-10-24 TimBL j=string.rfind(value, "/") # Allow "/" namespaces as a second best if (j>=0 and "p" not in self._flags): # Suppress use of prefixes? for ch in value[j+1:]: # Examples: "." ";" we can't have in qname if ch in _notNameChars: if verbosity() > 20: progress("Cannot have character %i in local name for %s" % (ord(ch), `value`)) break else: namesp = value[:j+1] if (self.defaultNamespace and self.defaultNamespace == namesp and "d" not in self._flags): return u":"+value[j+1:] self.countNamespace(namesp) prefix = self.prefixes.get(namesp, None) # @@ #CONVENTION if prefix != None : return prefix + u":" + value[j+1:] if value[:j] == self.base: # If local to output stream, return u"<#" + value[j+1:] + u">" # use local frag id if "r" not in self._flags and self.base != None: value = hexify(refTo(self.base, value)) elif "u" in self._flags: value = backslashUify(value) else: value = hexify(value) return u"<" + value + u">" # Everything else def nothing(): pass import triple_maker as tm LIST = 10000 QUESTION = 10001 class tmToN3(RDFSink.RDFSink): """ """ def __init__(self, write, base=None, genPrefix = None, noLists=0 , quiet=0, flags=""): gp = genPrefix if gp == None: gp = "#_g" if base!=None: try: gp = uripath.join(base, "#_g") except ValueError: pass # bogus: base eg RDFSink.RDFSink.__init__(self, gp) self._write = self.writeEncoded self._writeRaw = write self._quiet = quiet or "c" in flags self._flags = flags self._subj = None self.prefixes = {} # Look up prefix conventions self.defaultNamespace = None self.indent = 1 # Level of nesting of output self.base = base # self.nextId = 0 # Regenerate Ids on output self.regen = {} # Mapping of regenerated Ids # self.genPrefix = genPrefix # Prefix for generated URIs on output self.noLists = noLists # Suppress generation of lists? self._anodeName = {} # For "a" flag self._anodeId = {} # For "a" flag - reverse mapping if "l" in self._flags: self.noLists = 1 def writeEncoded(self, str): """Write a possibly unicode string out to the output""" return self._writeRaw(str.encode('utf-8')) def _newline(self, extra=0): self._needNL = 0 self._write("\n"+ " " * (self.indent+extra)) def bind(self, prefixString, uri): """ Just accepting a convention here """ assert ':' in uri # absolute URI references only if "p" in self._flags: return # Ignore the prefix system completely if not prefixString: return self.setDefaultNamespace(uri) if (uri == self.defaultNamespace and "d" not in self._flags): return # don't duplicate ?? self.endStatement() self.prefixes[uri] = prefixString self._write(" @prefix %s: <%s> ." % (prefixString, refTo(self.base, uri)) ) self._newline() def setDefaultNamespace(self, uri): if "d" in self._flags or "p" in self._flags: return # no prefix system self.endStatement() self.defaultNamespace = uri if self.base: # Sometimes there is none, and now refTo is intolerant x = refTo(self.base, uri) else: x = uri self._write(" @prefix : <%s> ." % x ) self._newline() def start(self): pass self._parts = [0] self._types = [None] self._nodeEnded = False def end(self): self._write('\n\n#End') def addNode(self, node): self._parts[-1] += 1 if node is not None: self._realEnd() if self._types == LITERAL: lit, dt, lang = node singleLine = "n" in self._flags if dt != None and "n" not in self._flags: dt_uri = dt if (dt_uri == INTEGER_DATATYPE): self._write(str(long(lit))) return if (dt_uri == FLOAT_DATATYPE): self._write(str(float(lit))) # numeric python-normalized return if (dt_uri == DECIMAL_DATATYPE): self._write(str(Decimal(lit))) return st = stringToN3(lit, singleLine= singleLine, flags=self._flags) if lang != None: st = st + "@" + lang if dt != None: st = st + "^^" + self.symbolString(dt) self._write(st) elif self._types == SYMBOL: self._write(self.symbolString(node) + ' ') elif self._types == QUESTION: self._write('?' + node + ' ') def _realEnd(self): if self._nodeEnded: self._nodeEnded = False if self._parts[-1] == 1: self._write(' . \n') elif self._parts[-1] == 2: self._write(';\n') elif self._parts[-1] == 3: self._write(',\n') else: pass def symbolString(self, value): j = string.rfind(value, "#") if j<0: #and "/" in self._flags: j=string.rfind(value, "/") # Allow "/" namespaces as a second best if (j>=0 and "p" not in self._flags): # Suppress use of prefixes? for ch in value[j+1:]: # Examples: "." ";" we can't have in qname if ch in _notNameChars: if verbosity() > 0: progress("Cannot have character %i in local name." % ord(ch)) break else: namesp = value[:j+1] if (self.defaultNamespace and self.defaultNamespace == namesp and "d" not in self._flags): return ":"+value[j+1:] self.countNamespace(namesp) prefix = self.prefixes.get(namesp, None) # @@ #CONVENTION if prefix != None : return prefix + ":" + value[j+1:] if value[:j] == self.base: # If local to output stream, return "<#" + value[j+1:] + ">" # use local frag id if "r" not in self._flags and self.base != None: value = refTo(self.base, value) elif "u" in self._flags: value = backslashUify(value) else: value = hexify(value) return "<" + value + ">" # Everything else def IsOf(self): self._write('is ') self._predIsOfs[-1] = FRESH def checkIsOf(self): return self._predIsOfs[-1] def forewardPath(self): self._write('!') def backwardPath(self): self._write('^') def endStatement(self): self._parts[-1] = 0 self._nodeEnded = True def addLiteral(self, lit, dt=None, lang=None): self._types = LITERAL self.addNode((lit, dt, lang)) def addSymbol(self, sym): self._types = SYMBOL self.addNode(sym) def beginFormula(self): self._realEnd() self._parts.append(0) self._write('{') def endFormula(self): self._parts.pop() self._write('}') self._types = None self.addNode(None) def beginList(self): self._realEnd() self._parts.append(-1) self._write('(') def endList(self): self._parts.pop() self._types = LIST self._write(') ') self.addNode(None) def addAnonymous(self, Id): """If an anonymous shows up more than once, this is the function to call """ if Id not in bNodes: a = self.formulas[-1].newBlankNode() bNodes[Id] = a else: a = bNodes[Id] self.addNode(a) def beginAnonymous(self): self._realEnd() self._parts.append(0) self._write('[') def endAnonymous(self): self._parts.pop() self._write(']') self._types = None self.addNode(None) def declareExistential(self, sym): self._write('@forSome ' + sym + ' . ') def declareUniversal(self, sym): self._write('@forAll ' + sym + ' . ') def addQuestionMarkedSymbol(self, sym): self._types = QUESTION self.addNode(sym) ################################################### # # Utilities # Escapes = {'a': '\a', 'b': '\b', 'f': '\f', 'r': '\r', 't': '\t', 'v': '\v', 'n': '\n', '\\': '\\', '"': '"'} forbidden1 = re.compile(ur'[\\\"\a\b\f\r\v\u0080-\U0000ffff]') forbidden2 = re.compile(ur'[\\\"\a\b\f\r\v\t\n\u0080-\U0000ffff]') #" def stringToN3(str, singleLine=0, flags=""): res = u'' if (len(str) > 20 and str[-1] <> u'"' and not singleLine and (string.find(str, u"\n") >=0 or string.find(str, u'"') >=0)): delim= u'"""' forbidden = forbidden1 # (allow tabs too now) else: delim = u'"' forbidden = forbidden2 i = 0 while i < len(str): m = forbidden.search(str, i) if not m: break j = m.start() res = res + str[i:j] ch = m.group(0) if ch == u'"' and delim == u'"""' and str[j:j+3] != u'"""': #" res = res + ch else: k = string.find(u'\a\b\f\r\t\v\n\\"', ch) if k >= 0: res = res + u"\\" + u'abfrtvn\\"'[k] else: if 'e' in flags: # res = res + ('\\u%04x' % ord(ch)) res = res + (u'\\u%04X' % ord(ch)) # http://www.w3.org/TR/rdf-testcases/#ntriples else: res = res + ch i = j + 1 # The following code fixes things for really high range Unicode newstr = u"" for ch in res + str[i:]: if ord(ch)>65535: newstr = newstr + (u'\\U%08X' % ord(ch)) # http://www.w3.org/TR/rdf-testcases/#ntriples else: newstr = newstr + ch # return delim + newstr + delim def backslashUify(ustr): """Use URL encoding to return an ASCII string corresponding to the given unicode""" # progress("String is "+`ustr`) # s1=ustr.encode('utf-8') str = u"" for ch in ustr: # .encode('utf-8'): if ord(ch) > 65535: ch = u"\\U%08X" % ord(ch) elif ord(ch) > 126: ch = u"\\u%04X" % ord(ch) else: ch = u"%c" % ord(ch) str = str + ch return str def hexify(ustr): """Use URL encoding to return an ASCII string corresponding to the given UTF8 string >>> hexify("http://example/a b") 'http://example/a%20b' """ #" # progress("String is "+`ustr`) # s1=ustr.encode('utf-8') str = "" for ch in ustr: # .encode('utf-8'): if ord(ch) > 126 or ord(ch) < 33 : ch = "%%%02X" % ord(ch) else: ch = "%c" % ord(ch) str = str + ch return str def dummy(): res = "" if len(str) > 20 and (string.find(str, "\n") >=0 or string.find(str, '"') >=0): delim= '"""' forbidden = "\\\"\a\b\f\r\v" # (allow tabs too now) else: delim = '"' forbidden = "\\\"\a\b\f\r\v\t\n" for i in range(len(str)): ch = str[i] j = string.find(forbidden, ch) if ch == '"' and delim == '"""' \ and i+1 < len(str) and str[i+1] != '"': j=-1 # Single quotes don't need escaping in long format if j>=0: ch = "\\" + '\\"abfrvtn'[j] elif ch not in "\n\t" and (ch < " " or ch > "}"): ch = "[[" + `ch` + "]]" #[2:-1] # Use python res = res + ch return delim + res + delim def _test(): import doctest doctest.testmod() if __name__ == '__main__': _test() #ends