diff --git a/swh/web/common/service.py b/swh/web/common/service.py index 95f5f1ca..5070b655 100644 --- a/swh/web/common/service.py +++ b/swh/web/common/service.py @@ -1,1056 +1,1055 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import os from collections import defaultdict from swh.model import hashutil from swh.storage.algos import revisions_walker from swh.web.common import converters from swh.web.common import query from swh.web.common.exc import NotFoundExc from swh.web.common.origin_visits import get_origin_visit from swh.web import config storage = config.storage() vault = config.vault() idx_storage = config.indexer_storage() MAX_LIMIT = 50 # Top limit the users can ask for def _first_element(l): """Returns the first element in the provided list or None if it is empty or None""" return next(iter(l or []), None) def lookup_multiple_hashes(hashes): """Lookup the passed hashes in a single DB connection, using batch processing. Args: An array of {filename: X, sha1: Y}, string X, hex sha1 string Y. Returns: The same array with elements updated with elem['found'] = true if the hash is present in storage, elem['found'] = false if not. """ hashlist = [hashutil.hash_to_bytes(elem['sha1']) for elem in hashes] content_missing = storage.content_missing_per_sha1(hashlist) missing = [hashutil.hash_to_hex(x) for x in content_missing] for x in hashes: x.update({'found': True}) for h in hashes: if h['sha1'] in missing: h['found'] = False return hashes def lookup_expression(expression, last_sha1, per_page): """Lookup expression in raw content. Args: expression (str): An expression to lookup through raw indexed content last_sha1 (str): Last sha1 seen per_page (int): Number of results per page Yields: ctags whose content match the expression """ limit = min(per_page, MAX_LIMIT) ctags = idx_storage.content_ctags_search(expression, last_sha1=last_sha1, limit=limit) for ctag in ctags: ctag = converters.from_swh(ctag, hashess={'id'}) ctag['sha1'] = ctag['id'] ctag.pop('id') yield ctag def lookup_hash(q): """Checks if the storage contains a given content checksum Args: query string of the form Returns: Dict with key found containing the hash info if the hash is present, None if not. """ algo, hash = query.parse_hash(q) - found = storage.content_find({algo: hash}) + found = _first_element(storage.content_find({algo: hash})) return {'found': converters.from_content(found), 'algo': algo} def search_hash(q): """Checks if the storage contains a given content checksum Args: query string of the form Returns: Dict with key found to True or False, according to whether the checksum is present or not """ algo, hash = query.parse_hash(q) - found = storage.content_find({algo: hash}) + found = _first_element(storage.content_find({algo: hash})) return {'found': found is not None} def _lookup_content_sha1(q): """Given a possible input, query for the content's sha1. Args: q: query string of the form Returns: binary sha1 if found or None """ algo, hash = query.parse_hash(q) if algo != 'sha1': - hashes = storage.content_find({algo: hash}) + hashes = _first_element(storage.content_find({algo: hash})) if not hashes: return None return hashes['sha1'] return hash def lookup_content_ctags(q): """Return ctags information from a specified content. Args: q: query string of the form Yields: ctags information (dict) list if the content is found. """ sha1 = _lookup_content_sha1(q) if not sha1: return None ctags = list(idx_storage.content_ctags_get([sha1])) if not ctags: return None for ctag in ctags: yield converters.from_swh(ctag, hashess={'id'}) def lookup_content_filetype(q): """Return filetype information from a specified content. Args: q: query string of the form Yields: filetype information (dict) list if the content is found. """ sha1 = _lookup_content_sha1(q) if not sha1: return None filetype = _first_element(list(idx_storage.content_mimetype_get([sha1]))) if not filetype: return None return converters.from_filetype(filetype) def lookup_content_language(q): """Return language information from a specified content. Args: q: query string of the form Yields: language information (dict) list if the content is found. """ sha1 = _lookup_content_sha1(q) if not sha1: return None lang = _first_element(list(idx_storage.content_language_get([sha1]))) if not lang: return None return converters.from_swh(lang, hashess={'id'}) def lookup_content_license(q): """Return license information from a specified content. Args: q: query string of the form Yields: license information (dict) list if the content is found. """ sha1 = _lookup_content_sha1(q) if not sha1: return None lic = _first_element(idx_storage.content_fossology_license_get([sha1])) if not lic: return None return converters.from_swh({'id': sha1, 'facts': lic[sha1]}, hashess={'id'}) def lookup_origin(origin): """Return information about the origin matching dict origin. Args: origin: origin's dict with keys either 'id' or ('type' AND 'url') Returns: origin information as dict. """ origin_info = storage.origin_get(origin) if not origin_info: if 'id' in origin and origin['id']: msg = 'Origin with id %s not found!' % origin['id'] else: msg = 'Origin with type %s and url %s not found!' % \ (origin['type'], origin['url']) raise NotFoundExc(msg) return converters.from_origin(origin_info) def lookup_origins(origin_from=1, origin_count=100): """Get list of archived software origins in a paginated way. Origins are sorted by id before returning them Args: origin_from (int): The minimum id of the origins to return origin_count (int): The maximum number of origins to return Yields: origins information as dicts """ origins = storage.origin_get_range(origin_from, origin_count) return map(converters.from_origin, origins) def search_origin(url_pattern, offset=0, limit=50, regexp=False, with_visit=False): """Search for origins whose urls contain a provided string pattern or match a provided regular expression. Args: url_pattern: the string pattern to search for in origin urls offset: number of found origins to skip before returning results limit: the maximum number of found origins to return Returns: list of origin information as dict. """ origins = storage.origin_search(url_pattern, offset, limit, regexp, with_visit) return map(converters.from_origin, origins) def search_origin_metadata(fulltext, limit=50): """Search for origins whose metadata match a provided string pattern. Args: fulltext: the string pattern to search for in origin metadata offset: number of found origins to skip before returning results limit: the maximum number of found origins to return Returns: list of origin metadata as dict. """ matches = idx_storage.origin_intrinsic_metadata_search_fulltext( conjunction=[fulltext], limit=limit) results = [] for match in matches: match['from_revision'] = hashutil.hash_to_hex(match['from_revision']) result = converters.from_origin( storage.origin_get({'id': match.pop('id')})) result['metadata'] = match results.append(result) return results def lookup_person(person_id): """Return information about the person with id person_id. Args: person_id as string Returns: person information as dict. Raises: NotFoundExc if there is no person with the provided id. """ person = _first_element(storage.person_get([int(person_id)])) if not person: raise NotFoundExc('Person with id %s not found' % person_id) return converters.from_person(person) def _to_sha1_bin(sha1_hex): _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( sha1_hex, ['sha1'], # HACK: sha1_git really 'Only sha1_git is supported.') return sha1_git_bin def _check_directory_exists(sha1_git, sha1_git_bin): if len(list(storage.directory_missing([sha1_git_bin]))): raise NotFoundExc('Directory with sha1_git %s not found' % sha1_git) def lookup_directory(sha1_git): """Return information about the directory with id sha1_git. Args: sha1_git as string Returns: directory information as dict. """ empty_dir_sha1 = '4b825dc642cb6eb9a060e54bf8d69288fbee4904' if sha1_git == empty_dir_sha1: return [] sha1_git_bin = _to_sha1_bin(sha1_git) _check_directory_exists(sha1_git, sha1_git_bin) directory_entries = storage.directory_ls(sha1_git_bin) return map(converters.from_directory_entry, directory_entries) def lookup_directory_with_path(sha1_git, path_string): """Return directory information for entry with path path_string w.r.t. root directory pointed by directory_sha1_git Args: - directory_sha1_git: sha1_git corresponding to the directory to which we append paths to (hopefully) find the entry - the relative path to the entry starting from the directory pointed by directory_sha1_git Raises: NotFoundExc if the directory entry is not found """ sha1_git_bin = _to_sha1_bin(sha1_git) _check_directory_exists(sha1_git, sha1_git_bin) paths = path_string.strip(os.path.sep).split(os.path.sep) queried_dir = storage.directory_entry_get_by_path( sha1_git_bin, list(map(lambda p: p.encode('utf-8'), paths))) if not queried_dir: raise NotFoundExc(('Directory entry with path %s from %s not found') % (path_string, sha1_git)) return converters.from_directory_entry(queried_dir) def lookup_release(release_sha1_git): """Return information about the release with sha1 release_sha1_git. Args: release_sha1_git: The release's sha1 as hexadecimal Returns: Release information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. """ sha1_git_bin = _to_sha1_bin(release_sha1_git) release = _first_element(storage.release_get([sha1_git_bin])) if not release: raise NotFoundExc('Release with sha1_git %s not found.' % release_sha1_git) return converters.from_release(release) def lookup_release_multiple(sha1_git_list): """Return information about the revisions identified with their sha1_git identifiers. Args: sha1_git_list: A list of revision sha1_git identifiers Returns: Release information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. """ sha1_bin_list = (_to_sha1_bin(sha1_git) for sha1_git in sha1_git_list) releases = storage.release_get(sha1_bin_list) or [] return (converters.from_release(r) for r in releases) def lookup_revision(rev_sha1_git): """Return information about the revision with sha1 revision_sha1_git. Args: revision_sha1_git: The revision's sha1 as hexadecimal Returns: Revision information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. NotFoundExc if there is no revision with the provided sha1_git. """ sha1_git_bin = _to_sha1_bin(rev_sha1_git) revision = _first_element(storage.revision_get([sha1_git_bin])) if not revision: raise NotFoundExc('Revision with sha1_git %s not found.' % rev_sha1_git) return converters.from_revision(revision) def lookup_revision_multiple(sha1_git_list): """Return information about the revisions identified with their sha1_git identifiers. Args: sha1_git_list: A list of revision sha1_git identifiers Returns: Generator of revisions information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. """ sha1_bin_list = (_to_sha1_bin(sha1_git) for sha1_git in sha1_git_list) revisions = storage.revision_get(sha1_bin_list) or [] return (converters.from_revision(r) for r in revisions) def lookup_revision_message(rev_sha1_git): """Return the raw message of the revision with sha1 revision_sha1_git. Args: revision_sha1_git: The revision's sha1 as hexadecimal Returns: Decoded revision message as dict {'message': } Raises: ValueError if the identifier provided is not of sha1 nature. NotFoundExc if the revision is not found, or if it has no message """ sha1_git_bin = _to_sha1_bin(rev_sha1_git) revision = _first_element(storage.revision_get([sha1_git_bin])) if not revision: raise NotFoundExc('Revision with sha1_git %s not found.' % rev_sha1_git) if 'message' not in revision: raise NotFoundExc('No message for revision with sha1_git %s.' % rev_sha1_git) res = {'message': revision['message']} return res def _lookup_revision_id_by(origin_id, branch_name, timestamp): def _get_snapshot_branch(snapshot, branch_name): snapshot = lookup_snapshot(visit['snapshot'], branches_from=branch_name, branches_count=10) branch = None if branch_name in snapshot['branches']: branch = snapshot['branches'][branch_name] return branch visit = get_origin_visit({'id': origin_id}, visit_ts=timestamp) branch = _get_snapshot_branch(visit['snapshot'], branch_name) rev_id = None if branch and branch['target_type'] == 'revision': rev_id = branch['target'] elif branch and branch['target_type'] == 'alias': branch = _get_snapshot_branch(visit['snapshot'], branch['target']) if branch and branch['target_type'] == 'revision': rev_id = branch['target'] if not rev_id: raise NotFoundExc('Revision for origin %s and branch %s not found.' % (origin_id, branch_name)) return rev_id def lookup_revision_by(origin_id, branch_name='HEAD', timestamp=None): """Lookup revision by origin id, snapshot branch name and visit timestamp. If branch_name is not provided, lookup using 'HEAD' as default. If timestamp is not provided, use the most recent. Args: origin_id (int): origin of the revision branch_name (str): snapshot branch name timestamp (str/int): origin visit time frame Returns: dict: The revision matching the criterions Raises: NotFoundExc if no revision corresponds to the criterion """ rev_id = _lookup_revision_id_by(origin_id, branch_name, timestamp) return lookup_revision(rev_id) def lookup_revision_log(rev_sha1_git, limit): """Lookup revision log by revision id. Args: rev_sha1_git (str): The revision's sha1 as hexadecimal limit (int): the maximum number of revisions returned Returns: list: Revision log as list of revision dicts Raises: ValueError: if the identifier provided is not of sha1 nature. NotFoundExc: if there is no revision with the provided sha1_git. """ lookup_revision(rev_sha1_git) sha1_git_bin = _to_sha1_bin(rev_sha1_git) revision_entries = storage.revision_log([sha1_git_bin], limit) return map(converters.from_revision, revision_entries) def lookup_revision_log_by(origin_id, branch_name, timestamp, limit): """Lookup revision by origin id, snapshot branch name and visit timestamp. Args: origin_id (int): origin of the revision branch_name (str): snapshot branch timestamp (str/int): origin visit time frame limit (int): the maximum number of revisions returned Returns: list: Revision log as list of revision dicts Raises: NotFoundExc: if no revision corresponds to the criterion """ rev_id = _lookup_revision_id_by(origin_id, branch_name, timestamp) return lookup_revision_log(rev_id, limit) def lookup_revision_with_context_by(origin_id, branch_name, timestamp, sha1_git, limit=100): """Return information about revision sha1_git, limited to the sub-graph of all transitive parents of sha1_git_root. sha1_git_root being resolved through the lookup of a revision by origin_id, branch_name and ts. In other words, sha1_git is an ancestor of sha1_git_root. Args: - origin_id: origin of the revision. - branch_name: revision's branch. - timestamp: revision's time frame. - sha1_git: one of sha1_git_root's ancestors. - limit: limit the lookup to 100 revisions back. Returns: Pair of (root_revision, revision). Information on sha1_git if it is an ancestor of sha1_git_root including children leading to sha1_git_root Raises: - BadInputExc in case of unknown algo_hash or bad hash. - NotFoundExc if either revision is not found or if sha1_git is not an ancestor of sha1_git_root. """ rev_root_id = _lookup_revision_id_by(origin_id, branch_name, timestamp) rev_root_id_bin = hashutil.hash_to_bytes(rev_root_id) rev_root = _first_element(storage.revision_get([rev_root_id_bin])) return (converters.from_revision(rev_root), lookup_revision_with_context(rev_root, sha1_git, limit)) def lookup_revision_with_context(sha1_git_root, sha1_git, limit=100): """Return information about revision sha1_git, limited to the sub-graph of all transitive parents of sha1_git_root. In other words, sha1_git is an ancestor of sha1_git_root. Args: sha1_git_root: latest revision. The type is either a sha1 (as an hex string) or a non converted dict. sha1_git: one of sha1_git_root's ancestors limit: limit the lookup to 100 revisions back Returns: Information on sha1_git if it is an ancestor of sha1_git_root including children leading to sha1_git_root Raises: BadInputExc in case of unknown algo_hash or bad hash NotFoundExc if either revision is not found or if sha1_git is not an ancestor of sha1_git_root """ sha1_git_bin = _to_sha1_bin(sha1_git) revision = _first_element(storage.revision_get([sha1_git_bin])) if not revision: raise NotFoundExc('Revision %s not found' % sha1_git) if isinstance(sha1_git_root, str): sha1_git_root_bin = _to_sha1_bin(sha1_git_root) revision_root = _first_element(storage.revision_get([sha1_git_root_bin])) # noqa if not revision_root: raise NotFoundExc('Revision root %s not found' % sha1_git_root) else: sha1_git_root_bin = sha1_git_root['id'] revision_log = storage.revision_log([sha1_git_root_bin], limit) parents = {} children = defaultdict(list) for rev in revision_log: rev_id = rev['id'] parents[rev_id] = [] for parent_id in rev['parents']: parents[rev_id].append(parent_id) children[parent_id].append(rev_id) if revision['id'] not in parents: raise NotFoundExc('Revision %s is not an ancestor of %s' % (sha1_git, sha1_git_root)) revision['children'] = children[revision['id']] return converters.from_revision(revision) def lookup_directory_with_revision(sha1_git, dir_path=None, with_data=False): """Return information on directory pointed by revision with sha1_git. If dir_path is not provided, display top level directory. Otherwise, display the directory pointed by dir_path (if it exists). Args: sha1_git: revision's hash. dir_path: optional directory pointed to by that revision. with_data: boolean that indicates to retrieve the raw data if the path resolves to a content. Default to False (for the api) Returns: Information on the directory pointed to by that revision. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc either if the revision is not found or the path referenced does not exist. NotImplementedError in case of dir_path exists but do not reference a type 'dir' or 'file'. """ sha1_git_bin = _to_sha1_bin(sha1_git) - revision = _first_element(storage.revision_get([sha1_git_bin])) if not revision: raise NotFoundExc('Revision %s not found' % sha1_git) - dir_sha1_git_bin = revision['directory'] - if dir_path: paths = dir_path.strip(os.path.sep).split(os.path.sep) entity = storage.directory_entry_get_by_path( dir_sha1_git_bin, list(map(lambda p: p.encode('utf-8'), paths))) - if not entity: raise NotFoundExc( "Directory or File '%s' pointed to by revision %s not found" % (dir_path, sha1_git)) else: entity = {'type': 'dir', 'target': dir_sha1_git_bin} - if entity['type'] == 'dir': directory_entries = storage.directory_ls(entity['target']) or [] return {'type': 'dir', 'path': '.' if not dir_path else dir_path, 'revision': sha1_git, 'content': list(map(converters.from_directory_entry, directory_entries))} elif entity['type'] == 'file': # content - content = storage.content_find({'sha1_git': entity['target']}) + content = _first_element( + storage.content_find({'sha1_git': entity['target']})) + if not content: + raise NotFoundExc('Content not found for revision %s' + % sha1_git) if with_data: c = _first_element(storage.content_get([content['sha1']])) content['data'] = c['data'] return {'type': 'file', 'path': '.' if not dir_path else dir_path, 'revision': sha1_git, 'content': converters.from_content(content)} elif entity['type'] == 'rev': # revision revision = next(storage.revision_get([entity['target']])) return {'type': 'rev', 'path': '.' if not dir_path else dir_path, 'revision': sha1_git, 'content': converters.from_revision(revision)} else: raise NotImplementedError('Entity of type %s not implemented.' % entity['type']) def lookup_content(q): """Lookup the content designed by q. Args: q: The release's sha1 as hexadecimal Raises: NotFoundExc if the requested content is not found """ algo, hash = query.parse_hash(q) - c = storage.content_find({algo: hash}) + c = _first_element(storage.content_find({algo: hash})) if not c: raise NotFoundExc('Content with %s checksum equals to %s not found!' % (algo, hashutil.hash_to_hex(hash))) return converters.from_content(c) def lookup_content_raw(q): """Lookup the content defined by q. Args: q: query string of the form Returns: dict with 'sha1' and 'data' keys. data representing its raw data decoded. Raises: NotFoundExc if the requested content is not found or if the content bytes are not available in the storage """ c = lookup_content(q) content_sha1_bytes = hashutil.hash_to_bytes(c['checksums']['sha1']) content = _first_element(storage.content_get([content_sha1_bytes])) if not content: algo, hash = query.parse_hash(q) raise NotFoundExc('Bytes of content with %s checksum equals to %s ' 'are not available!' % (algo, hashutil.hash_to_hex(hash))) return converters.from_content(content) def stat_counters(): """Return the stat counters for Software Heritage Returns: A dict mapping textual labels to integer values. """ return storage.stat_counters() def _lookup_origin_visits(origin_id, last_visit=None, limit=10): """Yields the origin origin_ids' visits. Args: origin_id (int): origin to list visits for last_visit (int): last visit to lookup from limit (int): Number of elements max to display Yields: Dictionaries of origin_visit for that origin """ limit = min(limit, MAX_LIMIT) yield from storage.origin_visit_get( origin_id, last_visit=last_visit, limit=limit) def lookup_origin_visits(origin_id, last_visit=None, per_page=10): """Yields the origin origin_ids' visits. Args: origin_id: origin to list visits for Yields: Dictionaries of origin_visit for that origin """ lookup_origin({'id': origin_id}) visits = _lookup_origin_visits(origin_id, last_visit=last_visit, limit=per_page) for visit in visits: yield converters.from_origin_visit(visit) def lookup_origin_visit(origin_id, visit_id): """Return information about visit visit_id with origin origin_id. Args: origin_id: origin concerned by the visit visit_id: the visit identifier to lookup Yields: The dict origin_visit concerned """ visit = storage.origin_visit_get_by(origin_id, visit_id) if not visit: raise NotFoundExc('Origin with id %s or its visit ' 'with id %s not found!' % (origin_id, visit_id)) return converters.from_origin_visit(visit) def lookup_snapshot_size(snapshot_id): """Count the number of branches in the snapshot with the given id Args: snapshot_id (str): sha1 identifier of the snapshot Returns: dict: A dict whose keys are the target types of branches and values their corresponding amount """ snapshot_id_bin = _to_sha1_bin(snapshot_id) snapshot_size = storage.snapshot_count_branches(snapshot_id_bin) if 'revision' not in snapshot_size: snapshot_size['revision'] = 0 if 'release' not in snapshot_size: snapshot_size['release'] = 0 return snapshot_size def lookup_snapshot(snapshot_id, branches_from='', branches_count=1000, target_types=None): """Return information about a snapshot, aka the list of named branches found during a specific visit of an origin. Args: snapshot_id (str): sha1 identifier of the snapshot branches_from (str): optional parameter used to skip branches whose name is lesser than it before returning them branches_count (int): optional parameter used to restrain the amount of returned branches target_types (list): optional parameter used to filter the target types of branch to return (possible values that can be contained in that list are `'content', 'directory', 'revision', 'release', 'snapshot', 'alias'`) Returns: A dict filled with the snapshot content. """ snapshot_id_bin = _to_sha1_bin(snapshot_id) snapshot = storage.snapshot_get_branches(snapshot_id_bin, branches_from.encode(), branches_count, target_types) if not snapshot: raise NotFoundExc('Snapshot with id %s not found!' % snapshot_id) return converters.from_snapshot(snapshot) def lookup_latest_origin_snapshot(origin_id, allowed_statuses=None): """Return information about the latest snapshot of an origin. .. warning:: At most 1000 branches contained in the snapshot will be returned for performance reasons. Args: origin_id: integer identifier of the origin allowed_statuses: list of visit statuses considered to find the latest snapshot for the visit. For instance, ``allowed_statuses=['full']`` will only consider visits that have successfully run to completion. Returns: A dict filled with the snapshot content. """ snapshot = storage.snapshot_get_latest(origin_id, allowed_statuses) return converters.from_snapshot(snapshot) def lookup_revision_through(revision, limit=100): """Retrieve a revision from the criterion stored in revision dictionary. Args: revision: Dictionary of criterion to lookup the revision with. Here are the supported combination of possible values: - origin_id, branch_name, ts, sha1_git - origin_id, branch_name, ts - sha1_git_root, sha1_git - sha1_git Returns: None if the revision is not found or the actual revision. """ if 'origin_id' in revision and \ 'branch_name' in revision and \ 'ts' in revision and \ 'sha1_git' in revision: return lookup_revision_with_context_by(revision['origin_id'], revision['branch_name'], revision['ts'], revision['sha1_git'], limit) if 'origin_id' in revision and \ 'branch_name' in revision and \ 'ts' in revision: return lookup_revision_by(revision['origin_id'], revision['branch_name'], revision['ts']) if 'sha1_git_root' in revision and \ 'sha1_git' in revision: return lookup_revision_with_context(revision['sha1_git_root'], revision['sha1_git'], limit) if 'sha1_git' in revision: return lookup_revision(revision['sha1_git']) # this should not happen raise NotImplementedError('Should not happen!') def lookup_directory_through_revision(revision, path=None, limit=100, with_data=False): """Retrieve the directory information from the revision. Args: revision: dictionary of criterion representing a revision to lookup path: directory's path to lookup. limit: optional query parameter to limit the revisions log (default to 100). For now, note that this limit could impede the transitivity conclusion about sha1_git not being an ancestor of. with_data: indicate to retrieve the content's raw data if path resolves to a content. Returns: The directory pointing to by the revision criterions at path. """ rev = lookup_revision_through(revision, limit) if not rev: raise NotFoundExc('Revision with criterion %s not found!' % revision) return (rev['id'], lookup_directory_with_revision(rev['id'], path, with_data)) def vault_cook(obj_type, obj_id, email=None): """Cook a vault bundle. """ return vault.cook(obj_type, obj_id, email=email) def vault_fetch(obj_type, obj_id): """Fetch a vault bundle. """ return vault.fetch(obj_type, obj_id) def vault_progress(obj_type, obj_id): """Get the current progress of a vault bundle. """ return vault.progress(obj_type, obj_id) def diff_revision(rev_id): """Get the list of file changes (insertion / deletion / modification / renaming) for a particular revision. """ rev_sha1_git_bin = _to_sha1_bin(rev_id) changes = storage.diff_revision(rev_sha1_git_bin, track_renaming=True) for change in changes: change['from'] = converters.from_directory_entry(change['from']) change['to'] = converters.from_directory_entry(change['to']) if change['from_path']: change['from_path'] = change['from_path'].decode('utf-8') if change['to_path']: change['to_path'] = change['to_path'].decode('utf-8') return changes class _RevisionsWalkerProxy(object): """ Proxy class wrapping a revisions walker iterator from swh-storage and performing needed conversions. """ def __init__(self, rev_walker_type, rev_start, *args, **kwargs): rev_start_bin = hashutil.hash_to_bytes(rev_start) self.revisions_walker = \ revisions_walker.get_revisions_walker(rev_walker_type, storage, rev_start_bin, *args, **kwargs) def export_state(self): return self.revisions_walker.export_state() def __next__(self): return converters.from_revision(next(self.revisions_walker)) def __iter__(self): return self def get_revisions_walker(rev_walker_type, rev_start, *args, **kwargs): """ Utility function to instantiate a revisions walker of a given type, see :mod:`swh.storage.algos.revisions_walker`. Args: rev_walker_type (str): the type of revisions walker to return, possible values are: ``committer_date``, ``dfs``, ``dfs_post``, ``bfs`` and ``path`` rev_start (str): hexadecimal representation of a revision identifier args (list): position arguments to pass to the revisions walker constructor kwargs (dict): keyword arguments to pass to the revisions walker constructor """ # first check if the provided revision is valid lookup_revision(rev_start) return _RevisionsWalkerProxy(rev_walker_type, rev_start, *args, **kwargs) diff --git a/swh/web/tests/common/test_service.py b/swh/web/tests/common/test_service.py index 3a5eb0be..b0c5588d 100644 --- a/swh/web/tests/common/test_service.py +++ b/swh/web/tests/common/test_service.py @@ -1,821 +1,875 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import itertools import pytest import random from collections import defaultdict from hypothesis import given from swh.model.hashutil import hash_to_bytes, hash_to_hex +from swh.model.from_disk import DentryPerms from swh.web.common import service from swh.web.common.exc import BadInputExc, NotFoundExc from swh.web.tests.strategies import ( content, contents, unknown_content, unknown_contents, contents_with_ctags, origin, new_origin, visit_dates, directory, release, revision, unknown_revision, revisions, unknown_revisions, ancestor_revisions, non_ancestor_revisions, invalid_sha1, sha256, revision_with_submodules, unknown_directory, empty_directory, new_revision, new_origins ) from swh.web.tests.testcase import ( WebTestCase, ctags_json_missing, fossology_missing ) class ServiceTestCase(WebTestCase): @given(contents()) def test_lookup_multiple_hashes_all_present(self, contents): input_data = [] expected_output = [] for cnt in contents: input_data.append({'sha1': cnt['sha1']}) expected_output.append({'sha1': cnt['sha1'], 'found': True}) self.assertEqual(service.lookup_multiple_hashes(input_data), expected_output) @given(contents(), unknown_contents()) def test_lookup_multiple_hashes_some_missing(self, contents, unknown_contents): input_contents = list(itertools.chain(contents, unknown_contents)) random.shuffle(input_contents) input_data = [] expected_output = [] for cnt in input_contents: input_data.append({'sha1': cnt['sha1']}) expected_output.append({'sha1': cnt['sha1'], 'found': cnt in contents}) self.assertEqual(service.lookup_multiple_hashes(input_data), expected_output) @given(unknown_content()) def test_lookup_hash_does_not_exist(self, unknown_content): actual_lookup = service.lookup_hash('sha1_git:%s' % unknown_content['sha1_git']) self.assertEqual(actual_lookup, {'found': None, 'algo': 'sha1_git'}) @given(content()) def test_lookup_hash_exist(self, content): actual_lookup = service.lookup_hash('sha1:%s' % content['sha1']) content_metadata = self.content_get_metadata(content['sha1']) self.assertEqual({'found': content_metadata, 'algo': 'sha1'}, actual_lookup) @given(unknown_content()) def test_search_hash_does_not_exist(self, content): actual_lookup = service.search_hash('sha1_git:%s' % content['sha1_git']) self.assertEqual({'found': False}, actual_lookup) @given(content()) def test_search_hash_exist(self, content): actual_lookup = service.search_hash('sha1:%s' % content['sha1']) self.assertEqual({'found': True}, actual_lookup) @pytest.mark.skipif(ctags_json_missing, reason="requires ctags with json output support") @given(contents_with_ctags()) def test_lookup_content_ctags(self, contents_with_ctags): content_sha1 = random.choice(contents_with_ctags['sha1s']) self.content_add_ctags(content_sha1) actual_ctags = \ list(service.lookup_content_ctags('sha1:%s' % content_sha1)) expected_data = list(self.content_get_ctags(content_sha1)) for ctag in expected_data: ctag['id'] = content_sha1 self.assertEqual(actual_ctags, expected_data) @given(unknown_content()) def test_lookup_content_ctags_no_hash(self, unknown_content): actual_ctags = \ list(service.lookup_content_ctags('sha1:%s' % unknown_content['sha1'])) self.assertEqual(actual_ctags, []) @given(content()) def test_lookup_content_filetype(self, content): self.content_add_mimetype(content['sha1']) actual_filetype = service.lookup_content_filetype(content['sha1']) expected_filetype = self.content_get_mimetype(content['sha1']) self.assertEqual(actual_filetype, expected_filetype) @pytest.mark.xfail # Language indexer is disabled. @given(content()) def test_lookup_content_language(self, content): self.content_add_language(content['sha1']) actual_language = service.lookup_content_language(content['sha1']) expected_language = self.content_get_language(content['sha1']) self.assertEqual(actual_language, expected_language) @given(contents_with_ctags()) def test_lookup_expression(self, contents_with_ctags): per_page = 10 expected_ctags = [] for content_sha1 in contents_with_ctags['sha1s']: if len(expected_ctags) == per_page: break self.content_add_ctags(content_sha1) for ctag in self.content_get_ctags(content_sha1): if len(expected_ctags) == per_page: break if ctag['name'] == contents_with_ctags['symbol_name']: del ctag['id'] ctag['sha1'] = content_sha1 expected_ctags.append(ctag) actual_ctags = \ list(service.lookup_expression(contents_with_ctags['symbol_name'], last_sha1=None, per_page=10)) self.assertEqual(actual_ctags, expected_ctags) def test_lookup_expression_no_result(self): expected_ctags = [] actual_ctags = \ list(service.lookup_expression('barfoo', last_sha1=None, per_page=10)) self.assertEqual(actual_ctags, expected_ctags) @pytest.mark.skipif(fossology_missing, reason="requires fossology-nomossa installed") @given(content()) def test_lookup_content_license(self, content): self.content_add_license(content['sha1']) actual_license = service.lookup_content_license(content['sha1']) expected_license = self.content_get_license(content['sha1']) self.assertEqual(actual_license, expected_license) def test_stat_counters(self): actual_stats = service.stat_counters() self.assertEqual(actual_stats, self.storage.stat_counters()) @given(new_origin(), visit_dates()) def test_lookup_origin_visits(self, new_origin, visit_dates): origin_id = self.storage.origin_add_one(new_origin) for ts in visit_dates: self.storage.origin_visit_add(origin_id, ts) actual_origin_visits = list( service.lookup_origin_visits(origin_id, per_page=100)) expected_visits = self.origin_visit_get(origin_id) self.assertEqual(actual_origin_visits, expected_visits) @given(new_origin(), visit_dates()) def test_lookup_origin_visit(self, new_origin, visit_dates): origin_id = self.storage.origin_add_one(new_origin) visits = [] for ts in visit_dates: visits.append(self.storage.origin_visit_add(origin_id, ts)) visit = random.choice(visits)['visit'] actual_origin_visit = service.lookup_origin_visit(origin_id, visit) expected_visit = dict(self.storage.origin_visit_get_by(origin_id, visit)) expected_visit['date'] = expected_visit['date'].isoformat() expected_visit['metadata'] = {} self.assertEqual(actual_origin_visit, expected_visit) @given(new_origin()) def test_lookup_origin(self, new_origin): origin_id = self.storage.origin_add_one(new_origin) actual_origin = service.lookup_origin({'id': origin_id}) expected_origin = self.storage.origin_get({'id': origin_id}) self.assertEqual(actual_origin, expected_origin) actual_origin = service.lookup_origin({'type': new_origin['type'], 'url': new_origin['url']}) expected_origin = self.storage.origin_get({'type': new_origin['type'], 'url': new_origin['url']}) self.assertEqual(actual_origin, expected_origin) @given(invalid_sha1()) def test_lookup_release_ko_id_checksum_not_a_sha1(self, invalid_sha1): with self.assertRaises(BadInputExc) as cm: service.lookup_release(invalid_sha1) self.assertIn('invalid checksum', cm.exception.args[0].lower()) @given(sha256()) def test_lookup_release_ko_id_checksum_too_long(self, sha256): with self.assertRaises(BadInputExc) as cm: service.lookup_release(sha256) self.assertEqual('Only sha1_git is supported.', cm.exception.args[0]) @given(directory()) def test_lookup_directory_with_path_not_found(self, directory): path = 'some/invalid/path/here' with self.assertRaises(NotFoundExc) as cm: service.lookup_directory_with_path(directory, path) self.assertEqual('Directory entry with path %s from %s ' 'not found' % (path, directory), cm.exception.args[0]) @given(directory()) def test_lookup_directory_with_path_found(self, directory): directory_content = self.directory_ls(directory) directory_entry = random.choice(directory_content) path = directory_entry['name'] actual_result = service.lookup_directory_with_path(directory, path) self.assertEqual(actual_result, directory_entry) @given(release()) def test_lookup_release(self, release): actual_release = service.lookup_release(release) self.assertEqual(actual_release, self.release_get(release)) @given(revision(), invalid_sha1(), sha256()) def test_lookup_revision_with_context_ko_not_a_sha1(self, revision, invalid_sha1, sha256): sha1_git_root = revision sha1_git = invalid_sha1 with self.assertRaises(BadInputExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Invalid checksum query string', cm.exception.args[0]) sha1_git = sha256 with self.assertRaises(BadInputExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Only sha1_git is supported', cm.exception.args[0]) @given(revision(), unknown_revision()) def test_lookup_revision_with_context_ko_sha1_git_does_not_exist( self, revision, unknown_revision): sha1_git_root = revision sha1_git = unknown_revision with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Revision %s not found' % sha1_git, cm.exception.args[0]) @given(revision(), unknown_revision()) def test_lookup_revision_with_context_ko_root_sha1_git_does_not_exist( self, revision, unknown_revision): sha1_git_root = unknown_revision sha1_git = revision - with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Revision root %s not found' % sha1_git_root, cm.exception.args[0]) @given(ancestor_revisions()) def test_lookup_revision_with_context(self, ancestor_revisions): sha1_git = ancestor_revisions['sha1_git'] root_sha1_git = ancestor_revisions['sha1_git_root'] for sha1_git_root in (root_sha1_git, {'id': hash_to_bytes(root_sha1_git)}): actual_revision = \ service.lookup_revision_with_context(sha1_git_root, sha1_git) children = [] for rev in self.revision_log(root_sha1_git): for p_rev in rev['parents']: p_rev_hex = hash_to_hex(p_rev) if p_rev_hex == sha1_git: children.append(rev['id']) expected_revision = self.revision_get(sha1_git) expected_revision['children'] = children self.assertEqual(actual_revision, expected_revision) @given(non_ancestor_revisions()) def test_lookup_revision_with_context_ko(self, non_ancestor_revisions): sha1_git = non_ancestor_revisions['sha1_git'] root_sha1_git = non_ancestor_revisions['sha1_git_root'] with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_with_context(root_sha1_git, sha1_git) self.assertIn('Revision %s is not an ancestor of %s' % (sha1_git, root_sha1_git), cm.exception.args[0]) @given(unknown_revision()) def test_lookup_directory_with_revision_not_found(self, unknown_revision): with self.assertRaises(NotFoundExc) as cm: service.lookup_directory_with_revision(unknown_revision) self.assertIn('Revision %s not found' % unknown_revision, cm.exception.args[0]) - @given(revision()) - def test_lookup_directory_with_revision_ko_path_to_nowhere(self, revision): + @given(unknown_content(), unknown_revision(), unknown_directory()) + def test_lookup_directory_with_revision_unknown_content( + self, unknown_content, unknown_revision, unknown_directory): + + dir_path = 'README.md' + # Create a revision that points to a directory + # Which points to unknown content + revision = { + 'author': { + 'name': b'abcd', + 'email': b'abcd@company.org', + 'fullname': b'abcd abcd' + }, + 'committer': { + 'email': b'aaaa@company.org', + 'fullname': b'aaaa aaa', + 'name': b'aaa' + }, + 'committer_date': { + 'negative_utc': False, + 'offset': 0, + 'timestamp': 1437511651 + }, + 'date': { + 'negative_utc': False, + 'offset': 0, + 'timestamp': 1437511651 + }, + 'message': b'bleh', + 'metadata': [], + 'parents': [], + 'synthetic': False, + 'type': 'file', + 'id': hash_to_bytes(unknown_revision), + 'directory': hash_to_bytes(unknown_directory) + } + # A directory that points to unknown content + dir = { + 'id': hash_to_bytes(unknown_directory), + 'entries': [{ + 'name': bytes(dir_path.encode('utf-8')), + 'type': 'file', + 'target': hash_to_bytes(unknown_content['sha1_git']), + 'perms': DentryPerms.content + }] + } + # Add the directory and revision in mem + self.storage.directory_add([dir]) + self.storage.revision_add([revision]) + with self.assertRaises(NotFoundExc) as cm: + service.lookup_directory_with_revision(unknown_revision, dir_path) + self.assertIn('Content not found for revision %s' % unknown_revision, + cm.exception.args[0]) + @given(revision()) + def test_lookup_directory_with_revision_ko_path_to_nowhere( + self, revision): invalid_path = 'path/to/something/unknown' with self.assertRaises(NotFoundExc) as cm: service.lookup_directory_with_revision(revision, invalid_path) exception_text = cm.exception.args[0].lower() self.assertIn('directory or file', exception_text) self.assertIn(invalid_path, exception_text) self.assertIn('revision %s' % revision, exception_text) self.assertIn('not found', exception_text) @given(revision_with_submodules()) def test_lookup_directory_with_revision_submodules( self, revision_with_submodules): rev_sha1_git = revision_with_submodules['rev_sha1_git'] rev_dir_path = revision_with_submodules['rev_dir_rev_path'] actual_data = service.lookup_directory_with_revision( rev_sha1_git, rev_dir_path) revision = self.revision_get(revision_with_submodules['rev_sha1_git']) directory = self.directory_ls(revision['directory']) rev_entry = next(e for e in directory if e['name'] == rev_dir_path) expected_data = { 'content': self.revision_get(rev_entry['target']), 'path': rev_dir_path, 'revision': rev_sha1_git, 'type': 'rev' } self.assertEqual(actual_data, expected_data) @given(revision()) def test_lookup_directory_with_revision_without_path(self, revision): actual_directory_entries = \ service.lookup_directory_with_revision(revision) revision_data = self.revision_get(revision) expected_directory_entries = \ self.directory_ls(revision_data['directory']) self.assertEqual(actual_directory_entries['type'], 'dir') self.assertEqual(actual_directory_entries['content'], expected_directory_entries) @given(revision()) def test_lookup_directory_with_revision_with_path(self, revision): revision_data = self.revision_get(revision) dir_entries = [e for e in self.directory_ls(revision_data['directory']) if e['type'] in ('file', 'dir')] expected_dir_entry = random.choice(dir_entries) actual_dir_entry = \ service.lookup_directory_with_revision(revision, expected_dir_entry['name']) self.assertEqual(actual_dir_entry['type'], expected_dir_entry['type']) self.assertEqual(actual_dir_entry['revision'], revision) self.assertEqual(actual_dir_entry['path'], expected_dir_entry['name']) if actual_dir_entry['type'] == 'file': del actual_dir_entry['content']['checksums']['blake2s256'] for key in ('checksums', 'status', 'length'): self.assertEqual(actual_dir_entry['content'][key], expected_dir_entry[key]) else: sub_dir_entries = self.directory_ls(expected_dir_entry['target']) self.assertEqual(actual_dir_entry['content'], sub_dir_entries) @given(revision()) def test_lookup_directory_with_revision_with_path_to_file_and_data( self, revision): revision_data = self.revision_get(revision) dir_entries = [e for e in self.directory_ls(revision_data['directory']) if e['type'] == 'file'] expected_dir_entry = random.choice(dir_entries) expected_data = \ self.content_get(expected_dir_entry['checksums']['sha1']) actual_dir_entry = \ service.lookup_directory_with_revision(revision, expected_dir_entry['name'], with_data=True) self.assertEqual(actual_dir_entry['type'], expected_dir_entry['type']) self.assertEqual(actual_dir_entry['revision'], revision) self.assertEqual(actual_dir_entry['path'], expected_dir_entry['name']) del actual_dir_entry['content']['checksums']['blake2s256'] for key in ('checksums', 'status', 'length'): self.assertEqual(actual_dir_entry['content'][key], expected_dir_entry[key]) self.assertEqual(actual_dir_entry['content']['data'], expected_data['data']) @given(revision()) def test_lookup_revision(self, revision): actual_revision = service.lookup_revision(revision) self.assertEqual(actual_revision, self.revision_get(revision)) @given(new_revision()) def test_lookup_revision_invalid_msg(self, new_revision): new_revision['message'] = b'elegant fix for bug \xff' self.storage.revision_add([new_revision]) revision = service.lookup_revision(hash_to_hex(new_revision['id'])) self.assertEqual(revision['message'], None) self.assertEqual(revision['message_decoding_failed'], True) @given(new_revision()) def test_lookup_revision_msg_ok(self, new_revision): self.storage.revision_add([new_revision]) revision_message = service.lookup_revision_message( hash_to_hex(new_revision['id'])) self.assertEqual(revision_message, {'message': new_revision['message']}) @given(new_revision()) def test_lookup_revision_msg_absent(self, new_revision): del new_revision['message'] self.storage.revision_add([new_revision]) new_revision_id = hash_to_hex(new_revision['id']) with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_message(new_revision_id) self.assertEqual( cm.exception.args[0], 'No message for revision with sha1_git %s.' % new_revision_id ) @given(unknown_revision()) def test_lookup_revision_msg_no_rev(self, unknown_revision): with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_message(unknown_revision) self.assertEqual( cm.exception.args[0], 'Revision with sha1_git %s not found.' % unknown_revision ) @given(revisions()) def test_lookup_revision_multiple(self, revisions): actual_revisions = list(service.lookup_revision_multiple(revisions)) expected_revisions = [] for rev in revisions: expected_revisions.append(self.revision_get(rev)) self.assertEqual(actual_revisions, expected_revisions) @given(unknown_revisions()) def test_lookup_revision_multiple_none_found(self, unknown_revisions): actual_revisions = \ list(service.lookup_revision_multiple(unknown_revisions)) self.assertEqual(actual_revisions, [None] * len(unknown_revisions)) @given(revision()) def test_lookup_revision_log(self, revision): actual_revision_log = \ list(service.lookup_revision_log(revision, limit=25)) expected_revision_log = self.revision_log(revision, limit=25) self.assertEqual(actual_revision_log, expected_revision_log) def _get_origin_branches(self, origin): origin_visit = self.origin_visit_get(origin['id'])[-1] snapshot = self.snapshot_get(origin_visit['snapshot']) branches = {k: v for (k, v) in snapshot['branches'].items() if v['target_type'] == 'revision'} return branches @given(origin()) def test_lookup_revision_log_by(self, origin): branches = self._get_origin_branches(origin) branch_name = random.choice(list(branches.keys())) actual_log = \ list(service.lookup_revision_log_by(origin['id'], branch_name, None, limit=25)) expected_log = \ self.revision_log(branches[branch_name]['target'], limit=25) self.assertEqual(actual_log, expected_log) @given(origin()) def test_lookup_revision_log_by_notfound(self, origin): with self.assertRaises(NotFoundExc): service.lookup_revision_log_by( origin['id'], 'unknown_branch_name', None, limit=100) @given(unknown_content()) def test_lookup_content_raw_not_found(self, unknown_content): with self.assertRaises(NotFoundExc) as cm: service.lookup_content_raw('sha1:' + unknown_content['sha1']) self.assertIn(cm.exception.args[0], 'Content with %s checksum equals to %s not found!' % ('sha1', unknown_content['sha1'])) @given(content()) def test_lookup_content_raw(self, content): actual_content = service.lookup_content_raw( 'sha256:%s' % content['sha256']) expected_content = self.content_get(content['sha1']) self.assertEqual(actual_content, expected_content) @given(unknown_content()) def test_lookup_content_not_found(self, unknown_content): with self.assertRaises(NotFoundExc) as cm: service.lookup_content('sha1:%s' % unknown_content['sha1']) self.assertIn(cm.exception.args[0], 'Content with %s checksum equals to %s not found!' % ('sha1', unknown_content['sha1'])) @given(content()) def test_lookup_content_with_sha1(self, content): actual_content = service.lookup_content( 'sha1:%s' % content['sha1']) expected_content = self.content_get_metadata(content['sha1']) self.assertEqual(actual_content, expected_content) @given(content()) def test_lookup_content_with_sha256(self, content): actual_content = service.lookup_content( 'sha256:%s' % content['sha256']) expected_content = self.content_get_metadata(content['sha1']) self.assertEqual(actual_content, expected_content) @given(revision()) def test_lookup_person(self, revision): rev_data = self.revision_get(revision) actual_person = service.lookup_person(rev_data['author']['id']) self.assertEqual(actual_person, rev_data['author']) def test_lookup_directory_bad_checksum(self): with self.assertRaises(BadInputExc): service.lookup_directory('directory_id') @given(unknown_directory()) def test_lookup_directory_not_found(self, unknown_directory): with self.assertRaises(NotFoundExc) as cm: service.lookup_directory(unknown_directory) self.assertIn('Directory with sha1_git %s not found' % unknown_directory, cm.exception.args[0]) @given(directory()) def test_lookup_directory(self, directory): actual_directory_ls = list(service.lookup_directory( directory)) expected_directory_ls = self.directory_ls(directory) self.assertEqual(actual_directory_ls, expected_directory_ls) @given(empty_directory()) def test_lookup_directory_empty(self, empty_directory): actual_directory_ls = list(service.lookup_directory(empty_directory)) self.assertEqual(actual_directory_ls, []) @given(origin()) def test_lookup_revision_by_nothing_found(self, origin): with self.assertRaises(NotFoundExc): service.lookup_revision_by(origin['id'], 'invalid-branch-name') @given(origin()) def test_lookup_revision_by(self, origin): branches = self._get_origin_branches(origin) branch_name = random.choice(list(branches.keys())) actual_revision = \ service.lookup_revision_by(origin['id'], branch_name, None) expected_revision = \ self.revision_get(branches[branch_name]['target']) self.assertEqual(actual_revision, expected_revision) @given(origin(), revision()) def test_lookup_revision_with_context_by_ko(self, origin, revision): with self.assertRaises(NotFoundExc): service.lookup_revision_with_context_by(origin['id'], 'invalid-branch-name', None, revision) @given(origin()) def test_lookup_revision_with_context_by(self, origin): branches = self._get_origin_branches(origin) branch_name = random.choice(list(branches.keys())) root_rev = branches[branch_name]['target'] root_rev_log = self.revision_log(root_rev) children = defaultdict(list) for rev in root_rev_log: for rev_p in rev['parents']: children[rev_p].append(rev['id']) rev = root_rev_log[-1]['id'] actual_root_rev, actual_rev = service.lookup_revision_with_context_by( origin['id'], branch_name, None, rev) expected_root_rev = self.revision_get(root_rev) expected_rev = self.revision_get(rev) expected_rev['children'] = children[rev] self.assertEqual(actual_root_rev, expected_root_rev) self.assertEqual(actual_rev, expected_rev) def test_lookup_revision_through_ko_not_implemented(self): with self.assertRaises(NotImplementedError): service.lookup_revision_through({ 'something-unknown': 10, }) @given(origin()) def test_lookup_revision_through_with_context_by(self, origin): branches = self._get_origin_branches(origin) branch_name = random.choice(list(branches.keys())) root_rev = branches[branch_name]['target'] root_rev_log = self.revision_log(root_rev) rev = root_rev_log[-1]['id'] self.assertEqual(service.lookup_revision_through({ 'origin_id': origin['id'], 'branch_name': branch_name, 'ts': None, 'sha1_git': rev }), service.lookup_revision_with_context_by( origin['id'], branch_name, None, rev) ) @given(origin()) def test_lookup_revision_through_with_revision_by(self, origin): branches = self._get_origin_branches(origin) branch_name = random.choice(list(branches.keys())) self.assertEqual(service.lookup_revision_through({ 'origin_id': origin['id'], 'branch_name': branch_name, 'ts': None, }), service.lookup_revision_by( origin['id'], branch_name, None) ) @given(ancestor_revisions()) def test_lookup_revision_through_with_context(self, ancestor_revisions): sha1_git = ancestor_revisions['sha1_git'] sha1_git_root = ancestor_revisions['sha1_git_root'] self.assertEqual(service.lookup_revision_through({ 'sha1_git_root': sha1_git_root, 'sha1_git': sha1_git, }), service.lookup_revision_with_context( sha1_git_root, sha1_git) ) @given(revision()) def test_lookup_revision_through_with_revision(self, revision): self.assertEqual(service.lookup_revision_through({ 'sha1_git': revision }), service.lookup_revision(revision) ) @given(revision()) def test_lookup_directory_through_revision_ko_not_found(self, revision): with self.assertRaises(NotFoundExc): service.lookup_directory_through_revision( {'sha1_git': revision}, 'some/invalid/path') @given(revision()) def test_lookup_directory_through_revision_ok(self, revision): revision_data = self.revision_get(revision) dir_entries = [e for e in self.directory_ls(revision_data['directory']) if e['type'] == 'file'] dir_entry = random.choice(dir_entries) self.assertEqual( service.lookup_directory_through_revision({'sha1_git': revision}, dir_entry['name']), (revision, service.lookup_directory_with_revision( revision, dir_entry['name'])) ) @given(revision()) def test_lookup_directory_through_revision_ok_with_data(self, revision): revision_data = self.revision_get(revision) dir_entries = [e for e in self.directory_ls(revision_data['directory']) if e['type'] == 'file'] dir_entry = random.choice(dir_entries) self.assertEqual( service.lookup_directory_through_revision({'sha1_git': revision}, dir_entry['name'], with_data=True), (revision, service.lookup_directory_with_revision( revision, dir_entry['name'], with_data=True)) ) @given(new_origins(20)) def test_lookup_origins(self, new_origins): nb_origins = len(new_origins) expected_origins = self.storage.origin_add(new_origins) origin_from_idx = random.randint(1, nb_origins-1) - 1 origin_from = expected_origins[origin_from_idx]['id'] max_origin_idx = expected_origins[-1]['id'] origin_count = random.randint(1, max_origin_idx - origin_from) actual_origins = list(service.lookup_origins(origin_from, origin_count)) expected_origins = list(self.storage.origin_get_range(origin_from, origin_count)) self.assertEqual(actual_origins, expected_origins)