Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/web/api/views/revision.py b/swh/web/api/views/revision.py
index 1083ce56c..73461e780 100644
--- a/swh/web/api/views/revision.py
+++ b/swh/web/api/views/revision.py
@@ -1,485 +1,487 @@
-# Copyright (C) 2015-2018 The Software Heritage developers
+# Copyright (C) 2015-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from django.http import HttpResponse
from swh.web.common import service
from swh.web.common.utils import reverse
from swh.web.common.utils import parse_timestamp
from swh.web.api import utils
from swh.web.api.apidoc import api_doc
from swh.web.api.apiurls import api_route
from swh.web.api.views.utils import api_lookup
def _revision_directory_by(revision, path, request_path,
limit=100, with_data=False):
"""
Compute the revision matching criterion's directory or content data.
Args:
revision: dictionary of criterions representing a revision to lookup
path: directory's path to lookup
request_path: request path which holds the original context to
limit: optional query parameter to limit the revisions log
(default to 100). For now, note that this limit could impede the
transitivity conclusion about sha1_git not being an ancestor of
with_data: indicate to retrieve the content's raw data if path resolves
to a content.
"""
def enrich_directory_local(dir, context_url=request_path):
return utils.enrich_directory(dir, context_url)
rev_id, result = service.lookup_directory_through_revision(
revision, path, limit=limit, with_data=with_data)
content = result['content']
if result['type'] == 'dir': # dir_entries
result['content'] = list(map(enrich_directory_local, content))
- else: # content
+ elif result['type'] == 'file': # content
result['content'] = utils.enrich_content(content)
+ elif result['type'] == 'rev': # revision
+ result['content'] = utils.enrich_revision(content)
return result
@api_route(r'/revision/origin/(?P<origin_id>[0-9]+)'
r'/branch/(?P<branch_name>.+)/log/',
'api-revision-origin-log')
@api_route(r'/revision/origin/(?P<origin_id>[0-9]+)/log/',
'api-revision-origin-log')
@api_route(r'/revision/origin/(?P<origin_id>[0-9]+)'
r'/ts/(?P<ts>.+)/log/',
'api-revision-origin-log')
@api_route(r'/revision/origin/(?P<origin_id>[0-9]+)'
r'/branch/(?P<branch_name>.+)'
r'/ts/(?P<ts>.+)/log/',
'api-revision-origin-log')
@api_doc('/revision/origin/log/')
def api_revision_log_by(request, origin_id,
branch_name='HEAD',
ts=None):
"""
.. http:get:: /api/1/revision/origin/(origin_id)[/branch/(branch_name)][/ts/(timestamp)]/log
Show the commit log for a revision, searching for it based on software origin,
branch name, and/or visit timestamp.
This endpoint behaves like :http:get:`/api/1/revision/(sha1_git)[/prev/(prev_sha1s)]/log/`,
but operates on the revision that has been found at a given software origin,
close to a given point in time, pointed by a given branch.
:param int origin_id: a software origin identifier
:param string branch_name: optional parameter specifying a fully-qualified branch name
associated to the software origin, e.g., "refs/heads/master". Defaults to the HEAD branch.
:param string timestamp: optional parameter specifying a timestamp close to which the revision
pointed by the given branch should be looked up. The timestamp can be expressed either
as an ISO date or as a Unix one (in UTC). Defaults to now.
:reqheader Accept: the requested response content type,
either ``application/json`` (default) or ``application/yaml``
:resheader Content-Type: this depends on :http:header:`Accept` header of request
:>jsonarr object author: information about the author of the revision
:>jsonarr string author_url: link to :http:get:`/api/1/person/(person_id)/` to get
information about the author of the revision
:>jsonarr object committer: information about the committer of the revision
:>jsonarr string committer_url: link to :http:get:`/api/1/person/(person_id)/` to get
information about the committer of the revision
:>jsonarr string committer_date: ISO representation of the commit date (in UTC)
:>jsonarr string date: ISO representation of the revision date (in UTC)
:>jsonarr string directory: the unique identifier that revision points to
:>jsonarr string directory_url: link to :http:get:`/api/1/directory/(sha1_git)/[(path)/]`
to get information about the directory associated to the revision
:>jsonarr string id: the revision unique identifier
:>jsonarr boolean merge: whether or not the revision corresponds to a merge commit
:>jsonarr string message: the message associated to the revision
:>jsonarr array parents: the parents of the revision, i.e. the previous revisions
that head directly to it, each entry of that array contains an unique parent
revision identifier but also a link to :http:get:`/api/1/revision/(sha1_git)/`
to get more information about it
:>jsonarr string type: the type of the revision
**Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, :http:method:`options`
:statuscode 200: no error
:statuscode 404: no revision matching the given criteria could be found in the archive
**Example:**
.. parsed-literal::
:swh_web_api:`revision/origin/723566/ts/2016-01-17T00:00:00+00:00/log/`
""" # noqa
result = {}
per_page = int(request.query_params.get('per_page', '10'))
def lookup_revision_log_by_with_limit(o_id, br, ts, limit=per_page+1):
return service.lookup_revision_log_by(o_id, br, ts, limit)
error_msg = 'No revision matching origin %s ' % origin_id
error_msg += ', branch name %s' % branch_name
error_msg += (' and time stamp %s.' % ts) if ts else '.'
rev_get = api_lookup(
- lookup_revision_log_by_with_limit, origin_id, branch_name, ts,
+ lookup_revision_log_by_with_limit, int(origin_id), branch_name, ts,
notfound_msg=error_msg,
enrich_fn=utils.enrich_revision)
nb_rev = len(rev_get)
if nb_rev == per_page+1:
revisions = rev_get[:-1]
last_sha1_git = rev_get[-1]['id']
params = {k: v for k, v in {'origin_id': origin_id,
'branch_name': branch_name,
'ts': ts,
}.items() if v is not None}
query_params = {}
query_params['sha1_git'] = last_sha1_git
if request.query_params.get('per_page'):
query_params['per_page'] = per_page
result['headers'] = {
'link-next': reverse('api-revision-origin-log', url_args=params,
query_params=query_params)
}
else:
revisions = rev_get
result.update({'results': revisions})
return result
@api_route(r'/revision/origin/(?P<origin_id>[0-9]+)/directory/',
'api-revision-origin-directory')
@api_route(r'/revision/origin/(?P<origin_id>[0-9]+)/directory/(?P<path>.+)/',
'api-revision-origin-directory')
@api_route(r'/revision/origin/(?P<origin_id>[0-9]+)'
r'/branch/(?P<branch_name>.+)/directory/',
'api-revision-origin-directory')
@api_route(r'/revision/origin/(?P<origin_id>[0-9]+)'
r'/branch/(?P<branch_name>.+)/ts/(?P<ts>.+)/directory/',
'api-revision-origin-directory')
@api_route(r'/revision/origin/(?P<origin_id>[0-9]+)'
r'/branch/(?P<branch_name>.+)/directory/(?P<path>.+)/',
'api-revision-origin-directory')
@api_route(r'/revision/origin/(?P<origin_id>[0-9]+)'
r'/branch/(?P<branch_name>.+)/ts/(?P<ts>.+)'
r'/directory/(?P<path>.+)/',
'api-revision-origin-directory')
@api_doc('/revision/origin/directory/', tags=['hidden'])
def api_directory_through_revision_origin(request, origin_id,
- branch_name="refs/heads/master",
+ branch_name='HEAD',
ts=None,
path=None,
with_data=False):
"""
Display directory or content information through a revision identified
by origin/branch/timestamp.
"""
if ts:
ts = parse_timestamp(ts)
- return _revision_directory_by({'origin_id': origin_id,
+ return _revision_directory_by({'origin_id': int(origin_id),
'branch_name': branch_name,
'ts': ts
},
path, request.path,
with_data=with_data)
@api_route(r'/revision/origin/(?P<origin_id>[0-9]+)/',
'api-revision-origin')
@api_route(r'/revision/origin/(?P<origin_id>[0-9]+)'
r'/branch/(?P<branch_name>.+)/',
'api-revision-origin')
@api_route(r'/revision/origin/(?P<origin_id>[0-9]+)'
r'/branch/(?P<branch_name>.+)/ts/(?P<ts>.+)/',
'api-revision-origin')
@api_route(r'/revision/origin/(?P<origin_id>[0-9]+)/ts/(?P<ts>.+)/',
'api-revision-origin')
@api_doc('/revision/origin/')
def api_revision_with_origin(request, origin_id,
branch_name='HEAD',
ts=None):
"""
.. http:get:: /api/1/revision/origin/(origin_id)/[branch/(branch_name)/][ts/(timestamp)/]
Get information about a revision, searching for it based on software origin,
branch name, and/or visit timestamp.
This endpoint behaves like :http:get:`/api/1/revision/(sha1_git)/`,
but operates on the revision that has been found at a given software origin,
close to a given point in time, pointed by a given branch.
:param int origin_id: a software origin identifier
:param string branch_name: optional parameter specifying a fully-qualified branch name
associated to the software origin, e.g., "refs/heads/master". Defaults to the HEAD branch.
:param string timestamp: optional parameter specifying a timestamp close to which the revision
pointed by the given branch should be looked up. The timestamp can be expressed either
as an ISO date or as a Unix one (in UTC). Defaults to now.
:reqheader Accept: the requested response content type,
either ``application/json`` (default) or ``application/yaml``
:resheader Content-Type: this depends on :http:header:`Accept` header of request
:>json object author: information about the author of the revision
:>json string author_url: link to :http:get:`/api/1/person/(person_id)/` to get
information about the author of the revision
:>json object committer: information about the committer of the revision
:>json string committer_url: link to :http:get:`/api/1/person/(person_id)/` to get
information about the committer of the revision
:>json string committer_date: ISO representation of the commit date (in UTC)
:>json string date: ISO representation of the revision date (in UTC)
:>json string directory: the unique identifier that revision points to
:>json string directory_url: link to :http:get:`/api/1/directory/(sha1_git)/[(path)/]`
to get information about the directory associated to the revision
:>json string id: the revision unique identifier
:>json boolean merge: whether or not the revision corresponds to a merge commit
:>json string message: the message associated to the revision
:>json array parents: the parents of the revision, i.e. the previous revisions
that head directly to it, each entry of that array contains an unique parent
revision identifier but also a link to :http:get:`/api/1/revision/(sha1_git)/`
to get more information about it
:>json string type: the type of the revision
**Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, :http:method:`options`
:statuscode 200: no error
:statuscode 404: no revision matching the given criteria could be found in the archive
**Example:**
.. parsed-literal::
:swh_web_api:`revision/origin/13706355/branch/refs/heads/2.7/`
""" # noqa
return api_lookup(
- service.lookup_revision_by, origin_id, branch_name, ts,
+ service.lookup_revision_by, int(origin_id), branch_name, ts,
notfound_msg=('Revision with (origin_id: {}, branch_name: {}'
', ts: {}) not found.'.format(origin_id,
branch_name, ts)),
enrich_fn=utils.enrich_revision)
@api_route(r'/revision/(?P<sha1_git>[0-9a-f]+)/', 'api-revision')
@api_doc('/revision/')
def api_revision(request, sha1_git):
"""
.. http:get:: /api/1/revision/(sha1_git)/
Get information about a revision in the archive.
Revisions are identified by **sha1** checksums, compatible with Git commit identifiers.
See :func:`swh.model.identifiers.revision_identifier` in our data model module for details
about how they are computed.
:param string sha1_git: hexadecimal representation of the revision **sha1_git** identifier
:reqheader Accept: the requested response content type,
either ``application/json`` (default) or ``application/yaml``
:resheader Content-Type: this depends on :http:header:`Accept` header of request
:>json object author: information about the author of the revision
:>json string author_url: link to :http:get:`/api/1/person/(person_id)/` to get
information about the author of the revision
:>json object committer: information about the committer of the revision
:>json string committer_url: link to :http:get:`/api/1/person/(person_id)/` to get
information about the committer of the revision
:>json string committer_date: ISO representation of the commit date (in UTC)
:>json string date: ISO representation of the revision date (in UTC)
:>json string directory: the unique identifier that revision points to
:>json string directory_url: link to :http:get:`/api/1/directory/(sha1_git)/[(path)/]`
to get information about the directory associated to the revision
:>json string id: the revision unique identifier
:>json boolean merge: whether or not the revision corresponds to a merge commit
:>json string message: the message associated to the revision
:>json array parents: the parents of the revision, i.e. the previous revisions
that head directly to it, each entry of that array contains an unique parent
revision identifier but also a link to :http:get:`/api/1/revision/(sha1_git)/`
to get more information about it
:>json string type: the type of the revision
**Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, :http:method:`options`
:statuscode 200: no error
:statuscode 400: an invalid **sha1_git** value has been provided
:statuscode 404: requested revision can not be found in the archive
**Example:**
.. parsed-literal::
:swh_web_api:`revision/aafb16d69fd30ff58afdd69036a26047f3aebdc6/`
""" # noqa
return api_lookup(
service.lookup_revision, sha1_git,
notfound_msg='Revision with sha1_git {} not found.'.format(sha1_git),
enrich_fn=utils.enrich_revision)
@api_route(r'/revision/(?P<sha1_git>[0-9a-f]+)/raw/',
'api-revision-raw-message')
@api_doc('/revision/raw/', tags=['hidden'], handle_response=True)
def api_revision_raw_message(request, sha1_git):
"""Return the raw data of the message of revision identified by sha1_git
"""
raw = service.lookup_revision_message(sha1_git)
response = HttpResponse(raw['message'],
content_type='application/octet-stream')
response['Content-disposition'] = \
'attachment;filename=rev_%s_raw' % sha1_git
return response
@api_route(r'/revision/(?P<sha1_git>[0-9a-f]+)/directory/',
'api-revision-directory')
@api_route(r'/revision/(?P<sha1_git>[0-9a-f]+)/directory/(?P<dir_path>.+)/',
'api-revision-directory')
@api_doc('/revision/directory/')
def api_revision_directory(request, sha1_git,
dir_path=None,
with_data=False):
"""
.. http:get:: /api/1/revision/(sha1_git)/directory/[(path)/]
Get information about directory (entry) objects associated to revisions.
Each revision is associated to a single "root" directory.
This endpoint behaves like :http:get:`/api/1/directory/(sha1_git)/[(path)/]`,
but operates on the root directory associated to a given revision.
:param string sha1_git: hexadecimal representation of the revision **sha1_git** identifier
:param string path: optional parameter to get information about the directory entry
pointed by that relative path
:reqheader Accept: the requested response content type,
either ``application/json`` (default) or ``application/yaml``
:resheader Content-Type: this depends on :http:header:`Accept` header of request
:>json array content: directory entries as returned by :http:get:`/api/1/directory/(sha1_git)/[(path)/]`
:>json string path: path of directory from the revision root one
:>json string revision: the unique revision identifier
:>json string type: the type of the directory
**Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, :http:method:`options`
:statuscode 200: no error
:statuscode 400: an invalid **sha1_git** value has been provided
:statuscode 404: requested revision can not be found in the archive
**Example:**
.. parsed-literal::
:swh_web_api:`revision/f1b94134a4b879bc55c3dacdb496690c8ebdc03f/directory/`
""" # noqa
return _revision_directory_by({'sha1_git': sha1_git},
dir_path, request.path,
with_data=with_data)
@api_route(r'/revision/(?P<sha1_git>[0-9a-f]+)/log/', 'api-revision-log')
@api_route(r'/revision/(?P<sha1_git>[0-9a-f]+)'
r'/prev/(?P<prev_sha1s>[0-9a-f/]+)/log/',
'api-revision-log')
@api_doc('/revision/log/')
def api_revision_log(request, sha1_git, prev_sha1s=None):
"""
.. http:get:: /api/1/revision/(sha1_git)[/prev/(prev_sha1s)]/log/
Get a list of all revisions heading to a given one, in other words show the commit log.
:param string sha1_git: hexadecimal representation of the revision **sha1_git** identifier
:param string prev_sha1s: optional parameter representing the navigation breadcrumbs
(descendant revisions previously visited). If multiple values, use / as delimiter.
If provided, revisions information will be added at the beginning of the returned list.
:query int per_page: number of elements in the returned list, for pagination purpose
:reqheader Accept: the requested response content type,
either ``application/json`` (default) or ``application/yaml``
:resheader Content-Type: this depends on :http:header:`Accept` header of request
:resheader Link: indicates that a subsequent result page is available and contains
the url pointing to it
:>jsonarr object author: information about the author of the revision
:>jsonarr string author_url: link to :http:get:`/api/1/person/(person_id)/` to get
information about the author of the revision
:>jsonarr object committer: information about the committer of the revision
:>jsonarr string committer_url: link to :http:get:`/api/1/person/(person_id)/` to get
information about the committer of the revision
:>jsonarr string committer_date: ISO representation of the commit date (in UTC)
:>jsonarr string date: ISO representation of the revision date (in UTC)
:>jsonarr string directory: the unique identifier that revision points to
:>jsonarr string directory_url: link to :http:get:`/api/1/directory/(sha1_git)/[(path)/]`
to get information about the directory associated to the revision
:>jsonarr string id: the revision unique identifier
:>jsonarr boolean merge: whether or not the revision corresponds to a merge commit
:>jsonarr string message: the message associated to the revision
:>jsonarr array parents: the parents of the revision, i.e. the previous revisions
that head directly to it, each entry of that array contains an unique parent
revision identifier but also a link to :http:get:`/api/1/revision/(sha1_git)/`
to get more information about it
:>jsonarr string type: the type of the revision
**Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, :http:method:`options`
:statuscode 200: no error
:statuscode 400: an invalid **sha1_git** value has been provided
:statuscode 404: requested revision can not be found in the archive
**Example:**
.. parsed-literal::
:swh_web_api:`revision/e1a315fa3fa734e2a6154ed7b5b9ae0eb8987aad/log/`
""" # noqa
result = {}
per_page = int(request.query_params.get('per_page', '10'))
def lookup_revision_log_with_limit(s, limit=per_page+1):
return service.lookup_revision_log(s, limit)
error_msg = 'Revision with sha1_git %s not found.' % sha1_git
rev_get = api_lookup(lookup_revision_log_with_limit, sha1_git,
notfound_msg=error_msg,
enrich_fn=utils.enrich_revision)
nb_rev = len(rev_get)
if nb_rev == per_page+1:
rev_backward = rev_get[:-1]
new_last_sha1 = rev_get[-1]['id']
query_params = {}
if request.query_params.get('per_page'):
query_params['per_page'] = per_page
result['headers'] = {
'link-next': reverse('api-revision-log',
url_args={'sha1_git': new_last_sha1},
query_params=query_params)
}
else:
rev_backward = rev_get
if not prev_sha1s: # no nav breadcrumbs, so we're done
revisions = rev_backward
else:
rev_forward_ids = prev_sha1s.split('/')
rev_forward = api_lookup(
service.lookup_revision_multiple, rev_forward_ids,
notfound_msg=error_msg,
enrich_fn=utils.enrich_revision)
revisions = rev_forward + rev_backward
result.update({
'results': revisions
})
return result
diff --git a/swh/web/common/service.py b/swh/web/common/service.py
index 4568f93ba..b6edba537 100644
--- a/swh/web/common/service.py
+++ b/swh/web/common/service.py
@@ -1,1052 +1,1056 @@
-# Copyright (C) 2015-2018 The Software Heritage developers
+# Copyright (C) 2015-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
from collections import defaultdict
from swh.model import hashutil
from swh.storage.algos import revisions_walker
from swh.web.common import converters
from swh.web.common import query
from swh.web.common.exc import NotFoundExc
from swh.web.common.origin_visits import get_origin_visit
from swh.web import config
storage = config.storage()
vault = config.vault()
idx_storage = config.indexer_storage()
MAX_LIMIT = 50 # Top limit the users can ask for
def _first_element(l):
"""Returns the first element in the provided list or None
if it is empty or None"""
return next(iter(l or []), None)
def lookup_multiple_hashes(hashes):
"""Lookup the passed hashes in a single DB connection, using batch
processing.
Args:
An array of {filename: X, sha1: Y}, string X, hex sha1 string Y.
Returns:
The same array with elements updated with elem['found'] = true if
the hash is present in storage, elem['found'] = false if not.
"""
hashlist = [hashutil.hash_to_bytes(elem['sha1']) for elem in hashes]
content_missing = storage.content_missing_per_sha1(hashlist)
missing = [hashutil.hash_to_hex(x) for x in content_missing]
for x in hashes:
x.update({'found': True})
for h in hashes:
if h['sha1'] in missing:
h['found'] = False
return hashes
def lookup_expression(expression, last_sha1, per_page):
"""Lookup expression in raw content.
Args:
expression (str): An expression to lookup through raw indexed
content
last_sha1 (str): Last sha1 seen
per_page (int): Number of results per page
Yields:
ctags whose content match the expression
"""
limit = min(per_page, MAX_LIMIT)
ctags = idx_storage.content_ctags_search(expression,
last_sha1=last_sha1,
limit=limit)
for ctag in ctags:
ctag = converters.from_swh(ctag, hashess={'id'})
ctag['sha1'] = ctag['id']
ctag.pop('id')
yield ctag
def lookup_hash(q):
"""Checks if the storage contains a given content checksum
Args: query string of the form <hash_algo:hash>
Returns: Dict with key found containing the hash info if the
hash is present, None if not.
"""
algo, hash = query.parse_hash(q)
found = storage.content_find({algo: hash})
return {'found': converters.from_content(found),
'algo': algo}
def search_hash(q):
"""Checks if the storage contains a given content checksum
Args: query string of the form <hash_algo:hash>
Returns: Dict with key found to True or False, according to
whether the checksum is present or not
"""
algo, hash = query.parse_hash(q)
found = storage.content_find({algo: hash})
return {'found': found is not None}
def _lookup_content_sha1(q):
"""Given a possible input, query for the content's sha1.
Args:
q: query string of the form <hash_algo:hash>
Returns:
binary sha1 if found or None
"""
algo, hash = query.parse_hash(q)
if algo != 'sha1':
hashes = storage.content_find({algo: hash})
if not hashes:
return None
return hashes['sha1']
return hash
def lookup_content_ctags(q):
"""Return ctags information from a specified content.
Args:
q: query string of the form <hash_algo:hash>
Yields:
ctags information (dict) list if the content is found.
"""
sha1 = _lookup_content_sha1(q)
if not sha1:
return None
ctags = list(idx_storage.content_ctags_get([sha1]))
if not ctags:
return None
for ctag in ctags:
yield converters.from_swh(ctag, hashess={'id'})
def lookup_content_filetype(q):
"""Return filetype information from a specified content.
Args:
q: query string of the form <hash_algo:hash>
Yields:
filetype information (dict) list if the content is found.
"""
sha1 = _lookup_content_sha1(q)
if not sha1:
return None
filetype = _first_element(list(idx_storage.content_mimetype_get([sha1])))
if not filetype:
return None
return converters.from_filetype(filetype)
def lookup_content_language(q):
"""Return language information from a specified content.
Args:
q: query string of the form <hash_algo:hash>
Yields:
language information (dict) list if the content is found.
"""
sha1 = _lookup_content_sha1(q)
if not sha1:
return None
lang = _first_element(list(idx_storage.content_language_get([sha1])))
if not lang:
return None
return converters.from_swh(lang, hashess={'id'})
def lookup_content_license(q):
"""Return license information from a specified content.
Args:
q: query string of the form <hash_algo:hash>
Yields:
license information (dict) list if the content is found.
"""
sha1 = _lookup_content_sha1(q)
if not sha1:
return None
lic = _first_element(idx_storage.content_fossology_license_get([sha1]))
if not lic:
return None
return converters.from_swh({'id': sha1, 'facts': lic[sha1]},
hashess={'id'})
def lookup_origin(origin):
"""Return information about the origin matching dict origin.
Args:
origin: origin's dict with keys either 'id' or
('type' AND 'url')
Returns:
origin information as dict.
"""
origin_info = storage.origin_get(origin)
if not origin_info:
if 'id' in origin and origin['id']:
msg = 'Origin with id %s not found!' % origin['id']
else:
msg = 'Origin with type %s and url %s not found!' % \
(origin['type'], origin['url'])
raise NotFoundExc(msg)
return converters.from_origin(origin_info)
def lookup_origins(origin_from=1, origin_count=100):
"""Get list of archived software origins in a paginated way.
Origins are sorted by id before returning them
Args:
origin_from (int): The minimum id of the origins to return
origin_count (int): The maximum number of origins to return
Yields:
origins information as dicts
"""
origins = storage.origin_get_range(origin_from, origin_count)
return map(converters.from_origin, origins)
def search_origin(url_pattern, offset=0, limit=50, regexp=False,
with_visit=False):
"""Search for origins whose urls contain a provided string pattern
or match a provided regular expression.
Args:
url_pattern: the string pattern to search for in origin urls
offset: number of found origins to skip before returning results
limit: the maximum number of found origins to return
Returns:
list of origin information as dict.
"""
origins = storage.origin_search(url_pattern, offset, limit, regexp,
with_visit)
return map(converters.from_origin, origins)
def search_origin_metadata(fulltext, limit=50):
"""Search for origins whose metadata match a provided string pattern.
Args:
fulltext: the string pattern to search for in origin metadata
offset: number of found origins to skip before returning results
limit: the maximum number of found origins to return
Returns:
list of origin metadata as dict.
"""
matches = idx_storage.origin_intrinsic_metadata_search_fulltext(
conjunction=[fulltext], limit=limit)
results = []
for match in matches:
match['from_revision'] = hashutil.hash_to_hex(match['from_revision'])
result = converters.from_origin(
storage.origin_get({'id': match.pop('origin_id')}))
result['metadata'] = match
results.append(result)
return results
def lookup_person(person_id):
"""Return information about the person with id person_id.
Args:
person_id as string
Returns:
person information as dict.
Raises:
NotFoundExc if there is no person with the provided id.
"""
person = _first_element(storage.person_get([int(person_id)]))
if not person:
raise NotFoundExc('Person with id %s not found' % person_id)
return converters.from_person(person)
def _to_sha1_bin(sha1_hex):
_, sha1_git_bin = query.parse_hash_with_algorithms_or_throws(
sha1_hex,
['sha1'], # HACK: sha1_git really
'Only sha1_git is supported.')
return sha1_git_bin
def _check_directory_exists(sha1_git, sha1_git_bin):
if len(list(storage.directory_missing([sha1_git_bin]))):
raise NotFoundExc('Directory with sha1_git %s not found' % sha1_git)
def lookup_directory(sha1_git):
"""Return information about the directory with id sha1_git.
Args:
sha1_git as string
Returns:
directory information as dict.
"""
empty_dir_sha1 = '4b825dc642cb6eb9a060e54bf8d69288fbee4904'
if sha1_git == empty_dir_sha1:
return []
sha1_git_bin = _to_sha1_bin(sha1_git)
_check_directory_exists(sha1_git, sha1_git_bin)
directory_entries = storage.directory_ls(sha1_git_bin)
return map(converters.from_directory_entry, directory_entries)
def lookup_directory_with_path(sha1_git, path_string):
"""Return directory information for entry with path path_string w.r.t.
root directory pointed by directory_sha1_git
Args:
- directory_sha1_git: sha1_git corresponding to the directory
to which we append paths to (hopefully) find the entry
- the relative path to the entry starting from the directory pointed by
directory_sha1_git
Raises:
NotFoundExc if the directory entry is not found
"""
sha1_git_bin = _to_sha1_bin(sha1_git)
_check_directory_exists(sha1_git, sha1_git_bin)
paths = path_string.strip(os.path.sep).split(os.path.sep)
queried_dir = storage.directory_entry_get_by_path(
sha1_git_bin, list(map(lambda p: p.encode('utf-8'), paths)))
if not queried_dir:
raise NotFoundExc(('Directory entry with path %s from %s not found') %
(path_string, sha1_git))
return converters.from_directory_entry(queried_dir)
def lookup_release(release_sha1_git):
"""Return information about the release with sha1 release_sha1_git.
Args:
release_sha1_git: The release's sha1 as hexadecimal
Returns:
Release information as dict.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
sha1_git_bin = _to_sha1_bin(release_sha1_git)
release = _first_element(storage.release_get([sha1_git_bin]))
if not release:
raise NotFoundExc('Release with sha1_git %s not found.'
% release_sha1_git)
return converters.from_release(release)
def lookup_release_multiple(sha1_git_list):
"""Return information about the revisions identified with
their sha1_git identifiers.
Args:
sha1_git_list: A list of revision sha1_git identifiers
Returns:
Release information as dict.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
sha1_bin_list = (_to_sha1_bin(sha1_git) for sha1_git in sha1_git_list)
releases = storage.release_get(sha1_bin_list) or []
return (converters.from_release(r) for r in releases)
def lookup_revision(rev_sha1_git):
"""Return information about the revision with sha1 revision_sha1_git.
Args:
revision_sha1_git: The revision's sha1 as hexadecimal
Returns:
Revision information as dict.
Raises:
ValueError if the identifier provided is not of sha1 nature.
NotFoundExc if there is no revision with the provided sha1_git.
"""
sha1_git_bin = _to_sha1_bin(rev_sha1_git)
revision = _first_element(storage.revision_get([sha1_git_bin]))
if not revision:
raise NotFoundExc('Revision with sha1_git %s not found.'
% rev_sha1_git)
return converters.from_revision(revision)
def lookup_revision_multiple(sha1_git_list):
"""Return information about the revisions identified with
their sha1_git identifiers.
Args:
sha1_git_list: A list of revision sha1_git identifiers
Returns:
Generator of revisions information as dict.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
sha1_bin_list = (_to_sha1_bin(sha1_git) for sha1_git in sha1_git_list)
revisions = storage.revision_get(sha1_bin_list) or []
return (converters.from_revision(r) for r in revisions)
def lookup_revision_message(rev_sha1_git):
"""Return the raw message of the revision with sha1 revision_sha1_git.
Args:
revision_sha1_git: The revision's sha1 as hexadecimal
Returns:
Decoded revision message as dict {'message': <the_message>}
Raises:
ValueError if the identifier provided is not of sha1 nature.
NotFoundExc if the revision is not found, or if it has no message
"""
sha1_git_bin = _to_sha1_bin(rev_sha1_git)
revision = _first_element(storage.revision_get([sha1_git_bin]))
if not revision:
raise NotFoundExc('Revision with sha1_git %s not found.'
% rev_sha1_git)
if 'message' not in revision:
raise NotFoundExc('No message for revision with sha1_git %s.'
% rev_sha1_git)
res = {'message': revision['message']}
return res
def _lookup_revision_id_by(origin_id, branch_name, timestamp):
def _get_snapshot_branch(snapshot, branch_name):
snapshot = lookup_snapshot(visit['snapshot'],
branches_from=branch_name,
branches_count=10)
branch = None
if branch_name in snapshot['branches']:
branch = snapshot['branches'][branch_name]
return branch
visit = get_origin_visit({'id': origin_id}, visit_ts=timestamp)
branch = _get_snapshot_branch(visit['snapshot'], branch_name)
rev_id = None
if branch and branch['target_type'] == 'revision':
rev_id = branch['target']
elif branch and branch['target_type'] == 'alias':
branch = _get_snapshot_branch(visit['snapshot'], branch['target'])
if branch and branch['target_type'] == 'revision':
rev_id = branch['target']
if not rev_id:
raise NotFoundExc('Revision for origin %s and branch %s not found.'
% (origin_id, branch_name))
return rev_id
def lookup_revision_by(origin_id,
branch_name='HEAD',
timestamp=None):
"""Lookup revision by origin id, snapshot branch name and visit timestamp.
If branch_name is not provided, lookup using 'HEAD' as default.
If timestamp is not provided, use the most recent.
Args:
origin_id (int): origin of the revision
branch_name (str): snapshot branch name
timestamp (str/int): origin visit time frame
Returns:
dict: The revision matching the criterions
Raises:
NotFoundExc if no revision corresponds to the criterion
"""
rev_id = _lookup_revision_id_by(origin_id, branch_name, timestamp)
return lookup_revision(rev_id)
def lookup_revision_log(rev_sha1_git, limit):
"""Lookup revision log by revision id.
Args:
rev_sha1_git (str): The revision's sha1 as hexadecimal
limit (int): the maximum number of revisions returned
Returns:
list: Revision log as list of revision dicts
Raises:
ValueError: if the identifier provided is not of sha1 nature.
NotFoundExc: if there is no revision with the provided sha1_git.
"""
+ lookup_revision(rev_sha1_git)
sha1_git_bin = _to_sha1_bin(rev_sha1_git)
-
revision_entries = storage.revision_log([sha1_git_bin], limit)
- if not revision_entries:
- raise NotFoundExc('Revision with sha1_git %s not found.'
- % rev_sha1_git)
return map(converters.from_revision, revision_entries)
def lookup_revision_log_by(origin_id, branch_name, timestamp, limit):
"""Lookup revision by origin id, snapshot branch name and visit timestamp.
Args:
origin_id (int): origin of the revision
branch_name (str): snapshot branch
timestamp (str/int): origin visit time frame
limit (int): the maximum number of revisions returned
Returns:
list: Revision log as list of revision dicts
Raises:
NotFoundExc: if no revision corresponds to the criterion
"""
rev_id = _lookup_revision_id_by(origin_id, branch_name, timestamp)
return lookup_revision_log(rev_id, limit)
def lookup_revision_with_context_by(origin_id, branch_name, timestamp,
sha1_git, limit=100):
"""Return information about revision sha1_git, limited to the
sub-graph of all transitive parents of sha1_git_root.
sha1_git_root being resolved through the lookup of a revision by origin_id,
branch_name and ts.
In other words, sha1_git is an ancestor of sha1_git_root.
Args:
- origin_id: origin of the revision.
- branch_name: revision's branch.
- timestamp: revision's time frame.
- sha1_git: one of sha1_git_root's ancestors.
- limit: limit the lookup to 100 revisions back.
Returns:
Pair of (root_revision, revision).
Information on sha1_git if it is an ancestor of sha1_git_root
including children leading to sha1_git_root
Raises:
- BadInputExc in case of unknown algo_hash or bad hash.
- NotFoundExc if either revision is not found or if sha1_git is not an
ancestor of sha1_git_root.
"""
rev_root_id = _lookup_revision_id_by(origin_id, branch_name, timestamp)
rev_root_id_bin = hashutil.hash_to_bytes(rev_root_id)
rev_root = _first_element(storage.revision_get([rev_root_id_bin]))
return (converters.from_revision(rev_root),
lookup_revision_with_context(rev_root, sha1_git, limit))
def lookup_revision_with_context(sha1_git_root, sha1_git, limit=100):
"""Return information about revision sha1_git, limited to the
sub-graph of all transitive parents of sha1_git_root.
In other words, sha1_git is an ancestor of sha1_git_root.
Args:
sha1_git_root: latest revision. The type is either a sha1 (as an hex
string) or a non converted dict.
sha1_git: one of sha1_git_root's ancestors
limit: limit the lookup to 100 revisions back
Returns:
Information on sha1_git if it is an ancestor of sha1_git_root
including children leading to sha1_git_root
Raises:
BadInputExc in case of unknown algo_hash or bad hash
NotFoundExc if either revision is not found or if sha1_git is not an
ancestor of sha1_git_root
"""
sha1_git_bin = _to_sha1_bin(sha1_git)
revision = _first_element(storage.revision_get([sha1_git_bin]))
if not revision:
raise NotFoundExc('Revision %s not found' % sha1_git)
if isinstance(sha1_git_root, str):
sha1_git_root_bin = _to_sha1_bin(sha1_git_root)
revision_root = _first_element(storage.revision_get([sha1_git_root_bin])) # noqa
if not revision_root:
raise NotFoundExc('Revision root %s not found' % sha1_git_root)
else:
sha1_git_root_bin = sha1_git_root['id']
revision_log = storage.revision_log([sha1_git_root_bin], limit)
parents = {}
children = defaultdict(list)
for rev in revision_log:
rev_id = rev['id']
parents[rev_id] = []
for parent_id in rev['parents']:
parents[rev_id].append(parent_id)
children[parent_id].append(rev_id)
if revision['id'] not in parents:
raise NotFoundExc('Revision %s is not an ancestor of %s' %
(sha1_git, sha1_git_root))
revision['children'] = children[revision['id']]
return converters.from_revision(revision)
def lookup_directory_with_revision(sha1_git, dir_path=None, with_data=False):
"""Return information on directory pointed by revision with sha1_git.
If dir_path is not provided, display top level directory.
Otherwise, display the directory pointed by dir_path (if it exists).
Args:
sha1_git: revision's hash.
dir_path: optional directory pointed to by that revision.
with_data: boolean that indicates to retrieve the raw data if the path
resolves to a content. Default to False (for the api)
Returns:
Information on the directory pointed to by that revision.
Raises:
BadInputExc in case of unknown algo_hash or bad hash.
NotFoundExc either if the revision is not found or the path referenced
does not exist.
NotImplementedError in case of dir_path exists but do not reference a
type 'dir' or 'file'.
"""
sha1_git_bin = _to_sha1_bin(sha1_git)
revision = _first_element(storage.revision_get([sha1_git_bin]))
if not revision:
raise NotFoundExc('Revision %s not found' % sha1_git)
dir_sha1_git_bin = revision['directory']
if dir_path:
paths = dir_path.strip(os.path.sep).split(os.path.sep)
entity = storage.directory_entry_get_by_path(
dir_sha1_git_bin, list(map(lambda p: p.encode('utf-8'), paths)))
if not entity:
raise NotFoundExc(
"Directory or File '%s' pointed to by revision %s not found"
% (dir_path, sha1_git))
else:
entity = {'type': 'dir', 'target': dir_sha1_git_bin}
if entity['type'] == 'dir':
directory_entries = storage.directory_ls(entity['target']) or []
return {'type': 'dir',
'path': '.' if not dir_path else dir_path,
'revision': sha1_git,
'content': list(map(converters.from_directory_entry,
directory_entries))}
elif entity['type'] == 'file': # content
content = storage.content_find({'sha1_git': entity['target']})
if with_data:
c = _first_element(storage.content_get([content['sha1']]))
content['data'] = c['data']
return {'type': 'file',
'path': '.' if not dir_path else dir_path,
'revision': sha1_git,
'content': converters.from_content(content)}
+ elif entity['type'] == 'rev': # revision
+ revision = next(storage.revision_get([entity['target']]))
+ return {'type': 'rev',
+ 'path': '.' if not dir_path else dir_path,
+ 'revision': sha1_git,
+ 'content': converters.from_revision(revision)}
else:
raise NotImplementedError('Entity of type %s not implemented.'
% entity['type'])
def lookup_content(q):
"""Lookup the content designed by q.
Args:
q: The release's sha1 as hexadecimal
Raises:
NotFoundExc if the requested content is not found
"""
algo, hash = query.parse_hash(q)
c = storage.content_find({algo: hash})
if not c:
raise NotFoundExc('Content with %s checksum equals to %s not found!' %
(algo, hashutil.hash_to_hex(hash)))
return converters.from_content(c)
def lookup_content_raw(q):
"""Lookup the content defined by q.
Args:
q: query string of the form <hash_algo:hash>
Returns:
dict with 'sha1' and 'data' keys.
data representing its raw data decoded.
Raises:
NotFoundExc if the requested content is not found or
if the content bytes are not available in the storage
"""
c = lookup_content(q)
content_sha1_bytes = hashutil.hash_to_bytes(c['checksums']['sha1'])
content = _first_element(storage.content_get([content_sha1_bytes]))
if not content:
algo, hash = query.parse_hash(q)
raise NotFoundExc('Bytes of content with %s checksum equals to %s '
'are not available!' %
(algo, hashutil.hash_to_hex(hash)))
return converters.from_content(content)
def stat_counters():
"""Return the stat counters for Software Heritage
Returns:
A dict mapping textual labels to integer values.
"""
return storage.stat_counters()
def _lookup_origin_visits(origin_id, last_visit=None, limit=10):
"""Yields the origin origin_ids' visits.
Args:
origin_id (int): origin to list visits for
last_visit (int): last visit to lookup from
limit (int): Number of elements max to display
Yields:
Dictionaries of origin_visit for that origin
"""
limit = min(limit, MAX_LIMIT)
yield from storage.origin_visit_get(
origin_id, last_visit=last_visit, limit=limit)
def lookup_origin_visits(origin_id, last_visit=None, per_page=10):
"""Yields the origin origin_ids' visits.
Args:
origin_id: origin to list visits for
Yields:
Dictionaries of origin_visit for that origin
"""
+ lookup_origin({'id': origin_id})
visits = _lookup_origin_visits(origin_id, last_visit=last_visit,
limit=per_page)
for visit in visits:
yield converters.from_origin_visit(visit)
def lookup_origin_visit(origin_id, visit_id):
"""Return information about visit visit_id with origin origin_id.
Args:
origin_id: origin concerned by the visit
visit_id: the visit identifier to lookup
Yields:
The dict origin_visit concerned
"""
visit = storage.origin_visit_get_by(origin_id, visit_id)
if not visit:
raise NotFoundExc('Origin with id %s or its visit '
'with id %s not found!' % (origin_id, visit_id))
return converters.from_origin_visit(visit)
def lookup_snapshot_size(snapshot_id):
"""Count the number of branches in the snapshot with the given id
Args:
snapshot_id (str): sha1 identifier of the snapshot
Returns:
dict: A dict whose keys are the target types of branches and
values their corresponding amount
"""
snapshot_id_bin = _to_sha1_bin(snapshot_id)
snapshot_size = storage.snapshot_count_branches(snapshot_id_bin)
if 'revision' not in snapshot_size:
snapshot_size['revision'] = 0
if 'release' not in snapshot_size:
snapshot_size['release'] = 0
return snapshot_size
def lookup_snapshot(snapshot_id, branches_from='', branches_count=1000,
target_types=None):
"""Return information about a snapshot, aka the list of named
branches found during a specific visit of an origin.
Args:
snapshot_id (str): sha1 identifier of the snapshot
branches_from (str): optional parameter used to skip branches
whose name is lesser than it before returning them
branches_count (int): optional parameter used to restrain
the amount of returned branches
target_types (list): optional parameter used to filter the
target types of branch to return (possible values that can be
contained in that list are `'content', 'directory',
'revision', 'release', 'snapshot', 'alias'`)
Returns:
A dict filled with the snapshot content.
"""
snapshot_id_bin = _to_sha1_bin(snapshot_id)
snapshot = storage.snapshot_get_branches(snapshot_id_bin,
branches_from.encode(),
branches_count, target_types)
if not snapshot:
raise NotFoundExc('Snapshot with id %s not found!' % snapshot_id)
return converters.from_snapshot(snapshot)
def lookup_latest_origin_snapshot(origin_id, allowed_statuses=None):
"""Return information about the latest snapshot of an origin.
.. warning:: At most 1000 branches contained in the snapshot
will be returned for performance reasons.
Args:
origin_id: integer identifier of the origin
allowed_statuses: list of visit statuses considered
to find the latest snapshot for the visit. For instance,
``allowed_statuses=['full']`` will only consider visits that
have successfully run to completion.
Returns:
A dict filled with the snapshot content.
"""
snapshot = storage.snapshot_get_latest(origin_id, allowed_statuses)
return converters.from_snapshot(snapshot)
def lookup_revision_through(revision, limit=100):
"""Retrieve a revision from the criterion stored in revision dictionary.
Args:
revision: Dictionary of criterion to lookup the revision with.
Here are the supported combination of possible values:
- origin_id, branch_name, ts, sha1_git
- origin_id, branch_name, ts
- sha1_git_root, sha1_git
- sha1_git
Returns:
None if the revision is not found or the actual revision.
"""
if 'origin_id' in revision and \
'branch_name' in revision and \
'ts' in revision and \
'sha1_git' in revision:
return lookup_revision_with_context_by(revision['origin_id'],
revision['branch_name'],
revision['ts'],
revision['sha1_git'],
limit)
if 'origin_id' in revision and \
'branch_name' in revision and \
'ts' in revision:
return lookup_revision_by(revision['origin_id'],
revision['branch_name'],
revision['ts'])
if 'sha1_git_root' in revision and \
'sha1_git' in revision:
return lookup_revision_with_context(revision['sha1_git_root'],
revision['sha1_git'],
limit)
if 'sha1_git' in revision:
return lookup_revision(revision['sha1_git'])
# this should not happen
raise NotImplementedError('Should not happen!')
def lookup_directory_through_revision(revision, path=None,
limit=100, with_data=False):
"""Retrieve the directory information from the revision.
Args:
revision: dictionary of criterion representing a revision to lookup
path: directory's path to lookup.
limit: optional query parameter to limit the revisions log (default to
100). For now, note that this limit could impede the transitivity
conclusion about sha1_git not being an ancestor of.
with_data: indicate to retrieve the content's raw data if path resolves
to a content.
Returns:
The directory pointing to by the revision criterions at path.
"""
rev = lookup_revision_through(revision, limit)
if not rev:
raise NotFoundExc('Revision with criterion %s not found!' % revision)
return (rev['id'],
lookup_directory_with_revision(rev['id'], path, with_data))
def vault_cook(obj_type, obj_id, email=None):
"""Cook a vault bundle.
"""
return vault.cook(obj_type, obj_id, email=email)
def vault_fetch(obj_type, obj_id):
"""Fetch a vault bundle.
"""
return vault.fetch(obj_type, obj_id)
def vault_progress(obj_type, obj_id):
"""Get the current progress of a vault bundle.
"""
return vault.progress(obj_type, obj_id)
def diff_revision(rev_id):
"""Get the list of file changes (insertion / deletion / modification /
renaming) for a particular revision.
"""
rev_sha1_git_bin = _to_sha1_bin(rev_id)
changes = storage.diff_revision(rev_sha1_git_bin, track_renaming=True)
for change in changes:
change['from'] = converters.from_directory_entry(change['from'])
change['to'] = converters.from_directory_entry(change['to'])
if change['from_path']:
change['from_path'] = change['from_path'].decode('utf-8')
if change['to_path']:
change['to_path'] = change['to_path'].decode('utf-8')
return changes
class _RevisionsWalkerProxy(object):
"""
Proxy class wrapping a revisions walker iterator from
swh-storage and performing needed conversions.
"""
def __init__(self, rev_walker_type, rev_start, *args, **kwargs):
rev_start_bin = hashutil.hash_to_bytes(rev_start)
self.revisions_walker = \
revisions_walker.get_revisions_walker(rev_walker_type,
storage,
rev_start_bin,
*args, **kwargs)
def export_state(self):
return self.revisions_walker.export_state()
def __next__(self):
return converters.from_revision(next(self.revisions_walker))
def __iter__(self):
return self
def get_revisions_walker(rev_walker_type, rev_start, *args, **kwargs):
"""
Utility function to instantiate a revisions walker of a given type,
see :mod:`swh.storage.algos.revisions_walker`.
Args:
rev_walker_type (str): the type of revisions walker to return,
possible values are: ``committer_date``, ``dfs``, ``dfs_post``,
``bfs`` and ``path``
rev_start (str): hexadecimal representation of a revision identifier
args (list): position arguments to pass to the revisions walker
constructor
kwargs (dict): keyword arguments to pass to the revisions walker
constructor
"""
# first check if the provided revision is valid
lookup_revision(rev_start)
return _RevisionsWalkerProxy(rev_walker_type, rev_start, *args, **kwargs)
diff --git a/swh/web/tests/api/views/test_revision.py b/swh/web/tests/api/views/test_revision.py
index e4a7fa847..32aca865b 100644
--- a/swh/web/tests/api/views/test_revision.py
+++ b/swh/web/tests/api/views/test_revision.py
@@ -1,847 +1,529 @@
-# Copyright (C) 2015-2018 The Software Heritage developers
+# Copyright (C) 2015-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import pytest
+import random
+
+from hypothesis import given
from rest_framework.test import APITestCase
from unittest.mock import patch
+from swh.model.hashutil import hash_to_hex
+
from swh.web.common.exc import NotFoundExc
-from swh.web.api.views.revision import (
- _revision_directory_by
-)
+from swh.web.common.utils import reverse, parse_timestamp
+from swh.web.tests.strategies import (
+ revision, unknown_revision, new_revision,
+ unknown_origin_id, origin, origin_with_multiple_visits
+)
from swh.web.tests.testcase import WebTestCase
class RevisionApiTestCase(WebTestCase, APITestCase):
- @patch('swh.web.api.views.revision.service')
- def test_api_revision(self, mock_service):
- # given
- stub_revision = {
- 'id': '18d8be353ed3480476f032475e7c233eff7371d5',
- 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
- 'author_name': 'Software Heritage',
- 'author_email': 'robot@softwareheritage.org',
- 'committer_name': 'Software Heritage',
- 'committer_email': 'robot@softwareheritage.org',
- 'message': 'synthetic revision message',
- 'date_offset': 0,
- 'committer_date_offset': 0,
- 'parents': ['8734ef7e7c357ce2af928115c6c6a42b7e2a44e7'],
- 'type': 'tar',
- 'synthetic': True,
- 'metadata': {
- 'original_artifact': [{
- 'archive_type': 'tar',
- 'name': 'webbase-5.7.0.tar.gz',
- 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd',
- 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1',
- 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f'
- '309d36484e7edf7bb912'
- }]
- },
- }
- mock_service.lookup_revision.return_value = stub_revision
-
- expected_revision = {
- 'id': '18d8be353ed3480476f032475e7c233eff7371d5',
- 'url': '/api/1/revision/18d8be353ed3480476f032475e7c233eff7371d5/',
- 'history_url': '/api/1/revision/18d8be353ed3480476f032475e7c233e'
- 'ff7371d5/log/',
- 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
- 'directory_url': '/api/1/directory/7834ef7e7c357ce2af928115c6c6'
- 'a42b7e2a44e6/',
- 'author_name': 'Software Heritage',
- 'author_email': 'robot@softwareheritage.org',
- 'committer_name': 'Software Heritage',
- 'committer_email': 'robot@softwareheritage.org',
- 'message': 'synthetic revision message',
- 'date_offset': 0,
- 'committer_date_offset': 0,
- 'parents': [{
- 'id': '8734ef7e7c357ce2af928115c6c6a42b7e2a44e7',
- 'url': '/api/1/revision/8734ef7e7c357ce2af928115c6c6a42b7e2a44e7/' # noqa
- }],
- 'type': 'tar',
- 'synthetic': True,
- 'metadata': {
- 'original_artifact': [{
- 'archive_type': 'tar',
- 'name': 'webbase-5.7.0.tar.gz',
- 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd',
- 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1',
- 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f'
- '309d36484e7edf7bb912'
- }]
- },
- }
+ @given(revision())
+ def test_api_revision(self, revision):
- # when
- rv = self.client.get('/api/1/revision/'
- '18d8be353ed3480476f032475e7c233eff7371d5/')
+ url = reverse('api-revision', url_args={'sha1_git': revision})
+ rv = self.client.get(url)
+
+ expected_revision = self.revision_get(revision)
+
+ self._enrich_revision(expected_revision)
- # then
self.assertEqual(rv.status_code, 200)
self.assertEqual(rv['Content-Type'], 'application/json')
- self.assertEqual(expected_revision, rv.data)
-
- mock_service.lookup_revision.assert_called_once_with(
- '18d8be353ed3480476f032475e7c233eff7371d5')
+ self.assertEqual(rv.data, expected_revision)
- @patch('swh.web.api.views.revision.service')
- def test_api_revision_not_found(self, mock_service):
- # given
- mock_service.lookup_revision.return_value = None
+ @given(unknown_revision())
+ def test_api_revision_not_found(self, unknown_revision):
- # when
- rv = self.client.get('/api/1/revision/12345/')
+ url = reverse('api-revision', url_args={'sha1_git': unknown_revision})
+ rv = self.client.get(url)
- # then
self.assertEqual(rv.status_code, 404)
self.assertEqual(rv['Content-Type'], 'application/json')
self.assertEqual(rv.data, {
'exception': 'NotFoundExc',
- 'reason': 'Revision with sha1_git 12345 not found.'})
+ 'reason': 'Revision with sha1_git %s not found.' %
+ unknown_revision})
- @patch('swh.web.api.views.revision.service')
- def test_api_revision_raw_ok(self, mock_service):
- # given
- stub_revision = {'message': 'synthetic revision message'}
+ @given(revision())
+ def test_api_revision_raw_ok(self, revision):
- mock_service.lookup_revision_message.return_value = stub_revision
+ url = reverse('api-revision-raw-message',
+ url_args={'sha1_git': revision})
+ rv = self.client.get(url)
+
+ expected_message = self.revision_get(revision)['message']
- # when
- rv = self.client.get('/api/1/revision/18d8be353ed3480476f032475e7c2'
- '33eff7371d5/raw/')
- # then
self.assertEqual(rv.status_code, 200)
self.assertEqual(rv['Content-Type'], 'application/octet-stream')
- self.assertEqual(rv.content, b'synthetic revision message')
+ self.assertEqual(rv.content, expected_message.encode())
- mock_service.lookup_revision_message.assert_called_once_with(
- '18d8be353ed3480476f032475e7c233eff7371d5')
+ @given(new_revision())
+ def test_api_revision_raw_ok_no_msg(self, new_revision):
- @patch('swh.web.api.views.revision.service')
- def test_api_revision_raw_ok_no_msg(self, mock_service):
- # given
- mock_service.lookup_revision_message.side_effect = NotFoundExc(
- 'No message for revision')
+ del new_revision['message']
+ self.storage.revision_add([new_revision])
- # when
- rv = self.client.get('/api/1/revision/'
- '18d8be353ed3480476f032475e7c233eff7371d5/raw/')
+ new_revision_id = hash_to_hex(new_revision['id'])
+
+ url = reverse('api-revision-raw-message',
+ url_args={'sha1_git': new_revision_id})
+
+ rv = self.client.get(url)
- # then
self.assertEqual(rv.status_code, 404)
self.assertEqual(rv['Content-Type'], 'application/json')
self.assertEqual(rv.data, {
'exception': 'NotFoundExc',
- 'reason': 'No message for revision'})
-
- self.assertEqual
- mock_service.lookup_revision_message.assert_called_once_with(
- '18d8be353ed3480476f032475e7c233eff7371d5')
+ 'reason': 'No message for revision with sha1_git %s.' %
+ new_revision_id})
- @patch('swh.web.api.views.revision.service')
- def test_api_revision_raw_ko_no_rev(self, mock_service):
- # given
- mock_service.lookup_revision_message.side_effect = NotFoundExc(
- 'No revision found')
+ @given(unknown_revision())
+ def test_api_revision_raw_ko_no_rev(self, unknown_revision):
- # when
- rv = self.client.get('/api/1/revision/'
- '18d8be353ed3480476f032475e7c233eff7371d5/raw/')
+ url = reverse('api-revision-raw-message',
+ url_args={'sha1_git': unknown_revision})
+ rv = self.client.get(url)
- # then
self.assertEqual(rv.status_code, 404)
self.assertEqual(rv['Content-Type'], 'application/json')
self.assertEqual(rv.data, {
'exception': 'NotFoundExc',
- 'reason': 'No revision found'})
-
- mock_service.lookup_revision_message.assert_called_once_with(
- '18d8be353ed3480476f032475e7c233eff7371d5')
+ 'reason': 'Revision with sha1_git %s not found.' %
+ unknown_revision})
- @patch('swh.web.api.views.revision.service')
- def test_api_revision_with_origin_not_found(self, mock_service):
- mock_service.lookup_revision_by.return_value = None
+ @pytest.mark.xfail(reason="bugs in origin_*get methods from in-memory storage") # noqa
+ @given(unknown_origin_id())
+ def test_api_revision_with_origin_not_found(self, unknown_origin_id):
- rv = self.client.get('/api/1/revision/origin/123/')
+ url = reverse('api-revision-origin',
+ url_args={'origin_id': unknown_origin_id})
+ rv = self.client.get(url)
- # then
self.assertEqual(rv.status_code, 404)
self.assertEqual(rv['Content-Type'], 'application/json')
- self.assertIn('Revision with (origin_id: 123', rv.data['reason'])
- self.assertIn('not found', rv.data['reason'])
- self.assertEqual('NotFoundExc', rv.data['exception'])
-
- mock_service.lookup_revision_by.assert_called_once_with(
- '123',
- 'HEAD',
- None)
-
- @patch('swh.web.api.views.revision.service')
- def test_api_revision_with_origin(self, mock_service):
- mock_revision = {
- 'id': '32',
- 'directory': '21',
- 'message': 'message 1',
- 'type': 'deb',
- }
- expected_revision = {
- 'id': '32',
- 'url': '/api/1/revision/32/',
- 'history_url': '/api/1/revision/32/log/',
- 'directory': '21',
- 'directory_url': '/api/1/directory/21/',
- 'message': 'message 1',
- 'type': 'deb',
- }
- mock_service.lookup_revision_by.return_value = mock_revision
+ self.assertEqual(rv.data, {
+ 'exception': 'NotFoundExc',
+ 'reason': 'Origin with id %s not found!' %
+ unknown_origin_id})
- rv = self.client.get('/api/1/revision/origin/1/')
+ @given(origin())
+ def test_api_revision_with_origin(self, origin):
- # then
- self.assertEqual(rv.status_code, 200)
- self.assertEqual(rv['Content-Type'], 'application/json')
- self.assertEqual(rv.data, expected_revision)
+ url = reverse('api-revision-origin',
+ url_args={'origin_id': origin['id']})
+ rv = self.client.get(url)
- mock_service.lookup_revision_by.assert_called_once_with(
- '1',
- 'HEAD',
- None)
-
- @patch('swh.web.api.views.revision.service')
- def test_api_revision_with_origin_and_branch_name(self, mock_service):
- mock_revision = {
- 'id': '12',
- 'directory': '23',
- 'message': 'message 2',
- 'type': 'tar',
- }
- mock_service.lookup_revision_by.return_value = mock_revision
-
- expected_revision = {
- 'id': '12',
- 'url': '/api/1/revision/12/',
- 'history_url': '/api/1/revision/12/log/',
- 'directory': '23',
- 'directory_url': '/api/1/directory/23/',
- 'message': 'message 2',
- 'type': 'tar',
- }
+ snapshot = self.snapshot_get_latest(origin['id'])
+ expected_revision = self.revision_get(
+ snapshot['branches']['HEAD']['target'])
- rv = self.client.get('/api/1/revision/origin/1'
- '/branch/refs/origin/dev/')
+ self._enrich_revision(expected_revision)
- # then
self.assertEqual(rv.status_code, 200)
self.assertEqual(rv['Content-Type'], 'application/json')
self.assertEqual(rv.data, expected_revision)
- mock_service.lookup_revision_by.assert_called_once_with(
- '1',
- 'refs/origin/dev',
- None)
-
- @patch('swh.web.api.views.revision.service')
- @patch('swh.web.api.views.revision.utils')
- def test_api_revision_with_origin_and_branch_name_and_timestamp(self,
- mock_utils,
- mock_service): # noqa
- mock_revision = {
- 'id': '123',
- 'directory': '456',
- 'message': 'message 3',
- 'type': 'tar',
- }
- mock_service.lookup_revision_by.return_value = mock_revision
-
- expected_revision = {
- 'id': '123',
- 'url': '/api/1/revision/123/',
- 'history_url': '/api/1/revision/123/log/',
- 'directory': '456',
- 'directory_url': '/api/1/directory/456/',
- 'message': 'message 3',
- 'type': 'tar',
- }
+ @given(origin())
+ def test_api_revision_with_origin_and_branch_name(self, origin):
- mock_utils.enrich_revision.return_value = expected_revision
+ snapshot = self.snapshot_get_latest(origin['id'])
- rv = self.client.get('/api/1/revision'
- '/origin/1'
- '/branch/refs/origin/dev'
- '/ts/1452591542/')
+ branch_name = random.choice(
+ list(b for b in snapshot['branches'].keys()
+ if snapshot['branches'][b]['target_type'] == 'revision'))
+
+ url = reverse('api-revision-origin',
+ url_args={'origin_id': origin['id'],
+ 'branch_name': branch_name})
+
+ rv = self.client.get(url)
+
+ expected_revision = self.revision_get(
+ snapshot['branches'][branch_name]['target'])
+
+ self._enrich_revision(expected_revision)
- # then
self.assertEqual(rv.status_code, 200)
self.assertEqual(rv['Content-Type'], 'application/json')
self.assertEqual(rv.data, expected_revision)
- mock_service.lookup_revision_by.assert_called_once_with(
- '1',
- 'refs/origin/dev',
- '1452591542')
- mock_utils.enrich_revision.assert_called_once_with(
- mock_revision)
-
- @patch('swh.web.api.views.revision.service')
- @patch('swh.web.api.views.revision.utils')
- def test_api_revision_with_origin_and_branch_name_and_timestamp_escapes(
- self,
- mock_utils,
- mock_service):
- mock_revision = {
- 'id': '999',
- }
- mock_service.lookup_revision_by.return_value = mock_revision
+ @given(origin_with_multiple_visits())
+ def test_api_revision_with_origin_and_branch_name_and_ts(self, origin):
- expected_revision = {
- 'id': '999',
- 'url': '/api/1/revision/999/',
- 'history_url': '/api/1/revision/999/log/',
- }
+ visit = random.choice(self.origin_visit_get(origin['id']))
- mock_utils.enrich_revision.return_value = expected_revision
+ snapshot = self.snapshot_get(visit['snapshot'])
- rv = self.client.get('/api/1/revision'
- '/origin/1'
- '/branch/refs%2Forigin%2Fdev'
- '/ts/Today%20is%20'
- 'January%201,%202047%20at%208:21:00AM/')
+ branch_name = random.choice(
+ list(b for b in snapshot['branches'].keys()
+ if snapshot['branches'][b]['target_type'] == 'revision'))
+
+ url = reverse('api-revision-origin',
+ url_args={'origin_id': origin['id'],
+ 'branch_name': branch_name,
+ 'ts': visit['date']})
+
+ rv = self.client.get(url)
+
+ expected_revision = self.revision_get(
+ snapshot['branches'][branch_name]['target'])
+
+ self._enrich_revision(expected_revision)
- # then
self.assertEqual(rv.status_code, 200)
self.assertEqual(rv['Content-Type'], 'application/json')
self.assertEqual(rv.data, expected_revision)
- mock_service.lookup_revision_by.assert_called_once_with(
- '1',
- 'refs/origin/dev',
- 'Today is January 1, 2047 at 8:21:00AM')
- mock_utils.enrich_revision.assert_called_once_with(
- mock_revision)
+ @given(origin_with_multiple_visits())
+ def test_api_revision_with_origin_and_branch_name_and_ts_escapes(self,
+ origin):
+ visit = random.choice(self.origin_visit_get(origin['id']))
- @patch('swh.web.api.views.revision.service')
- def test_revision_directory_by_ko_raise(self, mock_service):
- # given
- mock_service.lookup_directory_through_revision.side_effect = NotFoundExc('not') # noqa
+ snapshot = self.snapshot_get(visit['snapshot'])
- # when
- with self.assertRaises(NotFoundExc):
- _revision_directory_by(
- {'sha1_git': 'id'},
- None,
- '/api/1/revision/sha1/directory/')
+ branch_name = random.choice(
+ list(b for b in snapshot['branches'].keys()
+ if snapshot['branches'][b]['target_type'] == 'revision'))
- # then
- mock_service.lookup_directory_through_revision.assert_called_once_with(
- {'sha1_git': 'id'},
- None, limit=100, with_data=False)
+ date = parse_timestamp(visit['date'])
- @patch('swh.web.api.views.revision.service')
- def test_revision_directory_by_type_dir(self, mock_service):
- # given
- mock_service.lookup_directory_through_revision.return_value = (
- 'rev-id',
- {
- 'type': 'dir',
- 'revision': 'rev-id',
- 'path': 'some/path',
- 'content': []
- })
- # when
- actual_dir_content = _revision_directory_by(
- {'sha1_git': 'blah-id'},
- 'some/path', '/api/1/revision/sha1/directory/')
+ formatted_date = date.strftime('Today is %B %d, %Y at %X')
- # then
- self.assertEqual(actual_dir_content, {
- 'type': 'dir',
- 'revision': 'rev-id',
- 'path': 'some/path',
- 'content': []
- })
+ url = reverse('api-revision-origin',
+ url_args={'origin_id': origin['id'],
+ 'branch_name': branch_name,
+ 'ts': formatted_date})
- mock_service.lookup_directory_through_revision.assert_called_once_with(
- {'sha1_git': 'blah-id'},
- 'some/path', limit=100, with_data=False)
+ rv = self.client.get(url)
- @patch('swh.web.api.views.revision.service')
- def test_revision_directory_by_type_file(self, mock_service):
- # given
- mock_service.lookup_directory_through_revision.return_value = (
- 'rev-id',
- {
- 'type': 'file',
- 'revision': 'rev-id',
- 'path': 'some/path',
- 'content': {'blah': 'blah'}
- })
- # when
- actual_dir_content = _revision_directory_by(
- {'sha1_git': 'sha1'},
- 'some/path',
- '/api/1/revision/origin/2/directory/',
- limit=1000, with_data=True)
+ expected_revision = self.revision_get(
+ snapshot['branches'][branch_name]['target'])
- # then
- self.assertEqual(actual_dir_content, {
- 'type': 'file',
- 'revision': 'rev-id',
- 'path': 'some/path',
- 'content': {'blah': 'blah'}
- })
+ self._enrich_revision(expected_revision)
- mock_service.lookup_directory_through_revision.assert_called_once_with(
- {'sha1_git': 'sha1'},
- 'some/path', limit=1000, with_data=True)
+ self.assertEqual(rv.status_code, 200)
+ self.assertEqual(rv['Content-Type'], 'application/json')
+ self.assertEqual(rv.data, expected_revision)
- @patch('swh.web.api.views.revision.parse_timestamp')
- @patch('swh.web.api.views.revision._revision_directory_by')
- @patch('swh.web.api.views.revision.utils')
- def test_api_directory_through_revision_origin_ko_not_found(self,
- mock_utils,
- mock_rev_dir,
- mock_parse_timestamp): # noqa
- mock_rev_dir.side_effect = NotFoundExc('not found')
- mock_parse_timestamp.return_value = '2012-10-20 00:00:00'
-
- rv = self.client.get('/api/1/revision'
- '/origin/10'
- '/branch/refs/remote/origin/dev'
- '/ts/2012-10-20'
- '/directory/')
+ @pytest.mark.xfail(reason="bugs in origin_*get methods from in-memory storage") # noqa
+ @given(unknown_origin_id())
+ def test_api_directory_through_revision_origin_ko(self,
+ unknown_origin_id):
+
+ url = reverse('api-revision-origin-directory',
+ url_args={'origin_id': unknown_origin_id})
+ rv = self.client.get(url)
- # then
self.assertEqual(rv.status_code, 404)
self.assertEqual(rv['Content-Type'], 'application/json')
self.assertEqual(rv.data, {
'exception': 'NotFoundExc',
- 'reason': 'not found'})
-
- mock_rev_dir.assert_called_once_with(
- {'origin_id': '10',
- 'branch_name': 'refs/remote/origin/dev',
- 'ts': '2012-10-20 00:00:00'}, None,
- '/api/1/revision'
- '/origin/10'
- '/branch/refs/remote/origin/dev'
- '/ts/2012-10-20'
- '/directory/',
- with_data=False)
-
- @patch('swh.web.api.views.revision._revision_directory_by')
- def test_api_directory_through_revision_origin(self,
- mock_revision_dir):
- expected_res = [{
- 'id': '123'
- }]
- mock_revision_dir.return_value = expected_res
+ 'reason': 'Origin with id %s not found!' %
+ unknown_origin_id
+ })
- rv = self.client.get('/api/1/revision/origin/3/directory/')
+ @given(origin())
+ def test_api_directory_through_revision_origin(self, origin):
- # then
- self.assertEqual(rv.status_code, 200)
- self.assertEqual(rv['Content-Type'], 'application/json')
- self.assertEqual(rv.data, expected_res)
-
- mock_revision_dir.assert_called_once_with({
- 'origin_id': '3',
- 'branch_name': 'refs/heads/master',
- 'ts': None}, None, '/api/1/revision/origin/3/directory/',
- with_data=False)
+ url = reverse('api-revision-origin-directory',
+ url_args={'origin_id': origin['id']})
+ rv = self.client.get(url)
- @patch('swh.web.api.views.revision.service')
- def test_api_revision_log(self, mock_service):
- # given
- stub_revisions = [{
- 'id': '18d8be353ed3480476f032475e7c233eff7371d5',
- 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
- 'author_name': 'Software Heritage',
- 'author_email': 'robot@softwareheritage.org',
- 'committer_name': 'Software Heritage',
- 'committer_email': 'robot@softwareheritage.org',
- 'message': 'synthetic revision message',
- 'date_offset': 0,
- 'committer_date_offset': 0,
- 'parents': ['7834ef7e7c357ce2af928115c6c6a42b7e2a4345'],
- 'type': 'tar',
- 'synthetic': True,
- }]
- mock_service.lookup_revision_log.return_value = stub_revisions
-
- expected_revisions = [{
- 'id': '18d8be353ed3480476f032475e7c233eff7371d5',
- 'url': '/api/1/revision/18d8be353ed3480476f032475e7c233eff7371d5/',
- 'history_url': '/api/1/revision/18d8be353ed3480476f032475e7c233ef'
- 'f7371d5/log/',
- 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
- 'directory_url': '/api/1/directory/7834ef7e7c357ce2af928115c6c6a'
- '42b7e2a44e6/',
- 'author_name': 'Software Heritage',
- 'author_email': 'robot@softwareheritage.org',
- 'committer_name': 'Software Heritage',
- 'committer_email': 'robot@softwareheritage.org',
- 'message': 'synthetic revision message',
- 'date_offset': 0,
- 'committer_date_offset': 0,
- 'parents': [{
- 'id': '7834ef7e7c357ce2af928115c6c6a42b7e2a4345',
- 'url': '/api/1/revision/7834ef7e7c357ce2af928115c6c6a42b7e2a4345/', # noqa
- }],
- 'type': 'tar',
- 'synthetic': True,
- }]
-
- # when
- rv = self.client.get('/api/1/revision/8834ef7e7c357ce2af928115c6c6a42'
- 'b7e2a44e6/log/')
+ snapshot = self.snapshot_get_latest(origin['id'])
+ revision_id = snapshot['branches']['HEAD']['target']
+ revision = self.revision_get(revision_id)
+ directory = self.directory_ls(revision['directory'])
+
+ for entry in directory:
+ if entry['type'] == 'dir':
+ entry['target_url'] = reverse(
+ 'api-directory',
+ url_args={'sha1_git': entry['target']}
+ )
+ entry['dir_url'] = reverse(
+ 'api-revision-origin-directory',
+ url_args={'origin_id': origin['id'],
+ 'path': entry['name']})
+ elif entry['type'] == 'file':
+ entry['target_url'] = reverse(
+ 'api-content',
+ url_args={'q': 'sha1_git:%s' % entry['target']}
+ )
+ entry['file_url'] = reverse(
+ 'api-revision-origin-directory',
+ url_args={'origin_id': origin['id'],
+ 'path': entry['name']})
+ elif entry['type'] == 'rev':
+ entry['target_url'] = reverse(
+ 'api-revision',
+ url_args={'sha1_git': entry['target']}
+ )
+ entry['rev_url'] = reverse(
+ 'api-revision-origin-directory',
+ url_args={'origin_id': origin['id'],
+ 'path': entry['name']})
+
+ expected_result = {
+ 'content': directory,
+ 'path': '.',
+ 'revision': revision_id,
+ 'type': 'dir'
+ }
- # then
self.assertEqual(rv.status_code, 200)
self.assertEqual(rv['Content-Type'], 'application/json')
+ self.assertEqual(rv.data, expected_result)
- self.assertEqual(rv.data, expected_revisions)
- self.assertFalse(rv.has_header('Link'))
+ @given(revision())
+ def test_api_revision_log(self, revision):
- mock_service.lookup_revision_log.assert_called_once_with(
- '8834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 11)
+ per_page = 10
- @patch('swh.web.api.views.revision.service')
- def test_api_revision_log_with_next(self, mock_service):
- # given
- stub_revisions = []
- for i in range(27):
- stub_revisions.append({'id': str(i)})
+ url = reverse('api-revision-log', url_args={'sha1_git': revision},
+ query_params={'per_page': per_page})
- mock_service.lookup_revision_log.return_value = stub_revisions[:26]
+ rv = self.client.get(url)
- expected_revisions = [x for x in stub_revisions if int(x['id']) < 25]
- for e in expected_revisions:
- e['url'] = '/api/1/revision/%s/' % e['id']
- e['history_url'] = '/api/1/revision/%s/log/' % e['id']
+ expected_log = self.revision_log(revision, limit=per_page+1)
+ expected_log = list(map(self._enrich_revision, expected_log))
- # when
- rv = self.client.get('/api/1/revision/8834ef7e7c357ce2af928115c6c6a42'
- 'b7e2a44e6/log/?per_page=25')
+ has_next = len(expected_log) > per_page
- # then
self.assertEqual(rv.status_code, 200)
self.assertEqual(rv['Content-Type'], 'application/json')
- self.assertEqual(rv.data, expected_revisions)
- self.assertEqual(rv['Link'],
- '</api/1/revision/25/log/?per_page=25>; rel="next"')
+ self.assertEqual(rv.data,
+ expected_log[:-1] if has_next else expected_log)
- mock_service.lookup_revision_log.assert_called_once_with(
- '8834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 26)
+ if has_next:
+ self.assertIn('Link', rv)
+ next_log_url = reverse(
+ 'api-revision-log',
+ url_args={'sha1_git': expected_log[-1]['id']},
+ query_params={'per_page': per_page})
+ self.assertIn(next_log_url, rv['Link'])
- @patch('swh.web.api.views.revision.service')
- def test_api_revision_log_not_found(self, mock_service):
- # given
- mock_service.lookup_revision_log.return_value = None
+ @given(unknown_revision())
+ def test_api_revision_log_not_found(self, unknown_revision):
- # when
- rv = self.client.get('/api/1/revision/8834ef7e7c357ce2af928115c6c6'
- 'a42b7e2a44e6/log/')
+ url = reverse('api-revision-log',
+ url_args={'sha1_git': unknown_revision})
+
+ rv = self.client.get(url)
- # then
self.assertEqual(rv.status_code, 404)
self.assertEqual(rv['Content-Type'], 'application/json')
self.assertEqual(rv.data, {
'exception': 'NotFoundExc',
- 'reason': 'Revision with sha1_git'
- ' 8834ef7e7c357ce2af928115c6c6a42b7e2a44e6 not found.'})
+ 'reason': 'Revision with sha1_git %s not found.' %
+ unknown_revision})
self.assertFalse(rv.has_header('Link'))
- mock_service.lookup_revision_log.assert_called_once_with(
- '8834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 11)
+ @given(revision())
+ def test_api_revision_log_context(self, revision):
- @patch('swh.web.api.views.revision.service')
- def test_api_revision_log_context(self, mock_service):
- # given
- stub_revisions = [{
- 'id': '18d8be353ed3480476f032475e7c233eff7371d5',
- 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
- 'author_name': 'Software Heritage',
- 'author_email': 'robot@softwareheritage.org',
- 'committer_name': 'Software Heritage',
- 'committer_email': 'robot@softwareheritage.org',
- 'message': 'synthetic revision message',
- 'date_offset': 0,
- 'committer_date_offset': 0,
- 'parents': ['7834ef7e7c357ce2af928115c6c6a42b7e2a4345'],
- 'type': 'tar',
- 'synthetic': True,
- }]
-
- mock_service.lookup_revision_log.return_value = stub_revisions
- mock_service.lookup_revision_multiple.return_value = [{
- 'id': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
- 'directory': '18d8be353ed3480476f032475e7c233eff7371d5',
- 'author_name': 'Name Surname',
- 'author_email': 'name@surname.com',
- 'committer_name': 'Name Surname',
- 'committer_email': 'name@surname.com',
- 'message': 'amazing revision message',
- 'date_offset': 0,
- 'committer_date_offset': 0,
- 'parents': ['adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'],
- 'type': 'tar',
- 'synthetic': True,
- }]
-
- expected_revisions = [
- {
- 'url': '/api/1/revision/'
- '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6/',
- 'history_url': '/api/1/revision/'
- '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6/log/',
- 'id': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
- 'directory': '18d8be353ed3480476f032475e7c233eff7371d5',
- 'directory_url': '/api/1/directory/'
- '18d8be353ed3480476f032475e7c233eff7371d5/',
- 'author_name': 'Name Surname',
- 'author_email': 'name@surname.com',
- 'committer_name': 'Name Surname',
- 'committer_email': 'name@surname.com',
- 'message': 'amazing revision message',
- 'date_offset': 0,
- 'committer_date_offset': 0,
- 'parents': [{
- 'id': 'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc',
- 'url': '/api/1/revision/adc83b19e793491b1c6ea0fd8b46cd9f32e592fc/', # noqa
- }],
- 'type': 'tar',
- 'synthetic': True,
- },
- {
- 'url': '/api/1/revision/'
- '18d8be353ed3480476f032475e7c233eff7371d5/',
- 'history_url': '/api/1/revision/'
- '18d8be353ed3480476f032475e7c233eff7371d5/log/',
- 'id': '18d8be353ed3480476f032475e7c233eff7371d5',
- 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
- 'directory_url': '/api/1/directory/'
- '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6/',
- 'author_name': 'Software Heritage',
- 'author_email': 'robot@softwareheritage.org',
- 'committer_name': 'Software Heritage',
- 'committer_email': 'robot@softwareheritage.org',
- 'message': 'synthetic revision message',
- 'date_offset': 0,
- 'committer_date_offset': 0,
- 'parents': [{
- 'id': '7834ef7e7c357ce2af928115c6c6a42b7e2a4345',
- 'url': '/api/1/revision/7834ef7e7c357ce2af928115c6c6a42b7e2a4345/', # noqa
- }],
- 'type': 'tar',
- 'synthetic': True,
- }]
-
- # when
- rv = self.client.get('/api/1/revision/18d8be353ed3480476f0'
- '32475e7c233eff7371d5/prev/21145781e2'
- '6ad1f978e/log/')
+ revisions = self.revision_log(revision, limit=4)
- # then
- self.assertEqual(rv.status_code, 200)
- self.assertEqual(rv['Content-Type'], 'application/json')
- self.assertEqual(expected_revisions, rv.data)
- self.assertFalse(rv.has_header('Link'))
+ prev_rev = revisions[0]['id']
+ rev = revisions[-1]['id']
- mock_service.lookup_revision_log.assert_called_once_with(
- '18d8be353ed3480476f032475e7c233eff7371d5', 11)
- mock_service.lookup_revision_multiple.assert_called_once_with(
- ['21145781e26ad1f978e'])
+ per_page = 10
- @patch('swh.web.api.views.revision.service')
- def test_api_revision_log_by(self, mock_service):
- # given
- stub_revisions = [{
- 'id': '18d8be353ed3480476f032475e7c233eff7371d5',
- 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
- 'author_name': 'Software Heritage',
- 'author_email': 'robot@softwareheritage.org',
- 'committer_name': 'Software Heritage',
- 'committer_email': 'robot@softwareheritage.org',
- 'message': 'synthetic revision message',
- 'date_offset': 0,
- 'committer_date_offset': 0,
- 'parents': ['7834ef7e7c357ce2af928115c6c6a42b7e2a4345'],
- 'type': 'tar',
- 'synthetic': True,
- }]
- mock_service.lookup_revision_log_by.return_value = stub_revisions
-
- expected_revisions = [{
- 'id': '18d8be353ed3480476f032475e7c233eff7371d5',
- 'url': '/api/1/revision/18d8be353ed3480476f032475e7c233eff7371d5/',
- 'history_url': '/api/1/revision/18d8be353ed3480476f032475e7c233ef'
- 'f7371d5/log/',
- 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
- 'directory_url': '/api/1/directory/7834ef7e7c357ce2af928115c6c6a'
- '42b7e2a44e6/',
- 'author_name': 'Software Heritage',
- 'author_email': 'robot@softwareheritage.org',
- 'committer_name': 'Software Heritage',
- 'committer_email': 'robot@softwareheritage.org',
- 'message': 'synthetic revision message',
- 'date_offset': 0,
- 'committer_date_offset': 0,
- 'parents': [{
- 'id': '7834ef7e7c357ce2af928115c6c6a42b7e2a4345',
- 'url': '/api/1/revision/7834ef7e7c357ce2af928115c6c6a42b7e2a4345/' # noqa
- }],
- 'type': 'tar',
- 'synthetic': True,
- }]
-
- # when
- rv = self.client.get('/api/1/revision/origin/1/log/')
+ url = reverse('api-revision-log',
+ url_args={'sha1_git': rev,
+ 'prev_sha1s': prev_rev},
+ query_params={'per_page': per_page})
+
+ rv = self.client.get(url)
+
+ expected_log = self.revision_log(rev, limit=per_page)
+ prev_revision = self.revision_get(prev_rev)
+ expected_log.insert(0, prev_revision)
+ expected_log = list(map(self._enrich_revision, expected_log))
- # then
self.assertEqual(rv.status_code, 200)
self.assertEqual(rv['Content-Type'], 'application/json')
- self.assertEqual(rv.data, expected_revisions)
- self.assertFalse(rv.has_header('Link'))
+ self.assertEqual(rv.data, expected_log)
- mock_service.lookup_revision_log_by.assert_called_once_with(
- '1', 'HEAD', None, 11)
+ @given(origin())
+ def test_api_revision_log_by(self, origin):
- @patch('swh.web.api.views.revision.service')
- def test_api_revision_log_by_with_next(self, mock_service):
- # given
- stub_revisions = []
- for i in range(27):
- stub_revisions.append({'id': str(i)})
+ per_page = 10
- mock_service.lookup_revision_log_by.return_value = stub_revisions[:26]
+ url = reverse('api-revision-origin-log',
+ url_args={'origin_id': origin['id']},
+ query_params={'per_page': per_page})
- expected_revisions = [x for x in stub_revisions if int(x['id']) < 25]
- for e in expected_revisions:
- e['url'] = '/api/1/revision/%s/' % e['id']
- e['history_url'] = '/api/1/revision/%s/log/' % e['id']
+ rv = self.client.get(url)
- # when
- rv = self.client.get('/api/1/revision/origin/1/log/?per_page=25')
+ snapshot = self.snapshot_get_latest(origin['id'])
- # then
- self.assertEqual(rv.status_code, 200)
+ expected_log = self.revision_log(
+ snapshot['branches']['HEAD']['target'], limit=per_page+1)
- self.assertEqual(rv['Content-Type'], 'application/json')
- self.assertIsNotNone(rv['Link'])
- self.assertEqual(rv.data, expected_revisions)
+ expected_log = list(map(self._enrich_revision, expected_log))
- mock_service.lookup_revision_log_by.assert_called_once_with(
- '1', 'HEAD', None, 26)
+ has_next = len(expected_log) > per_page
- @patch('swh.web.api.views.revision.service')
- def test_api_revision_log_by_norev(self, mock_service):
- # given
- mock_service.lookup_revision_log_by.side_effect = NotFoundExc(
- 'No revision')
+ self.assertEqual(rv.status_code, 200)
+ self.assertEqual(rv['Content-Type'], 'application/json')
+ self.assertEqual(rv.data,
+ expected_log[:-1] if has_next else expected_log)
+ if has_next:
+ self.assertIn('Link', rv)
+ next_log_url = reverse(
+ 'api-revision-origin-log',
+ url_args={'origin_id': origin['id'],
+ 'branch_name': 'HEAD'},
+ query_params={'per_page': per_page,
+ 'sha1_git': expected_log[-1]['id']})
+ self.assertIn(next_log_url, rv['Link'])
+
+ @given(origin())
+ def test_api_revision_log_by_ko(self, origin):
+
+ invalid_branch_name = 'foobar'
+
+ url = reverse('api-revision-origin-log',
+ url_args={'origin_id': origin['id'],
+ 'branch_name': invalid_branch_name})
- # when
- rv = self.client.get('/api/1/revision/origin/1/log/')
+ rv = self.client.get(url)
- # then
self.assertEqual(rv.status_code, 404)
self.assertEqual(rv['Content-Type'], 'application/json')
self.assertFalse(rv.has_header('Link'))
- self.assertEqual(rv.data, {'exception': 'NotFoundExc',
- 'reason': 'No revision'})
-
- mock_service.lookup_revision_log_by.assert_called_once_with(
- '1', 'HEAD', None, 11)
+ self.assertEqual(
+ rv.data,
+ {'exception': 'NotFoundExc',
+ 'reason': 'Revision for origin %s and branch %s not found.' %
+ (origin['id'], invalid_branch_name)})
@patch('swh.web.api.views.revision._revision_directory_by')
def test_api_revision_directory_ko_not_found(self, mock_rev_dir):
# given
mock_rev_dir.side_effect = NotFoundExc('Not found')
# then
rv = self.client.get('/api/1/revision/999/directory/some/path/to/dir/')
self.assertEqual(rv.status_code, 404)
self.assertEqual(rv['Content-Type'], 'application/json')
self.assertEqual(rv.data, {
'exception': 'NotFoundExc',
'reason': 'Not found'})
mock_rev_dir.assert_called_once_with(
{'sha1_git': '999'},
'some/path/to/dir',
'/api/1/revision/999/directory/some/path/to/dir/',
with_data=False)
@patch('swh.web.api.views.revision._revision_directory_by')
def test_api_revision_directory_ok_returns_dir_entries(self, mock_rev_dir):
stub_dir = {
'type': 'dir',
'revision': '999',
'content': [
{
'sha1_git': '789',
'type': 'file',
'target': '101',
'target_url': '/api/1/content/sha1_git:101/',
'name': 'somefile',
'file_url': '/api/1/revision/999/directory/some/path/'
'somefile/'
},
{
'sha1_git': '123',
'type': 'dir',
'target': '456',
'target_url': '/api/1/directory/456/',
'name': 'to-subdir',
'dir_url': '/api/1/revision/999/directory/some/path/'
'to-subdir/',
}]
}
# given
mock_rev_dir.return_value = stub_dir
# then
rv = self.client.get('/api/1/revision/999/directory/some/path/')
self.assertEqual(rv.status_code, 200)
self.assertEqual(rv['Content-Type'], 'application/json')
self.assertEqual(rv.data, stub_dir)
mock_rev_dir.assert_called_once_with(
{'sha1_git': '999'},
'some/path',
'/api/1/revision/999/directory/some/path/',
with_data=False)
@patch('swh.web.api.views.revision._revision_directory_by')
def test_api_revision_directory_ok_returns_content(self, mock_rev_dir):
stub_content = {
'type': 'file',
'revision': '999',
'content': {
'sha1_git': '789',
'sha1': '101',
'data_url': '/api/1/content/101/raw/',
}
}
# given
mock_rev_dir.return_value = stub_content
# then
url = '/api/1/revision/666/directory/some/other/path/'
rv = self.client.get(url)
self.assertEqual(rv.status_code, 200)
self.assertEqual(rv['Content-Type'], 'application/json')
self.assertEqual(rv.data, stub_content)
mock_rev_dir.assert_called_once_with(
{'sha1_git': '666'}, 'some/other/path', url, with_data=False)
+
+ def _enrich_revision(self, revision):
+ author_url = reverse(
+ 'api-person',
+ url_args={'person_id': revision['author']['id']})
+
+ committer_url = reverse(
+ 'api-person',
+ url_args={'person_id': revision['committer']['id']})
+
+ directory_url = reverse(
+ 'api-directory',
+ url_args={'sha1_git': revision['directory']})
+
+ history_url = reverse('api-revision-log',
+ url_args={'sha1_git': revision['id']})
+
+ parents_id_url = []
+ for p in revision['parents']:
+ parents_id_url.append({
+ 'id': p,
+ 'url': reverse('api-revision', url_args={'sha1_git': p})
+ })
+
+ revision_url = reverse('api-revision',
+ url_args={'sha1_git': revision['id']})
+
+ revision['author_url'] = author_url
+ revision['committer_url'] = committer_url
+ revision['directory_url'] = directory_url
+ revision['history_url'] = history_url
+ revision['url'] = revision_url
+ revision['parents'] = parents_id_url
+
+ return revision
diff --git a/swh/web/tests/common/test_service.py b/swh/web/tests/common/test_service.py
index ca5c0b7ae..d0ad0b8f1 100644
--- a/swh/web/tests/common/test_service.py
+++ b/swh/web/tests/common/test_service.py
@@ -1,808 +1,820 @@
-# Copyright (C) 2015-2018 The Software Heritage developers
+# Copyright (C) 2015-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import itertools
import pytest
import random
from collections import defaultdict
from hypothesis import given
from swh.model.hashutil import hash_to_bytes, hash_to_hex
from swh.web.common import service
from swh.web.common.exc import BadInputExc, NotFoundExc
from swh.web.tests.strategies import (
content, contents, unknown_content, unknown_contents,
contents_with_ctags, origin, new_origin, visit_dates, directory,
release, revision, unknown_revision, revisions, unknown_revisions,
ancestor_revisions, non_ancestor_revisions, invalid_sha1, sha256,
revision_with_submodules, unknown_directory, empty_directory,
new_revision, new_origins
)
from swh.web.tests.testcase import (
WebTestCase, ctags_json_missing, fossology_missing
)
class ServiceTestCase(WebTestCase):
@given(contents())
def test_lookup_multiple_hashes_all_present(self, contents):
input_data = []
expected_output = []
for cnt in contents:
input_data.append({'sha1': cnt['sha1']})
expected_output.append({'sha1': cnt['sha1'],
'found': True})
self.assertEqual(service.lookup_multiple_hashes(input_data),
expected_output)
@given(contents(), unknown_contents())
def test_lookup_multiple_hashes_some_missing(self, contents,
unknown_contents):
input_contents = list(itertools.chain(contents, unknown_contents))
random.shuffle(input_contents)
input_data = []
expected_output = []
for cnt in input_contents:
input_data.append({'sha1': cnt['sha1']})
expected_output.append({'sha1': cnt['sha1'],
'found': cnt in contents})
self.assertEqual(service.lookup_multiple_hashes(input_data),
expected_output)
@given(unknown_content())
def test_lookup_hash_does_not_exist(self, unknown_content):
actual_lookup = service.lookup_hash('sha1_git:%s' %
unknown_content['sha1_git'])
self.assertEqual(actual_lookup, {'found': None,
'algo': 'sha1_git'})
@given(content())
def test_lookup_hash_exist(self, content):
actual_lookup = service.lookup_hash('sha1:%s' % content['sha1'])
content_metadata = self.content_get_metadata(content['sha1'])
self.assertEqual({'found': content_metadata,
'algo': 'sha1'}, actual_lookup)
@given(unknown_content())
def test_search_hash_does_not_exist(self, content):
actual_lookup = service.search_hash('sha1_git:%s' %
content['sha1_git'])
self.assertEqual({'found': False}, actual_lookup)
@given(content())
def test_search_hash_exist(self, content):
actual_lookup = service.search_hash('sha1:%s' % content['sha1'])
self.assertEqual({'found': True}, actual_lookup)
@pytest.mark.skipif(ctags_json_missing,
reason="requires ctags with json output support")
@given(contents_with_ctags())
def test_lookup_content_ctags(self, contents_with_ctags):
content_sha1 = random.choice(contents_with_ctags['sha1s'])
self.content_add_ctags(content_sha1)
actual_ctags = \
list(service.lookup_content_ctags('sha1:%s' % content_sha1))
expected_data = list(self.content_get_ctags(content_sha1))
for ctag in expected_data:
ctag['id'] = content_sha1
self.assertEqual(actual_ctags, expected_data)
@given(unknown_content())
def test_lookup_content_ctags_no_hash(self, unknown_content):
actual_ctags = \
list(service.lookup_content_ctags('sha1:%s' %
unknown_content['sha1']))
self.assertEqual(actual_ctags, [])
@given(content())
def test_lookup_content_filetype(self, content):
self.content_add_mimetype(content['sha1'])
actual_filetype = service.lookup_content_filetype(content['sha1'])
expected_filetype = self.content_get_mimetype(content['sha1'])
self.assertEqual(actual_filetype, expected_filetype)
@given(content())
def test_lookup_content_language(self, content):
self.content_add_language(content['sha1'])
actual_language = service.lookup_content_language(content['sha1'])
expected_language = self.content_get_language(content['sha1'])
self.assertEqual(actual_language, expected_language)
@given(contents_with_ctags())
def test_lookup_expression(self, contents_with_ctags):
per_page = 10
expected_ctags = []
for content_sha1 in contents_with_ctags['sha1s']:
if len(expected_ctags) == per_page:
break
self.content_add_ctags(content_sha1)
for ctag in self.content_get_ctags(content_sha1):
if len(expected_ctags) == per_page:
break
if ctag['name'] == contents_with_ctags['symbol_name']:
del ctag['id']
ctag['sha1'] = content_sha1
expected_ctags.append(ctag)
actual_ctags = \
list(service.lookup_expression(contents_with_ctags['symbol_name'],
last_sha1=None, per_page=10))
self.assertEqual(actual_ctags, expected_ctags)
def test_lookup_expression_no_result(self):
expected_ctags = []
actual_ctags = \
list(service.lookup_expression('barfoo', last_sha1=None,
per_page=10))
self.assertEqual(actual_ctags, expected_ctags)
@pytest.mark.skipif(fossology_missing,
reason="requires fossology-nomossa installed")
@given(content())
def test_lookup_content_license(self, content):
self.content_add_license(content['sha1'])
actual_license = service.lookup_content_license(content['sha1'])
expected_license = self.content_get_license(content['sha1'])
self.assertEqual(actual_license, expected_license)
def test_stat_counters(self):
actual_stats = service.stat_counters()
self.assertEqual(actual_stats, self.storage.stat_counters())
@given(new_origin(), visit_dates())
def test_lookup_origin_visits(self, new_origin, visit_dates):
origin_id = self.storage.origin_add_one(new_origin)
for ts in visit_dates:
self.storage.origin_visit_add(origin_id, ts)
actual_origin_visits = list(
service.lookup_origin_visits(origin_id, per_page=100))
expected_visits = self.origin_visit_get(origin_id)
self.assertEqual(actual_origin_visits, expected_visits)
@given(new_origin(), visit_dates())
def test_lookup_origin_visit(self, new_origin, visit_dates):
origin_id = self.storage.origin_add_one(new_origin)
visits = []
for ts in visit_dates:
visits.append(self.storage.origin_visit_add(origin_id, ts))
visit = random.choice(visits)['visit']
actual_origin_visit = service.lookup_origin_visit(origin_id, visit)
expected_visit = dict(self.storage.origin_visit_get_by(origin_id,
visit))
expected_visit['date'] = expected_visit['date'].isoformat()
expected_visit['metadata'] = {}
self.assertEqual(actual_origin_visit, expected_visit)
@given(new_origin())
def test_lookup_origin(self, new_origin):
origin_id = self.storage.origin_add_one(new_origin)
actual_origin = service.lookup_origin({'id': origin_id})
expected_origin = self.storage.origin_get({'id': origin_id})
self.assertEqual(actual_origin, expected_origin)
actual_origin = service.lookup_origin({'type': new_origin['type'],
'url': new_origin['url']})
expected_origin = self.storage.origin_get({'type': new_origin['type'],
'url': new_origin['url']})
self.assertEqual(actual_origin, expected_origin)
@given(invalid_sha1())
def test_lookup_release_ko_id_checksum_not_a_sha1(self, invalid_sha1):
with self.assertRaises(BadInputExc) as cm:
service.lookup_release(invalid_sha1)
self.assertIn('invalid checksum', cm.exception.args[0].lower())
@given(sha256())
def test_lookup_release_ko_id_checksum_too_long(self, sha256):
with self.assertRaises(BadInputExc) as cm:
service.lookup_release(sha256)
self.assertEqual('Only sha1_git is supported.', cm.exception.args[0])
@given(directory())
def test_lookup_directory_with_path_not_found(self, directory):
path = 'some/invalid/path/here'
with self.assertRaises(NotFoundExc) as cm:
service.lookup_directory_with_path(directory, path)
self.assertEqual('Directory entry with path %s from %s '
'not found' % (path, directory),
cm.exception.args[0])
@given(directory())
def test_lookup_directory_with_path_found(self, directory):
directory_content = self.directory_ls(directory)
directory_entry = random.choice(directory_content)
path = directory_entry['name']
actual_result = service.lookup_directory_with_path(directory, path)
self.assertEqual(actual_result, directory_entry)
@given(release())
def test_lookup_release(self, release):
actual_release = service.lookup_release(release)
self.assertEqual(actual_release,
self.release_get(release))
@given(revision(), invalid_sha1(), sha256())
def test_lookup_revision_with_context_ko_not_a_sha1(self, revision,
invalid_sha1,
sha256):
sha1_git_root = revision
sha1_git = invalid_sha1
with self.assertRaises(BadInputExc) as cm:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
self.assertIn('Invalid checksum query string', cm.exception.args[0])
sha1_git = sha256
with self.assertRaises(BadInputExc) as cm:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
self.assertIn('Only sha1_git is supported', cm.exception.args[0])
@given(revision(), unknown_revision())
def test_lookup_revision_with_context_ko_sha1_git_does_not_exist(
self, revision, unknown_revision):
sha1_git_root = revision
sha1_git = unknown_revision
with self.assertRaises(NotFoundExc) as cm:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
self.assertIn('Revision %s not found' % sha1_git, cm.exception.args[0])
@given(revision(), unknown_revision())
def test_lookup_revision_with_context_ko_root_sha1_git_does_not_exist(
self, revision, unknown_revision):
sha1_git_root = unknown_revision
sha1_git = revision
with self.assertRaises(NotFoundExc) as cm:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
self.assertIn('Revision root %s not found' % sha1_git_root,
cm.exception.args[0])
@given(ancestor_revisions())
def test_lookup_revision_with_context(self, ancestor_revisions):
sha1_git = ancestor_revisions['sha1_git']
root_sha1_git = ancestor_revisions['sha1_git_root']
for sha1_git_root in (root_sha1_git,
{'id': hash_to_bytes(root_sha1_git)}):
actual_revision = \
service.lookup_revision_with_context(sha1_git_root,
sha1_git)
children = []
for rev in self.revision_log(root_sha1_git):
for p_rev in rev['parents']:
p_rev_hex = hash_to_hex(p_rev)
if p_rev_hex == sha1_git:
children.append(rev['id'])
expected_revision = self.revision_get(sha1_git)
expected_revision['children'] = children
self.assertEqual(actual_revision, expected_revision)
@given(non_ancestor_revisions())
def test_lookup_revision_with_context_ko(self, non_ancestor_revisions):
sha1_git = non_ancestor_revisions['sha1_git']
root_sha1_git = non_ancestor_revisions['sha1_git_root']
with self.assertRaises(NotFoundExc) as cm:
service.lookup_revision_with_context(root_sha1_git, sha1_git)
self.assertIn('Revision %s is not an ancestor of %s' %
(sha1_git, root_sha1_git), cm.exception.args[0])
@given(unknown_revision())
def test_lookup_directory_with_revision_not_found(self, unknown_revision):
with self.assertRaises(NotFoundExc) as cm:
service.lookup_directory_with_revision(unknown_revision)
self.assertIn('Revision %s not found' % unknown_revision,
cm.exception.args[0])
@given(revision())
def test_lookup_directory_with_revision_ko_path_to_nowhere(self, revision):
invalid_path = 'path/to/something/unknown'
with self.assertRaises(NotFoundExc) as cm:
service.lookup_directory_with_revision(revision, invalid_path)
exception_text = cm.exception.args[0].lower()
self.assertIn('directory or file', exception_text)
self.assertIn(invalid_path, exception_text)
self.assertIn('revision %s' % revision, exception_text)
self.assertIn('not found', exception_text)
@given(revision_with_submodules())
- def test_lookup_directory_with_revision_ko_type_not_implemented(
+ def test_lookup_directory_with_revision_submodules(
self, revision_with_submodules):
- with self.assertRaises(NotImplementedError) as cm:
- service.lookup_directory_with_revision(
- revision_with_submodules['rev_sha1_git'],
- revision_with_submodules['rev_dir_rev_path'])
- self.assertIn("Entity of type rev not implemented.",
- cm.exception.args[0])
+ rev_sha1_git = revision_with_submodules['rev_sha1_git']
+ rev_dir_path = revision_with_submodules['rev_dir_rev_path']
+
+ actual_data = service.lookup_directory_with_revision(
+ rev_sha1_git, rev_dir_path)
+
+ revision = self.revision_get(revision_with_submodules['rev_sha1_git'])
+ directory = self.directory_ls(revision['directory'])
+ rev_entry = next(e for e in directory if e['name'] == rev_dir_path)
+
+ expected_data = {
+ 'content': self.revision_get(rev_entry['target']),
+ 'path': rev_dir_path,
+ 'revision': rev_sha1_git,
+ 'type': 'rev'
+ }
+
+ self.assertEqual(actual_data, expected_data)
@given(revision())
def test_lookup_directory_with_revision_without_path(self, revision):
actual_directory_entries = \
service.lookup_directory_with_revision(revision)
revision_data = self.revision_get(revision)
expected_directory_entries = \
self.directory_ls(revision_data['directory'])
self.assertEqual(actual_directory_entries['type'], 'dir')
self.assertEqual(actual_directory_entries['content'],
expected_directory_entries)
@given(revision())
def test_lookup_directory_with_revision_with_path(self, revision):
revision_data = self.revision_get(revision)
dir_entries = [e for e in self.directory_ls(revision_data['directory'])
if e['type'] in ('file', 'dir')]
expected_dir_entry = random.choice(dir_entries)
actual_dir_entry = \
service.lookup_directory_with_revision(revision,
expected_dir_entry['name'])
self.assertEqual(actual_dir_entry['type'], expected_dir_entry['type'])
self.assertEqual(actual_dir_entry['revision'], revision)
self.assertEqual(actual_dir_entry['path'], expected_dir_entry['name'])
if actual_dir_entry['type'] == 'file':
del actual_dir_entry['content']['checksums']['blake2s256']
for key in ('checksums', 'status', 'length'):
self.assertEqual(actual_dir_entry['content'][key],
expected_dir_entry[key])
else:
sub_dir_entries = self.directory_ls(expected_dir_entry['target'])
self.assertEqual(actual_dir_entry['content'], sub_dir_entries)
@given(revision())
def test_lookup_directory_with_revision_with_path_to_file_and_data(
self, revision):
revision_data = self.revision_get(revision)
dir_entries = [e for e in self.directory_ls(revision_data['directory'])
if e['type'] == 'file']
expected_dir_entry = random.choice(dir_entries)
expected_data = \
self.content_get(expected_dir_entry['checksums']['sha1'])
actual_dir_entry = \
service.lookup_directory_with_revision(revision,
expected_dir_entry['name'],
with_data=True)
self.assertEqual(actual_dir_entry['type'], expected_dir_entry['type'])
self.assertEqual(actual_dir_entry['revision'], revision)
self.assertEqual(actual_dir_entry['path'], expected_dir_entry['name'])
del actual_dir_entry['content']['checksums']['blake2s256']
for key in ('checksums', 'status', 'length'):
self.assertEqual(actual_dir_entry['content'][key],
expected_dir_entry[key])
self.assertEqual(actual_dir_entry['content']['data'],
expected_data['data'])
@given(revision())
def test_lookup_revision(self, revision):
actual_revision = service.lookup_revision(revision)
self.assertEqual(actual_revision, self.revision_get(revision))
@given(new_revision())
def test_lookup_revision_invalid_msg(self, new_revision):
new_revision['message'] = b'elegant fix for bug \xff'
self.storage.revision_add([new_revision])
revision = service.lookup_revision(hash_to_hex(new_revision['id']))
self.assertEqual(revision['message'], None)
self.assertEqual(revision['message_decoding_failed'], True)
@given(new_revision())
def test_lookup_revision_msg_ok(self, new_revision):
self.storage.revision_add([new_revision])
revision_message = service.lookup_revision_message(
hash_to_hex(new_revision['id']))
self.assertEqual(revision_message,
{'message': new_revision['message']})
@given(new_revision())
def test_lookup_revision_msg_absent(self, new_revision):
del new_revision['message']
self.storage.revision_add([new_revision])
new_revision_id = hash_to_hex(new_revision['id'])
with self.assertRaises(NotFoundExc) as cm:
service.lookup_revision_message(new_revision_id)
self.assertEqual(
cm.exception.args[0],
'No message for revision with sha1_git %s.' % new_revision_id
)
@given(unknown_revision())
def test_lookup_revision_msg_no_rev(self, unknown_revision):
with self.assertRaises(NotFoundExc) as cm:
service.lookup_revision_message(unknown_revision)
self.assertEqual(
cm.exception.args[0],
'Revision with sha1_git %s not found.' % unknown_revision
)
@given(revisions())
def test_lookup_revision_multiple(self, revisions):
actual_revisions = list(service.lookup_revision_multiple(revisions))
expected_revisions = []
for rev in revisions:
expected_revisions.append(self.revision_get(rev))
self.assertEqual(actual_revisions, expected_revisions)
@given(unknown_revisions())
def test_lookup_revision_multiple_none_found(self, unknown_revisions):
actual_revisions = \
list(service.lookup_revision_multiple(unknown_revisions))
self.assertEqual(actual_revisions, [None] * len(unknown_revisions))
@given(revision())
def test_lookup_revision_log(self, revision):
actual_revision_log = \
list(service.lookup_revision_log(revision, limit=25))
expected_revision_log = self.revision_log(revision, limit=25)
self.assertEqual(actual_revision_log, expected_revision_log)
def _get_origin_branches(self, origin):
- origin_visit = self.origin_visit_get(origin['id'])[0]
+ origin_visit = self.origin_visit_get(origin['id'])[-1]
snapshot = self.snapshot_get(origin_visit['snapshot'])
branches = {k: v for (k, v) in snapshot['branches'].items()
if v['target_type'] == 'revision'}
return branches
@given(origin())
def test_lookup_revision_log_by(self, origin):
branches = self._get_origin_branches(origin)
branch_name = random.choice(list(branches.keys()))
actual_log = \
list(service.lookup_revision_log_by(origin['id'], branch_name,
None, limit=25))
expected_log = \
self.revision_log(branches[branch_name]['target'], limit=25)
self.assertEqual(actual_log, expected_log)
@given(origin())
def test_lookup_revision_log_by_notfound(self, origin):
with self.assertRaises(NotFoundExc):
service.lookup_revision_log_by(
origin['id'], 'unknown_branch_name', None, limit=100)
@given(unknown_content())
def test_lookup_content_raw_not_found(self, unknown_content):
with self.assertRaises(NotFoundExc) as cm:
service.lookup_content_raw('sha1:' + unknown_content['sha1'])
self.assertIn(cm.exception.args[0],
'Content with %s checksum equals to %s not found!' %
('sha1', unknown_content['sha1']))
@given(content())
def test_lookup_content_raw(self, content):
actual_content = service.lookup_content_raw(
'sha256:%s' % content['sha256'])
expected_content = self.content_get(content['sha1'])
self.assertEqual(actual_content, expected_content)
@given(unknown_content())
def test_lookup_content_not_found(self, unknown_content):
with self.assertRaises(NotFoundExc) as cm:
service.lookup_content('sha1:%s' % unknown_content['sha1'])
self.assertIn(cm.exception.args[0],
'Content with %s checksum equals to %s not found!' %
('sha1', unknown_content['sha1']))
@given(content())
def test_lookup_content_with_sha1(self, content):
actual_content = service.lookup_content(
'sha1:%s' % content['sha1'])
expected_content = self.content_get_metadata(content['sha1'])
self.assertEqual(actual_content, expected_content)
@given(content())
def test_lookup_content_with_sha256(self, content):
actual_content = service.lookup_content(
'sha256:%s' % content['sha256'])
expected_content = self.content_get_metadata(content['sha1'])
self.assertEqual(actual_content, expected_content)
@given(revision())
def test_lookup_person(self, revision):
rev_data = self.revision_get(revision)
actual_person = service.lookup_person(rev_data['author']['id'])
self.assertEqual(actual_person, rev_data['author'])
def test_lookup_directory_bad_checksum(self):
with self.assertRaises(BadInputExc):
service.lookup_directory('directory_id')
@given(unknown_directory())
def test_lookup_directory_not_found(self, unknown_directory):
with self.assertRaises(NotFoundExc) as cm:
service.lookup_directory(unknown_directory)
self.assertIn('Directory with sha1_git %s not found'
% unknown_directory, cm.exception.args[0])
@given(directory())
def test_lookup_directory(self, directory):
actual_directory_ls = list(service.lookup_directory(
directory))
expected_directory_ls = self.directory_ls(directory)
self.assertEqual(actual_directory_ls, expected_directory_ls)
@given(empty_directory())
def test_lookup_directory_empty(self, empty_directory):
actual_directory_ls = list(service.lookup_directory(empty_directory))
self.assertEqual(actual_directory_ls, [])
@given(origin())
def test_lookup_revision_by_nothing_found(self, origin):
with self.assertRaises(NotFoundExc):
service.lookup_revision_by(origin['id'], 'invalid-branch-name')
@given(origin())
def test_lookup_revision_by(self, origin):
branches = self._get_origin_branches(origin)
branch_name = random.choice(list(branches.keys()))
actual_revision = \
service.lookup_revision_by(origin['id'], branch_name, None)
expected_revision = \
self.revision_get(branches[branch_name]['target'])
self.assertEqual(actual_revision, expected_revision)
@given(origin(), revision())
def test_lookup_revision_with_context_by_ko(self, origin, revision):
with self.assertRaises(NotFoundExc):
service.lookup_revision_with_context_by(origin['id'],
'invalid-branch-name',
None,
revision)
@given(origin())
def test_lookup_revision_with_context_by(self, origin):
branches = self._get_origin_branches(origin)
branch_name = random.choice(list(branches.keys()))
root_rev = branches[branch_name]['target']
root_rev_log = self.revision_log(root_rev)
children = defaultdict(list)
for rev in root_rev_log:
for rev_p in rev['parents']:
children[rev_p].append(rev['id'])
rev = root_rev_log[-1]['id']
actual_root_rev, actual_rev = service.lookup_revision_with_context_by(
origin['id'], branch_name, None, rev)
expected_root_rev = self.revision_get(root_rev)
expected_rev = self.revision_get(rev)
expected_rev['children'] = children[rev]
self.assertEqual(actual_root_rev, expected_root_rev)
self.assertEqual(actual_rev, expected_rev)
def test_lookup_revision_through_ko_not_implemented(self):
with self.assertRaises(NotImplementedError):
service.lookup_revision_through({
'something-unknown': 10,
})
@given(origin())
def test_lookup_revision_through_with_context_by(self, origin):
branches = self._get_origin_branches(origin)
branch_name = random.choice(list(branches.keys()))
root_rev = branches[branch_name]['target']
root_rev_log = self.revision_log(root_rev)
rev = root_rev_log[-1]['id']
self.assertEqual(service.lookup_revision_through({
'origin_id': origin['id'],
'branch_name': branch_name,
'ts': None,
'sha1_git': rev
}),
service.lookup_revision_with_context_by(
origin['id'], branch_name, None, rev)
)
@given(origin())
def test_lookup_revision_through_with_revision_by(self, origin):
branches = self._get_origin_branches(origin)
branch_name = random.choice(list(branches.keys()))
self.assertEqual(service.lookup_revision_through({
'origin_id': origin['id'],
'branch_name': branch_name,
'ts': None,
}),
service.lookup_revision_by(
origin['id'], branch_name, None)
)
@given(ancestor_revisions())
def test_lookup_revision_through_with_context(self, ancestor_revisions):
sha1_git = ancestor_revisions['sha1_git']
sha1_git_root = ancestor_revisions['sha1_git_root']
self.assertEqual(service.lookup_revision_through({
'sha1_git_root': sha1_git_root,
'sha1_git': sha1_git,
}),
service.lookup_revision_with_context(
sha1_git_root, sha1_git)
)
@given(revision())
def test_lookup_revision_through_with_revision(self, revision):
self.assertEqual(service.lookup_revision_through({
'sha1_git': revision
}),
service.lookup_revision(revision)
)
@given(revision())
def test_lookup_directory_through_revision_ko_not_found(self, revision):
with self.assertRaises(NotFoundExc):
service.lookup_directory_through_revision(
{'sha1_git': revision}, 'some/invalid/path')
@given(revision())
def test_lookup_directory_through_revision_ok(self, revision):
revision_data = self.revision_get(revision)
dir_entries = [e for e in self.directory_ls(revision_data['directory'])
if e['type'] == 'file']
dir_entry = random.choice(dir_entries)
self.assertEqual(
service.lookup_directory_through_revision({'sha1_git': revision},
dir_entry['name']),
(revision,
service.lookup_directory_with_revision(
revision, dir_entry['name']))
)
@given(revision())
def test_lookup_directory_through_revision_ok_with_data(self, revision):
revision_data = self.revision_get(revision)
dir_entries = [e for e in self.directory_ls(revision_data['directory'])
if e['type'] == 'file']
dir_entry = random.choice(dir_entries)
self.assertEqual(
service.lookup_directory_through_revision({'sha1_git': revision},
dir_entry['name'],
with_data=True),
(revision,
service.lookup_directory_with_revision(
revision, dir_entry['name'], with_data=True))
)
@given(new_origins(20))
def test_lookup_origins(self, new_origins):
nb_origins = len(new_origins)
expected_origins = self.storage.origin_add(new_origins)
origin_from_idx = random.randint(1, nb_origins-1) - 1
origin_from = expected_origins[origin_from_idx]['id']
max_origin_idx = expected_origins[-1]['id']
origin_count = random.randint(1, max_origin_idx - origin_from)
actual_origins = list(service.lookup_origins(origin_from,
origin_count))
expected_origins = list(self.storage.origin_get_range(origin_from,
origin_count))
self.assertEqual(actual_origins, expected_origins)
diff --git a/swh/web/tests/data.py b/swh/web/tests/data.py
index d7c1aa88b..dced201bd 100644
--- a/swh/web/tests/data.py
+++ b/swh/web/tests/data.py
@@ -1,254 +1,260 @@
-# Copyright (C) 2018 The Software Heritage developers
+# Copyright (C) 2018-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
+import time
from swh.indexer.language import LanguageIndexer
from swh.indexer.fossology_license import FossologyLicenseIndexer
from swh.indexer.mimetype import MimetypeIndexer
from swh.indexer.ctags import CtagsIndexer
from swh.indexer.storage import get_indexer_storage
from swh.model.hashutil import hash_to_hex, hash_to_bytes, DEFAULT_ALGORITHMS
from swh.model.identifiers import directory_identifier
from swh.loader.git.from_disk import GitLoaderFromArchive
from swh.storage.algos.dir_iterators import dir_iterator
# Module used to initialize data that will be provided as tests input
# Configuration for git loader
_TEST_LOADER_CONFIG = {
'storage': {
'cls': 'memory',
'args': {}
},
'send_contents': True,
'send_directories': True,
'send_revisions': True,
'send_releases': True,
'send_snapshot': True,
'content_size_limit': 100 * 1024 * 1024,
'content_packet_size': 10,
'content_packet_size_bytes': 100 * 1024 * 1024,
'directory_packet_size': 10,
'revision_packet_size': 10,
'release_packet_size': 10,
'save_data': False,
}
# Base content indexer configuration
_TEST_INDEXER_BASE_CONFIG = {
'storage': {
'cls': 'memory',
'args': {},
},
'objstorage': {
'cls': 'memory',
'args': {},
},
'indexer_storage': {
'cls': 'memory',
'args': {},
}
}
# MimetypeIndexer with custom configuration for tests
class _MimetypeIndexer(MimetypeIndexer):
def parse_config_file(self, *args, **kwargs):
return {
**_TEST_INDEXER_BASE_CONFIG,
'tools': {
'name': 'file',
'version': '1:5.30-1+deb9u1',
'configuration': {
"type": "library",
"debian-package": "python3-magic"
}
}
}
# LanguageIndexer with custom configuration for tests
class _LanguageIndexer(LanguageIndexer):
def parse_config_file(self, *args, **kwargs):
return {
**_TEST_INDEXER_BASE_CONFIG,
'tools': {
'name': 'pygments',
'version': '2.0.1+dfsg-1.1+deb8u1',
'configuration': {
'type': 'library',
'debian-package': 'python3-pygments',
'max_content_size': 10240,
}
}
}
# FossologyLicenseIndexer with custom configuration for tests
class _FossologyLicenseIndexer(FossologyLicenseIndexer):
def parse_config_file(self, *args, **kwargs):
return {
**_TEST_INDEXER_BASE_CONFIG,
'workdir': '/tmp/swh/indexer.fossology.license',
'tools': {
'name': 'nomos',
'version': '3.1.0rc2-31-ga2cbb8c',
'configuration': {
'command_line': 'nomossa <filepath>',
},
}
}
# CtagsIndexer with custom configuration for tests
class _CtagsIndexer(CtagsIndexer):
def parse_config_file(self, *args, **kwargs):
return {
**_TEST_INDEXER_BASE_CONFIG,
'workdir': '/tmp/swh/indexer.ctags',
'languages': {'c': 'c'},
'tools': {
'name': 'universal-ctags',
'version': '~git7859817b',
'configuration': {
'command_line': '''ctags --fields=+lnz --sort=no --links=no ''' # noqa
'''--output-format=json <filepath>'''
},
}
}
# Lightweight git repositories that will be loaded to generate
# input data for tests
_TEST_ORIGINS = [
{
'id': 1,
'type': 'git',
'url': 'https://github.com/wcoder/highlightjs-line-numbers.js',
- 'archive': 'highlightjs-line-numbers.js.zip'
+ 'archives': ['highlightjs-line-numbers.js.zip',
+ 'highlightjs-line-numbers.js_visit2.zip']
},
{
'id': 2,
'type': 'git',
'url': 'https://github.com/memononen/libtess2',
- 'archive': 'libtess2.zip'
+ 'archives': ['libtess2.zip']
},
{
'id': 3,
'type': 'git',
'url': 'repo_with_submodules',
- 'archive': 'repo_with_submodules.tgz'
+ 'archives': ['repo_with_submodules.tgz']
}
]
# Tests data initialization
def _init_tests_data():
# Load git repositories from archives
loader = GitLoaderFromArchive(config=_TEST_LOADER_CONFIG)
for origin in _TEST_ORIGINS:
- origin_repo_archive = \
- os.path.join(os.path.dirname(__file__),
- 'resources/repos/%s' % origin['archive'])
- loader.load(origin['url'], origin_repo_archive, None)
+ nb_visits = len(origin['archives'])
+ for i, archive in enumerate(origin['archives']):
+ origin_repo_archive = \
+ os.path.join(os.path.dirname(__file__),
+ 'resources/repos/%s' % archive)
+ loader.load(origin['url'], origin_repo_archive, None)
+ if nb_visits > 1 and i != nb_visits - 1:
+ time.sleep(1)
# Get reference to the memory storage
storage = loader.storage
contents = set()
directories = set()
revisions = set()
releases = set()
snapshots = set()
persons = set()
# Get all objects loaded into the test archive
for origin in _TEST_ORIGINS:
snp = storage.snapshot_get_latest(origin['id'])
snapshots.add(hash_to_hex(snp['id']))
for branch_name, branch_data in snp['branches'].items():
if branch_data['target_type'] == 'revision':
revisions.add(branch_data['target'])
elif branch_data['target_type'] == 'release':
release = next(storage.release_get([branch_data['target']]))
revisions.add(release['target'])
releases.add(hash_to_hex(branch_data['target']))
persons.add(release['author']['id'])
for rev_log in storage.revision_shortlog(set(revisions)):
rev_id = rev_log[0]
revisions.add(rev_id)
for rev in storage.revision_get(revisions):
dir_id = rev['directory']
persons.add(rev['author']['id'])
persons.add(rev['committer']['id'])
directories.add(hash_to_hex(dir_id))
for entry in dir_iterator(storage, dir_id):
if entry['type'] == 'file':
contents.add(entry['sha1'])
elif entry['type'] == 'dir':
directories.add(hash_to_hex(entry['target']))
# Get all checksums for each content
contents_metadata = storage.content_get_metadata(contents)
contents = []
for content_metadata in contents_metadata:
contents.append({
algo: hash_to_hex(content_metadata[algo])
for algo in DEFAULT_ALGORITHMS
})
# Create indexer storage instance that will be shared by indexers
idx_storage = get_indexer_storage('memory', {})
# Instantiate content indexers that will be used in tests
# and force them to use the memory storages
indexers = {}
for idx_name, idx_class in (('mimetype_indexer', _MimetypeIndexer),
('language_indexer', _LanguageIndexer),
('license_indexer', _FossologyLicenseIndexer),
('ctags_indexer', _CtagsIndexer)):
idx = idx_class()
idx.storage = storage
idx.objstorage = storage.objstorage
idx.idx_storage = idx_storage
idx.register_tools(idx.config['tools'])
indexers[idx_name] = idx
# Add the empty directory to the test archive
empty_dir_id = directory_identifier({'entries': []})
empty_dir_id_bin = hash_to_bytes(empty_dir_id)
storage.directory_add([{'id': empty_dir_id_bin, 'entries': []}])
# Return tests data
return {
'storage': storage,
'idx_storage': idx_storage,
**indexers,
'origins': _TEST_ORIGINS,
'contents': contents,
'directories': list(directories),
'persons': list(persons),
'releases': list(releases),
'revisions': list(map(hash_to_hex, revisions)),
'snapshots': list(snapshots)
}
_tests_data = None
def get_tests_data():
"""
Initialize tests data and return them in a dict.
"""
global _tests_data
if _tests_data is None:
_tests_data = _init_tests_data()
return _tests_data
diff --git a/swh/web/tests/resources/repos/highlightjs-line-numbers.js_visit2.zip b/swh/web/tests/resources/repos/highlightjs-line-numbers.js_visit2.zip
new file mode 100644
index 000000000..160f6fdc2
Binary files /dev/null and b/swh/web/tests/resources/repos/highlightjs-line-numbers.js_visit2.zip differ
diff --git a/swh/web/tests/strategies.py b/swh/web/tests/strategies.py
index d0d7bc06f..8b79dfcdb 100644
--- a/swh/web/tests/strategies.py
+++ b/swh/web/tests/strategies.py
@@ -1,446 +1,467 @@
-# Copyright (C) 2018 The Software Heritage developers
+# Copyright (C) 2018-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import random
from collections import defaultdict
from datetime import datetime
from hypothesis import settings, assume
from hypothesis.strategies import (
just, sampled_from, lists, composite, datetimes,
integers, binary, text, characters
)
from swh.model.hashutil import hash_to_hex, hash_to_bytes
from swh.model.identifiers import directory_identifier
from swh.storage.algos.revisions_walker import get_revisions_walker
from swh.storage.tests.algos.test_snapshot import ( # noqa
origins as new_origin_strategy, snapshots as new_snapshot
)
from swh.web.tests.data import get_tests_data
# Module dedicated to the generation of input data for tests through
# the use of hypothesis.
# Some of these data are sampled from a test archive created and populated
# in the swh.web.tests.data module.
# Set the swh-web hypothesis profile if none has been explicitly set
hypothesis_default_settings = settings.get_profile('default')
if repr(settings()) == repr(hypothesis_default_settings):
settings.load_profile('swh-web')
# Import tests data
tests_data = get_tests_data()
storage = tests_data['storage']
# The following strategies exploit the hypothesis capabilities
def _known_swh_object(object_type):
return sampled_from(tests_data[object_type])
def sha1():
"""
Hypothesis strategy returning a valid hexadecimal sha1 value.
"""
return binary(
min_size=20, max_size=20).filter(
lambda s: int.from_bytes(s, byteorder='little')).map(hash_to_hex)
def invalid_sha1():
"""
Hypothesis strategy returning an invalid sha1 representation.
"""
return binary(
min_size=50, max_size=50).filter(
lambda s: int.from_bytes(s, byteorder='little')).map(hash_to_hex)
def sha256():
"""
Hypothesis strategy returning a valid hexadecimal sha256 value.
"""
return binary(
min_size=32, max_size=32).filter(
lambda s: int.from_bytes(s, byteorder='little')).map(hash_to_hex)
def content():
"""
Hypothesis strategy returning a random content ingested
into the test archive.
"""
return _known_swh_object('contents')
def contents():
"""
Hypothesis strategy returning random contents ingested
into the test archive.
"""
return lists(content(), min_size=2, max_size=8)
@composite
def new_content(draw):
blake2s256_hex = draw(sha256())
sha1_hex = draw(sha1())
sha1_git_hex = draw(sha1())
sha256_hex = draw(sha256())
assume(sha1_hex != sha1_git_hex)
assume(blake2s256_hex != sha256_hex)
return {
'blake2S256': blake2s256_hex,
'sha1': sha1_hex,
'sha1_git': sha1_git_hex,
'sha256': sha256_hex
}
def unknown_content():
"""
Hypothesis strategy returning a random content not ingested
into the test archive.
"""
return new_content().filter(
lambda c: next(storage.content_get(
[hash_to_bytes(c['sha1'])])) is None)
def unknown_contents():
"""
Hypothesis strategy returning random contents not ingested
into the test archive.
"""
return lists(unknown_content(), min_size=2, max_size=8)
def directory():
"""
Hypothesis strategy returning a random directory ingested
into the test archive.
"""
return _known_swh_object('directories')
def empty_directory():
"""
Hypothesis strategy returning the empty directory ingested
into the test archive.
"""
return just(directory_identifier({'entries': []}))
def unknown_directory():
"""
Hypothesis strategy returning a random directory not ingested
into the test archive.
"""
return sha1().filter(
lambda s: len(list(storage.directory_missing([hash_to_bytes(s)]))) > 0)
def origin():
"""
Hypothesis strategy returning a random origin ingested
into the test archive.
"""
return _known_swh_object('origins')
+def origin_with_multiple_visits():
+ """
+ Hypothesis strategy returning a random origin ingested
+ into the test archive.
+ """
+ ret = []
+ for origin in tests_data['origins']:
+ visits = list(storage.origin_visit_get(origin['id']))
+ if len(visits) > 1:
+ ret.append(origin)
+ return sampled_from(ret)
+
+
+def unknown_origin_id():
+ """
+ Hypothesis strategy returning a random origin id not ingested
+ into the test archive.
+ """
+ return integers(min_value=1000000)
+
+
def new_origin():
"""
Hypothesis strategy returning a random origin not ingested
into the test archive.
"""
return new_origin_strategy().filter(
lambda origin: storage.origin_get(origin) is None)
def new_origins(nb_origins=None):
"""
Hypothesis strategy returning random origins not ingested
into the test archive.
"""
min_size = nb_origins if nb_origins is not None else 2
max_size = nb_origins if nb_origins is not None else 8
size = random.randint(min_size, max_size)
return lists(new_origin(), min_size=size, max_size=size,
unique_by=lambda o: tuple(sorted(o.items())))
def visit_dates(nb_dates=None):
"""
Hypothesis strategy returning a list of visit dates.
"""
min_size = nb_dates if nb_dates else 2
max_size = nb_dates if nb_dates else 8
return lists(datetimes(min_value=datetime(2015, 1, 1, 0, 0),
max_value=datetime(2018, 12, 31, 0, 0)),
min_size=min_size, max_size=max_size, unique=True).map(sorted)
def release():
"""
Hypothesis strategy returning a random release ingested
into the test archive.
"""
return _known_swh_object('releases')
def unknown_release():
"""
Hypothesis strategy returning a random revision not ingested
into the test archive.
"""
return sha1().filter(
lambda s: next(storage.release_get([s])) is None)
def revision():
"""
Hypothesis strategy returning a random revision ingested
into the test archive.
"""
return _known_swh_object('revisions')
def unknown_revision():
"""
Hypothesis strategy returning a random revision not ingested
into the test archive.
"""
return sha1().filter(
lambda s: next(storage.revision_get([hash_to_bytes(s)])) is None)
@composite
def new_person(draw):
"""
Hypothesis strategy returning random raw swh person data.
"""
name = draw(text(min_size=5, max_size=30,
alphabet=characters(min_codepoint=0, max_codepoint=255)))
email = '%s@company.org' % name
return {
'name': name.encode(),
'email': email.encode(),
'fullname': ('%s <%s>' % (name, email)).encode()
}
@composite
def new_swh_date(draw):
"""
Hypothesis strategy returning random raw swh date data.
"""
timestamp = draw(
datetimes(min_value=datetime(2015, 1, 1, 0, 0),
max_value=datetime(2018, 12, 31, 0, 0)).map(
lambda d: int(d.timestamp())))
return {
'timestamp': timestamp,
'offset': 0,
'negative_utc': False,
}
@composite
def new_revision(draw):
"""
Hypothesis strategy returning random raw swh revision data
not ingested into the test archive.
"""
return {
'id': draw(unknown_revision().map(hash_to_bytes)),
'directory': draw(sha1().map(hash_to_bytes)),
'author': draw(new_person()),
'committer': draw(new_person()),
'message': draw(
text(min_size=20, max_size=100).map(lambda t: t.encode())),
'date': draw(new_swh_date()),
'committer_date': draw(new_swh_date()),
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
}
def revisions():
"""
Hypothesis strategy returning random revisions ingested
into the test archive.
"""
return lists(revision(), min_size=2, max_size=8)
def unknown_revisions():
"""
Hypothesis strategy returning random revisions not ingested
into the test archive.
"""
return lists(unknown_revision(), min_size=2, max_size=8)
def snapshot():
"""
Hypothesis strategy returning a random snapshot ingested
into the test archive.
"""
return _known_swh_object('snapshots')
def new_snapshots(nb_snapshots=None):
min_size = nb_snapshots if nb_snapshots else 2
max_size = nb_snapshots if nb_snapshots else 8
return lists(new_snapshot(min_size=2, max_size=10, only_objects=True),
min_size=min_size, max_size=max_size)
def unknown_snapshot():
"""
Hypothesis strategy returning a random revision not ingested
into the test archive.
"""
return sha1().filter(
lambda s: storage.snapshot_get(hash_to_bytes(s)) is None)
def person():
"""
Hypothesis strategy returning a random person ingested
into the test archive.
"""
return _known_swh_object('persons')
def unknown_person():
"""
Hypothesis strategy returning a random person not ingested
into the test archive.
"""
persons = tests_data['persons']
return integers(min_value=max(persons)+1)
def _get_origin_dfs_revisions_walker():
origin = random.choice(tests_data['origins'][:-1])
snapshot = storage.snapshot_get_latest(origin['id'])
head = snapshot['branches'][b'HEAD']['target']
return get_revisions_walker('dfs', storage, head)
def ancestor_revisions():
"""
Hypothesis strategy returning a pair of revisions ingested into the
test archive with an ancestor relation.
"""
# get a dfs revisions walker for one of the origins
# loaded into the test archive
revisions_walker = _get_origin_dfs_revisions_walker()
master_revisions = []
children = defaultdict(list)
init_rev_found = False
# get revisions only authored in the master branch
for rev in revisions_walker:
for rev_p in rev['parents']:
children[rev_p].append(rev['id'])
if not init_rev_found:
master_revisions.append(rev)
if not rev['parents']:
init_rev_found = True
# head revision
root_rev = master_revisions[0]
# pick a random revision, different from head, only authored
# in the master branch
ancestor_rev_idx = random.choice(list(range(1, len(master_revisions)-1)))
ancestor_rev = master_revisions[ancestor_rev_idx]
ancestor_child_revs = children[ancestor_rev['id']]
return just({
'sha1_git_root': hash_to_hex(root_rev['id']),
'sha1_git': hash_to_hex(ancestor_rev['id']),
'children': [hash_to_hex(r) for r in ancestor_child_revs]
})
def non_ancestor_revisions():
"""
Hypothesis strategy returning a pair of revisions ingested into the
test archive with no ancestor relation.
"""
# get a dfs revisions walker for one of the origins
# loaded into the test archive
revisions_walker = _get_origin_dfs_revisions_walker()
merge_revs = []
children = defaultdict(list)
# get all merge revisions
for rev in revisions_walker:
if len(rev['parents']) > 1:
merge_revs.append(rev)
for rev_p in rev['parents']:
children[rev_p].append(rev['id'])
# find a merge revisions whose parents have a unique child revision
random.shuffle(merge_revs)
selected_revs = None
for merge_rev in merge_revs:
if all(len(children[rev_p]) == 1
for rev_p in merge_rev['parents']):
selected_revs = merge_rev['parents']
return just({
'sha1_git_root': hash_to_hex(selected_revs[0]),
'sha1_git': hash_to_hex(selected_revs[1])
})
# The following strategies returns data specific to some tests
# that can not be generated and thus are hardcoded.
def contents_with_ctags():
"""
Hypothesis strategy returning contents ingested into the test
archive. Those contents are ctags compatible, that is running
ctags on those lay results.
"""
return just({
'sha1s': ['0ab37c02043ebff946c1937523f60aadd0844351',
'15554cf7608dde6bfefac7e3d525596343a85b6f',
'2ce837f1489bdfb8faf3ebcc7e72421b5bea83bd',
'30acd0b47fc25e159e27a980102ddb1c4bea0b95',
'4f81f05aaea3efb981f9d90144f746d6b682285b',
'5153aa4b6e4455a62525bc4de38ed0ff6e7dd682',
'59d08bafa6a749110dfb65ba43a61963d5a5bf9f',
'7568285b2d7f31ae483ae71617bd3db873deaa2c',
'7ed3ee8e94ac52ba983dd7690bdc9ab7618247b4',
'8ed7ef2e7ff9ed845e10259d08e4145f1b3b5b03',
'9b3557f1ab4111c8607a4f2ea3c1e53c6992916c',
'9c20da07ed14dc4fcd3ca2b055af99b2598d8bdd',
'c20ceebd6ec6f7a19b5c3aebc512a12fbdc9234b',
'e89e55a12def4cd54d5bff58378a3b5119878eb7',
'e8c0654fe2d75ecd7e0b01bee8a8fc60a130097e',
'eb6595e559a1d34a2b41e8d4835e0e4f98a5d2b5'],
'symbol_name': 'ABS'
})
def revision_with_submodules():
"""
Hypothesis strategy returning a revision that is known to
point to a directory with revision entries (aka git submodule)
"""
return just({
'rev_sha1_git': 'ffcb69001f3f6745dfd5b48f72ab6addb560e234',
'rev_dir_sha1_git': 'd92a21446387fa28410e5a74379c934298f39ae2',
'rev_dir_rev_path': 'libtess2'
})

File Metadata

Mime Type
text/x-diff
Expires
Fri, Jul 4, 3:39 PM (1 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3285613

Event Timeline