Source code for HookTest.test

# -*- coding: utf-8 -*-

import os
import glob
import statistics
import sys
import traceback
import re

from collections import defaultdict, OrderedDict
from multiprocessing.pool import Pool
import json
import shutil
import requests
import hashlib
import hmac
import time
from prettytable import PrettyTable as PT
from prettytable import ALL as pt_all

import HookTest.capitains_units.cts
import HookTest.units
from colors import white, magenta, black
from operator import attrgetter


pr_finder = re.compile("pull\/([0-9]+)\/head")


[docs]class DefaultFinder(object): """ Finder are object used in Test to retrieve the target files of the tests """ def __init__(self, **options): pass
[docs] def find(self, directory): """ Return object to find :param directory: Root Directory to search in :returns: Path of xml text files, Path of __cts__.xml files :rtype: (list, list) """ data = glob.glob(os.path.join(directory, "data/*/*/*.xml")) + glob.glob(os.path.join(directory, "data/*/*.xml")) files, cts = [f for f in data if "__cts__.xml" not in f], [f for f in data if "__cts__.xml" in f] # For unit testing and human readable progression cts.sort() files.sort() return files, cts
[docs]class FilterFinder(DefaultFinder): """ FilterFinder provide a filtering capacity to DefaultFinder. It takes an include option which takes the form of the work urn (*ie.* in urn:cts:latinLit:phi1294.phi002.perseus-lat2 \ this would be phi1294.phi002.perseus-lat2, cut at any of the points : phi1294, phi1294.phi002, phi1294.phi002.perseus-lat2) :param include: Representation of the work urn component (might be from one member down to the version member) :type include: str """ def __init__(self, include, **options): self.include = include.split(".")
[docs] def find(self, directory): """ Return object to find :param directory: Root Directory to search in :returns: Path of xml text files, Path of __cts__.xml files :rtype: (list, list) """ textgroup, work, version = "*", "*", "*.*.*", if len(self.include) == 3: version = ".".join(self.include) if len(self.include) >= 2: work = self.include[1] if len(self.include) >= 1: textgroup = self.include[0] cts = glob.glob(os.path.join(directory, "data/{textgroup}/__cts__.xml".format( textgroup=textgroup ))) + \ glob.glob(os.path.join(directory, "data/{textgroup}/{work}/__cts__.xml".format( textgroup=textgroup, work=work ))) files = glob.glob(os.path.join(directory, "data/{textgroup}/{work}/{version}.xml".format( textgroup=textgroup, work=work, version=version ))) # For unit testing and human readable progression cts.sort() files.sort() return files, cts
[docs]class Test(object): """ Create a Test object :param path: Path where the test should happen :type path: str :param workers: Number of simultaneous workers to be used :type workers: str :param scheme: Name of the scheme :type scheme: str :param verbose: Log also rng and unit logs details :type verbose: int :param ping: URI to ping with data :type ping: str :param console: If set to true, print logs to the console :type console: bool :param finder: Test files retriever :type finder: DefaultFinder :param finderoptions: Dictionary of option to instantiate specific finders :type finderoptions: dict :param countwords: Enable counting words for text tests (False by default) :type countwords: bool """ STACK_TRIGGER_SIZE = 10 FAILURE = "failed" ERROR = "error" SUCCESS = "success" PENDING = "pending" SCHEMES = { "tei": "tei.rng", "epidoc": "epidoc.rng", "ignore": "epidoc.rng", "auto": "auto_rng" } def __init__( self, path, workers=1, scheme="auto", verbose=0, ping=None, secret="", triggering_size=None, console=False, build_manifest=False, finder=DefaultFinder, finderoptions=None, countwords=False, allowfailure=False, from_travis_to_hook=False, timeout=30, guidelines=None, **kwargs ): """ Create a Test object :param path: Path where the test should happen :type path: str :param uuid: Identifier for the test :type uuid: str :param repository: URI of the repository :type repository: str :param branch: Identifier of the branch :type branch: str :param workers: Number of simultaneous workers to be used :type workers: str :param scheme: Name of the scheme :type scheme: str :param verbose: Log also rng and unit logs details :type verbose: int :param ping: URI to ping with data :type ping: str :param console: If set to true, print logs to the console :type console: bool :param countwords: Count the number of words for passing texts :type countwords: bool :param build_manifest: Build a manifest at the end of the test :type build_manifest: bool """ self.depth = 10 self.console = console self.build_manifest = build_manifest self.path = path self.workers = workers self.ping = ping if os.environ.get("HOOK_SECRET"): self.secret = os.environ.get("HOOK_SECRET").encode() else: self.secret = bytes(secret, "utf-8") self.scheme = scheme self.rng = None if isinstance(scheme, list): self.scheme = scheme[0] self.rng = scheme[1] self.verbose = verbose self.countwords = countwords self.allowfailure = allowfailure self.__triggering_size = None self.timeout = timeout self.guidelines = guidelines if self.guidelines is None: if self.scheme == "epidoc": self.guidelines = "2.epidoc" else: self.guidelines = "2.tei" if isinstance(triggering_size, int): self.__triggering_size = triggering_size if not isinstance(scheme, list) and scheme not in Test.SCHEMES: raise ValueError( "Scheme {0} unknown, please use one of the following : {1}".format( scheme, ", ".join(Test.SCHEMES.keys()) ) ) self.results = OrderedDict() self.passing = defaultdict(bool) self.inventory = [] self.text_files = [] self.cts_files = [] self.progress = None self.finder = finder if not finder: self.finder = DefaultFinder if finderoptions: self.finder = self.finder(**finderoptions) else: self.finder = self.finder() self.from_travis_to_hook = from_travis_to_hook @property def successes(self): """ Get the number of successful tests :returns: Number of successful tests :rtype: int """ return len([True for status in self.passing.values() if status is True]) @property def json(self): """ Get Json representation of object report :return: JSON representing the complete test :rtype: """ return Test.dump(self.report) @property def report(self): """ Get the report of the Test :return: Report of the test :rtype: dict """ coverage = 0 if len(self.results) > 0: coverage = statistics.mean([test.coverage for test in self.results.values()]) return { "status": self.status, "units": [unitlog.dict for unitlog in self.results.values()], "coverage": coverage } @property def directory(self): """ Directory :return: Path of the full directory :rtype: str """ return self.path @property def stack(self): """ Get the current stack of unsent item :return: Unset UnitLog :rtype: [UnitLog] """ return [result for result in self.results.values() if result.sent is False] @property def status(self): """ Updated the status string based on available informations :return: Status string updated :rtype: str """ if self.count_files == 0 or len(self.passing) != self.count_files: return Test.ERROR elif self.allowfailure is True and self.count_files > 0 and self.successes > 0: return Test.SUCCESS elif self.count_files > 0 and self.successes == len(self.passing): return Test.SUCCESS else: return Test.FAILURE @property def triggering_size(self): """ :return: """ percentage = int(self.count_files / 20) if self.__triggering_size is not None: return self.__triggering_size elif percentage > Test.STACK_TRIGGER_SIZE: return percentage else: return Test.STACK_TRIGGER_SIZE @property def files(self): return self.text_files, self.cts_files @property def count_files(self): return len(self.text_files) + len(self.cts_files)
[docs] def flush(self, stack): """ Flush the remaining logs to the endpoint """ if len(stack) > 0: for needle in stack: needle.sent = True self.send({"units": [needle.dict for needle in stack]})
[docs] def send(self, data): """ Send data to self.ping URL :param data: Data to send :return: Result of request """ if isinstance(data, dict): data = Test.dump(data) else: data = Test.dump({"logs": data}) data = bytes(data, "utf-8") hashed = hmac.new(self.secret, data, hashlib.sha1).hexdigest() return requests.post( self.ping, data=data, headers={"HookTest-Secure-X": hashed} )
[docs] def unit(self, filepath): """ Do test for a file and print the results :param filepath: Path of the file to be tested :type filepath: str :returns: A UnitLog :rtype: UnitLog """ logs = [] results = {} additional = [] if filepath.endswith("__cts__.xml"): unit = HookTest.capitains_units.cts.CTSMetadata_TestUnit(filepath) texttype = "CTSMetadata" logs.append(">>>> Testing " + filepath) for name, status, unitlogs in unit.test(): if status: status_str = " passed" else: status_str = " failed" logs.append(">>>>> " + name + status_str) if self.verbose > 0 and len(unitlogs) > 0: logs += [log for log in unitlogs if log] results[name] = status additional += unit.urns else: unit = HookTest.capitains_units.cts.CTSText_TestUnit(filepath, countwords=self.countwords, timeout=self.timeout) texttype = "CTSText" logs.append(">>>> Testing " + filepath.split("data")[-1]) for name, status, unitlogs in unit.test(self.scheme, self.guidelines, self.rng, self.inventory): if status: status_str = " passed" else: status_str = " failed" logs.append(">>>>> " + name + status_str) if self.verbose > 0 and len(unitlogs) > 0: logs += [log for log in unitlogs if log] results[name] = status additional = {} additional["citations"] = unit.citation additional["duplicates"] = unit.duplicates additional["forbiddens"] = unit.forbiddens additional["dtd_errors"] = unit.dtd_errors additional['language'] = unit.lang additional['empties'] = unit.empties additional['capitains_errors'] = unit.capitains_errors if self.countwords: additional["words"] = unit.count return self.cover(filepath, results, testtype=texttype, logs=logs, additional=additional), filepath, additional
[docs] def run(self): """ Run the tests :returns: Status of the test, List of logs, Report :rtype: (string, list, dict) """ self.text_files, self.cts_files = self.find() self.start() # We deal with Inventory files first to get a list of urns with Pool(processes=self.workers) as executor: # We iterate over a dictionary of completed tasks for future in executor.imap_unordered(self.unit, [file for file in self.cts_files]): result, filepath, additional = future self.results[filepath] = result self.passing[filepath] = result.status self.inventory += additional self.log(self.results[filepath]) # Required for coverage executor.close() executor.join() self.middle() # To print the results from the metadata file tests # We load a thread pool which has 5 maximum workers with Pool(processes=self.workers) as executor: # We create a dictionary of tasks which for future in executor.imap_unordered(self.unit, [file for file in self.text_files]): result, filepath, additional = future self.results[filepath] = result self.passing[filepath] = result.status self.log(self.results[filepath]) # Required for coverage executor.close() executor.join() self.end() return self.status
[docs] def log(self, log): """ Deal with middle process situation :param log: Result of a test for one unit :type log: UnitLog :return: None """ if self.console: if isinstance(log, UnitLog): if log.status is True: sys.stdout.write('.') sys.stdout.flush() else: sys.stdout.write(str('X')) sys.stdout.flush() elif self.ping and len(self.stack) >= self.triggering_size: self.flush(self.stack)
[docs] def start(self): """ Deal with the start of the process """ if self.scheme == "auto": self.scheme = "auto_rng" if self.console: print(">>> Starting tests !", flush=True) print(">>> Files to test : "+str(self.count_files), flush=True) elif self.ping: self.send({ "logs": [ ">>> Starting tests !" ], "files": self.count_files, "texts": len(self.text_files), "inventories": len(self.cts_files) })
[docs] def download(self): """ Information to send or print during download """ if self.console is not False and self.verbose == 10: print("\n".join([f for f in self.progress.json if f]), flush=True)
[docs] def middle(self): """ to print out the results for the metadata files that failed the tests :return: :rtype: """ self.m_files = self.m_passing = len(self.results.values()) if self.console and self.verbose > 0: print('', flush=True) if False not in [unit.status for unit in self.results.values()]: print('All Metadata Files Passed', flush=True) else: display_table = PT(["Filename", "Failed Tests"]) display_table.align["Filename", "Failed Tests"] = 'c' display_table.hrules = pt_all for unit in sorted(self.report['units'], key=lambda x: x['name']): if unit['status'] is not True: self.m_passing -= 1 display_table.add_row([unit['name'], '\n'.join(['{test} failed'.format(test=x) for x in unit['units'] if unit['units'][x] is False])]) print(display_table, flush=True)
[docs] def end(self): """ Deal with end logs """ total_units = 0 total_words = 0 language_words = defaultdict(int) show = list(HookTest.capitains_units.cts.CTSText_TestUnit.readable.values()) if self.verbose == 0: show.remove("Duplicate passages") show.remove("Forbidden characters") if self.console: duplicate_nodes = '' forbidden_chars = '' dtd_errors = '' capitains_errors = '' empty_refs = '' num_texts = 0 num_failed = 0 print('', flush=True) if self.countwords is True: display_table = PT(["Identifier", "Words", "Nodes", "Failed Tests"]) display_table.align["Identifier", "Words", "Nodes", "Failed Tests"] = "c" else: display_table = PT(["Identifier", "Nodes", "Failed Tests"]) display_table.align["Identifier", "Nodes", "Failed Tests"] = "c" display_table.hrules = pt_all # try using self.results and then the UnitLog attributes instead of self.report # also use operator.attrgetter('name') instead of lambda x in the for statement for unit in sorted(self.results.values(), key=attrgetter('name')): if not unit.name.endswith('__cts__.xml'): num_texts += 1 if unit.units["Passage level parsing"] is False: try: show.remove("Duplicate passages") show.remove("Forbidden characters") except: pass if unit.coverage != 100.0: num_failed += 1 text_color = magenta else: text_color = white if unit.coverage == 0.0: failed_tests = 'All' else: failed_tests = '\n'.join([x for x in unit.units if unit.units[x] is False and x in show]) if unit.additional['duplicates']: duplicate_nodes += '\t{name}\t{nodes}\n'.format(name=magenta(os.path.basename(unit.name)), nodes=', '.join(unit.additional['duplicates'])) if unit.additional['forbiddens']: forbidden_chars += '\t{name}\t{nodes}\n'.format(name=magenta(os.path.basename(unit.name)), nodes=', '.join(unit.additional['forbiddens'])) if unit.additional["dtd_errors"] and self.verbose >= 6: dtd_errors += '\t{name}\t{nodes}\n'.format(name=magenta(os.path.basename(unit.name)), nodes=', '.join(unit.additional["dtd_errors"])) if unit.additional["capitains_errors"]: capitains_errors += '\t{name}\t{nodes}\n'.format(name=magenta(os.path.basename(unit.name)), nodes=', '.join(unit.additional["capitains_errors"])) if unit.additional["empties"]: empty_refs += '\t{name}\t{nodes}\n'.format(name=magenta(os.path.basename(unit.name)), nodes=', '.join(unit.additional["empties"])) if self.verbose >= 7 or unit.status is False: if self.countwords: row = [ "{}".format(text_color(os.path.basename(unit.name))), "{:,}".format(unit.additional['words']), ';'.join([str(x[1]) for x in unit.additional['citations']]), failed_tests ] else: row = [ "{}".format(text_color(os.path.basename(unit.name))), ';'.join([str(x[1]) for x in unit.additional['citations']]), failed_tests ] display_table.add_row(row) for x in unit.additional['citations']: total_units += x[1] if self.countwords: total_words += unit.additional['words'] if unit.additional['words'] > 0: language_words[unit.additional['language']] += unit.additional['words'] print(display_table, flush=True) print('', flush=True) if self.verbose >= 5: if duplicate_nodes: duplicate_nodes = magenta('Duplicate nodes found:\n') + duplicate_nodes + '\n' if forbidden_chars: forbidden_chars = magenta('Forbidden characters found:\n') + forbidden_chars + '\n' if dtd_errors: dtd_errors = magenta('DTD errors found:\n') + dtd_errors + '\n' if empty_refs: empty_refs = magenta('Empty references found:\n') + empty_refs + '\n' else: duplicate_nodes = forbidden_chars = dtd_errors = empty_refs = '' if capitains_errors: capitains_errors = magenta('CapiTainS parsing errors found:\n') + capitains_errors + '\n' print("{caps}{dupes}{forbs}{dtds}{empts}>>> End of the test !\n".format(caps=capitains_errors, dupes=duplicate_nodes, forbs=forbidden_chars, dtds=dtd_errors, empts=empty_refs)) t_pass = num_texts - num_failed cov = round(statistics.mean([test.coverage for test in self.results.values()]), ndigits=2) results_table = PT(["HookTestResults", ""]) results_table.align["HookTestResults", ""] = "c" results_table.hrules = pt_all results_table.add_row(["Total Texts", num_texts]) results_table.add_row(["Passing Texts", t_pass]) results_table.add_row(["Metadata Files", self.m_files]) results_table.add_row(["Passing Metadata", self.m_passing]) results_table.add_row(["Coverage", cov]) results_table.add_row(["Total Citation Units", "{:,}".format(total_units)]) if self.countwords is True: results_table.add_row(["Total Words", "{:,}".format(total_words)]) for l, words in language_words.items(): results_table.add_row(["Words in {}".format(l.upper()), "{:,}".format(words)]) print(results_table, flush=True) # Pushing to HOOK ! if isinstance(self.from_travis_to_hook, str): args = [num_texts, t_pass, self.m_files, self.m_passing, cov, total_units] if self.countwords is True: args.append(language_words) print(self.send_to_hook_from_travis(*args).text) # Manifest of passing files if self.build_manifest: passing = self.create_manifest() with open('{}/manifest.txt'.format(self.path), mode="w") as f: f.write('\n'.join(passing)) elif self.ping: report = self.report report["units"] = [unit.dict for unit in self.stack] self.send(report)
[docs] def send_to_hook_from_travis( self, texts_total, texts_passing, metadata_total, metadata_passing, coverage, nodes_count, words_dict=None ): """ Send data to travis :return: Request output """ data = dict( # Event event_type=os.environ.get("TRAVIS_EVENT_TYPE"), build_uri="https://travis-ci.org/{slug}/builds/{bid}".format( bid=os.environ.get("TRAVIS_BUILD_ID"), slug=os.environ.get("TRAVIS_REPO_SLUG") ), build_id=os.environ.get("TRAVIS_BUILD_NUMBER"), commit_sha=os.environ.get("TRAVIS_COMMIT"), # Information about the test texts_total=texts_total, texts_passing=texts_passing, metadata_total=metadata_total, metadata_passing=metadata_passing, coverage=coverage, nodes_count=nodes_count, units={ unit_name: log.status for unit_name, log in self.results.items() }, ) if data["event_type"] == "pull_request": data["source"] = os.environ.get("TRAVIS_PULL_REQUEST") else: data["source"] = os.environ.get("TRAVIS_BRANCH") if words_dict is not None: data["words_count"] = words_dict data = Test.dump(data) data = bytes(data, "utf-8") hashed = hmac.new(self.secret, data, hashlib.sha1).hexdigest() return requests.post( self.from_travis_to_hook, data=data, headers={ "HookTest-Secure-X": hashed, "Content-Type": "application/json" } )
[docs] def create_manifest(self): """ Creates a manifest.txt file in the source directory that contains an ordered list of passing files """ passing_temp = [x.name for x in self.results.values() if x.coverage == 100.0] passing = [] for f in passing_temp: if not f.endswith('__cts__.xml') and '{}/__cts__.xml'.format( os.path.dirname(f)) in passing_temp and '{}/__cts__.xml'.format( '/'.join(f.split('/')[:-2])) in passing_temp: passing.append(f) passing.append('{}/__cts__.xml'.format(os.path.dirname(f))) passing.append('{}/__cts__.xml'.format('/'.join(f.split('/')[:-2]))) return sorted(list(set(passing)))
[docs] def find(self): """ Find CTS files in a directory :param directory: Path of the directory :type directory: str :returns: Path of xml text files, Path of __cts__.xml files :rtype: (list, list) """ return self.finder.find(self.directory)
[docs] def cover(self, name, test, testtype=None, logs=None, additional=None): """ Given a dictionary, compute the coverage of one item :param name: :type name: :param test: Dictionary where keys represents test done on a file and value a boolean indicating passing status :type test: boolean :param logs: List of logs for one unit :type logs: list :param testtype: the type of file tested (e.g., CTSMetadata or CTSText) :type testtype: str :returns: Passing status :rtype: dict """ results = list(test.values()) if logs is None: logs = list() if len(results) > 0: return UnitLog( directory=self.directory, name=name, units=test, coverage=len([v for v in results if v is True])/len(results)*100, status=False not in results, logs=logs, additional=additional, testtype=testtype ) else: return UnitLog( directory=self.directory, name=name, units=list(), coverage=0.0, status=False, logs=logs, testtype=testtype )
@staticmethod def dump(obj): return json.dumps(obj, separators=(',', ':'), sort_keys=True)
[docs]def cmd(console=False, **kwargs): """ Generate the complete process of Test :param console: Print logs to console :type console: bool :param kwargs: Named arguments :type kwargs: dict :return: Status of the test """ test = HookTest.test.Test(console=console, **kwargs) test.console = console status = {} try: status = test.run() except Exception as E: type_, value_, traceback_ = sys.exc_info() tb = "".join(traceback.format_exception(type_, value_, traceback_)) if test.ping: test.send({"status": Test.ERROR, "message": tb}) elif console: print(tb, flush=True) if "json" in kwargs and kwargs["json"]: with open(kwargs["json"], "w") as json_file: json.dump(test.report, json_file) return status
[docs]class UnitLog(object): """ Model for logging information :param name: Name of the tested unit :param units: :param coverage: Percentage of successful tests :param status: Status of the unit :param logs: Logs :param sent: Status regarding the logging :param additional: Additional informations. Can be used for words counting """ def __init__(self, directory, name, units, coverage, status, testtype=None, logs=None, sent=False, additional=None ): """ Initiate the object :param name: Name of the tested unit :param units: :param coverage: Percentage of successful tests :param status: Status of the unit :param logs: Logs :param sent: Status regarding the logging """ self.directory = directory self.units = units self.coverage = coverage self.status = status self.__logs = list() self.sent = sent self.time = time.strftime("%Y-%m-%d %H:%M:%S") self.name = self.directory_replacer(name) self.logs = logs self.additional = {} self.testtype = testtype if isinstance(additional, dict): self.additional = additional @property def logs(self): return self.__logs @logs.setter def logs(self, logs): if isinstance(logs, list): self.__logs = [self.directory_replacer(data) for data in logs] def directory_replacer(self, data): if self.directory != ".": return data.replace(self.directory, "") else: return data @property def dict(self): """ Get the dictionary version of the object :return: Dictionary representation of the object :rtype: dict """ x = { "name": self.name, "units": self.units, "coverage": self.coverage, "status": self.status, "logs": self.logs, "at": self.time } x.update(self.additional) return x def __str__(self): return "\n".join(self.logs)