diff --git a/swh/web/ui/backend.py b/swh/web/ui/backend.py index 4ab7b3fe1..52c83b9f1 100644 --- a/swh/web/ui/backend.py +++ b/swh/web/ui/backend.py @@ -1,272 +1,270 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import os from swh.web.ui import main def content_get(sha1_bin): """Lookup the content designed by {algo: hash_bin}. Args: sha1_bin: content's binary sha1. Returns: Content as dict with 'sha1' and 'data' keys. data representing its raw data. """ contents = main.storage().content_get([sha1_bin]) if contents and len(contents) >= 1: return contents[0] return None def content_find(algo, hash_bin): """Retrieve the content with binary hash hash_bin Args: algo: nature of the hash hash_bin. hash_bin: content's hash searched for. Returns: A triplet (sha1, sha1_git, sha256) if the content exist or None otherwise. """ return main.storage().content_find({algo: hash_bin}) def content_find_occurrence(algo, hash_bin): """Find the content's occurrence. Args: algo: nature of the hash hash_bin. hash_bin: content's hash searched for. Returns: The occurrence of the content. """ return main.storage().content_find_occurrence({algo: hash_bin}) def content_missing_per_sha1(sha1list): """List content missing from storage based on sha1 Args: sha1s: Iterable of sha1 to check for absence Returns: an iterable of sha1s missing from the storage """ return main.storage().content_missing_per_sha1(sha1list) def directory_get(sha1_bin): """Retrieve information on one directory. Args: sha1_bin: Directory's identifier Returns: The directory's information. """ res = main.storage().directory_get([sha1_bin]) if res and len(res) >= 1: return res[0] def origin_get(origin_id): """Return information about the origin with id origin_id. Args: origin_id: origin's identifier Returns: Origin information as dict. """ return main.storage().origin_get({'id': origin_id}) def person_get(person_id): """Return information about the person with id person_id. Args: person_id: person's identifier.v Returns: Person information as dict. """ res = main.storage().person_get([person_id]) if res and len(res) >= 1: return res[0] def directory_ls(sha1_git_bin, recursive=False): """Return information about the directory with id sha1_git. Args: sha1_git: directory's identifier. recursive: Optional recursive flag default to False Returns: Directory information as dict. """ directory_entries = main.storage().directory_ls(sha1_git_bin, recursive) if not directory_entries: return [] return directory_entries def release_get(sha1_git_bin): """Return information about the release with sha1 sha1_git_bin. Args: sha1_git_bin: The release's sha1 as bytes. Returns: Release information as dict if found, None otherwise. Raises: ValueError if the identifier provided is not of sha1 nature. """ res = main.storage().release_get([sha1_git_bin]) if res and len(res) >= 1: return res[0] return None def revision_get(sha1_git_bin): """Return information about the revision with sha1 sha1_git_bin. Args: sha1_git_bin: The revision's sha1 as bytes. Returns: Revision information as dict if found, None otherwise. Raises: ValueError if the identifier provided is not of sha1 nature. """ res = main.storage().revision_get([sha1_git_bin]) if res and len(res) >= 1: return res[0] return None def revision_get_multiple(sha1_git_bin_list): """Return information about the revisions in sha1_git_bin_list Args: sha1_git_bin_list: The revisions' sha1s as a list of bytes. Returns: Revisions' information as an iterable of dicts if any found, an empty list otherwise Raises: ValueError if the identifier provided is not of sha1 nature. """ res = main.storage().revision_get(sha1_git_bin_list) if res and len(res) >= 1: return res return [] -def revision_log(sha1_git_bin, limit=100): +def revision_log(sha1_git_bin, limit): """Return information about the revision with sha1 sha1_git_bin. Args: sha1_git_bin: The revision's sha1 as bytes. limit: the maximum number of revisions returned. Returns: Revision information as dict if found, None otherwise. Raises: ValueError if the identifier provided is not of sha1 nature. """ return main.storage().revision_log([sha1_git_bin], limit) -def revision_log_by(origin_id, branch_name, ts, limit=100): +def revision_log_by(origin_id, branch_name, ts, limit): """Return information about the revision matching the timestamp ts, from origin origin_id, in branch branch_name. Args: origin_id: origin of the revision - branch_name: revision's branch. - timestamp: revision's time frame. Returns: Information for the revision matching the criterions. """ - rev_list = main.storage().revision_log_by(origin_id, - branch_name, - ts) - if rev_list is None: - return None - return rev_list[:limit] + return main.storage().revision_log_by(origin_id, + branch_name, + ts, + limit=limit) def stat_counters(): """Return the stat counters for Software Heritage Returns: A dict mapping textual labels to integer values. """ return main.storage().stat_counters() def stat_origin_visits(origin_id): """Return the dates at which the given origin was scanned for content. Returns: An array of dates """ return main.storage().origin_visit_get(origin_id) def revision_get_by(origin_id, branch_name, timestamp): """Return occurrence information matching the criterions origin_id, branch_name, ts. """ res = main.storage().revision_get_by(origin_id, branch_name, timestamp=timestamp, limit=1) if not res: return None return res[0] def directory_entry_get_by_path(directory, path): """Return a directory entry by its path. """ paths = path.strip(os.path.sep).split(os.path.sep) return main.storage().directory_entry_get_by_path( directory, list(map(lambda p: p.encode('utf-8'), paths))) def entity_get(uuid): """Retrieve the entity per its uuid. """ return main.storage().entity_get(uuid) diff --git a/swh/web/ui/converters.py b/swh/web/ui/converters.py index d3621771f..287f95a41 100644 --- a/swh/web/ui/converters.py +++ b/swh/web/ui/converters.py @@ -1,232 +1,232 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from swh.core import hashutil from swh.core.utils import decode_with_escape from swh.web.ui import utils def from_swh(dict_swh, hashess={}, bytess={}, dates={}, blacklist={}, convert={}, convert_fn=lambda x: x): """Convert from an swh dictionary to something reasonably json serializable. Args: - dict_swh: the origin dictionary needed to be transformed - hashess: list/set of keys representing hashes values (sha1, sha256, sha1_git, etc...) as bytes. Those need to be transformed in hexadecimal string - bytess: list/set of keys representing bytes values which needs to be decoded - blacklist: set of keys to filter out from the conversion - convert: set of keys whose associated values need to be converted using convert_fn - convert_fn: the conversion function to apply on the value of key in 'convert' The remaining keys are copied as is in the output. Returns: dictionary equivalent as dict_swh only with its keys `converted`. """ def convert_hashes_bytes(v): """v is supposedly a hash as bytes, returns it converted in hex. """ if v and isinstance(v, bytes): return hashutil.hash_to_hex(v) return v def convert_bytes(v): """v is supposedly a bytes string, decode as utf-8. FIXME: Improve decoding policy. If not utf-8, break! """ if v and isinstance(v, bytes): return v.decode('utf-8') return v def convert_date(v): """v is a dict with three keys: timestamp offset negative_utc We convert it to a human-readable string """ tz = datetime.timezone(datetime.timedelta(minutes=v['offset'])) date = datetime.datetime.fromtimestamp(v['timestamp'], tz=tz) datestr = date.isoformat() if v['offset'] == 0 and v['negative_utc']: # remove the rightmost + and replace it with a - return '-'.join(datestr.rsplit('+', 1)) return datestr if not dict_swh: return dict_swh new_dict = {} for key, value in dict_swh.items(): if key in blacklist: continue elif key in dates: new_dict[key] = convert_date(value) elif isinstance(value, dict): new_dict[key] = from_swh(value, hashess, bytess, dates, blacklist, convert, convert_fn) elif key in hashess: new_dict[key] = utils.fmap(convert_hashes_bytes, value) elif key in bytess: try: new_dict[key] = utils.fmap(convert_bytes, value) except UnicodeDecodeError: if 'decoding_failures' not in new_dict: new_dict['decoding_failures'] = [key] else: new_dict['decoding_failures'].append(key) new_dict[key] = utils.fmap(decode_with_escape, value) elif key in convert: new_dict[key] = convert_fn(value) else: new_dict[key] = value return new_dict def from_origin(origin): """Convert from an SWH origin to an origin dictionary. """ return from_swh(origin, hashess=set(['revision']), bytess=set(['path'])) def from_release(release): """Convert from an SWH release to a json serializable release dictionary. Args: release: Dict with the following keys - id: identifier of the revision (sha1 in bytes) - revision: identifier of the revision the release points to (sha1 in bytes) - comment: release's comment message (bytes) - name: release's name (string) - author: release's author identifier (swh's id) - synthetic: the synthetic property (boolean) Returns: Release dictionary with the following keys: - id: hexadecimal sha1 (string) - revision: hexadecimal sha1 (string) - comment: release's comment message (string) - name: release's name (string) - author: release's author identifier (swh's id) - synthetic: the synthetic property (boolean) """ return from_swh( release, hashess=set(['id', 'target']), bytess=set(['message', 'name', 'fullname', 'email']), dates={'date'}, ) def from_revision(revision): """Convert from an SWH revision to a json serializable revision dictionary. Args: revision: Dict with the following keys - id: identifier of the revision (sha1 in bytes) - directory: identifier of the directory the revision points to (sha1 in bytes) - author_name, author_email: author's revision name and email - committer_name, committer_email: committer's revision name and email - message: revision's message - date, date_offset: revision's author date - committer_date, committer_date_offset: revision's commit date - parents: list of parents for such revision - synthetic: revision's property nature - type: revision's type (git, tar or dsc at the moment) - metadata: if the revision is synthetic, this can reference dynamic properties. Returns: Revision dictionary with the same keys as inputs, only: - sha1s are in hexadecimal strings (id, directory) - bytes are decoded in string (author_name, committer_name, author_email, committer_email) - remaining keys are left as is """ revision = from_swh(revision, hashess=set(['id', 'directory', 'parents', 'children']), bytess=set(['name', 'fullname', 'email']), dates={'date', 'committer_date'}) if revision: if 'parents' in revision: revision['merge'] = len(revision['parents']) > 1 if 'message' in revision: try: revision['message'] = revision['message'].decode('utf-8') except UnicodeDecodeError: revision['message_decoding_failed'] = True revision['message'] = None return revision def from_content(content): """Convert swh content to serializable content dictionary. """ - if content and 'ctime' in content: - del content['ctime'] + if content: + content = {k: v for k, v in content.items() if k not in ['ctime']} return from_swh(content, hashess={'sha1', 'sha1_git', 'sha256'}, bytess={}, blacklist={}, convert={'status'}, convert_fn=lambda v: 'absent' if v == 'hidden' else v) def from_person(person): """Convert swh person to serializable person dictionary. """ return from_swh(person, hashess=set(), bytess=set(['name', 'fullname', 'email'])) def from_directory_entry(dir_entry): """Convert swh person to serializable person dictionary. """ return from_swh(dir_entry, hashess=set(['dir_id', 'sha1_git', 'sha1', 'sha256', 'target']), bytess=set(['name']), blacklist={}, convert={'status'}, convert_fn=lambda v: 'absent' if v == 'hidden' else v) diff --git a/swh/web/ui/service.py b/swh/web/ui/service.py index e3773ea2b..2112ef465 100644 --- a/swh/web/ui/service.py +++ b/swh/web/ui/service.py @@ -1,621 +1,621 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict from swh.core import hashutil from swh.web.ui import converters, query, backend from swh.web.ui.exc import NotFoundExc def lookup_multiple_hashes(hashes): """Lookup the passed hashes in a single DB connection, using batch processing. Args: An array of {filename: X, sha1: Y}, string X, hex sha1 string Y. Returns: The same array with elements updated with elem['found'] = true if the hash is present in storage, elem['found'] = false if not. """ hashlist = [hashutil.hex_to_hash(elem['sha1']) for elem in hashes] content_missing = backend.content_missing_per_sha1(hashlist) missing = [hashutil.hash_to_hex(x) for x in content_missing] for x in hashes: x.update({'found': True}) for h in hashes: if h['sha1'] in missing: h['found'] = False return hashes def lookup_hash(q): """Checks if the storage contains a given content checksum Args: query string of the form Returns: Dict with key found containing the hash info if the hash is present, None if not. """ algo, hash = query.parse_hash(q) found = backend.content_find(algo, hash) return {'found': found, 'algo': algo} def search_hash(q): """Checks if the storage contains a given content checksum Args: query string of the form Returns: Dict with key found to True or False, according to whether the checksum is present or not """ algo, hash = query.parse_hash(q) found = backend.content_find(algo, hash) return {'found': found is not None} def lookup_hash_origin(q): """Return information about the checksum contained in the query q. Args: query string of the form Returns: origin as dictionary if found for the given content. """ algo, hash = query.parse_hash(q) origin = backend.content_find_occurrence(algo, hash) return converters.from_origin(origin) def lookup_origin(origin_id): """Return information about the origin with id origin_id. Args: origin_id as string Returns: origin information as dict. """ return backend.origin_get(origin_id) def lookup_person(person_id): """Return information about the person with id person_id. Args: person_id as string Returns: person information as dict. """ person = backend.person_get(person_id) return converters.from_person(person) def lookup_directory(sha1_git): """Return information about the directory with id sha1_git. Args: sha1_git as string Returns: directory information as dict. """ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( sha1_git, ['sha1'], # HACK: sha1_git really 'Only sha1_git is supported.') dir = backend.directory_get(sha1_git_bin) if not dir: return None directory_entries = backend.directory_ls(sha1_git_bin) return map(converters.from_directory_entry, directory_entries) def lookup_directory_with_path(directory_sha1_git, path_string): """Return directory information for entry with path path_string w.r.t. root directory pointed by directory_sha1_git Args: - directory_sha1_git: sha1_git corresponding to the directory to which we append paths to (hopefully) find the entry - the relative path to the entry starting from the directory pointed by directory_sha1_git Raises: NotFoundExc if the directory entry is not found """ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( directory_sha1_git, ['sha1'], 'Only sha1_git is supported.') queried_dir = backend.directory_entry_get_by_path( sha1_git_bin, path_string) if not queried_dir: raise NotFoundExc(('Directory entry with path %s from %s not found') % (path_string, directory_sha1_git)) return converters.from_directory_entry(queried_dir) def lookup_release(release_sha1_git): """Return information about the release with sha1 release_sha1_git. Args: release_sha1_git: The release's sha1 as hexadecimal Returns: Release information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. """ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( release_sha1_git, ['sha1'], 'Only sha1_git is supported.') res = backend.release_get(sha1_git_bin) return converters.from_release(res) def lookup_revision(rev_sha1_git): """Return information about the revision with sha1 revision_sha1_git. Args: revision_sha1_git: The revision's sha1 as hexadecimal Returns: Revision information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. """ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( rev_sha1_git, ['sha1'], 'Only sha1_git is supported.') revision = backend.revision_get(sha1_git_bin) return converters.from_revision(revision) def lookup_revision_multiple(sha1_git_list): """Return information about the revision with sha1 revision_sha1_git. Args: revision_sha1_git: The revision's sha1 as hexadecimal Returns: Revision information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. """ def to_sha1_bin(sha1_hex): _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( sha1_hex, ['sha1'], 'Only sha1_git is supported.') return sha1_git_bin sha1_bin_list = (to_sha1_bin(x) for x in sha1_git_list) revisions = backend.revision_get_multiple(sha1_bin_list) return (converters.from_revision(x) for x in revisions) def lookup_revision_message(rev_sha1_git): """Return the raw message of the revision with sha1 revision_sha1_git. Args: revision_sha1_git: The revision's sha1 as hexadecimal Returns: Decoded revision message as dict {'message': } Raises: ValueError if the identifier provided is not of sha1 nature. NotFoundExc if the revision is not found, or if it has no message """ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( rev_sha1_git, ['sha1'], 'Only sha1_git is supported.') revision = backend.revision_get(sha1_git_bin) if not revision: raise NotFoundExc('Revision with sha1_git %s not found.' % rev_sha1_git) if 'message' not in revision: raise NotFoundExc('No message for revision with sha1_git %s.' % rev_sha1_git) res = {'message': revision['message']} return res def lookup_revision_by(origin_id, branch_name="refs/heads/master", timestamp=None): """Lookup revisions by origin_id, branch_name and timestamp. If: - branch_name is not provided, lookup using 'refs/heads/master' as default. - ts is not provided, use the most recent Args: - origin_id: origin of the revision. - branch_name: revision's branch. - timestamp: revision's time frame. Yields: The revisions matching the criterions. """ res = backend.revision_get_by(origin_id, branch_name, timestamp) return converters.from_revision(res) -def lookup_revision_log(rev_sha1_git, limit=25): +def lookup_revision_log(rev_sha1_git, limit): """Return information about the revision with sha1 revision_sha1_git. Args: revision_sha1_git: The revision's sha1 as hexadecimal limit: the maximum number of revisions returned Returns: Revision information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. """ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( rev_sha1_git, ['sha1'], 'Only sha1_git is supported.') revision_entries = backend.revision_log(sha1_git_bin, limit) return map(converters.from_revision, revision_entries) -def lookup_revision_log_by(origin_id, branch_name, timestamp, limit=25): +def lookup_revision_log_by(origin_id, branch_name, timestamp, limit): """Return information about the revision with sha1 revision_sha1_git. Args: origin_id: origin of the revision branch_name: revision's branch timestamp: revision's time frame limit: the maximum number of revisions returned Returns: Revision information as dict. Raises: NotFoundExc if no revision corresponds to the criterion NotFoundExc if the corresponding revision has no log """ revision_entries = backend.revision_log_by(origin_id, branch_name, timestamp, limit) if not revision_entries: return None return map(converters.from_revision, revision_entries) def lookup_revision_with_context_by(origin_id, branch_name, ts, sha1_git, limit=100): """Return information about revision sha1_git, limited to the sub-graph of all transitive parents of sha1_git_root. sha1_git_root being resolved through the lookup of a revision by origin_id, branch_name and ts. In other words, sha1_git is an ancestor of sha1_git_root. Args: - origin_id: origin of the revision. - branch_name: revision's branch. - timestamp: revision's time frame. - sha1_git: one of sha1_git_root's ancestors. - limit: limit the lookup to 100 revisions back. Returns: Pair of (root_revision, revision). Information on sha1_git if it is an ancestor of sha1_git_root including children leading to sha1_git_root Raises: - BadInputExc in case of unknown algo_hash or bad hash. - NotFoundExc if either revision is not found or if sha1_git is not an ancestor of sha1_git_root. """ rev_root = backend.revision_get_by(origin_id, branch_name, ts) if not rev_root: raise NotFoundExc('Revision with (origin_id: %s, branch_name: %s' ', ts: %s) not found.' % (origin_id, branch_name, ts)) return (converters.from_revision(rev_root), lookup_revision_with_context(rev_root, sha1_git, limit)) def lookup_revision_with_context(sha1_git_root, sha1_git, limit=100): """Return information about revision sha1_git, limited to the sub-graph of all transitive parents of sha1_git_root. In other words, sha1_git is an ancestor of sha1_git_root. Args: sha1_git_root: latest revision. The type is either a sha1 (as an hex string) or a non converted dict. sha1_git: one of sha1_git_root's ancestors limit: limit the lookup to 100 revisions back Returns: Information on sha1_git if it is an ancestor of sha1_git_root including children leading to sha1_git_root Raises: BadInputExc in case of unknown algo_hash or bad hash NotFoundExc if either revision is not found or if sha1_git is not an ancestor of sha1_git_root """ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( sha1_git, ['sha1'], 'Only sha1_git is supported.') revision = backend.revision_get(sha1_git_bin) if not revision: raise NotFoundExc('Revision %s not found' % sha1_git) if isinstance(sha1_git_root, str): _, sha1_git_root_bin = query.parse_hash_with_algorithms_or_throws( sha1_git_root, ['sha1'], 'Only sha1_git is supported.') revision_root = backend.revision_get(sha1_git_root_bin) if not revision_root: raise NotFoundExc('Revision root %s not found' % sha1_git_root) else: sha1_git_root_bin = sha1_git_root['id'] revision_log = backend.revision_log(sha1_git_root_bin, limit) parents = {} children = defaultdict(list) for rev in revision_log: rev_id = rev['id'] parents[rev_id] = [] for parent_id in rev['parents']: parents[rev_id].append(parent_id) children[parent_id].append(rev_id) if revision['id'] not in parents: raise NotFoundExc('Revision %s is not an ancestor of %s' % (sha1_git, sha1_git_root)) revision['children'] = children[revision['id']] return converters.from_revision(revision) def lookup_directory_with_revision(sha1_git, dir_path=None, with_data=False): """Return information on directory pointed by revision with sha1_git. If dir_path is not provided, display top level directory. Otherwise, display the directory pointed by dir_path (if it exists). Args: sha1_git: revision's hash. dir_path: optional directory pointed to by that revision. with_data: boolean that indicates to retrieve the raw data if the path resolves to a content. Default to False (for the api) Returns: Information on the directory pointed to by that revision. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc either if the revision is not found or the path referenced does not exist. NotImplementedError in case of dir_path exists but do not reference a type 'dir' or 'file'. """ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( sha1_git, ['sha1'], 'Only sha1_git is supported.') revision = backend.revision_get(sha1_git_bin) if not revision: raise NotFoundExc('Revision %s not found' % sha1_git) dir_sha1_git_bin = revision['directory'] if dir_path: entity = backend.directory_entry_get_by_path(dir_sha1_git_bin, dir_path) if not entity: raise NotFoundExc( "Directory or File '%s' pointed to by revision %s not found" % (dir_path, sha1_git)) else: entity = {'type': 'dir', 'target': dir_sha1_git_bin} if entity['type'] == 'dir': directory_entries = backend.directory_ls(entity['target']) return {'type': 'dir', 'path': '.' if not dir_path else dir_path, 'revision': sha1_git, 'content': map(converters.from_directory_entry, directory_entries)} elif entity['type'] == 'file': # content content = backend.content_find('sha1_git', entity['target']) if with_data: content['data'] = backend.content_get(content['sha1'])['data'] return {'type': 'file', 'path': '.' if not dir_path else dir_path, 'revision': sha1_git, 'content': converters.from_content(content)} else: raise NotImplementedError('Entity of type %s not implemented.' % entity['type']) def lookup_content(q): """Lookup the content designed by q. Args: q: The release's sha1 as hexadecimal """ algo, hash = query.parse_hash(q) c = backend.content_find(algo, hash) return converters.from_content(c) def lookup_content_raw(q): """Lookup the content defined by q. Args: q: query string of the form Returns: dict with 'sha1' and 'data' keys. data representing its raw data decoded. """ algo, hash = query.parse_hash(q) c = backend.content_find(algo, hash) if not c: return None content = backend.content_get(c['sha1']) return converters.from_content(content) def stat_counters(): """Return the stat counters for Software Heritage Returns: A dict mapping textual labels to integer values. """ return backend.stat_counters() def stat_origin_visits(origin_id): """Return the dates at which the given origin was scanned for content. Returns: An array of dates in the datetime format """ for visit in backend.stat_origin_visits(origin_id): visit['date'] = visit['date'].timestamp() yield(visit) def lookup_entity_by_uuid(uuid): """Return the entity's hierarchy from its uuid. Args: uuid: entity's identifier. Returns: List of hierarchy entities from the entity with uuid. """ uuid = query.parse_uuid4(uuid) return backend.entity_get(uuid) def lookup_revision_through(revision, limit=100): """Retrieve a revision from the criterion stored in revision dictionary. Args: revision: Dictionary of criterion to lookup the revision with. Here are the supported combination of possible values: - origin_id, branch_name, ts, sha1_git - origin_id, branch_name, ts - sha1_git_root, sha1_git - sha1_git Returns: None if the revision is not found or the actual revision. """ if 'origin_id' in revision and \ 'branch_name' in revision and \ 'ts' in revision and \ 'sha1_git' in revision: return lookup_revision_with_context_by(revision['origin_id'], revision['branch_name'], revision['ts'], revision['sha1_git'], limit) if 'origin_id' in revision and \ 'branch_name' in revision and \ 'ts' in revision: return lookup_revision_by(revision['origin_id'], revision['branch_name'], revision['ts']) if 'sha1_git_root' in revision and \ 'sha1_git' in revision: return lookup_revision_with_context(revision['sha1_git_root'], revision['sha1_git'], limit) if 'sha1_git' in revision: return lookup_revision(revision['sha1_git']) # this should not happen raise NotImplementedError('Should not happen!') def lookup_directory_through_revision(revision, path=None, limit=100, with_data=False): """Retrieve the directory information from the revision. Args: revision: dictionary of criterion representing a revision to lookup path: directory's path to lookup. limit: optional query parameter to limit the revisions log. (default to 100). For now, note that this limit could impede the transitivity conclusion about sha1_git not being an ancestor of. with_data: indicate to retrieve the content's raw data if path resolves to a content. Returns: The directory pointing to by the revision criterions at path. """ rev = lookup_revision_through(revision, limit) if not rev: raise NotFoundExc('Revision with criterion %s not found!' % revision) return (rev['id'], lookup_directory_with_revision(rev['id'], path, with_data)) diff --git a/swh/web/ui/tests/test_backend.py b/swh/web/ui/tests/test_backend.py index ed01c08ee..516c73bc8 100644 --- a/swh/web/ui/tests/test_backend.py +++ b/swh/web/ui/tests/test_backend.py @@ -1,697 +1,699 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from nose.tools import istest from unittest.mock import MagicMock from swh.core import hashutil from swh.web.ui import backend from swh.web.ui.tests import test_app class BackendTestCase(test_app.SWHApiTestCase): @istest def content_get_ko_not_found_1(self): # given sha1_bin = hashutil.hex_to_hash( '456caf10e9535160d90e874b45aa426de762f777') self.storage.content_get = MagicMock(return_value=None) # when actual_content = backend.content_get(sha1_bin) # then self.assertIsNone(actual_content) self.storage.content_get.assert_called_once_with( [sha1_bin]) @istest def content_get_ko_not_found_empty_result(self): # given sha1_bin = hashutil.hex_to_hash( '456caf10e9535160d90e874b45aa426de762f19f') self.storage.content_get = MagicMock(return_value=[]) # when actual_content = backend.content_get(sha1_bin) # then self.assertIsNone(actual_content) self.storage.content_get.assert_called_once_with( [sha1_bin]) @istest def content_get(self): # given sha1_bin = hashutil.hex_to_hash( '123caf10e9535160d90e874b45aa426de762f19f') stub_contents = [{ 'sha1': sha1_bin, 'data': b'binary data', }, {}] self.storage.content_get = MagicMock(return_value=stub_contents) # when actual_content = backend.content_get(sha1_bin) # then self.assertEquals(actual_content, stub_contents[0]) self.storage.content_get.assert_called_once_with( [sha1_bin]) @istest def content_find_ko_no_result(self): # given sha1_bin = hashutil.hex_to_hash( '123caf10e9535160d90e874b45aa426de762f19f') self.storage.content_find = MagicMock(return_value=None) # when actual_lookup = backend.content_find('sha1_git', sha1_bin) # then self.assertIsNone(actual_lookup) self.storage.content_find.assert_called_once_with( {'sha1_git': sha1_bin}) @istest def content_find(self): # given sha1_bin = hashutil.hex_to_hash( '456caf10e9535160d90e874b45aa426de762f19f') self.storage.content_find = MagicMock(return_value=(1, 2, 3)) # when actual_content = backend.content_find('sha1', sha1_bin) # then self.assertEquals(actual_content, (1, 2, 3)) # check the function has been called with parameters self.storage.content_find.assert_called_with({'sha1': sha1_bin}) @istest def content_find_occurrence_ko_no_result(self): # given sha1_bin = hashutil.hex_to_hash( '123caf10e9535160d90e874b45aa426de762f19f') self.storage.content_find_occurrence = MagicMock(return_value=None) # when actual_lookup = backend.content_find_occurrence('sha1_git', sha1_bin) # then self.assertIsNone(actual_lookup) self.storage.content_find_occurrence.assert_called_once_with( {'sha1_git': sha1_bin}) @istest def content_find_occurrence(self): # given sha1_bin = hashutil.hex_to_hash( '456caf10e9535160d90e874b45aa426de762f19f') self.storage.content_find_occurrence = MagicMock( return_value=(1, 2, 3)) # when actual_content = backend.content_find_occurrence('sha1', sha1_bin) # then self.assertEquals(actual_content, (1, 2, 3)) # check the function has been called with parameters self.storage.content_find_occurrence.assert_called_with( {'sha1': sha1_bin}) @istest def content_missing_per_sha1_none(self): # given sha1s_bin = [hashutil.hex_to_hash( '456caf10e9535160d90e874b45aa426de762f19f'), hashutil.hex_to_hash( '745bab676c8f3cec8016e0c39ea61cf57e518865' )] self.storage.content_missing_per_sha1 = MagicMock(return_value=[]) # when actual_content = backend.content_missing_per_sha1(sha1s_bin) # then self.assertEquals(actual_content, []) self.storage.content_missing_per_sha1.assert_called_with(sha1s_bin) @istest def content_missing_per_sha1_some(self): # given sha1s_bin = [hashutil.hex_to_hash( '456caf10e9535160d90e874b45aa426de762f19f'), hashutil.hex_to_hash( '745bab676c8f3cec8016e0c39ea61cf57e518865' )] self.storage.content_missing_per_sha1 = MagicMock(return_value=[ hashutil.hex_to_hash( '745bab676c8f3cec8016e0c39ea61cf57e518865' )]) # when actual_content = backend.content_missing_per_sha1(sha1s_bin) # then self.assertEquals(actual_content, [hashutil.hex_to_hash( '745bab676c8f3cec8016e0c39ea61cf57e518865' )]) self.storage.content_missing_per_sha1.assert_called_with(sha1s_bin) @istest def origin_get(self): # given self.storage.origin_get = MagicMock(return_value={ 'id': 'origin-id', 'lister': 'uuid-lister', 'project': 'uuid-project', 'url': 'ftp://some/url/to/origin', 'type': 'ftp'}) # when actual_origin = backend.origin_get('origin-id') # then self.assertEqual(actual_origin, {'id': 'origin-id', 'lister': 'uuid-lister', 'project': 'uuid-project', 'url': 'ftp://some/url/to/origin', 'type': 'ftp'}) self.storage.origin_get.assert_called_with({'id': 'origin-id'}) @istest def person_get(self): # given self.storage.person_get = MagicMock(return_value=[{ 'id': 'person-id', 'name': 'blah'}]) # when actual_person = backend.person_get('person-id') # then self.assertEqual(actual_person, {'id': 'person-id', 'name': 'blah'}) self.storage.person_get.assert_called_with(['person-id']) @istest def directory_get_not_found(self): # given sha1_bin = hashutil.hex_to_hash( '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') self.storage.directory_get = MagicMock(return_value=None) # when actual_directory = backend.directory_get(sha1_bin) # then self.assertEquals(actual_directory, None) self.storage.directory_get.assert_called_with([sha1_bin]) @istest def directory_get(self): # given sha1_bin = hashutil.hex_to_hash( '51f71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') sha1_bin2 = hashutil.hex_to_hash( '62071b8614fcd89ccd17ca2b1d9e66c5b00a6d03') stub_dir = {'id': sha1_bin, 'revision': b'sha1-blah'} stub_dir2 = {'id': sha1_bin2, 'revision': b'sha1-foobar'} self.storage.directory_get = MagicMock(return_value=[stub_dir, stub_dir2]) # when actual_directory = backend.directory_get(sha1_bin) # then self.assertEquals(actual_directory, stub_dir) self.storage.directory_get.assert_called_with([sha1_bin]) @istest def directory_ls_empty_result(self): # given sha1_bin = hashutil.hex_to_hash( '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') self.storage.directory_ls = MagicMock(return_value=[]) # when actual_directory = backend.directory_ls(sha1_bin) # then self.assertEquals(actual_directory, []) self.storage.directory_ls.assert_called_with(sha1_bin, False) @istest def directory_ls(self): # given sha1_bin = hashutil.hex_to_hash( '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') stub_dir_entries = [{ 'sha1': hashutil.hex_to_hash('5c6f0e2750f48fa0bd0c4cf5976ba0b9e0' '2ebda5'), 'sha256': hashutil.hex_to_hash('39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926'), 'sha1_git': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'target': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'dir_id': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'name': b'bob', 'type': 10, }] self.storage.directory_ls = MagicMock( return_value=stub_dir_entries) actual_directory = backend.directory_ls(sha1_bin, recursive=True) # then self.assertIsNotNone(actual_directory) self.assertEqual(list(actual_directory), stub_dir_entries) self.storage.directory_ls.assert_called_with(sha1_bin, True) @istest def release_get_not_found(self): # given sha1_bin = hashutil.hex_to_hash( '65a55bbdf3629f916219feb3dcc7393ded1bc8db') self.storage.release_get = MagicMock(return_value=[]) # when actual_release = backend.release_get(sha1_bin) # then self.assertIsNone(actual_release) self.storage.release_get.assert_called_with([sha1_bin]) @istest def release_get(self): # given sha1_bin = hashutil.hex_to_hash( '65a55bbdf3629f916219feb3dcc7393ded1bc8db') stub_releases = [{ 'id': sha1_bin, 'target': None, 'date': datetime.datetime(2015, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc), 'name': b'v0.0.1', 'message': b'synthetic release', 'synthetic': True, }] self.storage.release_get = MagicMock(return_value=stub_releases) # when actual_release = backend.release_get(sha1_bin) # then self.assertEqual(actual_release, stub_releases[0]) self.storage.release_get.assert_called_with([sha1_bin]) @istest def revision_get_by_not_found(self): # given self.storage.revision_get_by = MagicMock(return_value=[]) # when actual_revision = backend.revision_get_by(10, 'master', 'ts2') # then self.assertIsNone(actual_revision) self.storage.revision_get_by.assert_called_with(10, 'master', timestamp='ts2', limit=1) @istest def revision_get_by(self): # given self.storage.revision_get_by = MagicMock(return_value=[{'id': 1}]) # when actual_revisions = backend.revision_get_by(100, 'dev', 'ts') # then self.assertEquals(actual_revisions, {'id': 1}) self.storage.revision_get_by.assert_called_with(100, 'dev', timestamp='ts', limit=1) @istest def revision_get_not_found(self): # given sha1_bin = hashutil.hex_to_hash( '18d8be353ed3480476f032475e7c233eff7371d5') self.storage.revision_get = MagicMock(return_value=[]) # when actual_revision = backend.revision_get(sha1_bin) # then self.assertIsNone(actual_revision) self.storage.revision_get.assert_called_with([sha1_bin]) @istest def revision_get(self): # given sha1_bin = hashutil.hex_to_hash( '18d8be353ed3480476f032475e7c233eff7371d5') stub_revisions = [{ 'id': sha1_bin, 'directory': hashutil.hex_to_hash( '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'), 'author': { 'name': b'bill & boule', 'email': b'bill@boule.org', }, 'committer': { 'name': b'boule & bill', 'email': b'boule@bill.org', }, 'message': b'elegant fix for bug 31415957', 'date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'date_offset': 0, 'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'committer_date_offset': 0, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], }] self.storage.revision_get = MagicMock(return_value=stub_revisions) # when actual_revision = backend.revision_get(sha1_bin) # then self.assertEqual(actual_revision, stub_revisions[0]) self.storage.revision_get.assert_called_with([sha1_bin]) @istest def revision_get_multiple(self): # given sha1_bin = hashutil.hex_to_hash( '18d8be353ed3480476f032475e7c233eff7371d5') sha1_other = hashutil.hex_to_hash( 'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc') stub_revisions = [ { 'id': sha1_bin, 'directory': hashutil.hex_to_hash( '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'), 'author': { 'name': b'bill & boule', 'email': b'bill@boule.org', }, 'committer': { 'name': b'boule & bill', 'email': b'boule@bill.org', }, 'message': b'elegant fix for bug 31415957', 'date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'date_offset': 0, 'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'committer_date_offset': 0, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], }, { 'id': sha1_other, 'directory': hashutil.hex_to_hash( '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'), 'author': { 'name': b'name', 'email': b'name@surname.org', }, 'committer': { 'name': b'name', 'email': b'name@surname.org', }, 'message': b'ugly fix for bug 42', 'date': datetime.datetime(2000, 1, 12, 5, 23, 54), 'date_offset': 0, 'committer_date': datetime.datetime(2000, 1, 12, 5, 23, 54), 'committer_date_offset': 0, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], } ] self.storage.revision_get = MagicMock( return_value=stub_revisions) # when actual_revision = backend.revision_get_multiple([sha1_bin, sha1_other]) # then self.assertEqual(actual_revision, stub_revisions) self.storage.revision_get.assert_called_with( [sha1_bin, sha1_other]) @istest def revision_get_multiple_none_found(self): # given sha1_bin = hashutil.hex_to_hash( '18d8be353ed3480476f032475e7c233eff7371d5') sha1_other = hashutil.hex_to_hash( 'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc') self.storage.revision_get = MagicMock( return_value=[]) # when actual_revision = backend.revision_get_multiple([sha1_bin, sha1_other]) # then self.assertEqual(actual_revision, []) self.storage.revision_get.assert_called_with( [sha1_bin, sha1_other]) @istest def revision_log(self): # given sha1_bin = hashutil.hex_to_hash( '28d8be353ed3480476f032475e7c233eff7371d5') stub_revision_log = [{ 'id': sha1_bin, 'directory': hashutil.hex_to_hash( '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'), 'author': { 'name': b'bill & boule', 'email': b'bill@boule.org', }, 'committer': { 'name': b'boule & bill', 'email': b'boule@bill.org', }, 'message': b'elegant fix for bug 31415957', 'date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'date_offset': 0, 'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'committer_date_offset': 0, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], }] self.storage.revision_log = MagicMock(return_value=stub_revision_log) # when - actual_revision = backend.revision_log(sha1_bin) + actual_revision = backend.revision_log(sha1_bin, limit=1) # then self.assertEqual(list(actual_revision), stub_revision_log) - self.storage.revision_log.assert_called_with([sha1_bin], 100) + self.storage.revision_log.assert_called_with([sha1_bin], 1) @istest def revision_log_by(self): # given sha1_bin = hashutil.hex_to_hash( '28d8be353ed3480476f032475e7c233eff7371d5') stub_revision_log = [{ 'id': sha1_bin, 'directory': hashutil.hex_to_hash( '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'), 'author': { 'name': b'bill & boule', 'email': b'bill@boule.org', }, 'committer': { 'name': b'boule & bill', 'email': b'boule@bill.org', }, 'message': b'elegant fix for bug 31415957', 'date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'date_offset': 0, 'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'committer_date_offset': 0, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], }] self.storage.revision_log_by = MagicMock( return_value=stub_revision_log) # when - actual_log = backend.revision_log_by(1, 'refs/heads/master', None) + actual_log = backend.revision_log_by(1, 'refs/heads/master', + None, limit=1) # then self.assertEqual(actual_log, stub_revision_log) - self.storage.revision_log.assert_called_with([sha1_bin], 100) + self.storage.revision_log.assert_called_with([sha1_bin], 1) @istest def revision_log_by_norev(self): # given sha1_bin = hashutil.hex_to_hash( '28d8be353ed3480476f032475e7c233eff7371d5') self.storage.revision_log_by = MagicMock(return_value=None) # when - actual_log = backend.revision_log_by(1, 'refs/heads/master', None) + actual_log = backend.revision_log_by(1, 'refs/heads/master', + None, limit=1) # then self.assertEqual(actual_log, None) - self.storage.revision_log.assert_called_with([sha1_bin], 100) + self.storage.revision_log.assert_called_with([sha1_bin], 1) @istest def stat_counters(self): # given input_stats = { "content": 1770830, "directory": 211683, "directory_entry_dir": 209167, "directory_entry_file": 1807094, "directory_entry_rev": 0, "entity": 0, "entity_history": 0, "occurrence": 0, "occurrence_history": 19600, "origin": 1096, "person": 0, "release": 8584, "revision": 7792, "revision_history": 0, "skipped_content": 0 } self.storage.stat_counters = MagicMock(return_value=input_stats) # when actual_stats = backend.stat_counters() # then expected_stats = input_stats self.assertEqual(actual_stats, expected_stats) self.storage.stat_counters.assert_called_with() @istest def stat_origin_visits(self): # given expected_dates = [ { 'date': datetime.datetime( 2015, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc), 'origin': 1, 'visit': 1 }, { 'date': datetime.datetime( 2013, 7, 1, 20, 0, 0, tzinfo=datetime.timezone.utc), 'origin': 1, 'visit': 2 }, { 'date': datetime.datetime( 2015, 1, 1, 21, 0, 0, tzinfo=datetime.timezone.utc), 'origin': 1, 'visit': 3 } ] self.storage.origin_visit_get = MagicMock(return_value=expected_dates) # when actual_dates = backend.stat_origin_visits(5) # then self.assertEqual(actual_dates, expected_dates) self.storage.origin_visit_get.assert_called_with(5) @istest def directory_entry_get_by_path(self): # given stub_dir_entry = {'id': b'dir-id', 'type': 'dir', 'name': b'some/path/foo'} self.storage.directory_entry_get_by_path = MagicMock( return_value=stub_dir_entry) # when actual_dir_entry = backend.directory_entry_get_by_path(b'dir-sha1', 'some/path/foo') self.assertEquals(actual_dir_entry, stub_dir_entry) self.storage.directory_entry_get_by_path.assert_called_once_with( b'dir-sha1', [b'some', b'path', b'foo']) @istest def entity_get(self): # given stub_entities = [{'uuid': 'e8c3fc2e-a932-4fd7-8f8e-c40645eb35a7', 'parent': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2'}, {'uuid': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2', 'parent': None}] self.storage.entity_get = MagicMock(return_value=stub_entities) # when actual_entities = backend.entity_get( 'e8c3fc2e-a932-4fd7-8f8e-c40645eb35a7') # then self.assertEquals(actual_entities, stub_entities) self.storage.entity_get.assert_called_once_with( 'e8c3fc2e-a932-4fd7-8f8e-c40645eb35a7') diff --git a/swh/web/ui/tests/test_service.py b/swh/web/ui/tests/test_service.py index 88a959e99..0d19e35d8 100644 --- a/swh/web/ui/tests/test_service.py +++ b/swh/web/ui/tests/test_service.py @@ -1,1716 +1,1717 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from nose.tools import istest from unittest.mock import MagicMock, patch, call from swh.core.hashutil import hex_to_hash, hash_to_hex from swh.web.ui import service from swh.web.ui.exc import BadInputExc, NotFoundExc from swh.web.ui.tests import test_app class ServiceTestCase(test_app.SWHApiTestCase): def setUp(self): self.SHA1_SAMPLE = '18d8be353ed3480476f032475e7c233eff7371d5' self.SHA1_SAMPLE_BIN = hex_to_hash(self.SHA1_SAMPLE) self.SHA256_SAMPLE = ('39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926') self.SHA256_SAMPLE_BIN = hex_to_hash(self.SHA256_SAMPLE) self.SHA1GIT_SAMPLE = '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03' self.SHA1GIT_SAMPLE_BIN = hex_to_hash(self.SHA1GIT_SAMPLE) self.DIRECTORY_ID = '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6' self.DIRECTORY_ID_BIN = hex_to_hash(self.DIRECTORY_ID) self.AUTHOR_ID_BIN = { 'name': b'author', 'email': b'author@company.org', } self.AUTHOR_ID = { 'name': 'author', 'email': 'author@company.org', } self.COMMITTER_ID_BIN = { 'name': b'committer', 'email': b'committer@corp.org', } self.COMMITTER_ID = { 'name': 'committer', 'email': 'committer@corp.org', } self.SAMPLE_DATE_RAW = { 'timestamp': datetime.datetime( 2000, 1, 17, 11, 23, 54, tzinfo=datetime.timezone.utc, ).timestamp(), 'offset': 0, 'negative_utc': False, } self.SAMPLE_DATE = '2000-01-17T11:23:54+00:00' self.SAMPLE_MESSAGE_BIN = b'elegant fix for bug 31415957' self.SAMPLE_MESSAGE = 'elegant fix for bug 31415957' self.SAMPLE_REVISION = { 'id': self.SHA1_SAMPLE, 'directory': self.DIRECTORY_ID, 'author': self.AUTHOR_ID, 'committer': self.COMMITTER_ID, 'message': self.SAMPLE_MESSAGE, 'date': self.SAMPLE_DATE, 'committer_date': self.SAMPLE_DATE, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], 'merge': False } self.SAMPLE_REVISION_RAW = { 'id': self.SHA1_SAMPLE_BIN, 'directory': self.DIRECTORY_ID_BIN, 'author': self.AUTHOR_ID_BIN, 'committer': self.COMMITTER_ID_BIN, 'message': self.SAMPLE_MESSAGE_BIN, 'date': self.SAMPLE_DATE_RAW, 'committer_date': self.SAMPLE_DATE_RAW, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], } self.SAMPLE_CONTENT = { 'sha1': self.SHA1_SAMPLE, 'sha256': self.SHA256_SAMPLE, 'sha1_git': self.SHA1GIT_SAMPLE, 'length': 190, 'status': 'absent' } self.SAMPLE_CONTENT_RAW = { 'sha1': self.SHA1_SAMPLE_BIN, 'sha256': self.SHA256_SAMPLE_BIN, 'sha1_git': self.SHA1GIT_SAMPLE_BIN, 'length': 190, 'status': 'hidden' } @patch('swh.web.ui.service.backend') @istest def lookup_multiple_hashes_ball_missing(self, mock_backend): # given mock_backend.content_missing_per_sha1 = MagicMock(return_value=[]) # when actual_lookup = service.lookup_multiple_hashes( [{'filename': 'a', 'sha1': '456caf10e9535160d90e874b45aa426de762f19f'}, {'filename': 'b', 'sha1': '745bab676c8f3cec8016e0c39ea61cf57e518865'}]) # then self.assertEquals(actual_lookup, [ {'filename': 'a', 'sha1': '456caf10e9535160d90e874b45aa426de762f19f', 'found': True}, {'filename': 'b', 'sha1': '745bab676c8f3cec8016e0c39ea61cf57e518865', 'found': True} ]) @patch('swh.web.ui.service.backend') @istest def lookup_multiple_hashes_some_missing(self, mock_backend): # given mock_backend.content_missing_per_sha1 = MagicMock(return_value=[ hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f') ]) # when actual_lookup = service.lookup_multiple_hashes( [{'filename': 'a', 'sha1': '456caf10e9535160d90e874b45aa426de762f19f'}, {'filename': 'b', 'sha1': '745bab676c8f3cec8016e0c39ea61cf57e518865'}]) # then self.assertEquals(actual_lookup, [ {'filename': 'a', 'sha1': '456caf10e9535160d90e874b45aa426de762f19f', 'found': False}, {'filename': 'b', 'sha1': '745bab676c8f3cec8016e0c39ea61cf57e518865', 'found': True} ]) @patch('swh.web.ui.service.backend') @istest def lookup_hash_does_not_exist(self, mock_backend): # given mock_backend.content_find = MagicMock(return_value=None) # when actual_lookup = service.lookup_hash( 'sha1_git:123caf10e9535160d90e874b45aa426de762f19f') # then self.assertEquals({'found': None, 'algo': 'sha1_git'}, actual_lookup) # check the function has been called with parameters mock_backend.content_find.assert_called_with( 'sha1_git', hex_to_hash('123caf10e9535160d90e874b45aa426de762f19f')) @patch('swh.web.ui.service.backend') @istest def lookup_hash_exist(self, mock_backend): # given stub_content = { 'sha1': hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f') } mock_backend.content_find = MagicMock(return_value=stub_content) # when actual_lookup = service.lookup_hash( 'sha1:456caf10e9535160d90e874b45aa426de762f19f') # then self.assertEquals({'found': stub_content, 'algo': 'sha1'}, actual_lookup) mock_backend.content_find.assert_called_with( 'sha1', hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f'), ) @patch('swh.web.ui.service.backend') @istest def search_hash_does_not_exist(self, mock_backend): # given mock_backend.content_find = MagicMock(return_value=None) # when actual_lookup = service.search_hash( 'sha1_git:123caf10e9535160d90e874b45aa426de762f19f') # then self.assertEquals({'found': False}, actual_lookup) # check the function has been called with parameters mock_backend.content_find.assert_called_with( 'sha1_git', hex_to_hash('123caf10e9535160d90e874b45aa426de762f19f')) @patch('swh.web.ui.service.backend') @istest def search_hash_exist(self, mock_backend): # given stub_content = { 'sha1': hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f') } mock_backend.content_find = MagicMock(return_value=stub_content) # when actual_lookup = service.search_hash( 'sha1:456caf10e9535160d90e874b45aa426de762f19f') # then self.assertEquals({'found': True}, actual_lookup) mock_backend.content_find.assert_called_with( 'sha1', hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f'), ) @patch('swh.web.ui.service.backend') @istest def lookup_hash_origin(self, mock_backend): # given mock_backend.content_find_occurrence = MagicMock(return_value={ 'origin_type': 'sftp', 'origin_url': 'sftp://ftp.gnu.org/gnu/octave', 'branch': 'octavio-3.4.0.tar.gz', 'revision': b'\xb0L\xaf\x10\xe9SQ`\xd9\x0e\x87KE\xaaBm\xe7b\xf1\x9f', # noqa 'path': b'octavio-3.4.0/doc/interpreter/octave.html/doc_002dS_005fISREG.html' # noqa }) expected_origin = { 'origin_type': 'sftp', 'origin_url': 'sftp://ftp.gnu.org/gnu/octave', 'branch': 'octavio-3.4.0.tar.gz', 'revision': 'b04caf10e9535160d90e874b45aa426de762f19f', 'path': 'octavio-3.4.0/doc/interpreter/octave.html/doc' '_002dS_005fISREG.html' } # when actual_origin = service.lookup_hash_origin( 'sha1_git:456caf10e9535160d90e874b45aa426de762f19f') # then self.assertEqual(actual_origin, expected_origin) mock_backend.content_find_occurrence.assert_called_with( 'sha1_git', hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')) @patch('swh.web.ui.service.backend') @istest def stat_counters(self, mock_backend): # given input_stats = { "content": 1770830, "directory": 211683, "directory_entry_dir": 209167, "directory_entry_file": 1807094, "directory_entry_rev": 0, "entity": 0, "entity_history": 0, "occurrence": 0, "occurrence_history": 19600, "origin": 1096, "person": 0, "release": 8584, "revision": 7792, "revision_history": 0, "skipped_content": 0 } mock_backend.stat_counters = MagicMock(return_value=input_stats) # when actual_stats = service.stat_counters() # then expected_stats = input_stats self.assertEqual(actual_stats, expected_stats) mock_backend.stat_counters.assert_called_with() @patch('swh.web.ui.service.backend') @istest def stat_origin_visits(self, mock_backend): # given stub_result = [ { 'date': datetime.datetime( 2015, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc), 'origin': 1, 'visit': 1 }, { 'date': datetime.datetime( 2013, 7, 1, 20, 0, 0, tzinfo=datetime.timezone.utc), 'origin': 1, 'visit': 2 }, { 'date': datetime.datetime( 2015, 1, 1, 21, 0, 0, tzinfo=datetime.timezone.utc), 'origin': 1, 'visit': 3 } ] mock_backend.stat_origin_visits.return_value = stub_result # when expected_dates = [ { 'date': datetime.datetime( 2015, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc).timestamp(), 'origin': 1, 'visit': 1 }, { 'date': datetime.datetime( 2013, 7, 1, 20, 0, 0, tzinfo=datetime.timezone.utc).timestamp(), 'origin': 1, 'visit': 2 }, { 'date': datetime.datetime( 2015, 1, 1, 21, 0, 0, tzinfo=datetime.timezone.utc).timestamp(), 'origin': 1, 'visit': 3 } ] actual_dates = service.stat_origin_visits(6) # then self.assertEqual(expected_dates, list(actual_dates)) mock_backend.stat_origin_visits.assert_called_once_with(6) @patch('swh.web.ui.service.backend') @istest def lookup_origin(self, mock_backend): # given mock_backend.origin_get = MagicMock(return_value={ 'id': 'origin-id', 'lister': 'uuid-lister', 'project': 'uuid-project', 'url': 'ftp://some/url/to/origin', 'type': 'ftp'}) # when actual_origin = service.lookup_origin('origin-id') # then self.assertEqual(actual_origin, {'id': 'origin-id', 'lister': 'uuid-lister', 'project': 'uuid-project', 'url': 'ftp://some/url/to/origin', 'type': 'ftp'}) mock_backend.origin_get.assert_called_with('origin-id') @patch('swh.web.ui.service.backend') @istest def lookup_release_ko_id_checksum_not_ok_because_not_a_sha1(self, mock_backend): # given mock_backend.release_get = MagicMock() with self.assertRaises(BadInputExc) as cm: # when service.lookup_release('not-a-sha1') self.assertIn('invalid checksum', cm.exception.args[0]) mock_backend.release_get.called = False @patch('swh.web.ui.service.backend') @istest def lookup_release_ko_id_checksum_ok_but_not_a_sha1(self, mock_backend): # given mock_backend.release_get = MagicMock() # when with self.assertRaises(BadInputExc) as cm: service.lookup_release( '13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f62d4daf5' '1aea892abe') self.assertIn('sha1_git supported', cm.exception.args[0]) mock_backend.release_get.called = False @patch('swh.web.ui.service.backend') @istest def lookup_directory_with_path_not_found(self, mock_backend): # given mock_backend.lookup_directory_with_path = MagicMock(return_value=None) sha1_git = '65a55bbdf3629f916219feb3dcc7393ded1bc8db' # when actual_directory = mock_backend.lookup_directory_with_path( sha1_git, 'some/path/here') self.assertIsNone(actual_directory) @patch('swh.web.ui.service.backend') @istest def lookup_directory_with_path_found(self, mock_backend): # given sha1_git = '65a55bbdf3629f916219feb3dcc7393ded1bc8db' entry = {'id': 'dir-id', 'type': 'dir', 'name': 'some/path/foo'} mock_backend.lookup_directory_with_path = MagicMock(return_value=entry) # when actual_directory = mock_backend.lookup_directory_with_path( sha1_git, 'some/path/here') self.assertEqual(entry, actual_directory) @patch('swh.web.ui.service.backend') @istest def lookup_release(self, mock_backend): # given mock_backend.release_get = MagicMock(return_value={ 'id': hex_to_hash('65a55bbdf3629f916219feb3dcc7393ded1bc8db'), 'target': None, 'date': { 'timestamp': datetime.datetime( 2015, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': True, }, 'name': b'v0.0.1', 'message': b'synthetic release', 'synthetic': True, }) # when actual_release = service.lookup_release( '65a55bbdf3629f916219feb3dcc7393ded1bc8db') # then self.assertEqual(actual_release, { 'id': '65a55bbdf3629f916219feb3dcc7393ded1bc8db', 'target': None, 'date': '2015-01-01T22:00:00-00:00', 'name': 'v0.0.1', 'message': 'synthetic release', 'synthetic': True, }) mock_backend.release_get.assert_called_with( hex_to_hash('65a55bbdf3629f916219feb3dcc7393ded1bc8db')) @istest def lookup_revision_with_context_ko_not_a_sha1_1(self): # given sha1_git = '13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f62d4' \ 'daf51aea892abe' sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db' # when with self.assertRaises(BadInputExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Only sha1_git is supported', cm.exception.args[0]) @istest def lookup_revision_with_context_ko_not_a_sha1_2(self): # given sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db' sha1_git = '13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f6' \ '2d4daf51aea892abe' # when with self.assertRaises(BadInputExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Only sha1_git is supported', cm.exception.args[0]) @patch('swh.web.ui.service.backend') @istest def lookup_revision_with_context_ko_sha1_git_does_not_exist( self, mock_backend): # given sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db' sha1_git = '777777bdf3629f916219feb3dcc7393ded1bc8db' sha1_git_bin = hex_to_hash(sha1_git) mock_backend.revision_get.return_value = None # when with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Revision 777777bdf3629f916219feb3dcc7393ded1bc8db' ' not found', cm.exception.args[0]) mock_backend.revision_get.assert_called_once_with( sha1_git_bin) @patch('swh.web.ui.service.backend') @istest def lookup_revision_with_context_ko_root_sha1_git_does_not_exist( self, mock_backend): # given sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db' sha1_git = '777777bdf3629f916219feb3dcc7393ded1bc8db' sha1_git_root_bin = hex_to_hash(sha1_git_root) sha1_git_bin = hex_to_hash(sha1_git) mock_backend.revision_get.side_effect = ['foo', None] # when with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Revision 65a55bbdf3629f916219feb3dcc7393ded1bc8db' ' not found', cm.exception.args[0]) mock_backend.revision_get.assert_has_calls([call(sha1_git_bin), call(sha1_git_root_bin)]) @patch('swh.web.ui.service.backend') @patch('swh.web.ui.service.query') @istest def lookup_revision_with_context(self, mock_query, mock_backend): # given sha1_git_root = '666' sha1_git = '883' sha1_git_root_bin = b'666' sha1_git_bin = b'883' sha1_git_root_dict = { 'id': sha1_git_root_bin, 'parents': [b'999'], } sha1_git_dict = { 'id': sha1_git_bin, 'parents': [], 'directory': b'278', } stub_revisions = [ sha1_git_root_dict, { 'id': b'999', 'parents': [b'777', b'883', b'888'], }, { 'id': b'777', 'parents': [b'883'], }, sha1_git_dict, { 'id': b'888', 'parents': [b'889'], }, { 'id': b'889', 'parents': [], }, ] # inputs ok mock_query.parse_hash_with_algorithms_or_throws.side_effect = [ ('sha1', sha1_git_bin), ('sha1', sha1_git_root_bin) ] # lookup revision first 883, then 666 (both exists) mock_backend.revision_get.side_effect = [ sha1_git_dict, sha1_git_root_dict ] mock_backend.revision_log = MagicMock( return_value=stub_revisions) # when actual_revision = service.lookup_revision_with_context( sha1_git_root, sha1_git) # then self.assertEquals(actual_revision, { 'id': hash_to_hex(sha1_git_bin), 'parents': [], 'children': [hash_to_hex(b'999'), hash_to_hex(b'777')], 'directory': hash_to_hex(b'278'), 'merge': False }) mock_query.parse_hash_with_algorithms_or_throws.assert_has_calls( [call(sha1_git, ['sha1'], 'Only sha1_git is supported.'), call(sha1_git_root, ['sha1'], 'Only sha1_git is supported.')]) mock_backend.revision_log.assert_called_with( sha1_git_root_bin, 100) @patch('swh.web.ui.service.backend') @patch('swh.web.ui.service.query') @istest def lookup_revision_with_context_sha1_git_root_already_retrieved_as_dict( self, mock_query, mock_backend): # given sha1_git = '883' sha1_git_root_bin = b'666' sha1_git_bin = b'883' sha1_git_root_dict = { 'id': sha1_git_root_bin, 'parents': [b'999'], } sha1_git_dict = { 'id': sha1_git_bin, 'parents': [], 'directory': b'278', } stub_revisions = [ sha1_git_root_dict, { 'id': b'999', 'parents': [b'777', b'883', b'888'], }, { 'id': b'777', 'parents': [b'883'], }, sha1_git_dict, { 'id': b'888', 'parents': [b'889'], }, { 'id': b'889', 'parents': [], }, ] # inputs ok mock_query.parse_hash_with_algorithms_or_throws.return_value = ( 'sha1', sha1_git_bin) # lookup only on sha1 mock_backend.revision_get.return_value = sha1_git_dict mock_backend.revision_log.return_value = stub_revisions # when actual_revision = service.lookup_revision_with_context( {'id': sha1_git_root_bin}, sha1_git) # then self.assertEquals(actual_revision, { 'id': hash_to_hex(sha1_git_bin), 'parents': [], 'children': [hash_to_hex(b'999'), hash_to_hex(b'777')], 'directory': hash_to_hex(b'278'), 'merge': False }) mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with( # noqa sha1_git, ['sha1'], 'Only sha1_git is supported.') mock_backend.revision_get.assert_called_once_with(sha1_git_bin) mock_backend.revision_log.assert_called_with( sha1_git_root_bin, 100) @patch('swh.web.ui.service.backend') @patch('swh.web.ui.service.query') @istest def lookup_directory_with_revision_ko_revision_not_found(self, mock_query, mock_backend): # given mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1', b'123') mock_backend.revision_get.return_value = None # when with self.assertRaises(NotFoundExc) as cm: service.lookup_directory_with_revision('123') self.assertIn('Revision 123 not found', cm.exception.args[0]) mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with ('123', ['sha1'], 'Only sha1_git is supported.') mock_backend.revision_get.assert_called_once_with(b'123') @patch('swh.web.ui.service.backend') @patch('swh.web.ui.service.query') @istest def lookup_directory_with_revision_ko_revision_with_path_to_nowhere( self, mock_query, mock_backend): # given mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1', b'123') dir_id = b'dir-id-as-sha1' mock_backend.revision_get.return_value = { 'directory': dir_id, } mock_backend.directory_entry_get_by_path.return_value = None # when with self.assertRaises(NotFoundExc) as cm: service.lookup_directory_with_revision( '123', 'path/to/something/unknown') self.assertIn("Directory/File 'path/to/something/unknown' " + "pointed to by revision 123 not found", cm.exception.args[0]) mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with ('123', ['sha1'], 'Only sha1_git is supported.') mock_backend.revision_get.assert_called_once_with(b'123') mock_backend.directory_entry_get_by_path.assert_called_once_with( b'dir-id-as-sha1', 'path/to/something/unknown') @patch('swh.web.ui.service.backend') @patch('swh.web.ui.service.query') @istest def lookup_directory_with_revision_ko_type_not_implemented( self, mock_query, mock_backend): # given mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1', b'123') dir_id = b'dir-id-as-sha1' mock_backend.revision_get.return_value = { 'directory': dir_id, } mock_backend.directory_entry_get_by_path.return_value = { 'type': 'rev', 'name': b'some/path/to/rev', 'target': b'456' } stub_content = { 'id': b'12', 'type': 'file' } mock_backend.content_get.return_value = stub_content # when with self.assertRaises(NotImplementedError) as cm: service.lookup_directory_with_revision( '123', 'some/path/to/rev') self.assertIn("Entity of type 'rev' not implemented.", cm.exception.args[0]) # then mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with ('123', ['sha1'], 'Only sha1_git is supported.') mock_backend.revision_get.assert_called_once_with(b'123') mock_backend.directory_entry_get_by_path.assert_called_once_with( b'dir-id-as-sha1', 'some/path/to/rev') @patch('swh.web.ui.service.backend') @patch('swh.web.ui.service.query') @istest def lookup_directory_with_revision_revision_without_path(self, mock_query, mock_backend): # given mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1', b'123') dir_id = b'dir-id-as-sha1' mock_backend.revision_get.return_value = { 'directory': dir_id, } stub_dir_entries = [{ 'id': b'123', 'type': 'dir' }, { 'id': b'456', 'type': 'file' }] mock_backend.directory_ls.return_value = stub_dir_entries # when actual_directory_entries = service.lookup_directory_with_revision( '123') self.assertEqual(actual_directory_entries['type'], 'dir') self.assertEqual(list(actual_directory_entries['content']), stub_dir_entries) mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with ('123', ['sha1'], 'Only sha1_git is supported.') mock_backend.revision_get.assert_called_once_with(b'123') mock_backend.directory_ls.assert_called_once_with(dir_id) @patch('swh.web.ui.service.backend') @patch('swh.web.ui.service.query') @istest def lookup_directory_with_revision_revision_with_path_to_dir(self, mock_query, mock_backend): # given mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1', b'123') dir_id = b'dir-id-as-sha1' mock_backend.revision_get.return_value = { 'directory': dir_id, } stub_dir_entries = [{ 'id': b'12', 'type': 'dir' }, { 'id': b'34', 'type': 'file' }] mock_backend.directory_entry_get_by_path.return_value = { 'type': 'dir', 'name': b'some/path', 'target': b'456' } mock_backend.directory_ls.return_value = stub_dir_entries # when actual_directory_entries = service.lookup_directory_with_revision( '123', 'some/path') self.assertEqual(actual_directory_entries['type'], 'dir') self.assertEqual(actual_directory_entries['revision'], '123') self.assertEqual(actual_directory_entries['path'], 'some/path') self.assertEqual(list(actual_directory_entries['content']), stub_dir_entries) mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with ('123', ['sha1'], 'Only sha1_git is supported.') mock_backend.revision_get.assert_called_once_with(b'123') mock_backend.directory_entry_get_by_path.assert_called_once_with( dir_id, 'some/path') mock_backend.directory_ls.assert_called_once_with(b'456') @patch('swh.web.ui.service.backend') @patch('swh.web.ui.service.query') @istest def lookup_directory_with_revision_revision_with_path_to_file_without_data( self, mock_query, mock_backend): # given mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1', b'123') dir_id = b'dir-id-as-sha1' mock_backend.revision_get.return_value = { 'directory': dir_id, } mock_backend.directory_entry_get_by_path.return_value = { 'type': 'file', 'name': b'some/path/to/file', 'target': b'789' } stub_content = { 'status': 'visible', } mock_backend.content_find.return_value = stub_content # when actual_content = service.lookup_directory_with_revision( '123', 'some/path/to/file') # then self.assertEqual(actual_content, {'type': 'file', 'revision': '123', 'path': 'some/path/to/file', 'content': stub_content}) mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with ('123', ['sha1'], 'Only sha1_git is supported.') mock_backend.revision_get.assert_called_once_with(b'123') mock_backend.directory_entry_get_by_path.assert_called_once_with( b'dir-id-as-sha1', 'some/path/to/file') mock_backend.content_find.assert_called_once_with('sha1_git', b'789') @patch('swh.web.ui.service.backend') @patch('swh.web.ui.service.query') @istest def lookup_directory_with_revision_revision_with_path_to_file_with_data( self, mock_query, mock_backend): # given mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1', b'123') dir_id = b'dir-id-as-sha1' mock_backend.revision_get.return_value = { 'directory': dir_id, } mock_backend.directory_entry_get_by_path.return_value = { 'type': 'file', 'name': b'some/path/to/file', 'target': b'789' } stub_content = { 'status': 'visible', 'sha1': b'content-sha1' } mock_backend.content_find.return_value = stub_content mock_backend.content_get.return_value = { 'sha1': b'content-sha1', 'data': b'some raw data' } expected_content = { 'status': 'visible', 'sha1': hash_to_hex(b'content-sha1'), 'data': b'some raw data' } # when actual_content = service.lookup_directory_with_revision( '123', 'some/path/to/file', with_data=True) # then self.assertEqual(actual_content, {'type': 'file', 'revision': '123', 'path': 'some/path/to/file', 'content': expected_content}) mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with ('123', ['sha1'], 'Only sha1_git is supported.') mock_backend.revision_get.assert_called_once_with(b'123') mock_backend.directory_entry_get_by_path.assert_called_once_with( b'dir-id-as-sha1', 'some/path/to/file') mock_backend.content_find.assert_called_once_with('sha1_git', b'789') mock_backend.content_get.assert_called_once_with(b'content-sha1') @patch('swh.web.ui.service.backend') @istest def lookup_revision(self, mock_backend): # given mock_backend.revision_get = MagicMock( return_value=self.SAMPLE_REVISION_RAW) # when actual_revision = service.lookup_revision( self.SHA1_SAMPLE) # then self.assertEqual(actual_revision, self.SAMPLE_REVISION) mock_backend.revision_get.assert_called_with( self.SHA1_SAMPLE_BIN) @patch('swh.web.ui.service.backend') @istest def lookup_revision_invalid_msg(self, mock_backend): # given stub_rev = self.SAMPLE_REVISION_RAW stub_rev['message'] = b'elegant fix for bug \xff' expected_revision = self.SAMPLE_REVISION expected_revision['message'] = None expected_revision['message_decoding_failed'] = True mock_backend.revision_get = MagicMock(return_value=stub_rev) # when actual_revision = service.lookup_revision( self.SHA1_SAMPLE) # then self.assertEqual(actual_revision, expected_revision) mock_backend.revision_get.assert_called_with( self.SHA1_SAMPLE_BIN) @patch('swh.web.ui.service.backend') @istest def lookup_revision_msg_ok(self, mock_backend): # given mock_backend.revision_get.return_value = self.SAMPLE_REVISION_RAW # when rv = service.lookup_revision_message( self.SHA1_SAMPLE) # then self.assertEquals(rv, {'message': self.SAMPLE_MESSAGE_BIN}) mock_backend.revision_get.assert_called_with( self.SHA1_SAMPLE_BIN) @patch('swh.web.ui.service.backend') @istest def lookup_revision_msg_absent(self, mock_backend): # given stub_revision = self.SAMPLE_REVISION_RAW del stub_revision['message'] mock_backend.revision_get.return_value = stub_revision # when with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_message( self.SHA1_SAMPLE) # then mock_backend.revision_get.assert_called_with( self.SHA1_SAMPLE_BIN) self.assertEqual(cm.exception.args[0], 'No message for revision ' 'with sha1_git ' '18d8be353ed3480476f032475e7c233eff7371d5.') @patch('swh.web.ui.service.backend') @istest def lookup_revision_msg_norev(self, mock_backend): # given mock_backend.revision_get.return_value = None # when with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_message( self.SHA1_SAMPLE) # then mock_backend.revision_get.assert_called_with( self.SHA1_SAMPLE_BIN) self.assertEqual(cm.exception.args[0], 'Revision with sha1_git ' '18d8be353ed3480476f032475e7c233eff7371d5 ' 'not found.') @patch('swh.web.ui.service.backend') @istest def lookup_revision_multiple(self, mock_backend): # given sha1 = self.SHA1_SAMPLE sha1_other = 'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc' stub_revisions = [ self.SAMPLE_REVISION_RAW, { 'id': hex_to_hash(sha1_other), 'directory': 'abcdbe353ed3480476f032475e7c233eff7371d5', 'author': { 'name': b'name', 'email': b'name@surname.org', }, 'committer': { 'name': b'name', 'email': b'name@surname.org', }, 'message': b'ugly fix for bug 42', 'date': { 'timestamp': datetime.datetime( 2000, 1, 12, 5, 23, 54, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': False }, 'date_offset': 0, 'committer_date': { 'timestamp': datetime.datetime( 2000, 1, 12, 5, 23, 54, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': False }, 'committer_date_offset': 0, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], } ] mock_backend.revision_get_multiple.return_value = stub_revisions # when actual_revisions = service.lookup_revision_multiple( [sha1, sha1_other]) # then self.assertEqual(list(actual_revisions), [ self.SAMPLE_REVISION, { 'id': sha1_other, 'directory': 'abcdbe353ed3480476f032475e7c233eff7371d5', 'author': { 'name': 'name', 'email': 'name@surname.org', }, 'committer': { 'name': 'name', 'email': 'name@surname.org', }, 'message': 'ugly fix for bug 42', 'date': '2000-01-12T05:23:54+00:00', 'date_offset': 0, 'committer_date': '2000-01-12T05:23:54+00:00', 'committer_date_offset': 0, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], 'merge': False } ]) self.assertEqual( list(mock_backend.revision_get_multiple.call_args[0][0]), [hex_to_hash(sha1), hex_to_hash(sha1_other)]) @patch('swh.web.ui.service.backend') @istest def lookup_revision_multiple_none_found(self, mock_backend): # given sha1_bin = self.SHA1_SAMPLE sha1_other = 'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc' mock_backend.revision_get_multiple.return_value = [] # then actual_revisions = service.lookup_revision_multiple( [sha1_bin, sha1_other]) self.assertEqual(list(actual_revisions), []) self.assertEqual( list(mock_backend.revision_get_multiple.call_args[0][0]), [hex_to_hash(self.SHA1_SAMPLE), hex_to_hash(sha1_other)]) @patch('swh.web.ui.service.backend') @istest def lookup_revision_log(self, mock_backend): # given stub_revision_log = [self.SAMPLE_REVISION_RAW] mock_backend.revision_log = MagicMock(return_value=stub_revision_log) # when actual_revision = service.lookup_revision_log( - 'abcdbe353ed3480476f032475e7c233eff7371d5') + 'abcdbe353ed3480476f032475e7c233eff7371d5', + limit=25) # then self.assertEqual(list(actual_revision), [self.SAMPLE_REVISION]) mock_backend.revision_log.assert_called_with( hex_to_hash('abcdbe353ed3480476f032475e7c233eff7371d5'), 25) @patch('swh.web.ui.service.backend') @istest def lookup_revision_log_by(self, mock_backend): # given stub_revision_log = [self.SAMPLE_REVISION_RAW] mock_backend.revision_log_by = MagicMock( return_value=stub_revision_log) # when actual_log = service.lookup_revision_log_by( 1, 'refs/heads/master', None, limit=100) # then self.assertEqual(list(actual_log), [self.SAMPLE_REVISION]) mock_backend.revision_log_by.assert_called_with( 1, 'refs/heads/master', None, 100) @patch('swh.web.ui.service.backend') @istest def lookup_revision_log_by_nolog(self, mock_backend): # given mock_backend.revision_log_by = MagicMock(return_value=None) # when res = service.lookup_revision_log_by( 1, 'refs/heads/master', None, limit=100) # then self.assertEquals(res, None) mock_backend.revision_log_by.assert_called_with( 1, 'refs/heads/master', None, 100) @patch('swh.web.ui.service.backend') @istest def lookup_content_raw_not_found(self, mock_backend): # given mock_backend.content_find = MagicMock(return_value=None) # when actual_content = service.lookup_content_raw( 'sha1:18d8be353ed3480476f032475e7c233eff7371d5') # then self.assertIsNone(actual_content) mock_backend.content_find.assert_called_with( 'sha1', hex_to_hash(self.SHA1_SAMPLE)) @patch('swh.web.ui.service.backend') @istest def lookup_content_raw(self, mock_backend): # given mock_backend.content_find = MagicMock(return_value={ 'sha1': self.SHA1_SAMPLE, }) mock_backend.content_get = MagicMock(return_value={ 'data': b'binary data'}) # when actual_content = service.lookup_content_raw( 'sha256:%s' % self.SHA256_SAMPLE) # then self.assertEquals(actual_content, {'data': b'binary data'}) mock_backend.content_find.assert_called_once_with( 'sha256', self.SHA256_SAMPLE_BIN) mock_backend.content_get.assert_called_once_with( self.SHA1_SAMPLE) @patch('swh.web.ui.service.backend') @istest def lookup_content_not_found(self, mock_backend): # given mock_backend.content_find = MagicMock(return_value=None) # when actual_content = service.lookup_content( 'sha1:%s' % self.SHA1_SAMPLE) # then self.assertIsNone(actual_content) mock_backend.content_find.assert_called_with( 'sha1', self.SHA1_SAMPLE_BIN) @patch('swh.web.ui.service.backend') @istest def lookup_content_with_sha1(self, mock_backend): # given mock_backend.content_find = MagicMock( return_value=self.SAMPLE_CONTENT_RAW) # when actual_content = service.lookup_content( 'sha1:%s' % self.SHA1_SAMPLE) # then self.assertEqual(actual_content, self.SAMPLE_CONTENT) mock_backend.content_find.assert_called_with( 'sha1', hex_to_hash(self.SHA1_SAMPLE)) @patch('swh.web.ui.service.backend') @istest def lookup_content_with_sha256(self, mock_backend): # given stub_content = self.SAMPLE_CONTENT_RAW stub_content['status'] = 'visible' expected_content = self.SAMPLE_CONTENT expected_content['status'] = 'visible' mock_backend.content_find = MagicMock( return_value=stub_content) # when actual_content = service.lookup_content( 'sha256:%s' % self.SHA256_SAMPLE) # then self.assertEqual(actual_content, expected_content) mock_backend.content_find.assert_called_with( 'sha256', self.SHA256_SAMPLE_BIN) @patch('swh.web.ui.service.backend') @istest def lookup_person(self, mock_backend): # given mock_backend.person_get = MagicMock(return_value={ 'id': 'person_id', 'name': b'some_name', 'email': b'some-email', }) # when actual_person = service.lookup_person('person_id') # then self.assertEqual(actual_person, { 'id': 'person_id', 'name': 'some_name', 'email': 'some-email', }) mock_backend.person_get.assert_called_with('person_id') @patch('swh.web.ui.service.backend') @istest def lookup_directory_bad_checksum(self, mock_backend): # given mock_backend.directory_ls = MagicMock() # when with self.assertRaises(BadInputExc): service.lookup_directory('directory_id') # then mock_backend.directory_ls.called = False @patch('swh.web.ui.service.backend') @patch('swh.web.ui.service.query') @istest def lookup_directory_not_found(self, mock_query, mock_backend): # given mock_query.parse_hash_with_algorithms_or_throws.return_value = ( 'sha1', 'directory-id-bin') mock_backend.directory_get.return_value = None # when actual_dir = service.lookup_directory('directory_id') # then self.assertIsNone(actual_dir) mock_query.parse_hash_with_algorithms_or_throws.assert_called_with( 'directory_id', ['sha1'], 'Only sha1_git is supported.') mock_backend.directory_get.assert_called_with('directory-id-bin') mock_backend.directory_ls.called = False @patch('swh.web.ui.service.backend') @patch('swh.web.ui.service.query') @istest def lookup_directory(self, mock_query, mock_backend): mock_query.parse_hash_with_algorithms_or_throws.return_value = ( 'sha1', 'directory-sha1-bin') # something that exists is all that matters here mock_backend.directory_get.return_value = {'id': b'directory-sha1-bin'} # given stub_dir_entries = [{ 'sha1': self.SHA1_SAMPLE_BIN, 'sha256': self.SHA256_SAMPLE_BIN, 'sha1_git': self.SHA1GIT_SAMPLE_BIN, 'target': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'dir_id': self.DIRECTORY_ID_BIN, 'name': b'bob', 'type': 10, }] expected_dir_entries = [{ 'sha1': self.SHA1_SAMPLE, 'sha256': self.SHA256_SAMPLE, 'sha1_git': self.SHA1GIT_SAMPLE, 'target': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'dir_id': self.DIRECTORY_ID, 'name': 'bob', 'type': 10, }] mock_backend.directory_ls.return_value = stub_dir_entries # when actual_directory_ls = list(service.lookup_directory( 'directory-sha1')) # then self.assertEqual(actual_directory_ls, expected_dir_entries) mock_query.parse_hash_with_algorithms_or_throws.assert_called_with( 'directory-sha1', ['sha1'], 'Only sha1_git is supported.') mock_backend.directory_ls.assert_called_with( 'directory-sha1-bin') @patch('swh.web.ui.service.backend') @istest def lookup_revision_by_nothing_found(self, mock_backend): # given mock_backend.revision_get_by.return_value = None # when actual_revisions = service.lookup_revision_by(1) # then self.assertIsNone(actual_revisions) mock_backend.revision_get_by(1, 'master', None) @patch('swh.web.ui.service.backend') @istest def lookup_revision_by(self, mock_backend): # given stub_rev = self.SAMPLE_REVISION_RAW expected_rev = self.SAMPLE_REVISION mock_backend.revision_get_by.return_value = stub_rev # when actual_revision = service.lookup_revision_by(10, 'master2', 'some-ts') # then self.assertEquals(actual_revision, expected_rev) mock_backend.revision_get_by(1, 'master2', 'some-ts') @patch('swh.web.ui.service.backend') @istest def lookup_revision_by_nomerge(self, mock_backend): # given stub_rev = self.SAMPLE_REVISION_RAW stub_rev['parents'] = [ hex_to_hash('adc83b19e793491b1c6ea0fd8b46cd9f32e592fc')] expected_rev = self.SAMPLE_REVISION expected_rev['parents'] = ['adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'] mock_backend.revision_get_by.return_value = stub_rev # when actual_revision = service.lookup_revision_by(10, 'master2', 'some-ts') # then self.assertEquals(actual_revision, expected_rev) mock_backend.revision_get_by(1, 'master2', 'some-ts') @patch('swh.web.ui.service.backend') @istest def lookup_revision_by_merge(self, mock_backend): # given stub_rev = self.SAMPLE_REVISION_RAW stub_rev['parents'] = [ hex_to_hash('adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'), hex_to_hash('ffff3b19e793491b1c6db0fd8b46cd9f32e592fc') ] expected_rev = self.SAMPLE_REVISION expected_rev['parents'] = [ 'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc', 'ffff3b19e793491b1c6db0fd8b46cd9f32e592fc' ] expected_rev['merge'] = True mock_backend.revision_get_by.return_value = stub_rev # when actual_revision = service.lookup_revision_by(10, 'master2', 'some-ts') # then self.assertEquals(actual_revision, expected_rev) mock_backend.revision_get_by(1, 'master2', 'some-ts') @patch('swh.web.ui.service.backend') @istest def lookup_revision_with_context_by_ko(self, mock_backend): # given mock_backend.revision_get_by.return_value = None # when with self.assertRaises(NotFoundExc) as cm: origin_id = 1 branch_name = 'master3' ts = None service.lookup_revision_with_context_by(origin_id, branch_name, ts, 'sha1') # then self.assertIn( 'Revision with (origin_id: %s, branch_name: %s' ', ts: %s) not found.' % (origin_id, branch_name, ts), cm.exception.args[0]) mock_backend.revision_get_by.assert_called_once_with( origin_id, branch_name, ts) @patch('swh.web.ui.service.lookup_revision_with_context') @patch('swh.web.ui.service.backend') @istest def lookup_revision_with_context_by(self, mock_backend, mock_lookup_revision_with_context): # given stub_root_rev = {'id': 'root-rev-id'} mock_backend.revision_get_by.return_value = {'id': 'root-rev-id'} stub_rev = {'id': 'rev-found'} mock_lookup_revision_with_context.return_value = stub_rev # when origin_id = 1 branch_name = 'master3' ts = None sha1_git = 'sha1' actual_root_rev, actual_rev = service.lookup_revision_with_context_by( origin_id, branch_name, ts, sha1_git) # then self.assertEquals(actual_root_rev, stub_root_rev) self.assertEquals(actual_rev, stub_rev) mock_backend.revision_get_by.assert_called_once_with( origin_id, branch_name, ts) mock_lookup_revision_with_context.assert_called_once_with( stub_root_rev, sha1_git, 100) @patch('swh.web.ui.service.backend') @patch('swh.web.ui.service.query') @istest def lookup_entity_by_uuid(self, mock_query, mock_backend): # given uuid_test = 'correct-uuid' mock_query.parse_uuid4.return_value = uuid_test stub_entities = [{'uuid': uuid_test}] mock_backend.entity_get.return_value = stub_entities # when actual_entities = service.lookup_entity_by_uuid(uuid_test) # then self.assertEquals(actual_entities, stub_entities) mock_query.parse_uuid4.assert_called_once_with(uuid_test) mock_backend.entity_get.assert_called_once_with(uuid_test) @istest def lookup_revision_through_ko_not_implemented(self): # then with self.assertRaises(NotImplementedError): service.lookup_revision_through({ 'something-unknown': 10, }) @patch('swh.web.ui.service.lookup_revision_with_context_by') @istest def lookup_revision_through_with_context_by(self, mock_lookup): # given stub_rev = {'id': 'rev'} mock_lookup.return_value = stub_rev # when actual_revision = service.lookup_revision_through({ 'origin_id': 1, 'branch_name': 'master', 'ts': None, 'sha1_git': 'sha1-git' }, limit=1000) # then self.assertEquals(actual_revision, stub_rev) mock_lookup.assert_called_once_with( 1, 'master', None, 'sha1-git', 1000) @patch('swh.web.ui.service.lookup_revision_by') @istest def lookup_revision_through_with_revision_by(self, mock_lookup): # given stub_rev = {'id': 'rev'} mock_lookup.return_value = stub_rev # when actual_revision = service.lookup_revision_through({ 'origin_id': 2, 'branch_name': 'master2', 'ts': 'some-ts', }, limit=10) # then self.assertEquals(actual_revision, stub_rev) mock_lookup.assert_called_once_with( 2, 'master2', 'some-ts') @patch('swh.web.ui.service.lookup_revision_with_context') @istest def lookup_revision_through_with_context(self, mock_lookup): # given stub_rev = {'id': 'rev'} mock_lookup.return_value = stub_rev # when actual_revision = service.lookup_revision_through({ 'sha1_git_root': 'some-sha1-root', 'sha1_git': 'some-sha1', }) # then self.assertEquals(actual_revision, stub_rev) mock_lookup.assert_called_once_with( 'some-sha1-root', 'some-sha1', 100) @patch('swh.web.ui.service.lookup_revision') @istest def lookup_revision_through_with_revision(self, mock_lookup): # given stub_rev = {'id': 'rev'} mock_lookup.return_value = stub_rev # when actual_revision = service.lookup_revision_through({ 'sha1_git': 'some-sha1', }) # then self.assertEquals(actual_revision, stub_rev) mock_lookup.assert_called_once_with( 'some-sha1') @patch('swh.web.ui.service.lookup_revision_through') @istest def lookup_directory_through_revision_ko_not_found( self, mock_lookup_rev): # given mock_lookup_rev.return_value = None # when with self.assertRaises(NotFoundExc): service.lookup_directory_through_revision( {'id': 'rev'}, 'some/path', 100) mock_lookup_rev.assert_called_once_with({'id': 'rev'}, 100) @patch('swh.web.ui.service.lookup_revision_through') @patch('swh.web.ui.service.lookup_directory_with_revision') @istest def lookup_directory_through_revision_ok_with_data( self, mock_lookup_dir, mock_lookup_rev): # given mock_lookup_rev.return_value = {'id': 'rev-id'} mock_lookup_dir.return_value = {'type': 'dir', 'content': []} # when rev_id, dir_result = service.lookup_directory_through_revision( {'id': 'rev'}, 'some/path', 100) # then self.assertEquals(rev_id, 'rev-id') self.assertEquals(dir_result, {'type': 'dir', 'content': []}) mock_lookup_rev.assert_called_once_with({'id': 'rev'}, 100) mock_lookup_dir.assert_called_once_with('rev-id', 'some/path', False) @patch('swh.web.ui.service.lookup_revision_through') @patch('swh.web.ui.service.lookup_directory_with_revision') @istest def lookup_directory_through_revision_ok_with_content( self, mock_lookup_dir, mock_lookup_rev): # given mock_lookup_rev.return_value = {'id': 'rev-id'} stub_result = {'type': 'file', 'revision': 'rev-id', 'content': {'data': b'blah', 'sha1': 'sha1'}} mock_lookup_dir.return_value = stub_result # when rev_id, dir_result = service.lookup_directory_through_revision( {'id': 'rev'}, 'some/path', 10, with_data=True) # then self.assertEquals(rev_id, 'rev-id') self.assertEquals(dir_result, stub_result) mock_lookup_rev.assert_called_once_with({'id': 'rev'}, 10) mock_lookup_dir.assert_called_once_with('rev-id', 'some/path', True) diff --git a/swh/web/ui/views/api.py b/swh/web/ui/views/api.py index b5acac980..d055bc937 100644 --- a/swh/web/ui/views/api.py +++ b/swh/web/ui/views/api.py @@ -1,728 +1,728 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from types import GeneratorType -from flask import request, url_for, Response, redirect +from flask import request, url_for, Response from swh.web.ui import service, utils from swh.web.ui.exc import NotFoundExc from swh.web.ui.main import app @app.route('/api/1/stat/counters/') def api_stats(): """Return statistics on SWH storage. Returns: SWH storage's statistics. """ return service.stat_counters() @app.route('/api/1/stat/visits//') def api_origin_visits(origin_id): """Return visit dates for the given revision. Returns: A list of SWH visit occurrence timestamps, sorted from oldest to newest. """ date_gen = (item['date'] for item in service.stat_origin_visits(origin_id)) return sorted(date_gen) @app.route('/api/1/search/', methods=['POST']) @app.route('/api/1/search//') def api_search(q=None): """Search a content per hash. Args: q is of the form algo_hash:hash with algo_hash in (sha1, sha1_git, sha256). Returns: Dictionary with 'found' key and the associated result. Raises: BadInputExc in case of unknown algo_hash or bad hash. Example: GET /api/1/search/sha1:bd819b5b28fcde3bf114d16a44ac46250da94ee5/ """ response = {'search_res': None, 'search_stats': None} search_stats = {'nbfiles': 0, 'pct': 0} search_res = None # Single hash request route if q: r = service.search_hash(q) search_res = [{'filename': None, 'sha1': q, 'found': r['found']}] search_stats['nbfiles'] = 1 search_stats['pct'] = 100 if r['found'] else 0 # Post form submission with many hash requests elif request.method == 'POST': data = request.form queries = [] # Remove potential inputs with no associated value for k, v in data.items(): if v is not None: if k == 'q' and len(v) > 0: queries.append({'filename': None, 'sha1': v}) elif v != '': queries.append({'filename': k, 'sha1': v}) if len(queries) > 0: lookup = service.lookup_multiple_hashes(queries) result = [] for el in lookup: result.append({'filename': el['filename'], 'sha1': el['sha1'], 'found': el['found']}) search_res = result nbfound = len([x for x in lookup if x['found']]) search_stats['nbfiles'] = len(queries) search_stats['pct'] = (nbfound / len(queries))*100 response['search_res'] = search_res response['search_stats'] = search_stats return response def _api_lookup(criteria, lookup_fn, error_msg_if_not_found, enrich_fn=lambda x: x, *args): """Capture a redundant behavior of: - looking up the backend with a criteria (be it an identifier or checksum) passed to the function lookup_fn - if nothing is found, raise an NotFoundExc exception with error message error_msg_if_not_found. - Otherwise if something is returned: - either as list, map or generator, map the enrich_fn function to it and return the resulting data structure as list. - either as dict and pass to enrich_fn and return the dict enriched. Args: - criteria: discriminating criteria to lookup - lookup_fn: function expects one criteria and optional supplementary *args. - error_msg_if_not_found: if nothing matching the criteria is found, raise NotFoundExc with this error message. - enrich_fn: Function to use to enrich the result returned by lookup_fn. Default to the identity function if not provided. - *args: supplementary arguments to pass to lookup_fn. Raises: NotFoundExp or whatever `lookup_fn` raises. """ res = lookup_fn(criteria, *args) if not res: raise NotFoundExc(error_msg_if_not_found) if isinstance(res, (map, list, GeneratorType)): enriched_data = [] for e in res: enriched_data.append(enrich_fn(e)) return enriched_data return enrich_fn(res) @app.route('/api/1/origin/') @app.route('/api/1/origin//') def api_origin(origin_id): """Return information about origin with id origin_id. Args: origin_id: the origin's identifier. Returns: Information on the origin if found. Raises: NotFoundExc if the origin is not found. Example: GET /api/1/origin/1/ """ return _api_lookup( origin_id, lookup_fn=service.lookup_origin, error_msg_if_not_found='Origin with id %s not found.' % origin_id) @app.route('/api/1/person/') @app.route('/api/1/person//') def api_person(person_id): """Return information about person with identifier person_id. Args: person_id: the person's identifier. Returns: Information on the person if found. Raises: NotFoundExc if the person is not found. Example: GET /api/1/person/1/ """ return _api_lookup( person_id, lookup_fn=service.lookup_person, error_msg_if_not_found='Person with id %s not found.' % person_id) @app.route('/api/1/release/') @app.route('/api/1/release//') def api_release(sha1_git): """Return information about release with id sha1_git. Args: sha1_git: the release's hash. Returns: Information on the release if found. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc if the release is not found. Example: GET /api/1/release/b307094f00c3641b0c9da808d894f3a325371414 """ error_msg = 'Release with sha1_git %s not found.' % sha1_git return _api_lookup( sha1_git, lookup_fn=service.lookup_release, error_msg_if_not_found=error_msg, enrich_fn=utils.enrich_release) def _revision_directory_by(revision, path, request_path, limit=100, with_data=False): """Compute the revision matching criterion's directory or content data. Args: revision: dictionary of criterions representing a revision to lookup path: directory's path to lookup request_path: request path which holds the original context to limit: optional query parameter to limit the revisions log (default to 100). For now, note that this limit could impede the transitivity conclusion about sha1_git not being an ancestor of with_data: indicate to retrieve the content's raw data if path resolves to a content. """ def enrich_directory_local(dir, context_url=request_path): return utils.enrich_directory(dir, context_url) rev_id, result = service.lookup_directory_through_revision( revision, path, limit=limit, with_data=with_data) content = result['content'] if result['type'] == 'dir': # dir_entries result['content'] = list(map(enrich_directory_local, content)) else: # content result['content'] = utils.enrich_content(content) return result @app.route('/api/1/revision' '/origin/' '/directory/') @app.route('/api/1/revision' '/origin/' '/directory//') @app.route('/api/1/revision' '/origin/' '/branch/' '/directory/') @app.route('/api/1/revision' '/origin/' '/branch/' '/directory//') @app.route('/api/1/revision' '/origin/' '/branch/' '/ts/' '/directory/') @app.route('/api/1/revision' '/origin/' '/branch/' '/ts/' '/directory//') def api_directory_through_revision_origin(origin_id, branch_name="refs/heads/master", ts=None, path=None, with_data=False): """Display directory or content information through a revision identified by origin/branch/timestamp. Args: origin_id: origin's identifier (default to 1). branch_name: the optional branch for the given origin (default to master). timestamp: optional timestamp (default to the nearest time crawl of timestamp). path: Path to directory or file to display. with_data: indicate to retrieve the content's raw data if path resolves to a content. Returns: Information on the directory or content pointed to by such revision. Raises: NotFoundExc if the revision is not found or the path pointed to is not found. """ if ts: ts = utils.parse_timestamp(ts) return _revision_directory_by( { 'origin_id': origin_id, 'branch_name': branch_name, 'ts': ts }, path, request.path, with_data=with_data) @app.route('/api/1/revision' '/origin//') @app.route('/api/1/revision' '/origin/' '/branch//') @app.route('/api/1/revision' '/origin/' '/branch/' '/ts//') @app.route('/api/1/revision' '/origin/' '/ts//') def api_revision_with_origin(origin_id, branch_name="refs/heads/master", ts=None): """Instead of having to specify a (root) revision by SHA1_GIT, users might want to specify a place and a time. In SWH a "place" is an origin; a "time" is a timestamp at which some place has been observed by SWH crawlers. Args: origin_id: origin's identifier (default to 1). branch_name: the optional branch for the given origin (default to master). timestamp: optional timestamp (default to the nearest time crawl of timestamp). Returns: Information on the revision if found. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc if the revision is not found. """ if ts: ts = utils.parse_timestamp(ts) return _api_lookup( origin_id, service.lookup_revision_by, 'Revision with (origin_id: %s, branch_name: %s' ', ts: %s) not found.' % (origin_id, branch_name, ts), utils.enrich_revision, branch_name, ts) @app.route('/api/1/revision/') @app.route('/api/1/revision//') @app.route('/api/1/revision//prev//') def api_revision(sha1_git, context=None): """Return information about revision with id sha1_git. Args: sha1_git: the revision's hash. Returns: Information on the revision if found. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc if the revision is not found. Example: GET /api/1/revision/baf18f9fc50a0b6fef50460a76c33b2ddc57486e """ def _enrich_revision(revision, context=context): return utils.enrich_revision(revision, context) return _api_lookup( sha1_git, service.lookup_revision, 'Revision with sha1_git %s not found.' % sha1_git, _enrich_revision) @app.route('/api/1/revision//raw/') def api_revision_raw_message(sha1_git): """Return the raw data of the revision's message Args: sha1_git: the revision's hash Returns: The raw revision message, possibly in an illegible format for humans, decoded in utf-8 by default. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc if the revision is not found or the revision has no message Example: GET /api/1/revision/baf18f9fc50a0b6fef50460a76c33b2ddc57486e/raw/ """ raw = service.lookup_revision_message(sha1_git) return Response(raw['message'], headers={'Content-disposition': 'attachment;' 'filename=rev_%s_raw' % sha1_git}, mimetype='application/octet-stream') @app.route('/api/1/revision//directory/') @app.route('/api/1/revision//directory//') def api_revision_directory(sha1_git, dir_path=None, with_data=False): """Return information on directory pointed by revision with sha1_git. If dir_path is not provided, display top level directory. Otherwise, display the directory pointed by dir_path (if it exists). Args: sha1_git: revision's hash. dir_path: optional directory pointed to by that revision. with_data: indicate to retrieve the content's raw data if path resolves to a content Returns: Information on the directory pointed to by that revision. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc either if the revision is not found or the path referenced does not exist Example: GET /api/1/revision/baf18f9fc50a0b6fef50460a76c33b2ddc57486e/directory/ """ return _revision_directory_by( { 'sha1_git': sha1_git }, dir_path, request.path, with_data=with_data) @app.route('/api/1/revision//log/') @app.route('/api/1/revision//prev//log/') def api_revision_log(sha1_git, prev_sha1s=None): """Show all revisions (~git log) starting from sha1_git. The first element returned is the given sha1_git. Args: sha1_git: the revision's hash. prev_sha1s: the navigation breadcrumb limit: optional query parameter to limit the revisions log (default to 100). Returns: Information on the revision if found, complemented with the revision's children if we have navigation breadcrumbs for them. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc if the revision is not found. """ limit = app.config['conf']['max_log_revs'] response = {'revisions': None, 'next_revs_url': None} revisions = None next_revs_url = None def lookup_revision_log_with_limit(s, limit=limit+1): return service.lookup_revision_log(s, limit) error_msg = 'Revision with sha1_git %s not found.' % sha1_git rev_get = _api_lookup(sha1_git, lookup_fn=lookup_revision_log_with_limit, error_msg_if_not_found=error_msg, enrich_fn=utils.enrich_revision) if len(rev_get) == limit+1: rev_backward = rev_get[:-1] next_revs_url = url_for('api_revision_log', sha1_git=rev_get[-1]['id']) else: rev_backward = rev_get if not prev_sha1s: # no nav breadcrumbs, so we're done revisions = rev_backward else: rev_forward_ids = prev_sha1s.split('/') rev_forward = _api_lookup(rev_forward_ids, lookup_fn=service.lookup_revision_multiple, error_msg_if_not_found=error_msg, enrich_fn=utils.enrich_revision) revisions = rev_forward + rev_backward response['revisions'] = revisions response['next_revs_url'] = next_revs_url return response @app.route('/api/1/revision' '/origin/log/') @app.route('/api/1/revision' '/origin//log/') @app.route('/api/1/revision' '/origin/' '/branch//log/') @app.route('/api/1/revision' '/origin/' '/branch/' '/ts//log/') @app.route('/api/1/revision' '/origin/' '/ts//log/') def api_revision_log_by(origin_id, branch_name='refs/heads/master', ts=None): """Show all revisions (~git log) starting from the revision described by its origin_id, optional branch name and timestamp. The first element returned is the described revision. Args: origin_id: the revision's origin. branch_name: the branch of the revision (optional, defaults to master ts: the requested timeframe near which the revision was created. limit: optional query parameter to limit the revisions log (default to 100). Returns: Information on the revision log if found. Raises: NotFoundExc if the revision is not found. """ limit = app.config['conf']['max_log_revs'] response = {'revisions': None, 'next_revs_url': None} next_revs_url = None if ts: ts = utils.parse_timestamp(ts) def lookup_revision_log_by_with_limit(o_id, br, ts, limit=limit+1): return service.lookup_revision_log_by(o_id, br, ts, limit) error_msg = 'No revision matching origin %s ' % origin_id error_msg += ', branch name %s' % branch_name error_msg += (' and time stamp %s.' % ts) if ts else '.' rev_get = _api_lookup(origin_id, lookup_revision_log_by_with_limit, error_msg, utils.enrich_revision, branch_name, ts) if len(rev_get) == limit+1: revisions = rev_get[:-1] next_revs_url = url_for('api_revision_log', sha1_git=rev_get[-1]['id']) else: revisions = rev_get response['revisions'] = revisions response['next_revs_url'] = next_revs_url return response @app.route('/api/1/directory/') @app.route('/api/1/directory//') @app.route('/api/1/directory///') def api_directory(sha1_git, path=None): """Return information about release with id sha1_git. Args: sha1_git: Directory's sha1_git. If path exists: starting directory for relative navigation. path: The path to the queried directory Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc if the content is not found. Example: GET /api/1/directory/8d7dc91d18546a91564606c3e3695a5ab568d179 GET /api/1/directory/8d7dc91d18546a91564606c3e3695a5ab568d179/path/dir/ """ if path: error_msg_path = ('Entry with path %s relative to directory ' 'with sha1_git %s not found.') % (path, sha1_git) return _api_lookup( sha1_git, service.lookup_directory_with_path, error_msg_path, utils.enrich_directory, path) else: error_msg_nopath = 'Directory with sha1_git %s not found.' % sha1_git return _api_lookup( sha1_git, service.lookup_directory, error_msg_nopath, utils.enrich_directory) # @app.route('/api/1/browse/') # @app.route('/api/1/browse//') def api_content_checksum_to_origin(q): """Return content information up to one of its origin if the content is found. Args: q is of the form algo_hash:hash with algo_hash in (sha1, sha1_git, sha256). Returns: Information on one possible origin for such content. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc if the content is not found. Example: GET /api/1/browse/sha1_git:88b9b366facda0b5ff8d8640ee9279bed346f242 """ found = service.lookup_hash(q)['found'] if not found: raise NotFoundExc('Content with %s not found.' % q) return service.lookup_hash_origin(q) @app.route('/api/1/content//raw/') def api_content_raw(q): """Return content's raw data if content is found. Args: q is of the form (algo_hash:)hash with algo_hash in (sha1, sha1_git, sha256). When algo_hash is not provided, 'hash' is considered sha1. Returns: Content's raw data in application/octet-stream. Raises: - BadInputExc in case of unknown algo_hash or bad hash - NotFoundExc if the content is not found. """ def generate(content): yield content['data'] content = service.lookup_content_raw(q) if not content: raise NotFoundExc('Content with %s not found.' % q) return Response(generate(content), mimetype='application/octet-stream') @app.route('/api/1/content/') @app.route('/api/1/content//') def api_content_metadata(q): """Return content information if content is found. Args: q is of the form (algo_hash:)hash with algo_hash in (sha1, sha1_git, sha256). When algo_hash is not provided, 'hash' is considered sha1. Returns: Content's information. Raises: - BadInputExc in case of unknown algo_hash or bad hash. - NotFoundExc if the content is not found. Example: GET /api/1/content/sha256:e2c76e40866bb6b28916387bdfc8649beceb 523015738ec6d4d540c7fe65232b """ return _api_lookup( q, lookup_fn=service.lookup_content, error_msg_if_not_found='Content with %s not found.' % q, enrich_fn=utils.enrich_content) @app.route('/api/1/entity/') @app.route('/api/1/entity//') def api_entity_by_uuid(uuid): """Return content information if content is found. Args: q is of the form (algo_hash:)hash with algo_hash in (sha1, sha1_git, sha256). When algo_hash is not provided, 'hash' is considered sha1. Returns: Content's information. Raises: - BadInputExc in case of unknown algo_hash or bad hash. - NotFoundExc if the content is not found. Example: - GET /api/1/entity/5f4d4c51-498a-4e28-88b3-b3e4e8396cba/ - GET /api/1/entity/7c33636b-8f11-4bda-89d9-ba8b76a42cec/ """ return _api_lookup( uuid, lookup_fn=service.lookup_entity_by_uuid, error_msg_if_not_found="Entity with uuid '%s' not found." % uuid, enrich_fn=utils.enrich_entity)