Source code for HookTest.capitains_units.cts
import re
import subprocess
import warnings
from threading import Timer
from collections import defaultdict
from os import makedirs
import os.path
from hashlib import md5
import time
import requests
import shutil
from lxml.etree import parse
import validators
import MyCapytain.common
from MyCapytain.common.constants import Mimetypes
from MyCapytain.errors import DuplicateReference, EmptyReference, MissingRefsDecl
from MyCapytain.resources.collections.cts import XmlCtsTextgroupMetadata, XmlCtsWorkMetadata
from MyCapytain.resources.texts.local.capitains.cts import CapitainsCtsText
from HookTest.units import TESTUnit
[docs]class CTSMetadata_TestUnit(TESTUnit):
""" CTS testing object
:param path: Path to the file
:type path: basestring
:cvar tests: Contains the list of methods to be run again the text
:type tests: [str]
:cvar readable: Human friendly string associated to object methods
:type readable: dict
:ivar urns: List of URN retrieved in the file.
:type urns: [str]
:ivar type: Type of metadata (textgroup or work)
:type type: str
Shared variables with parent class:
:ivar path: Path for the resource
:type path: str
:ivar xml: XML resource, parsed in python. Used to do general checking
:type xml: lxml._etree.Element
.. note:: All method in CTSText_TestUnit.tests ("parsable", "capitain", "metadata", "check_urns", "filename" ) yield at \
least one boolean (might be more) which represents the success of it.
"""
tests = ["parsable", "capitain", "metadata", "check_urns", "filename"]
readable = {
"parsable": "File parsing",
"capitain": "MyCapytain parsing",
"metadata": "Metadata availability",
"check_urns": "URNs testing",
"filename": "Naming Convention"
}
def __init__(self, *args, **kwargs):
super(CTSMetadata_TestUnit, self).__init__(*args, **kwargs)
self.urns = []
self.type = None
[docs] def capitain(self):
""" Load the file in MyCapytain
"""
if self.xml:
textgroup = "textgroup" in self.xml.getroot().tag
work = not textgroup and "work" in self.xml.getroot().tag
if textgroup:
self.type = "textgroup"
self.log("TextGroup detected")
Trait = XmlCtsTextgroupMetadata
elif work:
self.type = "work"
self.log("Work detected")
Trait = XmlCtsWorkMetadata
else:
self.log("No metadata type detected (neither work nor textgroup)")
self.log("Inventory can't be read through Capitains standards")
yield False
if self.type in ["textgroup", "work"]:
try:
self.Text = Trait.parse(self.xml.getroot())
except AttributeError as E:
self.log("Missing URN attribute")
self.error(E)
except Exception as E:
self.error(E)
if self.Text is not False:
yield True
else:
yield False
[docs] def metadata(self):
""" Check the presence of all metadata
"""
status = False
if self.xml is not None and self.Text is not False:
if self.type == "textgroup":
groups = len(self.Text.get_cts_property("groupname"))
self.log("{0} groupname found".format(str(groups)))
status = groups > 0
elif self.type == "work":
status = True
# Check that the work has a language
workLang = self.xml.xpath("//ti:work/@xml:lang", namespaces=TESTUnit.NS)
if len(workLang) != 1:
status = False
self.log("Work node is missing its lang attribute")
langs = self.xml.xpath("//ti:translation/@xml:lang", namespaces=TESTUnit.NS)
if len(langs) != len(self.xml.xpath("//ti:translation", namespaces=TESTUnit.NS)):
status = False
self.log("Translation(s) are missing lang attribute")
com_langs = self.xml.xpath("//ti:commentary/@xml:lang", namespaces=TESTUnit.NS)
if len(com_langs) != len(self.xml.xpath("//ti:commentary", namespaces=TESTUnit.NS)):
status = False
self.log("Some Commentaries are missing lang attribute")
titles = len(self.Text.get_cts_property("title"))
self.log("{0} titles found".format(titles))
status = status and titles > 0
texts = len(self.Text.texts)
labels = len(
[
text for text in self.Text.texts.values()
if len(text.get_cts_property("label")) > 0
]
)
self.log("{0}/{1} file(s) with labels".format(labels, texts))
status = status and labels == texts
descs = len(
[
text for text in self.Text.texts.values()
if len(text.get_cts_property("description")) > 0
]
)
self.log("{0}/{1} file(s) with descs".format(descs, texts))
status = status and labels == descs
yield status
[docs] def check_urns(self):
""" Check the validity and presence of urns in the text
.. note:: Populates self.urns
"""
status = False
if self.xml:
if self.type == "textgroup":
urns = [
urn
for urn in self.xml.xpath("//ti:textgroup/@urn", namespaces=TESTUnit.NS)
if urn and len(MyCapytain.common.reference.URN(urn)) == 3
]
self.log("Group urn :" + "".join(self.xml.xpath("//ti:textgroup/@urn", namespaces=TESTUnit.NS)))
status = len(urns) == 1
if status:
self.urn = urns[0]
elif self.type == "work":
matches = True
onlyOneWork = True
allMembers = True
worksUrns = [
urn
for urn in self.xml.xpath("//ti:work/@urn", namespaces=TESTUnit.NS)
if urn and len(MyCapytain.common.reference.URN(urn)) == 4
]
groupUrns = [
urn
for urn in self.xml.xpath("//ti:work/@groupUrn", namespaces=TESTUnit.NS)
if urn and len(MyCapytain.common.reference.URN(urn)) == 3
]
self.urn = None
urn = None
if len(worksUrns) == 1:
self.urn = worksUrns[0]
urn = MyCapytain.common.reference.URN(self.urn)
if len(groupUrns) == len(worksUrns):
missing = [
key for key in ['namespace', 'work', 'textgroup']
if getattr(urn, key) is None or len(getattr(urn, key)) == 0
]
if missing:
self.log("Work URN is missing: {}".format(", ".join(missing)))
allMembers = False
elif groupUrns[0] != urn.upTo(MyCapytain.common.reference.URN.TEXTGROUP):
matches = False
self.log("The Work URN is not a child of the Textgroup URN")
elif len(worksUrns) == 0:
self.log("The Work URN on the <ti:work> element is incorrectly formatted or missing.")
self.log("Group urn : " + "".join(groupUrns))
self.log("Work urn : " + "".join(worksUrns))
texts = self.xml.xpath("//ti:edition|//ti:translation|//ti:commentary", namespaces=TESTUnit.NS)
for text in texts:
t_urn = text.get("urn")
if t_urn and t_urn.startswith("urn:cts:"):
t_urn = MyCapytain.common.reference.URN(t_urn)
missing = [
key for key in ['namespace', 'work', 'version', 'textgroup']
if getattr(t_urn, key) is None or len(getattr(t_urn, key)) == 0
]
if missing:
self.log("Text {} URN is missing: {}".format(str(t_urn), ", ".join(missing)))
allMembers = False
elif t_urn.upTo(MyCapytain.common.reference.URN.WORK) != str(urn):
matches = False
self.log("Text {} does not match parent URN".format(str(t_urn)))
self.urns.append(t_urn)
worksUrns.append(text.get("workUrn"))
if len(set(worksUrns)) > 1:
onlyOneWork = False
self.log("There is different workUrns in the metadata")
self.urns = [str(urn) for urn in self.urns if urn and len(urn) == 5]
self.log("Edition, translation, and commentary urns : " + " ".join(self.urns))
status = allMembers and\
matches and onlyOneWork and self.urn and \
len(groupUrns) == 1 and \
(len(texts)*2+1) == len(self.urns + worksUrns)
yield status
[docs] def filename(self):
""" Check the filename and the path correctly represent the path
"""
status = False
if self.urn:
urn = MyCapytain.common.reference.URN(self.urn)
if self.type == "textgroup":
status = self.path.endswith("data/{textgroup}/__cts__.xml".format(textgroup=urn.textgroup))
elif self.type == "work":
self.log(str(urn))
status = self.path.endswith("data/{textgroup}/{work}/__cts__.xml".format(
textgroup=urn.textgroup, work=urn.work
))
if not status:
self.log("URN and path does not match")
yield status
[docs] def test(self):
""" Test a file with various checks
:returns: List of urns
:rtype: list.<str>
"""
self.urns = []
for test in CTSMetadata_TestUnit.tests:
# Show the logs and return the status
for status in getattr(self, test)():
yield (CTSMetadata_TestUnit.readable[test], status, self.logs)
self.flush()
[docs]class CTSText_TestUnit(TESTUnit):
""" CTS testing object
:param path: Path to the file
:type path: basestring
:param countwords: Count the number of words and log it if necessary
:type countwords: bool
:cvar tests: Contains the list of methods to be run again the text
:type tests: [str]
:cvar readable: Human friendly string associated to object methods
:type readable: dict
:ivar inv: List of URN retrieved in metadata. Used to check the availability of metadata for the text
:type inv: [str]
:ivar scheme: Scheme to be used to check the
:type scheme: str
:ivar Text: Text object according to MyCapytains parsing. Used to find passages
:type Text: MyCapytain.resources.text.local.Text
Shared variables with parent class:
:ivar path: Path for the resource
:type path: str
:ivar xml: XML resource, parsed in python. Used to do general checking
:type xml: lxml._etree.Element
.. note:: All method in CTSText_TestUnit.tests ( "parsable", "has_urn", "naming_convention", "refsDecl", "passages", \
"unique_passage", "inventory" ) yield at least one boolean (might be more) which represents the success of it.
"""
tests = [
# Parsing the XML
"parsable",
# Retrieving the URN (requires parsale
"has_urn", 'language',
# Requires has_urn
"inventory", "naming_convention",
# Requires parsable
"refsDecl", "passages", "unique_passage", "duplicate", "forbidden", "empty"
]
breaks = [
"parsable",
"refsDecl",
"passages"
]
readable = {
"parsable": "File parsing",
"refsDecl": "RefsDecl parsing",
"passages": "Passage level parsing",
"duplicate": "Duplicate passages",
"forbidden": "Forbidden characters",
"epidoc": "Epidoc DTD validation",
"tei": "TEI DTD Validation",
"auto_rng": "Automatic RNG validation",
"local_file": "Custom local RNG validation",
"has_urn": "URN informations",
"naming_convention": "Naming conventions",
"inventory": "Available in inventory",
"unique_passage": "Unique nodes found by XPath",
"count_words": "Word Counting",
"language": "Correct xml:lang attribute",
"empty": "Empty References"
}
splitter = re.compile(r'\S+', re.MULTILINE)
def __init__(self, path, countwords=False, timeout=30, *args, **kwargs):
self.inv = list()
self.timeout = timeout
self.scheme = None
self.guidelines = None
self.rng = None
self.Text = None
self.xml = None
self.count = 0
self.countwords = countwords
self.citation = list()
self.duplicates = list()
self.forbiddens = list()
self.empties = list()
self.capitains_errors = list()
self.test_status = defaultdict(bool)
self.lang = ''
self.dtd_errors = list()
super(CTSText_TestUnit, self).__init__(path, *args, **kwargs)
[docs] def parsable(self):
""" Chacke that the text is parsable (as XML) and ingest it through MyCapytain then.
.. note:: Override super(parsable) and add CapiTainS Ingesting to it
"""
status = next(
super(CTSText_TestUnit, self).parsable()
)
if status is True:
try:
self.Text = CapitainsCtsText(resource=self.xml.getroot())
except MissingRefsDecl as E:
self.Text = None
self.log(str(E))
self.capitains_errors.append(str(E))
yield False
else:
self.Text = None
yield status
[docs] def refsDecl(self):
""" Check that the text contains refsDecl informations
"""
if self.Text:
# In 1.0.1, MyCapytain actually create an empty citation by default
if not self.Text.citation.isEmpty():
self.log(str(len(self.Text.citation)) + " citation's level found")
yield True
else:
yield False
else:
yield False
[docs] def run_rng(self, rng_path):
""" Run the RNG through JingTrang
:param rng_path: Path to the RelaxNG file to run against the XML to test
"""
test = subprocess.Popen(
["java", "-Duser.country=US", "-Duser.language=en", "-jar", TESTUnit.JING, rng_path, self.path],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=False
)
out = []
error = []
timer = Timer(self.timeout, test.kill)
try:
timer.start()
out, error = test.communicate()
except Exception as E:
self.error(E)
yield False
pass
finally:
if not timer.isAlive():
self.log("Timeout on RelaxNG")
yield False
timer.cancel()
pass
timer.cancel()
# This is to deal with Travis printing a message about the _JAVA_OPTIONS when a java command is run
# Travis printing this command resulted in this test not passing
out = '\n'.join([x for x in out.decode().split('\n') if '_JAVA_OPTIONS' not in x]).encode()
error = '\n'.join([x for x in error.decode().split('\n') if '_JAVA_OPTIONS' not in x]).encode()
if len(out) > 0:
for issue in TESTUnit.rng_logs(out):
self.log(issue)
self.dtd_errors.append(issue)
yield len(out) == 0 and len(error) == 0
def auto_rng(self):
xml = parse(self.path)
xml_dir = os.path.dirname(os.path.abspath(self.path))
# A file can have multiple schema
for rng in xml.xpath("/processing-instruction('xml-model')"):
uri = rng.attrib["href"]
rng_path = os.path.abspath(os.path.join(xml_dir, uri))
if validators.url(uri):
rng_path = self.get_remote_rng(uri)
elif not os.path.isfile(rng_path):
self.dtd_errors.append("No RNG was found at " + rng_path)
yield False
continue
for status in self.run_rng(rng_path):
yield status
[docs] def get_remote_rng(self, url):
""" Given a valid URL, downloads the RNG from the given URL and returns the filepath and name
:param url: the URL of the RNG
:return: filenpath and name where the RNG was saved
"""
# If the file is remote, have a file-system approved name
# The md5 hash seems like a good option
sha = md5(url.encode()).hexdigest()
# We have a name for the rng file but also for the in-download marker
# Note : we might want to add a os.makedirs somewhere with exists=True
makedirs(".rngs", exist_ok=True)
stable_local = os.path.join(".rngs", sha+".rng")
stable_local_downloading = os.path.join(".rngs", sha+".rng-indownload")
# check if the stable_local rng already exists
# if it does, immediately run the rng test and move to the next rng in the file
if os.path.exists(stable_local):
return stable_local
# We check if the in-download proof file is shown here
# Until the in-download marker is there, we need to wait
elif os.path.exists(stable_local_downloading):
# Wait up to 30 secs ?
# Have it as a constant that could be changed in environment variables ?
waited = self.timeout
while not os.path.exists(stable_local):
time.sleep(1)
waited -= 1
if waited < 0:
# Maybe we can wait more ?
raise EnvironmentError("The download of the RNG took too long")
else:
with open(stable_local_downloading, "w") as f:
f.write("Downloading...")
data = requests.get(url)
data.raise_for_status()
with open(stable_local_downloading, "w") as f:
f.write(data.text)
shutil.move(stable_local_downloading, stable_local)
return stable_local
[docs] def epidoc(self):
""" Check the original file against Epidoc rng through a java pipe
"""
for status in self.run_rng(TESTUnit.EPIDOC):
yield status
[docs] def tei(self):
""" Check the original file against TEI rng through a java pipe
"""
for status in self.run_rng(TESTUnit.TEI_ALL):
yield status
[docs] def local_file(self):
""" Check the original file against TEI rng through a java pipe
"""
for status in self.run_rng(self.rng):
yield status
[docs] def passages(self):
""" Check that passages are available at each level. On top of that, it checks for forbidden characters \
and duplicate in references
"""
if self.Text and self.Text.citation.refsDecl:
citations = [c.name for c in self.Text.citation]
for i in range(0, len(self.Text.citation)):
try:
with warnings.catch_warnings(record=True) as warning_record:
# Cause all warnings to always be triggered.
warnings.simplefilter("always")
passages = self.Text.getValidReff(level=i+1, _debug=True)
ids = [ref.split(".", i)[-1] for ref in passages]
space_in_passage = TESTUnit.FORBIDDEN_CHAR.search("".join(ids))
len_passage = len(passages)
status = len_passage > 0
self.log(str(len_passage) + " found")
self.citation.append((i, len_passage, citations[i]))
for record in warning_record:
if record.category == DuplicateReference:
self.duplicates += sorted(str(record.message).split(", "))
if record.category == EmptyReference:
self.empties += [str(record.message)]
if space_in_passage and space_in_passage is not None:
self.forbiddens += ["'{}'".format(n)
for ref, n in zip(ids, passages)
if TESTUnit.FORBIDDEN_CHAR.search(ref)]
if status is False:
yield status
break
yield status
except Exception as E:
self.error(E)
self.log("Error when searching passages at level {0}".format(i+1))
yield False
break
else:
yield False
[docs] def duplicate(self):
""" Detects duplicate references
"""
if len(self.duplicates) > 0:
self.log("Duplicate references found : {0}".format(", ".join(self.duplicates)))
yield False
elif self.test_status['passages'] is False:
yield False
else:
yield True
[docs] def forbidden(self):
""" Checks for forbidden characters in references
"""
if len(self.forbiddens) > 0:
self.log("Reference with forbidden characters found: {0}".format(", ".join(self.forbiddens)))
yield False
elif self.test_status['passages'] is False:
yield False
else:
yield True
[docs] def empty(self):
""" Detects empty references
"""
if len(self.empties) > 0:
self.log("Empty references found : {0}".format(", ".join(self.empties)))
yield False
elif self.test_status['passages'] is False:
yield False
else:
yield True
[docs] def unique_passage(self):
""" Check that citation scheme do not collide (eg. Where text:1 would be the same node as text:1.1)
"""
try:
# Checking for duplicate
xpaths = [
self.Text.xml.xpath(
MyCapytain.common.reference.REFERENCE_REPLACER.sub(
r"\1",
citation.refsDecl
),
namespaces=TESTUnit.NS
)
for citation in self.Text.citation
]
nodes = [element for xpath in xpaths for element in xpath]
bad_citation = len(nodes) == len(set(nodes))
if not bad_citation:
self.log("Some node are found twice")
yield False
else:
yield True
except Exception:
yield False
[docs] def has_urn(self):
""" Test that a file has its urn according to CapiTainS Guidelines in its scheme
"""
if self.xml is not None:
if self.guidelines == "2.tei":
urns = self.xml.xpath("//tei:text/tei:body[starts-with(@n, 'urn:cts:')]", namespaces=TESTUnit.NS) + \
self.xml.xpath("//tei:text[starts-with(@xml:base, 'urn:cts:')]", namespaces=TESTUnit.NS)
else:
urns = self.xml.xpath(
"//tei:body/tei:div[@type='edition' and starts-with(@n, 'urn:cts:')]",
namespaces=TESTUnit.NS
)
urns += self.xml.xpath(
"//tei:body/tei:div[@type='translation' and starts-with(@n, 'urn:cts:')]",
namespaces=TESTUnit.NS
)
urns += self.xml.xpath(
"//tei:body/tei:div[@type='commentary' and starts-with(@n, 'urn:cts:')]",
namespaces=TESTUnit.NS
)
status = len(urns) > 0
if status:
logs = urns[0].get("n")
if not logs:
logs = urns[0].base
urn = MyCapytain.common.reference.URN(logs)
missing_members = [
key for key in ['namespace', 'work', 'version', 'textgroup']
if getattr(urn, key) is None or len(getattr(urn, key)) == 0
]
if len(urn) < 5:
status = False
self.log("Incomplete URN")
elif urn.reference:
status = False
self.log("Reference not accepted in URN")
elif len(missing_members) > 0:
status = False
self.log("Elements of URN are empty: {}".format(", ".join(sorted(missing_members))))
self.urn = logs
else:
status = False
yield status
[docs] def naming_convention(self):
""" Check the naming convention of the file
"""
if self.urn:
yield self.urn.split(":")[-1] in self.path
else:
yield False
[docs] def inventory(self):
""" Check the naming convention of the file
"""
if self.urn and self.inv:
yield self.urn in self.inv
else:
yield False
[docs] def count_words(self):
""" Count words in a file
"""
status = False
if self.test_status["passages"]:
text = self.Text.export(Mimetypes.PLAINTEXT, exclude=["tei:note", "tei:teiHeader"])
self.count = len(type(self).splitter.findall(text))
self.log("{} has {} words".format(self.urn, self.count))
status = self.count > 0
yield status
[docs] def language(self):
""" Tests to make sure an xml:lang element is on the correct node
"""
if self.guidelines == "2.epidoc":
urns_holding_node = self.xml.xpath(
"//tei:text/tei:body/tei:div"
"[@type='edition' or @type='translation' or @type='commentary']"
"[starts-with(@n, 'urn:cts:')]",
namespaces=TESTUnit.NS
)
elif self.guidelines == "2.tei":
urns_holding_node = self.xml.xpath("//tei:text/tei:body[starts-with(@n, 'urn:cts:')]", namespaces=TESTUnit.NS) + \
self.xml.xpath("//tei:text[starts-with(@xml:base, 'urn:cts:')]", namespaces=TESTUnit.NS)
try:
self.lang = urns_holding_node[0].get('{http://www.w3.org/XML/1998/namespace}lang')
except:
self.lang = ''
if self.lang == '' or self.lang is None:
self.lang = 'UNK'
yield False
else:
yield True
[docs] def test(self, scheme, guidelines, rng=None, inventory=None):
""" Test a file with various checks
:param scheme: Test with TEI DTD
:type scheme: str
:param inventory: URNs to be matched against
:type inventory: list
:returns: Iterator containing human readable test name, boolean status and logs
:rtype: iterator(str, bool, list(str))
"""
if inventory is not None:
self.inv = inventory
tests = [] + CTSText_TestUnit.tests
if self.countwords:
tests.append("count_words")
if scheme.endswith("-ignore"):
scheme = scheme.replace("-ignore", "")
else:
tests = [scheme] + tests
self.scheme = scheme
self.guidelines = guidelines
self.rng = rng
i = 0
for test in tests:
# Show the logs and return the status
status = False not in [status for status in getattr(self, test)()]
self.test_status[test] = status
yield (CTSText_TestUnit.readable[test], status, self.logs)
if test in self.breaks and status == False:
for t in tests[i+1:]:
self.test_status[t] = False
yield (CTSText_TestUnit.readable[t], False, [])
break
self.flush()
i += 1