Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/web/common/service.py b/swh/web/common/service.py
index d37e55c8e..71e531a56 100644
--- a/swh/web/common/service.py
+++ b/swh/web/common/service.py
@@ -1,1036 +1,1032 @@
# Copyright (C) 2015-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
from collections import defaultdict
from swh.model import hashutil
from swh.storage.algos import revisions_walker
from swh.web.common import converters
from swh.web.common import query
from swh.web.common.exc import NotFoundExc
from swh.web.common.origin_visits import get_origin_visit
from swh.web import config
storage = config.storage()
vault = config.vault()
idx_storage = config.indexer_storage()
MAX_LIMIT = 50 # Top limit the users can ask for
def _first_element(l):
"""Returns the first element in the provided list or None
if it is empty or None"""
return next(iter(l or []), None)
def lookup_multiple_hashes(hashes):
"""Lookup the passed hashes in a single DB connection, using batch
processing.
Args:
An array of {filename: X, sha1: Y}, string X, hex sha1 string Y.
Returns:
The same array with elements updated with elem['found'] = true if
the hash is present in storage, elem['found'] = false if not.
"""
hashlist = [hashutil.hash_to_bytes(elem['sha1']) for elem in hashes]
content_missing = storage.content_missing_per_sha1(hashlist)
missing = [hashutil.hash_to_hex(x) for x in content_missing]
for x in hashes:
x.update({'found': True})
for h in hashes:
if h['sha1'] in missing:
h['found'] = False
return hashes
def lookup_expression(expression, last_sha1, per_page):
"""Lookup expression in raw content.
Args:
expression (str): An expression to lookup through raw indexed
content
last_sha1 (str): Last sha1 seen
per_page (int): Number of results per page
Returns:
List of ctags whose content match the expression
"""
limit = min(per_page, MAX_LIMIT)
ctags = idx_storage.content_ctags_search(expression,
last_sha1=last_sha1,
limit=limit)
for ctag in ctags:
ctag = converters.from_swh(ctag, hashess={'id'})
ctag['sha1'] = ctag['id']
ctag.pop('id')
yield ctag
def lookup_hash(q):
"""Checks if the storage contains a given content checksum
Args: query string of the form <hash_algo:hash>
Returns: Dict with key found containing the hash info if the
hash is present, None if not.
"""
algo, hash = query.parse_hash(q)
found = storage.content_find({algo: hash})
return {'found': converters.from_content(found),
'algo': algo}
def search_hash(q):
"""Checks if the storage contains a given content checksum
Args: query string of the form <hash_algo:hash>
Returns: Dict with key found to True or False, according to
whether the checksum is present or not
"""
algo, hash = query.parse_hash(q)
found = storage.content_find({algo: hash})
return {'found': found is not None}
def _lookup_content_sha1(q):
"""Given a possible input, query for the content's sha1.
Args:
q: query string of the form <hash_algo:hash>
Returns:
binary sha1 if found or None
"""
algo, hash = query.parse_hash(q)
if algo != 'sha1':
hashes = storage.content_find({algo: hash})
if not hashes:
return None
return hashes['sha1']
return hash
def lookup_content_ctags(q):
"""Return ctags information from a specified content.
Args:
q: query string of the form <hash_algo:hash>
Yields:
ctags information (dict) list if the content is found.
"""
sha1 = _lookup_content_sha1(q)
if not sha1:
return None
ctags = list(idx_storage.content_ctags_get([sha1]))
if not ctags:
return None
for ctag in ctags:
yield converters.from_swh(ctag, hashess={'id'})
def lookup_content_filetype(q):
"""Return filetype information from a specified content.
Args:
q: query string of the form <hash_algo:hash>
Yields:
filetype information (dict) list if the content is found.
"""
sha1 = _lookup_content_sha1(q)
if not sha1:
return None
filetype = _first_element(list(idx_storage.content_mimetype_get([sha1])))
if not filetype:
return None
return converters.from_filetype(filetype)
def lookup_content_language(q):
"""Return language information from a specified content.
Args:
q: query string of the form <hash_algo:hash>
Yields:
language information (dict) list if the content is found.
"""
sha1 = _lookup_content_sha1(q)
if not sha1:
return None
lang = _first_element(list(idx_storage.content_language_get([sha1])))
if not lang:
return None
return converters.from_swh(lang, hashess={'id'})
def lookup_content_license(q):
"""Return license information from a specified content.
Args:
q: query string of the form <hash_algo:hash>
Yields:
license information (dict) list if the content is found.
"""
sha1 = _lookup_content_sha1(q)
if not sha1:
return None
lic = _first_element(idx_storage.content_fossology_license_get([sha1]))
if not lic:
return None
return converters.from_swh({'id': sha1, 'facts': lic[sha1]},
hashess={'id'})
def lookup_origin(origin):
"""Return information about the origin matching dict origin.
Args:
origin: origin's dict with keys either 'id' or
('type' AND 'url')
Returns:
origin information as dict.
"""
origin_info = storage.origin_get(origin)
if not origin_info:
if 'id' in origin and origin['id']:
msg = 'Origin with id %s not found!' % origin['id']
else:
msg = 'Origin with type %s and url %s not found!' % \
(origin['type'], origin['url'])
raise NotFoundExc(msg)
return converters.from_origin(origin_info)
def search_origin(url_pattern, offset=0, limit=50, regexp=False,
with_visit=False):
"""Search for origins whose urls contain a provided string pattern
or match a provided regular expression.
Args:
url_pattern: the string pattern to search for in origin urls
offset: number of found origins to skip before returning results
limit: the maximum number of found origins to return
Returns:
list of origin information as dict.
"""
origins = storage.origin_search(url_pattern, offset, limit, regexp,
with_visit)
return map(converters.from_origin, origins)
def search_origin_metadata(fulltext, limit=50):
"""Search for origins whose metadata match a provided string pattern.
Args:
fulltext: the string pattern to search for in origin metadata
offset: number of found origins to skip before returning results
limit: the maximum number of found origins to return
Returns:
list of origin metadata as dict.
"""
results = idx_storage.origin_intrinsic_metadata_search_fulltext(
conjunction=[fulltext], limit=limit)
for result in results:
result['from_revision'] = hashutil.hash_to_hex(result['from_revision'])
return results
def lookup_person(person_id):
"""Return information about the person with id person_id.
Args:
person_id as string
Returns:
person information as dict.
Raises:
NotFoundExc if there is no person with the provided id.
"""
person = _first_element(storage.person_get([person_id]))
if not person:
raise NotFoundExc('Person with id %s not found' % person_id)
return converters.from_person(person)
def _to_sha1_bin(sha1_hex):
_, sha1_git_bin = query.parse_hash_with_algorithms_or_throws(
sha1_hex,
['sha1'], # HACK: sha1_git really
'Only sha1_git is supported.')
return sha1_git_bin
def _check_directory_exists(sha1_git, sha1_git_bin):
if len(list(storage.directory_missing([sha1_git_bin]))):
raise NotFoundExc('Directory with sha1_git %s not found' % sha1_git)
def lookup_directory(sha1_git):
"""Return information about the directory with id sha1_git.
Args:
sha1_git as string
Returns:
directory information as dict.
"""
empty_dir_sha1 = '4b825dc642cb6eb9a060e54bf8d69288fbee4904'
if sha1_git == empty_dir_sha1:
return []
sha1_git_bin = _to_sha1_bin(sha1_git)
_check_directory_exists(sha1_git, sha1_git_bin)
directory_entries = storage.directory_ls(sha1_git_bin)
return map(converters.from_directory_entry, directory_entries)
def lookup_directory_with_path(sha1_git, path_string):
"""Return directory information for entry with path path_string w.r.t.
root directory pointed by directory_sha1_git
Args:
- directory_sha1_git: sha1_git corresponding to the directory
to which we append paths to (hopefully) find the entry
- the relative path to the entry starting from the directory pointed by
directory_sha1_git
Raises:
NotFoundExc if the directory entry is not found
"""
sha1_git_bin = _to_sha1_bin(sha1_git)
_check_directory_exists(sha1_git, sha1_git_bin)
paths = path_string.strip(os.path.sep).split(os.path.sep)
queried_dir = storage.directory_entry_get_by_path(
sha1_git_bin, list(map(lambda p: p.encode('utf-8'), paths)))
if not queried_dir:
raise NotFoundExc(('Directory entry with path %s from %s not found') %
(path_string, sha1_git))
return converters.from_directory_entry(queried_dir)
def lookup_release(release_sha1_git):
"""Return information about the release with sha1 release_sha1_git.
Args:
release_sha1_git: The release's sha1 as hexadecimal
Returns:
Release information as dict.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
sha1_git_bin = _to_sha1_bin(release_sha1_git)
release = _first_element(storage.release_get([sha1_git_bin]))
if not release:
raise NotFoundExc('Release with sha1_git %s not found.'
% release_sha1_git)
return converters.from_release(release)
def lookup_release_multiple(sha1_git_list):
"""Return information about the revisions identified with
their sha1_git identifiers.
Args:
sha1_git_list: A list of revision sha1_git identifiers
Returns:
Release information as dict.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
sha1_bin_list = (_to_sha1_bin(sha1_git) for sha1_git in sha1_git_list)
releases = storage.release_get(sha1_bin_list) or []
return (converters.from_release(r) for r in releases)
def lookup_revision(rev_sha1_git):
"""Return information about the revision with sha1 revision_sha1_git.
Args:
revision_sha1_git: The revision's sha1 as hexadecimal
Returns:
Revision information as dict.
Raises:
ValueError if the identifier provided is not of sha1 nature.
NotFoundExc if there is no revision with the provided sha1_git.
"""
sha1_git_bin = _to_sha1_bin(rev_sha1_git)
revision = _first_element(storage.revision_get([sha1_git_bin]))
if not revision:
raise NotFoundExc('Revision with sha1_git %s not found.'
% rev_sha1_git)
return converters.from_revision(revision)
def lookup_revision_multiple(sha1_git_list):
"""Return information about the revisions identified with
their sha1_git identifiers.
Args:
sha1_git_list: A list of revision sha1_git identifiers
Returns:
Generator of revisions information as dict.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
sha1_bin_list = (_to_sha1_bin(sha1_git) for sha1_git in sha1_git_list)
revisions = storage.revision_get(sha1_bin_list) or []
return (converters.from_revision(r) for r in revisions)
def lookup_revision_message(rev_sha1_git):
"""Return the raw message of the revision with sha1 revision_sha1_git.
Args:
revision_sha1_git: The revision's sha1 as hexadecimal
Returns:
Decoded revision message as dict {'message': <the_message>}
Raises:
ValueError if the identifier provided is not of sha1 nature.
NotFoundExc if the revision is not found, or if it has no message
"""
sha1_git_bin = _to_sha1_bin(rev_sha1_git)
revision = _first_element(storage.revision_get([sha1_git_bin]))
if not revision:
raise NotFoundExc('Revision with sha1_git %s not found.'
% rev_sha1_git)
if 'message' not in revision:
raise NotFoundExc('No message for revision with sha1_git %s.'
% rev_sha1_git)
res = {'message': revision['message']}
return res
def _lookup_revision_id_by(origin_id, branch_name, timestamp):
def _get_snapshot_branch(snapshot, branch_name):
snapshot = lookup_snapshot(visit['snapshot'],
branches_from=branch_name,
branches_count=10)
branch = None
if branch_name in snapshot['branches']:
branch = snapshot['branches'][branch_name]
return branch
visit = get_origin_visit({'id': origin_id}, visit_ts=timestamp)
branch = _get_snapshot_branch(visit['snapshot'], branch_name)
rev_id = None
if branch and branch['target_type'] == 'revision':
rev_id = branch['target']
elif branch and branch['target_type'] == 'alias':
branch = _get_snapshot_branch(visit['snapshot'], branch['target'])
if branch and branch['target_type'] == 'revision':
rev_id = branch['target']
if not rev_id:
raise NotFoundExc('Revision for origin %s and branch %s not found.'
% (origin_id, branch_name))
return rev_id
def lookup_revision_by(origin_id,
branch_name='HEAD',
timestamp=None):
"""Lookup revision by origin id, snapshot branch name and visit timestamp.
If branch_name is not provided, lookup using 'HEAD' as default.
If timestamp is not provided, use the most recent.
Args:
origin_id (int): origin of the revision
branch_name (str): snapshot branch name
timestamp (str/int): origin visit time frame
Returns:
dict: The revision matching the criterions
Raises:
NotFoundExc if no revision corresponds to the criterion
"""
rev_id = _lookup_revision_id_by(origin_id, branch_name, timestamp)
return lookup_revision(rev_id)
def lookup_revision_log(rev_sha1_git, limit):
"""Lookup revision log by revision id.
Args:
rev_sha1_git (str): The revision's sha1 as hexadecimal
limit (int): the maximum number of revisions returned
Returns:
list: Revision log as list of revision dicts
Raises:
ValueError: if the identifier provided is not of sha1 nature.
NotFoundExc: if there is no revision with the provided sha1_git.
"""
sha1_git_bin = _to_sha1_bin(rev_sha1_git)
revision_entries = storage.revision_log([sha1_git_bin], limit)
if not revision_entries:
raise NotFoundExc('Revision with sha1_git %s not found.'
% rev_sha1_git)
return map(converters.from_revision, revision_entries)
def lookup_revision_log_by(origin_id, branch_name, timestamp, limit):
"""Lookup revision by origin id, snapshot branch name and visit timestamp.
Args:
origin_id (int): origin of the revision
branch_name (str): snapshot branch
timestamp (str/int): origin visit time frame
limit (int): the maximum number of revisions returned
Returns:
list: Revision log as list of revision dicts
Raises:
NotFoundExc: if no revision corresponds to the criterion
"""
rev_id = _lookup_revision_id_by(origin_id, branch_name, timestamp)
return lookup_revision_log(rev_id, limit)
-def lookup_revision_with_context_by(origin_id, branch_name, ts, sha1_git,
- limit=100):
+def lookup_revision_with_context_by(origin_id, branch_name, timestamp,
+ sha1_git, limit=100):
"""Return information about revision sha1_git, limited to the
sub-graph of all transitive parents of sha1_git_root.
sha1_git_root being resolved through the lookup of a revision by origin_id,
branch_name and ts.
In other words, sha1_git is an ancestor of sha1_git_root.
Args:
- origin_id: origin of the revision.
- branch_name: revision's branch.
- timestamp: revision's time frame.
- sha1_git: one of sha1_git_root's ancestors.
- limit: limit the lookup to 100 revisions back.
Returns:
Pair of (root_revision, revision).
Information on sha1_git if it is an ancestor of sha1_git_root
including children leading to sha1_git_root
Raises:
- BadInputExc in case of unknown algo_hash or bad hash.
- NotFoundExc if either revision is not found or if sha1_git is not an
ancestor of sha1_git_root.
"""
- rev_root = _first_element(storage.revision_get_by(origin_id,
- branch_name,
- timestamp=ts,
- limit=1))
- if not rev_root:
- raise NotFoundExc('Revision with (origin_id: %s, branch_name: %s'
- ', ts: %s) not found.' % (origin_id,
- branch_name,
- ts))
+ rev_root_id = _lookup_revision_id_by(origin_id, branch_name, timestamp)
+
+ rev_root_id_bin = hashutil.hash_to_bytes(rev_root_id)
+
+ rev_root = _first_element(storage.revision_get([rev_root_id_bin]))
return (converters.from_revision(rev_root),
lookup_revision_with_context(rev_root, sha1_git, limit))
def lookup_revision_with_context(sha1_git_root, sha1_git, limit=100):
"""Return information about revision sha1_git, limited to the
sub-graph of all transitive parents of sha1_git_root.
In other words, sha1_git is an ancestor of sha1_git_root.
Args:
sha1_git_root: latest revision. The type is either a sha1 (as an hex
string) or a non converted dict.
sha1_git: one of sha1_git_root's ancestors
limit: limit the lookup to 100 revisions back
Returns:
Information on sha1_git if it is an ancestor of sha1_git_root
including children leading to sha1_git_root
Raises:
BadInputExc in case of unknown algo_hash or bad hash
NotFoundExc if either revision is not found or if sha1_git is not an
ancestor of sha1_git_root
"""
sha1_git_bin = _to_sha1_bin(sha1_git)
revision = _first_element(storage.revision_get([sha1_git_bin]))
if not revision:
raise NotFoundExc('Revision %s not found' % sha1_git)
if isinstance(sha1_git_root, str):
sha1_git_root_bin = _to_sha1_bin(sha1_git_root)
revision_root = _first_element(storage.revision_get([sha1_git_root_bin])) # noqa
if not revision_root:
raise NotFoundExc('Revision root %s not found' % sha1_git_root)
else:
sha1_git_root_bin = sha1_git_root['id']
revision_log = storage.revision_log([sha1_git_root_bin], limit)
parents = {}
children = defaultdict(list)
for rev in revision_log:
rev_id = rev['id']
parents[rev_id] = []
for parent_id in rev['parents']:
parents[rev_id].append(parent_id)
children[parent_id].append(rev_id)
if revision['id'] not in parents:
raise NotFoundExc('Revision %s is not an ancestor of %s' %
(sha1_git, sha1_git_root))
revision['children'] = children[revision['id']]
return converters.from_revision(revision)
def lookup_directory_with_revision(sha1_git, dir_path=None, with_data=False):
"""Return information on directory pointed by revision with sha1_git.
If dir_path is not provided, display top level directory.
Otherwise, display the directory pointed by dir_path (if it exists).
Args:
sha1_git: revision's hash.
dir_path: optional directory pointed to by that revision.
with_data: boolean that indicates to retrieve the raw data if the path
resolves to a content. Default to False (for the api)
Returns:
Information on the directory pointed to by that revision.
Raises:
BadInputExc in case of unknown algo_hash or bad hash.
NotFoundExc either if the revision is not found or the path referenced
does not exist.
NotImplementedError in case of dir_path exists but do not reference a
type 'dir' or 'file'.
"""
sha1_git_bin = _to_sha1_bin(sha1_git)
revision = _first_element(storage.revision_get([sha1_git_bin]))
if not revision:
raise NotFoundExc('Revision %s not found' % sha1_git)
dir_sha1_git_bin = revision['directory']
if dir_path:
paths = dir_path.strip(os.path.sep).split(os.path.sep)
entity = storage.directory_entry_get_by_path(
dir_sha1_git_bin, list(map(lambda p: p.encode('utf-8'), paths)))
if not entity:
raise NotFoundExc(
"Directory or File '%s' pointed to by revision %s not found"
% (dir_path, sha1_git))
else:
entity = {'type': 'dir', 'target': dir_sha1_git_bin}
if entity['type'] == 'dir':
directory_entries = storage.directory_ls(entity['target']) or []
return {'type': 'dir',
'path': '.' if not dir_path else dir_path,
'revision': sha1_git,
'content': list(map(converters.from_directory_entry,
directory_entries))}
elif entity['type'] == 'file': # content
content = storage.content_find({'sha1_git': entity['target']})
if with_data:
c = _first_element(storage.content_get([content['sha1']]))
content['data'] = c['data']
return {'type': 'file',
'path': '.' if not dir_path else dir_path,
'revision': sha1_git,
'content': converters.from_content(content)}
else:
raise NotImplementedError('Entity of type %s not implemented.'
% entity['type'])
def lookup_content(q):
"""Lookup the content designed by q.
Args:
q: The release's sha1 as hexadecimal
Raises:
NotFoundExc if the requested content is not found
"""
algo, hash = query.parse_hash(q)
c = storage.content_find({algo: hash})
if not c:
raise NotFoundExc('Content with %s checksum equals to %s not found!' %
(algo, hashutil.hash_to_hex(hash)))
return converters.from_content(c)
def lookup_content_raw(q):
"""Lookup the content defined by q.
Args:
q: query string of the form <hash_algo:hash>
Returns:
dict with 'sha1' and 'data' keys.
data representing its raw data decoded.
Raises:
NotFoundExc if the requested content is not found or
if the content bytes are not available in the storage
"""
c = lookup_content(q)
content_sha1_bytes = hashutil.hash_to_bytes(c['checksums']['sha1'])
content = _first_element(storage.content_get([content_sha1_bytes]))
if not content:
algo, hash = query.parse_hash(q)
raise NotFoundExc('Bytes of content with %s checksum equals to %s '
'are not available!' %
(algo, hashutil.hash_to_hex(hash)))
return converters.from_content(content)
def stat_counters():
"""Return the stat counters for Software Heritage
Returns:
A dict mapping textual labels to integer values.
"""
return storage.stat_counters()
def _lookup_origin_visits(origin_id, last_visit=None, limit=10):
"""Yields the origin origin_ids' visits.
Args:
origin_id (int): origin to list visits for
last_visit (int): last visit to lookup from
limit (int): Number of elements max to display
Yields:
Dictionaries of origin_visit for that origin
"""
limit = min(limit, MAX_LIMIT)
yield from storage.origin_visit_get(
origin_id, last_visit=last_visit, limit=limit)
def lookup_origin_visits(origin_id, last_visit=None, per_page=10):
"""Yields the origin origin_ids' visits.
Args:
origin_id: origin to list visits for
Yields:
Dictionaries of origin_visit for that origin
"""
visits = _lookup_origin_visits(origin_id, last_visit=last_visit,
limit=per_page)
for visit in visits:
yield converters.from_origin_visit(visit)
def lookup_origin_visit(origin_id, visit_id):
"""Return information about visit visit_id with origin origin_id.
Args:
origin_id: origin concerned by the visit
visit_id: the visit identifier to lookup
Yields:
The dict origin_visit concerned
"""
visit = storage.origin_visit_get_by(origin_id, visit_id)
if not visit:
raise NotFoundExc('Origin with id %s or its visit '
'with id %s not found!' % (origin_id, visit_id))
return converters.from_origin_visit(visit)
def lookup_snapshot_size(snapshot_id):
"""Count the number of branches in the snapshot with the given id
Args:
snapshot_id (str): sha1 identifier of the snapshot
Returns:
dict: A dict whose keys are the target types of branches and
values their corresponding amount
"""
snapshot_id_bin = _to_sha1_bin(snapshot_id)
snapshot_size = storage.snapshot_count_branches(snapshot_id_bin)
if 'revision' not in snapshot_size:
snapshot_size['revision'] = 0
if 'release' not in snapshot_size:
snapshot_size['release'] = 0
return snapshot_size
def lookup_snapshot(snapshot_id, branches_from='', branches_count=1000,
target_types=None):
"""Return information about a snapshot, aka the list of named
branches found during a specific visit of an origin.
Args:
snapshot_id (str): sha1 identifier of the snapshot
branches_from (str): optional parameter used to skip branches
whose name is lesser than it before returning them
branches_count (int): optional parameter used to restrain
the amount of returned branches
target_types (list): optional parameter used to filter the
target types of branch to return (possible values that can be
contained in that list are `'content', 'directory',
'revision', 'release', 'snapshot', 'alias'`)
Returns:
A dict filled with the snapshot content.
"""
snapshot_id_bin = _to_sha1_bin(snapshot_id)
snapshot = storage.snapshot_get_branches(snapshot_id_bin,
branches_from.encode(),
branches_count, target_types)
if not snapshot:
raise NotFoundExc('Snapshot with id %s not found!' % snapshot_id)
return converters.from_snapshot(snapshot)
def lookup_latest_origin_snapshot(origin_id, allowed_statuses=None):
"""Return information about the latest snapshot of an origin.
.. warning:: At most 1000 branches contained in the snapshot
will be returned for performance reasons.
Args:
origin_id: integer identifier of the origin
allowed_statuses: list of visit statuses considered
to find the latest snapshot for the visit. For instance,
``allowed_statuses=['full']`` will only consider visits that
have successfully run to completion.
Returns:
A dict filled with the snapshot content.
"""
snapshot = storage.snapshot_get_latest(origin_id, allowed_statuses)
return converters.from_snapshot(snapshot)
def lookup_revision_through(revision, limit=100):
"""Retrieve a revision from the criterion stored in revision dictionary.
Args:
revision: Dictionary of criterion to lookup the revision with.
Here are the supported combination of possible values:
- origin_id, branch_name, ts, sha1_git
- origin_id, branch_name, ts
- sha1_git_root, sha1_git
- sha1_git
Returns:
None if the revision is not found or the actual revision.
"""
if 'origin_id' in revision and \
'branch_name' in revision and \
'ts' in revision and \
'sha1_git' in revision:
return lookup_revision_with_context_by(revision['origin_id'],
revision['branch_name'],
revision['ts'],
revision['sha1_git'],
limit)
if 'origin_id' in revision and \
'branch_name' in revision and \
'ts' in revision:
return lookup_revision_by(revision['origin_id'],
revision['branch_name'],
revision['ts'])
if 'sha1_git_root' in revision and \
'sha1_git' in revision:
return lookup_revision_with_context(revision['sha1_git_root'],
revision['sha1_git'],
limit)
if 'sha1_git' in revision:
return lookup_revision(revision['sha1_git'])
# this should not happen
raise NotImplementedError('Should not happen!')
def lookup_directory_through_revision(revision, path=None,
limit=100, with_data=False):
"""Retrieve the directory information from the revision.
Args:
revision: dictionary of criterion representing a revision to lookup
path: directory's path to lookup.
limit: optional query parameter to limit the revisions log (default to
100). For now, note that this limit could impede the transitivity
conclusion about sha1_git not being an ancestor of.
with_data: indicate to retrieve the content's raw data if path resolves
to a content.
Returns:
The directory pointing to by the revision criterions at path.
"""
rev = lookup_revision_through(revision, limit)
if not rev:
raise NotFoundExc('Revision with criterion %s not found!' % revision)
return (rev['id'],
lookup_directory_with_revision(rev['id'], path, with_data))
def vault_cook(obj_type, obj_id, email=None):
"""Cook a vault bundle.
"""
return vault.cook(obj_type, obj_id, email=email)
def vault_fetch(obj_type, obj_id):
"""Fetch a vault bundle.
"""
return vault.fetch(obj_type, obj_id)
def vault_progress(obj_type, obj_id):
"""Get the current progress of a vault bundle.
"""
return vault.progress(obj_type, obj_id)
def diff_revision(rev_id):
"""Get the list of file changes (insertion / deletion / modification /
renaming) for a particular revision.
"""
rev_sha1_git_bin = _to_sha1_bin(rev_id)
changes = storage.diff_revision(rev_sha1_git_bin, track_renaming=True)
for change in changes:
change['from'] = converters.from_directory_entry(change['from'])
change['to'] = converters.from_directory_entry(change['to'])
if change['from_path']:
change['from_path'] = change['from_path'].decode('utf-8')
if change['to_path']:
change['to_path'] = change['to_path'].decode('utf-8')
return changes
class _RevisionsWalkerProxy(object):
"""
Proxy class wrapping a revisions walker iterator from
swh-storage and performing needed conversions.
"""
def __init__(self, rev_walker_type, rev_start, *args, **kwargs):
rev_start_bin = hashutil.hash_to_bytes(rev_start)
self.revisions_walker = \
revisions_walker.get_revisions_walker(rev_walker_type,
storage,
rev_start_bin,
*args, **kwargs)
def export_state(self):
return self.revisions_walker.export_state()
def __next__(self):
return converters.from_revision(next(self.revisions_walker))
def __iter__(self):
return self
def get_revisions_walker(rev_walker_type, rev_start, *args, **kwargs):
"""
Utility function to instantiate a revisions walker of a given type,
see :mod:`swh.storage.algos.revisions_walker`.
Args:
rev_walker_type (str): the type of revisions walker to return,
possible values are: ``committer_date``, ``dfs``, ``dfs_post``,
``bfs`` and ``path``
rev_start (str): hexadecimal representation of a revision identifier
args (list): position arguments to pass to the revisions walker
constructor
kwargs (dict): keyword arguments to pass to the revisions walker
constructor
"""
# first check if the provided revision is valid
lookup_revision(rev_start)
return _RevisionsWalkerProxy(rev_walker_type, rev_start, *args, **kwargs)
diff --git a/swh/web/tests/common/test_service.py b/swh/web/tests/common/test_service.py
index 2d79ed033..8a705df32 100644
--- a/swh/web/tests/common/test_service.py
+++ b/swh/web/tests/common/test_service.py
@@ -1,1915 +1,1925 @@
# Copyright (C) 2015-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
from unittest.mock import MagicMock, patch, call
from swh.model.hashutil import hash_to_bytes, hash_to_hex
from swh.web.common import service
from swh.web.common.exc import BadInputExc, NotFoundExc
from swh.web.tests.testcase import WebTestCase
class ServiceTestCase(WebTestCase):
def setUp(self):
self.BLAKE2S256_SAMPLE = ('685395c5dc57cada459364f0946d3dd45b'
'ad5fcbabc1048edb44380f1d31d0aa')
self.BLAKE2S256_SAMPLE_BIN = hash_to_bytes(self.BLAKE2S256_SAMPLE)
self.SHA1_SAMPLE = '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'
self.SHA1_SAMPLE_BIN = hash_to_bytes(self.SHA1_SAMPLE)
self.SHA256_SAMPLE = ('8abb0aa566452620ecce816eecdef4792d77a'
'293ad8ea82a4d5ecb4d36f7e560')
self.SHA256_SAMPLE_BIN = hash_to_bytes(self.SHA256_SAMPLE)
self.SHA1GIT_SAMPLE = '25d1a2e8f32937b0f498a5ca87f823d8df013c01'
self.SHA1GIT_SAMPLE_BIN = hash_to_bytes(self.SHA1GIT_SAMPLE)
self.DIRECTORY_ID = '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'
self.DIRECTORY_ID_BIN = hash_to_bytes(self.DIRECTORY_ID)
self.AUTHOR_ID_BIN = {
'name': b'author',
'email': b'author@company.org',
}
self.AUTHOR_ID = {
'name': 'author',
'email': 'author@company.org',
}
self.COMMITTER_ID_BIN = {
'name': b'committer',
'email': b'committer@corp.org',
}
self.COMMITTER_ID = {
'name': 'committer',
'email': 'committer@corp.org',
}
self.SAMPLE_DATE_RAW = {
'timestamp': datetime.datetime(
2000, 1, 17, 11, 23, 54,
tzinfo=datetime.timezone.utc,
).timestamp(),
'offset': 0,
'negative_utc': False,
}
self.SAMPLE_DATE = '2000-01-17T11:23:54+00:00'
self.SAMPLE_MESSAGE_BIN = b'elegant fix for bug 31415957'
self.SAMPLE_MESSAGE = 'elegant fix for bug 31415957'
self.SAMPLE_REVISION = {
'id': self.SHA1_SAMPLE,
'directory': self.DIRECTORY_ID,
'author': self.AUTHOR_ID,
'committer': self.COMMITTER_ID,
'message': self.SAMPLE_MESSAGE,
'date': self.SAMPLE_DATE,
'committer_date': self.SAMPLE_DATE,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': {},
'merge': False
}
self.SAMPLE_REVISION_RAW = {
'id': self.SHA1_SAMPLE_BIN,
'directory': self.DIRECTORY_ID_BIN,
'author': self.AUTHOR_ID_BIN,
'committer': self.COMMITTER_ID_BIN,
'message': self.SAMPLE_MESSAGE_BIN,
'date': self.SAMPLE_DATE_RAW,
'committer_date': self.SAMPLE_DATE_RAW,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
}
self.SAMPLE_CONTENT = {
'checksums': {
'blake2s256': self.BLAKE2S256_SAMPLE,
'sha1': self.SHA1_SAMPLE,
'sha256': self.SHA256_SAMPLE,
'sha1_git': self.SHA1GIT_SAMPLE,
},
'length': 190,
'status': 'absent'
}
self.SAMPLE_CONTENT_RAW = {
'blake2s256': self.BLAKE2S256_SAMPLE_BIN,
'sha1': self.SHA1_SAMPLE_BIN,
'sha256': self.SHA256_SAMPLE_BIN,
'sha1_git': self.SHA1GIT_SAMPLE_BIN,
'length': 190,
'status': 'hidden'
}
self.date_origin_visit1 = datetime.datetime(
2015, 1, 1, 22, 0, 0,
tzinfo=datetime.timezone.utc)
self.origin_visit1 = {
'date': self.date_origin_visit1,
'origin': 1,
'visit': 1
}
@patch('swh.web.common.service.storage')
def test_lookup_multiple_hashes_ball_missing(self, mock_storage):
# given
mock_storage.content_missing_per_sha1 = MagicMock(return_value=[])
# when
actual_lookup = service.lookup_multiple_hashes(
[{'filename': 'a',
'sha1': '456caf10e9535160d90e874b45aa426de762f19f'},
{'filename': 'b',
'sha1': '745bab676c8f3cec8016e0c39ea61cf57e518865'}])
# then
self.assertEqual(actual_lookup, [
{'filename': 'a',
'sha1': '456caf10e9535160d90e874b45aa426de762f19f',
'found': True},
{'filename': 'b',
'sha1': '745bab676c8f3cec8016e0c39ea61cf57e518865',
'found': True}
])
@patch('swh.web.common.service.storage')
def test_lookup_multiple_hashes_some_missing(self, mock_storage):
# given
mock_storage.content_missing_per_sha1 = MagicMock(return_value=[
hash_to_bytes('456caf10e9535160d90e874b45aa426de762f19f')
])
# when
actual_lookup = service.lookup_multiple_hashes(
[{'filename': 'a',
'sha1': '456caf10e9535160d90e874b45aa426de762f19f'},
{'filename': 'b',
'sha1': '745bab676c8f3cec8016e0c39ea61cf57e518865'}])
# then
self.assertEqual(actual_lookup, [
{'filename': 'a',
'sha1': '456caf10e9535160d90e874b45aa426de762f19f',
'found': False},
{'filename': 'b',
'sha1': '745bab676c8f3cec8016e0c39ea61cf57e518865',
'found': True}
])
@patch('swh.web.common.service.storage')
def test_lookup_hash_does_not_exist(self, mock_storage):
# given
mock_storage.content_find = MagicMock(return_value=None)
# when
actual_lookup = service.lookup_hash(
'sha1_git:123caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEqual({'found': None,
'algo': 'sha1_git'}, actual_lookup)
# check the function has been called with parameters
mock_storage.content_find.assert_called_with(
{'sha1_git':
hash_to_bytes('123caf10e9535160d90e874b45aa426de762f19f')})
@patch('swh.web.common.service.storage')
def test_lookup_hash_exist(self, mock_storage):
# given
stub_content = {
'sha1': hash_to_bytes(
'456caf10e9535160d90e874b45aa426de762f19f')
}
mock_storage.content_find = MagicMock(return_value=stub_content)
# when
actual_lookup = service.lookup_hash(
'sha1:456caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEqual({
'found': {
'checksums': {
'sha1': '456caf10e9535160d90e874b45aa426de762f19f'
}
},
'algo': 'sha1'
}, actual_lookup)
mock_storage.content_find.assert_called_with(
{'sha1':
hash_to_bytes('456caf10e9535160d90e874b45aa426de762f19f')}
)
@patch('swh.web.common.service.storage')
def test_search_hash_does_not_exist(self, mock_storage):
# given
mock_storage.content_find = MagicMock(return_value=None)
# when
actual_lookup = service.search_hash(
'sha1_git:123caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEqual({'found': False}, actual_lookup)
# check the function has been called with parameters
mock_storage.content_find.assert_called_with(
{'sha1_git':
hash_to_bytes('123caf10e9535160d90e874b45aa426de762f19f')})
@patch('swh.web.common.service.storage')
def test_search_hash_exist(self, mock_storage):
# given
stub_content = {
'sha1': hash_to_bytes(
'456caf10e9535160d90e874b45aa426de762f19f')
}
mock_storage.content_find = MagicMock(return_value=stub_content)
# when
actual_lookup = service.search_hash(
'sha1:456caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEqual({'found': True}, actual_lookup)
mock_storage.content_find.assert_called_with(
{'sha1':
hash_to_bytes('456caf10e9535160d90e874b45aa426de762f19f')},
)
@patch('swh.web.common.service.idx_storage')
def test_lookup_content_ctags(self, mock_idx_storage):
# given
mock_idx_storage.content_ctags_get = MagicMock(
return_value=[{
'id': hash_to_bytes(
'123caf10e9535160d90e874b45aa426de762f19f'),
'line': 100,
'name': 'hello',
'kind': 'function',
'tool_name': 'ctags',
'tool_version': 'some-version',
}])
expected_ctags = [{
'id': '123caf10e9535160d90e874b45aa426de762f19f',
'line': 100,
'name': 'hello',
'kind': 'function',
'tool_name': 'ctags',
'tool_version': 'some-version',
}]
# when
actual_ctags = list(service.lookup_content_ctags(
'sha1:123caf10e9535160d90e874b45aa426de762f19f'))
# then
self.assertEqual(actual_ctags, expected_ctags)
mock_idx_storage.content_ctags_get.assert_called_with(
[hash_to_bytes('123caf10e9535160d90e874b45aa426de762f19f')])
@patch('swh.web.common.service.idx_storage')
def test_lookup_content_ctags_no_hash(self, mock_idx_storage):
# given
mock_idx_storage.content_ctags_get = MagicMock(return_value=[])
# when
actual_ctags = list(service.lookup_content_ctags(
'sha1:123caf10e9535160d90e874b45aa426de762f19f'))
# then
self.assertEqual(actual_ctags, [])
@patch('swh.web.common.service.idx_storage')
def test_lookup_content_filetype(self, mock_idx_storage):
# given
mock_idx_storage.content_mimetype_get = MagicMock(
return_value=[{
'id': hash_to_bytes(
'123caf10e9535160d90e874b45aa426de762f19f'),
'mimetype': 'text/x-c++',
'encoding': 'us-ascii',
}])
expected_filetype = {
'id': '123caf10e9535160d90e874b45aa426de762f19f',
'mimetype': 'text/x-c++',
'encoding': 'us-ascii',
}
# when
actual_filetype = service.lookup_content_filetype(
'sha1:123caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEqual(actual_filetype, expected_filetype)
mock_idx_storage.content_mimetype_get.assert_called_with(
[hash_to_bytes('123caf10e9535160d90e874b45aa426de762f19f')])
@patch('swh.web.common.service.idx_storage')
@patch('swh.web.common.service.storage')
def test_lookup_content_filetype_2(self, mock_storage, mock_idx_storage):
# given
mock_storage.content_find = MagicMock(
return_value={
'sha1': hash_to_bytes(
'123caf10e9535160d90e874b45aa426de762f19f')
}
)
mock_idx_storage.content_mimetype_get = MagicMock(
return_value=[{
'id': hash_to_bytes(
'123caf10e9535160d90e874b45aa426de762f19f'),
'mimetype': 'text/x-python',
'encoding': 'us-ascii',
}]
)
expected_filetype = {
'id': '123caf10e9535160d90e874b45aa426de762f19f',
'mimetype': 'text/x-python',
'encoding': 'us-ascii',
}
# when
actual_filetype = service.lookup_content_filetype(
'sha1_git:456caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEqual(actual_filetype, expected_filetype)
mock_storage.content_find(
'sha1_git', hash_to_bytes(
'456caf10e9535160d90e874b45aa426de762f19f')
)
mock_idx_storage.content_mimetype_get.assert_called_with(
[hash_to_bytes('123caf10e9535160d90e874b45aa426de762f19f')])
@patch('swh.web.common.service.idx_storage')
def test_lookup_content_language(self, mock_idx_storage):
# given
mock_idx_storage.content_language_get = MagicMock(
return_value=[{
'id': hash_to_bytes(
'123caf10e9535160d90e874b45aa426de762f19f'),
'lang': 'python',
}])
expected_language = {
'id': '123caf10e9535160d90e874b45aa426de762f19f',
'lang': 'python',
}
# when
actual_language = service.lookup_content_language(
'sha1:123caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEqual(actual_language, expected_language)
mock_idx_storage.content_language_get.assert_called_with(
[hash_to_bytes('123caf10e9535160d90e874b45aa426de762f19f')])
@patch('swh.web.common.service.idx_storage')
@patch('swh.web.common.service.storage')
def test_lookup_content_language_2(self, mock_storage, mock_idx_storage):
# given
mock_storage.content_find = MagicMock(
return_value={
'sha1': hash_to_bytes(
'123caf10e9535160d90e874b45aa426de762f19f')
}
)
mock_idx_storage.content_language_get = MagicMock(
return_value=[{
'id': hash_to_bytes(
'123caf10e9535160d90e874b45aa426de762f19f'),
'lang': 'haskell',
}]
)
expected_language = {
'id': '123caf10e9535160d90e874b45aa426de762f19f',
'lang': 'haskell',
}
# when
actual_language = service.lookup_content_language(
'sha1_git:456caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEqual(actual_language, expected_language)
mock_storage.content_find(
'sha1_git', hash_to_bytes(
'456caf10e9535160d90e874b45aa426de762f19f')
)
mock_idx_storage.content_language_get.assert_called_with(
[hash_to_bytes('123caf10e9535160d90e874b45aa426de762f19f')])
@patch('swh.web.common.service.idx_storage')
def test_lookup_expression(self, mock_idx_storage):
# given
mock_idx_storage.content_ctags_search = MagicMock(
return_value=[{
'id': hash_to_bytes(
'123caf10e9535160d90e874b45aa426de762f19f'),
'name': 'foobar',
'kind': 'variable',
'lang': 'C',
'line': 10
}])
expected_ctags = [{
'sha1': '123caf10e9535160d90e874b45aa426de762f19f',
'name': 'foobar',
'kind': 'variable',
'lang': 'C',
'line': 10
}]
# when
actual_ctags = list(service.lookup_expression(
'foobar', last_sha1='hash', per_page=10))
# then
self.assertEqual(actual_ctags, expected_ctags)
mock_idx_storage.content_ctags_search.assert_called_with(
'foobar', last_sha1='hash', limit=10)
@patch('swh.web.common.service.idx_storage')
def test_lookup_expression_no_result(self, mock_idx_storage):
# given
mock_idx_storage.content_ctags_search = MagicMock(
return_value=[])
expected_ctags = []
# when
actual_ctags = list(service.lookup_expression(
'barfoo', last_sha1='hash', per_page=10))
# then
self.assertEqual(actual_ctags, expected_ctags)
mock_idx_storage.content_ctags_search.assert_called_with(
'barfoo', last_sha1='hash', limit=10)
@patch('swh.web.common.service.idx_storage')
def test_lookup_content_license(self, mock_idx_storage):
# given
mock_idx_storage.content_fossology_license_get = MagicMock(
return_value=[{
hash_to_bytes('123caf10e9535160d90e874b45aa426de762f19f'): [{
'licenses': ['GPL-3.0+'],
'tool': {}
}]
}])
expected_license = {
'id': '123caf10e9535160d90e874b45aa426de762f19f',
'facts': [{
'licenses': ['GPL-3.0+'],
'tool': {}
}]
}
# when
actual_license = service.lookup_content_license(
'sha1:123caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEqual(actual_license, expected_license)
mock_idx_storage.content_fossology_license_get.assert_called_with(
[hash_to_bytes('123caf10e9535160d90e874b45aa426de762f19f')])
@patch('swh.web.common.service.idx_storage')
@patch('swh.web.common.service.storage')
def test_lookup_content_license_2(self, mock_storage, mock_idx_storage):
# given
mock_storage.content_find = MagicMock(
return_value={
'sha1': hash_to_bytes(
'123caf10e9535160d90e874b45aa426de762f19f')
}
)
mock_idx_storage.content_fossology_license_get = MagicMock(
return_value=[{
hash_to_bytes('123caf10e9535160d90e874b45aa426de762f19f'): [{
'licenses': ['BSD-2-Clause'],
'tool': {}
}]
}]
)
expected_license = {
'id': '123caf10e9535160d90e874b45aa426de762f19f',
'facts': [{
'licenses': ['BSD-2-Clause'],
'tool': {}
}]
}
# when
actual_license = service.lookup_content_license(
'sha1_git:456caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEqual(actual_license, expected_license)
mock_storage.content_find(
'sha1_git', hash_to_bytes(
'456caf10e9535160d90e874b45aa426de762f19f')
)
mock_idx_storage.content_fossology_license_get.assert_called_with(
[hash_to_bytes('123caf10e9535160d90e874b45aa426de762f19f')])
@patch('swh.web.common.service.storage')
def test_stat_counters(self, mock_storage):
# given
input_stats = {
"content": 1770830,
"directory": 211683,
"directory_entry_dir": 209167,
"directory_entry_file": 1807094,
"directory_entry_rev": 0,
"origin": 1096,
"person": 0,
"release": 8584,
"revision": 7792,
"revision_history": 0,
"skipped_content": 0
}
mock_storage.stat_counters = MagicMock(return_value=input_stats)
# when
actual_stats = service.stat_counters()
# then
expected_stats = input_stats
self.assertEqual(actual_stats, expected_stats)
mock_storage.stat_counters.assert_called_with()
@patch('swh.web.common.service._lookup_origin_visits')
def test_lookup_origin_visits(self, mock_lookup_visits):
# given
date_origin_visit2 = datetime.datetime(
2013, 7, 1, 20, 0, 0,
tzinfo=datetime.timezone.utc)
date_origin_visit3 = datetime.datetime(
2015, 1, 1, 21, 0, 0,
tzinfo=datetime.timezone.utc)
stub_result = [self.origin_visit1, {
'date': date_origin_visit2,
'origin': 1,
'visit': 2,
'target': hash_to_bytes(
'65a55bbdf3629f916219feb3dcc7393ded1bc8db'),
'branch': b'master',
'target_type': 'release',
'metadata': None,
}, {
'date': date_origin_visit3,
'origin': 1,
'visit': 3
}]
mock_lookup_visits.return_value = stub_result
# when
expected_origin_visits = [{
'date': self.origin_visit1['date'].isoformat(),
'origin': self.origin_visit1['origin'],
'visit': self.origin_visit1['visit']
}, {
'date': date_origin_visit2.isoformat(),
'origin': 1,
'visit': 2,
'target': '65a55bbdf3629f916219feb3dcc7393ded1bc8db',
'branch': 'master',
'target_type': 'release',
'metadata': {},
}, {
'date': date_origin_visit3.isoformat(),
'origin': 1,
'visit': 3
}]
actual_origin_visits = service.lookup_origin_visits(6)
# then
self.assertEqual(list(actual_origin_visits), expected_origin_visits)
mock_lookup_visits.assert_called_once_with(
6, last_visit=None, limit=10)
@patch('swh.web.common.service.storage')
def test_lookup_origin_visit(self, mock_storage):
# given
stub_result = self.origin_visit1
mock_storage.origin_visit_get_by.return_value = stub_result
expected_origin_visit = {
'date': self.origin_visit1['date'].isoformat(),
'origin': self.origin_visit1['origin'],
'visit': self.origin_visit1['visit']
}
# when
actual_origin_visit = service.lookup_origin_visit(1, 1)
# then
self.assertEqual(actual_origin_visit, expected_origin_visit)
mock_storage.origin_visit_get_by.assert_called_once_with(1, 1)
@patch('swh.web.common.service.storage')
def test_lookup_origin(self, mock_storage):
# given
mock_storage.origin_get = MagicMock(return_value={
'id': 'origin-id',
'url': 'ftp://some/url/to/origin',
'type': 'ftp'})
# when
actual_origin = service.lookup_origin({'id': 'origin-id'})
# then
self.assertEqual(actual_origin, {'id': 'origin-id',
'url': 'ftp://some/url/to/origin',
'type': 'ftp'})
mock_storage.origin_get.assert_called_with({'id': 'origin-id'})
@patch('swh.web.common.service.storage')
def test_lookup_release_ko_id_checksum_not_a_sha1(self, mock_storage):
# given
mock_storage.release_get = MagicMock()
with self.assertRaises(BadInputExc) as cm:
# when
service.lookup_release('not-a-sha1')
self.assertIn('invalid checksum', cm.exception.args[0].lower())
mock_storage.release_get.called = False
@patch('swh.web.common.service.storage')
def test_lookup_release_ko_id_checksum_too_long(self, mock_storage):
# given
mock_storage.release_get = MagicMock()
# when
with self.assertRaises(BadInputExc) as cm:
service.lookup_release(
'13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f62d4daf5'
'1aea892abe')
self.assertEqual('Only sha1_git is supported.', cm.exception.args[0])
mock_storage.release_get.called = False
@patch('swh.web.common.service.storage')
def test_lookup_directory_with_path_not_found(self, mock_storage):
# given
mock_storage.lookup_directory_with_path = MagicMock(return_value=None)
sha1_git = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
# when
actual_directory = mock_storage.lookup_directory_with_path(
sha1_git, 'some/path/here')
self.assertIsNone(actual_directory)
@patch('swh.web.common.service.storage')
def test_lookup_directory_with_path_found(self, mock_storage):
# given
sha1_git = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
entry = {'id': 'dir-id',
'type': 'dir',
'name': 'some/path/foo'}
mock_storage.lookup_directory_with_path = MagicMock(return_value=entry)
# when
actual_directory = mock_storage.lookup_directory_with_path(
sha1_git, 'some/path/here')
self.assertEqual(entry, actual_directory)
@patch('swh.web.common.service.storage')
def test_lookup_release(self, mock_storage):
# given
mock_storage.release_get = MagicMock(return_value=[{
'id': hash_to_bytes('65a55bbdf3629f916219feb3dcc7393ded1bc8db'),
'target': None,
'date': {
'timestamp': datetime.datetime(
2015, 1, 1, 22, 0, 0,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': True,
},
'name': b'v0.0.1',
'message': b'synthetic release',
'synthetic': True,
}])
# when
actual_release = service.lookup_release(
'65a55bbdf3629f916219feb3dcc7393ded1bc8db')
# then
self.assertEqual(actual_release, {
'id': '65a55bbdf3629f916219feb3dcc7393ded1bc8db',
'target': None,
'date': '2015-01-01T22:00:00-00:00',
'name': 'v0.0.1',
'message': 'synthetic release',
'synthetic': True,
})
mock_storage.release_get.assert_called_with(
[hash_to_bytes('65a55bbdf3629f916219feb3dcc7393ded1bc8db')])
def test_lookup_revision_with_context_ko_not_a_sha1_1(self):
# given
sha1_git = '13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f62d4' \
'daf51aea892abe'
sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
# when
with self.assertRaises(BadInputExc) as cm:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
self.assertIn('Only sha1_git is supported', cm.exception.args[0])
def test_lookup_revision_with_context_ko_not_a_sha1_2(self):
# given
sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
sha1_git = '13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f6' \
'2d4daf51aea892abe'
# when
with self.assertRaises(BadInputExc) as cm:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
self.assertIn('Only sha1_git is supported', cm.exception.args[0])
@patch('swh.web.common.service.storage')
def test_lookup_revision_with_context_ko_sha1_git_does_not_exist(
self,
mock_storage):
# given
sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
sha1_git = '777777bdf3629f916219feb3dcc7393ded1bc8db'
sha1_git_bin = hash_to_bytes(sha1_git)
mock_storage.revision_get.return_value = None
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
self.assertIn('Revision 777777bdf3629f916219feb3dcc7393ded1bc8db'
' not found', cm.exception.args[0])
mock_storage.revision_get.assert_called_once_with(
[sha1_git_bin])
@patch('swh.web.common.service.storage')
def test_lookup_revision_with_context_ko_root_sha1_git_does_not_exist(
self,
mock_storage):
# given
sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
sha1_git = '777777bdf3629f916219feb3dcc7393ded1bc8db'
sha1_git_root_bin = hash_to_bytes(sha1_git_root)
sha1_git_bin = hash_to_bytes(sha1_git)
mock_storage.revision_get.side_effect = ['foo', None]
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
self.assertIn('Revision root 65a55bbdf3629f916219feb3dcc7393ded1bc8db'
' not found', cm.exception.args[0])
mock_storage.revision_get.assert_has_calls([call([sha1_git_bin]),
call([sha1_git_root_bin])])
@patch('swh.web.common.service.storage')
@patch('swh.web.common.service.query')
def test_lookup_revision_with_context(self, mock_query, mock_storage):
# given
sha1_git_root = '666'
sha1_git = '883'
sha1_git_root_bin = b'666'
sha1_git_bin = b'883'
sha1_git_root_dict = {
'id': sha1_git_root_bin,
'parents': [b'999'],
}
sha1_git_dict = {
'id': sha1_git_bin,
'parents': [],
'directory': b'278',
}
stub_revisions = [
sha1_git_root_dict,
{
'id': b'999',
'parents': [b'777', b'883', b'888'],
},
{
'id': b'777',
'parents': [b'883'],
},
sha1_git_dict,
{
'id': b'888',
'parents': [b'889'],
},
{
'id': b'889',
'parents': [],
},
]
# inputs ok
mock_query.parse_hash_with_algorithms_or_throws.side_effect = [
('sha1', sha1_git_bin),
('sha1', sha1_git_root_bin)
]
# lookup revision first 883, then 666 (both exists)
mock_storage.revision_get.return_value = [
sha1_git_dict,
sha1_git_root_dict
]
mock_storage.revision_log = MagicMock(
return_value=stub_revisions)
# when
actual_revision = service.lookup_revision_with_context(
sha1_git_root,
sha1_git)
# then
self.assertEqual(actual_revision, {
'id': hash_to_hex(sha1_git_bin),
'parents': [],
'children': [hash_to_hex(b'999'), hash_to_hex(b'777')],
'directory': hash_to_hex(b'278'),
'merge': False
})
mock_query.parse_hash_with_algorithms_or_throws.assert_has_calls(
[call(sha1_git, ['sha1'], 'Only sha1_git is supported.'),
call(sha1_git_root, ['sha1'], 'Only sha1_git is supported.')])
mock_storage.revision_log.assert_called_with(
[sha1_git_root_bin], 100)
@patch('swh.web.common.service.storage')
@patch('swh.web.common.service.query')
def test_lookup_revision_with_context_retrieved_as_dict(
self, mock_query, mock_storage):
# given
sha1_git = '883'
sha1_git_root_bin = b'666'
sha1_git_bin = b'883'
sha1_git_root_dict = {
'id': sha1_git_root_bin,
'parents': [b'999'],
}
sha1_git_dict = {
'id': sha1_git_bin,
'parents': [],
'directory': b'278',
}
stub_revisions = [
sha1_git_root_dict,
{
'id': b'999',
'parents': [b'777', b'883', b'888'],
},
{
'id': b'777',
'parents': [b'883'],
},
sha1_git_dict,
{
'id': b'888',
'parents': [b'889'],
},
{
'id': b'889',
'parents': [],
},
]
# inputs ok
mock_query.parse_hash_with_algorithms_or_throws.return_value = (
'sha1', sha1_git_bin)
# lookup only on sha1
mock_storage.revision_get.return_value = [sha1_git_dict]
mock_storage.revision_log.return_value = stub_revisions
# when
actual_revision = service.lookup_revision_with_context(
{'id': sha1_git_root_bin},
sha1_git)
# then
self.assertEqual(actual_revision, {
'id': hash_to_hex(sha1_git_bin),
'parents': [],
'children': [hash_to_hex(b'999'), hash_to_hex(b'777')],
'directory': hash_to_hex(b'278'),
'merge': False
})
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with( # noqa
sha1_git, ['sha1'], 'Only sha1_git is supported.')
mock_storage.revision_get.assert_called_once_with([sha1_git_bin])
mock_storage.revision_log.assert_called_with(
[sha1_git_root_bin], 100)
@patch('swh.web.common.service.storage')
@patch('swh.web.common.service.query')
def test_lookup_directory_with_revision_not_found(self,
mock_query,
mock_storage):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
mock_storage.revision_get.return_value = None
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_directory_with_revision('123')
self.assertIn('Revision 123 not found', cm.exception.args[0])
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_storage.revision_get.assert_called_once_with([b'123'])
@patch('swh.web.common.service.storage')
@patch('swh.web.common.service.query')
def test_lookup_directory_with_revision_ko_revision_with_path_to_nowhere(
self,
mock_query,
mock_storage):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
dir_id = b'dir-id-as-sha1'
mock_storage.revision_get.return_value = [{
'directory': dir_id,
}]
mock_storage.directory_entry_get_by_path.return_value = None
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_directory_with_revision(
'123',
'path/to/something/unknown')
exception_text = cm.exception.args[0].lower()
self.assertIn('directory or file', exception_text)
self.assertIn('path/to/something/unknown', exception_text)
self.assertIn('revision 123', exception_text)
self.assertIn('not found', exception_text)
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_storage.revision_get.assert_called_once_with([b'123'])
mock_storage.directory_entry_get_by_path.assert_called_once_with(
b'dir-id-as-sha1', [b'path', b'to', b'something', b'unknown'])
@patch('swh.web.common.service.storage')
@patch('swh.web.common.service.query')
def test_lookup_directory_with_revision_ko_type_not_implemented(
self,
mock_query,
mock_storage):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
dir_id = b'dir-id-as-sha1'
mock_storage.revision_get.return_value = [{
'directory': dir_id,
}]
mock_storage.directory_entry_get_by_path.return_value = {
'type': 'rev',
'name': b'some/path/to/rev',
'target': b'456'
}
stub_content = {
'id': b'12',
'type': 'file'
}
mock_storage.content_get.return_value = stub_content
# when
with self.assertRaises(NotImplementedError) as cm:
service.lookup_directory_with_revision(
'123',
'some/path/to/rev')
self.assertIn("Entity of type rev not implemented.",
cm.exception.args[0])
# then
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_storage.revision_get.assert_called_once_with([b'123'])
mock_storage.directory_entry_get_by_path.assert_called_once_with(
b'dir-id-as-sha1', [b'some', b'path', b'to', b'rev'])
@patch('swh.web.common.service.storage')
@patch('swh.web.common.service.query')
def test_lookup_directory_with_revision_revision_without_path(
self, mock_query, mock_storage,
):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
dir_id = b'dir-id-as-sha1'
mock_storage.revision_get.return_value = [{
'directory': dir_id,
}]
stub_dir_entries = [{
'id': b'123',
'type': 'dir'
}, {
'id': b'456',
'type': 'file'
}]
mock_storage.directory_ls.return_value = stub_dir_entries
# when
actual_directory_entries = service.lookup_directory_with_revision(
'123')
self.assertEqual(actual_directory_entries['type'], 'dir')
self.assertEqual(list(actual_directory_entries['content']),
stub_dir_entries)
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_storage.revision_get.assert_called_once_with([b'123'])
mock_storage.directory_ls.assert_called_once_with(dir_id)
@patch('swh.web.common.service.storage')
@patch('swh.web.common.service.query')
def test_lookup_directory_with_revision_with_path_to_dir(self,
mock_query,
mock_storage):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
dir_id = b'dir-id-as-sha1'
mock_storage.revision_get.return_value = [{
'directory': dir_id,
}]
stub_dir_entries = [{
'id': b'12',
'type': 'dir'
}, {
'id': b'34',
'type': 'file'
}]
mock_storage.directory_entry_get_by_path.return_value = {
'type': 'dir',
'name': b'some/path',
'target': b'456'
}
mock_storage.directory_ls.return_value = stub_dir_entries
# when
actual_directory_entries = service.lookup_directory_with_revision(
'123',
'some/path')
self.assertEqual(actual_directory_entries['type'], 'dir')
self.assertEqual(actual_directory_entries['revision'], '123')
self.assertEqual(actual_directory_entries['path'], 'some/path')
self.assertEqual(list(actual_directory_entries['content']),
stub_dir_entries)
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_storage.revision_get.assert_called_once_with([b'123'])
mock_storage.directory_entry_get_by_path.assert_called_once_with(
dir_id,
[b'some', b'path'])
mock_storage.directory_ls.assert_called_once_with(b'456')
@patch('swh.web.common.service.storage')
@patch('swh.web.common.service.query')
def test_lookup_directory_with_revision_with_path_to_file_wo_data(
self,
mock_query,
mock_storage):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
dir_id = b'dir-id-as-sha1'
mock_storage.revision_get.return_value = [{
'directory': dir_id,
}]
mock_storage.directory_entry_get_by_path.return_value = {
'type': 'file',
'name': b'some/path/to/file',
'target': b'789'
}
stub_content = {
'status': 'visible',
}
mock_storage.content_find.return_value = stub_content
# when
actual_content = service.lookup_directory_with_revision(
'123',
'some/path/to/file')
# then
self.assertEqual(actual_content, {'type': 'file',
'revision': '123',
'path': 'some/path/to/file',
'content': stub_content})
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_storage.revision_get.assert_called_once_with([b'123'])
mock_storage.directory_entry_get_by_path.assert_called_once_with(
b'dir-id-as-sha1', [b'some', b'path', b'to', b'file'])
mock_storage.content_find.assert_called_once_with({'sha1_git': b'789'})
@patch('swh.web.common.service.storage')
@patch('swh.web.common.service.query')
def test_lookup_directory_with_revision_with_path_to_file_w_data(
self,
mock_query,
mock_storage):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
dir_id = b'dir-id-as-sha1'
mock_storage.revision_get.return_value = [{
'directory': dir_id,
}]
mock_storage.directory_entry_get_by_path.return_value = {
'type': 'file',
'name': b'some/path/to/file',
'target': b'789'
}
stub_content = {
'status': 'visible',
'sha1': b'content-sha1'
}
mock_storage.content_find.return_value = stub_content
mock_storage.content_get.return_value = [{
'sha1': b'content-sha1',
'data': b'some raw data'
}]
expected_content = {
'status': 'visible',
'checksums': {
'sha1': hash_to_hex(b'content-sha1'),
},
'data': b'some raw data'
}
# when
actual_content = service.lookup_directory_with_revision(
'123',
'some/path/to/file',
with_data=True)
# then
self.assertEqual(actual_content, {'type': 'file',
'revision': '123',
'path': 'some/path/to/file',
'content': expected_content})
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_storage.revision_get.assert_called_once_with([b'123'])
mock_storage.directory_entry_get_by_path.assert_called_once_with(
b'dir-id-as-sha1', [b'some', b'path', b'to', b'file'])
mock_storage.content_find.assert_called_once_with({'sha1_git': b'789'})
mock_storage.content_get.assert_called_once_with([b'content-sha1'])
@patch('swh.web.common.service.storage')
def test_lookup_revision(self, mock_storage):
# given
mock_storage.revision_get = MagicMock(
return_value=[self.SAMPLE_REVISION_RAW])
# when
actual_revision = service.lookup_revision(
self.SHA1_SAMPLE)
# then
self.assertEqual(actual_revision, self.SAMPLE_REVISION)
mock_storage.revision_get.assert_called_with(
[self.SHA1_SAMPLE_BIN])
@patch('swh.web.common.service.storage')
def test_lookup_revision_invalid_msg(self, mock_storage):
# given
stub_rev = self.SAMPLE_REVISION_RAW
stub_rev['message'] = b'elegant fix for bug \xff'
expected_revision = self.SAMPLE_REVISION
expected_revision['message'] = None
expected_revision['message_decoding_failed'] = True
mock_storage.revision_get = MagicMock(return_value=[stub_rev])
# when
actual_revision = service.lookup_revision(
self.SHA1_SAMPLE)
# then
self.assertEqual(actual_revision, expected_revision)
mock_storage.revision_get.assert_called_with(
[self.SHA1_SAMPLE_BIN])
@patch('swh.web.common.service.storage')
def test_lookup_revision_msg_ok(self, mock_storage):
# given
mock_storage.revision_get.return_value = [self.SAMPLE_REVISION_RAW]
# when
rv = service.lookup_revision_message(
self.SHA1_SAMPLE)
# then
self.assertEqual(rv, {'message': self.SAMPLE_MESSAGE_BIN})
mock_storage.revision_get.assert_called_with(
[self.SHA1_SAMPLE_BIN])
@patch('swh.web.common.service.storage')
def test_lookup_revision_msg_absent(self, mock_storage):
# given
stub_revision = self.SAMPLE_REVISION_RAW
del stub_revision['message']
mock_storage.revision_get.return_value = stub_revision
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_revision_message(
self.SHA1_SAMPLE)
# then
mock_storage.revision_get.assert_called_with(
[self.SHA1_SAMPLE_BIN])
self.assertEqual(
cm.exception.args[0],
'No message for revision with sha1_git %s.' % self.SHA1_SAMPLE,
)
@patch('swh.web.common.service.storage')
def test_lookup_revision_msg_norev(self, mock_storage):
# given
mock_storage.revision_get.return_value = None
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_revision_message(
self.SHA1_SAMPLE)
# then
mock_storage.revision_get.assert_called_with(
[self.SHA1_SAMPLE_BIN])
self.assertEqual(
cm.exception.args[0],
'Revision with sha1_git %s not found.' % self.SHA1_SAMPLE,
)
@patch('swh.web.common.service.storage')
def test_lookup_revision_multiple(self, mock_storage):
# given
sha1 = self.SHA1_SAMPLE
sha1_other = 'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'
stub_revisions = [
self.SAMPLE_REVISION_RAW,
{
'id': hash_to_bytes(sha1_other),
'directory': 'abcdbe353ed3480476f032475e7c233eff7371d5',
'author': {
'name': b'name',
'email': b'name@surname.org',
},
'committer': {
'name': b'name',
'email': b'name@surname.org',
},
'message': b'ugly fix for bug 42',
'date': {
'timestamp': datetime.datetime(
2000, 1, 12, 5, 23, 54,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': False
},
'date_offset': 0,
'committer_date': {
'timestamp': datetime.datetime(
2000, 1, 12, 5, 23, 54,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': False
},
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
}
]
mock_storage.revision_get.return_value = stub_revisions
# when
actual_revisions = service.lookup_revision_multiple(
[sha1, sha1_other])
# then
self.assertEqual(list(actual_revisions), [
self.SAMPLE_REVISION,
{
'id': sha1_other,
'directory': 'abcdbe353ed3480476f032475e7c233eff7371d5',
'author': {
'name': 'name',
'email': 'name@surname.org',
},
'committer': {
'name': 'name',
'email': 'name@surname.org',
},
'message': 'ugly fix for bug 42',
'date': '2000-01-12T05:23:54+00:00',
'date_offset': 0,
'committer_date': '2000-01-12T05:23:54+00:00',
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': {},
'merge': False
}
])
self.assertEqual(
list(mock_storage.revision_get.call_args[0][0]),
[hash_to_bytes(sha1),
hash_to_bytes(sha1_other)])
@patch('swh.web.common.service.storage')
def test_lookup_revision_multiple_none_found(self, mock_storage):
# given
sha1_bin = self.SHA1_SAMPLE
sha1_other = 'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'
mock_storage.revision_get.return_value = []
# then
actual_revisions = service.lookup_revision_multiple(
[sha1_bin, sha1_other])
self.assertEqual(list(actual_revisions), [])
self.assertEqual(
list(mock_storage.revision_get.call_args[0][0]),
[hash_to_bytes(self.SHA1_SAMPLE),
hash_to_bytes(sha1_other)])
@patch('swh.web.common.service.storage')
def test_lookup_revision_log(self, mock_storage):
# given
stub_revision_log = [self.SAMPLE_REVISION_RAW]
mock_storage.revision_log = MagicMock(return_value=stub_revision_log)
# when
actual_revision = service.lookup_revision_log(
'abcdbe353ed3480476f032475e7c233eff7371d5',
limit=25)
# then
self.assertEqual(list(actual_revision), [self.SAMPLE_REVISION])
mock_storage.revision_log.assert_called_with(
[hash_to_bytes('abcdbe353ed3480476f032475e7c233eff7371d5')], 25)
@patch('swh.web.common.service.lookup_revision_log')
@patch('swh.web.common.service.lookup_snapshot')
@patch('swh.web.common.service.get_origin_visit')
def test_lookup_revision_log_by(self, mock_get_origin_visit,
mock_lookup_snapshot,
mock_lookup_revision_log):
# given
mock_get_origin_visit.return_value = {'snapshot': self.SHA1_SAMPLE}
mock_lookup_snapshot.return_value = \
{
'branches': {
'refs/heads/master': {
'target_type': 'revision',
'target': self.SAMPLE_REVISION['id']
}
}
}
mock_lookup_revision_log.return_value = [self.SAMPLE_REVISION]
# when
actual_log = service.lookup_revision_log_by(
1, 'refs/heads/master', None, limit=100)
# then
self.assertEqual(list(actual_log), [self.SAMPLE_REVISION])
@patch('swh.web.common.service.lookup_snapshot')
@patch('swh.web.common.service.get_origin_visit')
def test_lookup_revision_log_by_notfound(self, mock_get_origin_visit,
mock_lookup_snapshot):
# given
mock_get_origin_visit.return_value = {'snapshot': self.SHA1_SAMPLE}
mock_lookup_snapshot.return_value = {'branches': {}}
# when
with self.assertRaises(NotFoundExc):
service.lookup_revision_log_by(
1, 'refs/heads/master', None, limit=100)
@patch('swh.web.common.service.storage')
def test_lookup_content_raw_not_found(self, mock_storage):
# given
mock_storage.content_find = MagicMock(return_value=None)
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_content_raw('sha1:' + self.SHA1_SAMPLE)
self.assertIn(cm.exception.args[0],
'Content with %s checksum equals to %s not found!' %
('sha1', self.SHA1_SAMPLE))
mock_storage.content_find.assert_called_with(
{'sha1': hash_to_bytes(self.SHA1_SAMPLE)})
@patch('swh.web.common.service.storage')
def test_lookup_content_raw(self, mock_storage):
# given
mock_storage.content_find = MagicMock(return_value={
'sha1': self.SHA1_SAMPLE,
})
mock_storage.content_get = MagicMock(return_value=[{
'data': b'binary data'}])
# when
actual_content = service.lookup_content_raw(
'sha256:%s' % self.SHA256_SAMPLE)
# then
self.assertEqual(actual_content, {'data': b'binary data'})
mock_storage.content_find.assert_called_once_with(
{'sha256': self.SHA256_SAMPLE_BIN})
mock_storage.content_get.assert_called_once_with(
[hash_to_bytes(self.SHA1_SAMPLE)])
@patch('swh.web.common.service.storage')
def test_lookup_content_not_found(self, mock_storage):
# given
mock_storage.content_find = MagicMock(return_value=None)
# when
with self.assertRaises(NotFoundExc) as cm:
# then
service.lookup_content('sha1:%s' % self.SHA1_SAMPLE)
self.assertIn(cm.exception.args[0],
'Content with %s checksum equals to %s not found!' %
('sha1', self.SHA1_SAMPLE))
mock_storage.content_find.assert_called_with(
{'sha1': self.SHA1_SAMPLE_BIN})
@patch('swh.web.common.service.storage')
def test_lookup_content_with_sha1(self, mock_storage):
# given
mock_storage.content_find = MagicMock(
return_value=self.SAMPLE_CONTENT_RAW)
# when
actual_content = service.lookup_content(
'sha1:%s' % self.SHA1_SAMPLE)
# then
self.assertEqual(actual_content, self.SAMPLE_CONTENT)
mock_storage.content_find.assert_called_with(
{'sha1': hash_to_bytes(self.SHA1_SAMPLE)})
@patch('swh.web.common.service.storage')
def test_lookup_content_with_sha256(self, mock_storage):
# given
stub_content = self.SAMPLE_CONTENT_RAW
stub_content['status'] = 'visible'
expected_content = self.SAMPLE_CONTENT
expected_content['status'] = 'visible'
mock_storage.content_find = MagicMock(
return_value=stub_content)
# when
actual_content = service.lookup_content(
'sha256:%s' % self.SHA256_SAMPLE)
# then
self.assertEqual(actual_content, expected_content)
mock_storage.content_find.assert_called_with(
{'sha256': self.SHA256_SAMPLE_BIN})
@patch('swh.web.common.service.storage')
def test_lookup_person(self, mock_storage):
# given
mock_storage.person_get = MagicMock(return_value=[{
'id': 'person_id',
'name': b'some_name',
'email': b'some-email',
}])
# when
actual_person = service.lookup_person('person_id')
# then
self.assertEqual(actual_person, {
'id': 'person_id',
'name': 'some_name',
'email': 'some-email',
})
mock_storage.person_get.assert_called_with(['person_id'])
@patch('swh.web.common.service.storage')
def test_lookup_directory_bad_checksum(self, mock_storage):
# given
mock_storage.directory_ls = MagicMock()
# when
with self.assertRaises(BadInputExc):
service.lookup_directory('directory_id')
# then
mock_storage.directory_ls.called = False
@patch('swh.web.common.service.storage')
@patch('swh.web.common.service.query')
def test_lookup_directory_not_found(self, mock_query, mock_storage):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = (
'sha1',
'directory-id-bin')
mock_storage.directory_missing.return_value = ['directory-id-bin']
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_directory('directory_id')
self.assertIn('Directory with sha1_git directory_id not found',
cm.exception.args[0])
# then
mock_query.parse_hash_with_algorithms_or_throws.assert_called_with(
'directory_id', ['sha1'], 'Only sha1_git is supported.')
@patch('swh.web.common.service.storage')
@patch('swh.web.common.service.query')
def test_lookup_directory(self, mock_query, mock_storage):
mock_query.parse_hash_with_algorithms_or_throws.return_value = (
'sha1',
'directory-sha1-bin')
# given
stub_dir_entries = [{
'sha1': self.SHA1_SAMPLE_BIN,
'sha256': self.SHA256_SAMPLE_BIN,
'sha1_git': self.SHA1GIT_SAMPLE_BIN,
'blake2s256': self.BLAKE2S256_SAMPLE_BIN,
'target': hash_to_bytes(
'40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'),
'dir_id': self.DIRECTORY_ID_BIN,
'name': b'bob',
'type': 10,
}]
expected_dir_entries = [{
'checksums': {
'sha1': self.SHA1_SAMPLE,
'sha256': self.SHA256_SAMPLE,
'sha1_git': self.SHA1GIT_SAMPLE,
'blake2s256': self.BLAKE2S256_SAMPLE
},
'target': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'dir_id': self.DIRECTORY_ID,
'name': 'bob',
'type': 10,
}]
mock_storage.directory_ls.return_value = stub_dir_entries
mock_storage.directory_missing.return_value = []
# when
actual_directory_ls = list(service.lookup_directory(
'directory-sha1'))
# then
self.assertEqual(actual_directory_ls, expected_dir_entries)
mock_query.parse_hash_with_algorithms_or_throws.assert_called_with(
'directory-sha1', ['sha1'], 'Only sha1_git is supported.')
mock_storage.directory_ls.assert_called_with(
'directory-sha1-bin')
@patch('swh.web.common.service.storage')
def test_lookup_directory_empty(self, mock_storage):
empty_dir_sha1 = '4b825dc642cb6eb9a060e54bf8d69288fbee4904'
mock_storage.directory_ls.return_value = []
# when
actual_directory_ls = list(service.lookup_directory(empty_dir_sha1))
# then
self.assertEqual(actual_directory_ls, [])
self.assertFalse(mock_storage.directory_ls.called)
@patch('swh.web.common.service.lookup_snapshot')
@patch('swh.web.common.service.get_origin_visit')
def test_lookup_revision_by_nothing_found(self, mock_get_origin_visit,
mock_lookup_snapshot):
# given
mock_get_origin_visit.return_value = {'snapshot': self.SHA1_SAMPLE}
mock_lookup_snapshot.return_value = {'branches': {}}
# when
with self.assertRaises(NotFoundExc):
service.lookup_revision_by(1)
@patch('swh.web.common.service.lookup_revision')
@patch('swh.web.common.service.lookup_snapshot')
@patch('swh.web.common.service.get_origin_visit')
def test_lookup_revision_by(self, mock_get_origin_visit,
mock_lookup_snapshot, mock_lookup_revision):
# given
expected_rev = self.SAMPLE_REVISION
mock_get_origin_visit.return_value = {'snapshot': self.SHA1_SAMPLE}
mock_lookup_snapshot.return_value = \
{
'branches': {
'master2': {
'target_type': 'revision',
'target': expected_rev['id']
}
}
}
mock_lookup_revision.return_value = expected_rev
# when
actual_revision = service.lookup_revision_by(10, 'master2', 'some-ts')
# then
self.assertEqual(actual_revision, expected_rev)
- @patch('swh.web.common.service.storage')
- def test_lookup_revision_with_context_by_ko(self, mock_storage):
+ @patch('swh.web.common.service.lookup_snapshot')
+ @patch('swh.web.common.service.get_origin_visit')
+ def test_lookup_revision_with_context_by_ko(self, mock_get_origin_visit,
+ mock_lookup_snapshot):
# given
- mock_storage.revision_get_by.return_value = None
+ mock_get_origin_visit.return_value = {'snapshot': self.SHA1_SAMPLE}
+ mock_lookup_snapshot.return_value = {'branches': {}}
# when
origin_id = 1
branch_name = 'master3'
ts = None
- with self.assertRaises(NotFoundExc) as cm:
+ with self.assertRaises(NotFoundExc):
service.lookup_revision_with_context_by(origin_id, branch_name, ts,
'sha1')
- # then
- self.assertIn(
- 'Revision with (origin_id: %s, branch_name: %s'
- ', ts: %s) not found.' % (origin_id,
- branch_name,
- ts), cm.exception.args[0])
-
- mock_storage.revision_get_by.assert_called_once_with(
- origin_id, branch_name, limit=1, timestamp=ts)
- @patch('swh.web.common.service.lookup_revision_with_context')
@patch('swh.web.common.service.storage')
+ @patch('swh.web.common.service.lookup_revision')
+ @patch('swh.web.common.service.lookup_snapshot')
+ @patch('swh.web.common.service.get_origin_visit')
+ @patch('swh.web.common.service.lookup_revision_with_context')
def test_lookup_revision_with_context_by(
- self, mock_storage, mock_lookup_revision_with_context,
+ self, mock_lookup_revision_with_context, mock_get_origin_visit,
+ mock_lookup_snapshot, mock_lookup_revision, mock_storage
):
# given
- stub_root_rev = {'id': 'root-rev-id'}
- mock_storage.revision_get_by.return_value = [{'id': 'root-rev-id'}]
+ stub_root_rev = self.SAMPLE_REVISION
+
+ mock_get_origin_visit.return_value = {'snapshot': self.SHA1_SAMPLE}
+ mock_lookup_snapshot.return_value = \
+ {
+ 'branches': {
+ 'master2': {
+ 'target_type': 'revision',
+ 'target': stub_root_rev['id']
+ }
+ }
+ }
+
+ mock_lookup_revision.return_value = stub_root_rev
stub_rev = {'id': 'rev-found'}
mock_lookup_revision_with_context.return_value = stub_rev
+ mock_storage.revision_get.return_value = [self.SAMPLE_REVISION_RAW]
+
# when
origin_id = 1
- branch_name = 'master3'
+ branch_name = 'master2'
ts = None
sha1_git = 'sha1'
actual_root_rev, actual_rev = service.lookup_revision_with_context_by(
origin_id, branch_name, ts, sha1_git)
# then
self.assertEqual(actual_root_rev, stub_root_rev)
self.assertEqual(actual_rev, stub_rev)
- mock_storage.revision_get_by.assert_called_once_with(
- origin_id, branch_name, limit=1, timestamp=ts)
mock_lookup_revision_with_context.assert_called_once_with(
- stub_root_rev, sha1_git, 100)
+ self.SAMPLE_REVISION_RAW, sha1_git, 100)
def test_lookup_revision_through_ko_not_implemented(self):
# then
with self.assertRaises(NotImplementedError):
service.lookup_revision_through({
'something-unknown': 10,
})
@patch('swh.web.common.service.lookup_revision_with_context_by')
def test_lookup_revision_through_with_context_by(self, mock_lookup):
# given
stub_rev = {'id': 'rev'}
mock_lookup.return_value = stub_rev
# when
actual_revision = service.lookup_revision_through({
'origin_id': 1,
'branch_name': 'master',
'ts': None,
'sha1_git': 'sha1-git'
}, limit=1000)
# then
self.assertEqual(actual_revision, stub_rev)
mock_lookup.assert_called_once_with(
1, 'master', None, 'sha1-git', 1000)
@patch('swh.web.common.service.lookup_revision_by')
def test_lookup_revision_through_with_revision_by(self, mock_lookup):
# given
stub_rev = {'id': 'rev'}
mock_lookup.return_value = stub_rev
# when
actual_revision = service.lookup_revision_through({
'origin_id': 2,
'branch_name': 'master2',
'ts': 'some-ts',
}, limit=10)
# then
self.assertEqual(actual_revision, stub_rev)
mock_lookup.assert_called_once_with(
2, 'master2', 'some-ts')
@patch('swh.web.common.service.lookup_revision_with_context')
def test_lookup_revision_through_with_context(self, mock_lookup):
# given
stub_rev = {'id': 'rev'}
mock_lookup.return_value = stub_rev
# when
actual_revision = service.lookup_revision_through({
'sha1_git_root': 'some-sha1-root',
'sha1_git': 'some-sha1',
})
# then
self.assertEqual(actual_revision, stub_rev)
mock_lookup.assert_called_once_with(
'some-sha1-root', 'some-sha1', 100)
@patch('swh.web.common.service.lookup_revision')
def test_lookup_revision_through_with_revision(self, mock_lookup):
# given
stub_rev = {'id': 'rev'}
mock_lookup.return_value = stub_rev
# when
actual_revision = service.lookup_revision_through({
'sha1_git': 'some-sha1',
})
# then
self.assertEqual(actual_revision, stub_rev)
mock_lookup.assert_called_once_with(
'some-sha1')
@patch('swh.web.common.service.lookup_revision_through')
def test_lookup_directory_through_revision_ko_not_found(
self, mock_lookup_rev):
# given
mock_lookup_rev.return_value = None
# when
with self.assertRaises(NotFoundExc):
service.lookup_directory_through_revision(
{'id': 'rev'}, 'some/path', 100)
mock_lookup_rev.assert_called_once_with({'id': 'rev'}, 100)
@patch('swh.web.common.service.lookup_revision_through')
@patch('swh.web.common.service.lookup_directory_with_revision')
def test_lookup_directory_through_revision_ok_with_data(
self, mock_lookup_dir, mock_lookup_rev):
# given
mock_lookup_rev.return_value = {'id': 'rev-id'}
mock_lookup_dir.return_value = {'type': 'dir',
'content': []}
# when
rev_id, dir_result = service.lookup_directory_through_revision(
{'id': 'rev'}, 'some/path', 100)
# then
self.assertEqual(rev_id, 'rev-id')
self.assertEqual(dir_result, {'type': 'dir',
'content': []})
mock_lookup_rev.assert_called_once_with({'id': 'rev'}, 100)
mock_lookup_dir.assert_called_once_with('rev-id', 'some/path', False)
@patch('swh.web.common.service.lookup_revision_through')
@patch('swh.web.common.service.lookup_directory_with_revision')
def test_lookup_directory_through_revision_ok_with_content(
self, mock_lookup_dir, mock_lookup_rev):
# given
mock_lookup_rev.return_value = {'id': 'rev-id'}
stub_result = {'type': 'file',
'revision': 'rev-id',
'content': {'data': b'blah',
'sha1': 'sha1'}}
mock_lookup_dir.return_value = stub_result
# when
rev_id, dir_result = service.lookup_directory_through_revision(
{'id': 'rev'}, 'some/path', 10, with_data=True)
# then
self.assertEqual(rev_id, 'rev-id')
self.assertEqual(dir_result, stub_result)
mock_lookup_rev.assert_called_once_with({'id': 'rev'}, 10)
mock_lookup_dir.assert_called_once_with('rev-id', 'some/path', True)

File Metadata

Mime Type
text/x-diff
Expires
Sat, Jun 21, 8:46 PM (3 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3302894

Event Timeline