diff --git a/swh/web/assets/src/bundles/browse/origin-search.js b/swh/web/assets/src/bundles/browse/origin-search.js
index 12861fb7..374057f7 100644
--- a/swh/web/assets/src/bundles/browse/origin-search.js
+++ b/swh/web/assets/src/bundles/browse/origin-search.js
@@ -1,244 +1,225 @@
/**
* Copyright (C) 2018-2019 The Software Heritage developers
* See the AUTHORS file at the top-level directory of this distribution
* License: GNU Affero General Public License version 3, or any later version
* See top-level LICENSE file for more information
*/
-import {heapsPermute} from 'utils/heaps-permute';
import {handleFetchError} from 'utils/functions';
const limit = 100;
let linksPrev = [];
let linkNext = null;
let linkCurrent = null;
let inSearch = false;
function parseLinkHeader(s) {
let re = /<(.+)>; rel="next"/;
return s.match(re)[1];
}
function fixTableRowsStyle() {
setTimeout(() => {
$('#origin-search-results tbody tr').removeAttr('style');
});
}
function clearOriginSearchResultsTable() {
$('#origin-search-results tbody tr').remove();
}
function populateOriginSearchResultsTable(origins) {
if (origins.length > 0) {
$('#swh-origin-search-results').show();
$('#swh-no-result').hide();
clearOriginSearchResultsTable();
let table = $('#origin-search-results tbody');
for (let [i, origin] of origins.entries()) {
let browseUrl = Urls.browse_origin(origin.url);
let tableRow = `
`;
tableRow += `${encodeURI(origin.url)} | `;
tableRow += ` | `;
tableRow += ` | `;
tableRow += '
';
table.append(tableRow);
// get async latest visit snapshot and update visit status icon
let latestSnapshotUrl = Urls.api_1_origin_visit_latest(origin.url);
latestSnapshotUrl += '?require_snapshot=true';
fetch(latestSnapshotUrl)
.then(response => response.json())
.then(data => {
$(`#visit-type-origin-${i}`).text(data.type);
$(`#visit-status-origin-${i}`).children().remove();
if (data) {
$(`#visit-status-origin-${i}`).append('');
} else {
$(`#visit-status-origin-${i}`).append('');
if ($('#swh-filter-empty-visits').prop('checked')) {
$(`#origin-${i}`).remove();
}
}
});
}
fixTableRowsStyle();
} else {
$('#swh-origin-search-results').hide();
$('#swh-no-result').text('No origins matching the search criteria were found.');
$('#swh-no-result').show();
}
if (linkNext === null) {
$('#origins-next-results-button').addClass('disabled');
} else {
$('#origins-next-results-button').removeClass('disabled');
}
if (linksPrev.length === 0) {
$('#origins-prev-results-button').addClass('disabled');
} else {
$('#origins-prev-results-button').removeClass('disabled');
}
inSearch = false;
setTimeout(() => {
window.scrollTo(0, 0);
});
}
-function escapeStringRegexp(str) {
- let matchOperatorsRe = /[|\\{}()[\]^$+*?.]/g;
- return str.replace(matchOperatorsRe, '%5C$&');
-}
-
-function searchOriginsFirst(patterns, limit) {
+function searchOriginsFirst(searchQueryText, limit) {
let baseSearchUrl;
let searchMetadata = $('#swh-search-origin-metadata').prop('checked');
if (searchMetadata) {
- baseSearchUrl = Urls.api_1_origin_metadata_search() + `?fulltext=${patterns}`;
+ baseSearchUrl = Urls.api_1_origin_metadata_search() + '?fulltext=' + encodeURIComponent(searchQueryText);
} else {
- let patternsArray = patterns.trim().replace(/\s+/g, ' ').split(' ');
- for (let i = 0; i < patternsArray.length; ++i) {
- patternsArray[i] = escapeStringRegexp(patternsArray[i]);
- }
- // url length must be less than 4096 for modern browsers
- // assuming average word length, 6 is max patternArray.length
- if (patternsArray.length < 7) {
- let patternsPermut = [];
- heapsPermute(patternsArray, p => patternsPermut.push(p.join('.*')));
- let regex = patternsPermut.join('|');
- baseSearchUrl = Urls.api_1_origin_search(regex) + `?regexp=true`;
- } else {
- baseSearchUrl = Urls.api_1_origin_search(patternsArray.join('.*')) + `?regexp=true`;
- }
+ baseSearchUrl = Urls.api_1_origin_search(searchQueryText);
}
let withVisit = $('#swh-search-origins-with-visit').prop('checked');
- let searchUrl = baseSearchUrl + `&limit=${limit}&with_visit=${withVisit}`;
+ let searchUrl = baseSearchUrl + `?limit=${limit}&with_visit=${withVisit}`;
searchOrigins(searchUrl);
}
function searchOrigins(searchUrl) {
clearOriginSearchResultsTable();
$('.swh-loading').addClass('show');
let response = fetch(searchUrl)
.then(handleFetchError)
.then(resp => {
response = resp;
return response.json();
})
.then(data => {
// Save link to the current results page
linkCurrent = searchUrl;
// Save link to the next results page.
linkNext = null;
if (response.headers.has('Link')) {
let parsedLink = parseLinkHeader(response.headers.get('Link'));
if (parsedLink !== undefined) {
linkNext = parsedLink;
}
}
// prevLinks is updated by the caller, which is the one to know if
// we're going forward or backward in the pages.
$('.swh-loading').removeClass('show');
populateOriginSearchResultsTable(data);
})
.catch(response => {
$('.swh-loading').removeClass('show');
inSearch = false;
$('#swh-origin-search-results').hide();
$('#swh-no-result').text(`Error ${response.status}: ${response.statusText}`);
$('#swh-no-result').show();
});
}
function doSearch() {
$('#swh-no-result').hide();
- let patterns = $('#origins-url-patterns').val();
+ let searchQueryText = $('#origins-url-patterns').val();
inSearch = true;
// first try to resolve a swh persistent identifier
- let resolvePidUrl = Urls.api_1_resolve_swh_pid(patterns);
+ let resolvePidUrl = Urls.api_1_resolve_swh_pid(searchQueryText);
fetch(resolvePidUrl)
.then(handleFetchError)
.then(response => response.json())
.then(data => {
// pid has been successfully resolved,
// so redirect to browse page
window.location = data.browse_url;
})
.catch(response => {
// pid resolving failed
- if (patterns.startsWith('swh:')) {
+ if (searchQueryText.startsWith('swh:')) {
// display a useful error message if the input
// looks like a swh pid
response.json().then(data => {
$('#swh-origin-search-results').hide();
$('.swh-search-pagination').hide();
$('#swh-no-result').text(data.reason);
$('#swh-no-result').show();
});
} else {
// otherwise, proceed with origins search
$('#swh-origin-search-results').show();
$('.swh-search-pagination').show();
- searchOriginsFirst(patterns, limit);
+ searchOriginsFirst(searchQueryText, limit);
}
});
}
export function initOriginSearch() {
$(document).ready(() => {
$('#swh-search-origins').submit(event => {
event.preventDefault();
- let patterns = $('#origins-url-patterns').val().trim();
+ let searchQueryText = $('#origins-url-patterns').val().trim();
let withVisit = $('#swh-search-origins-with-visit').prop('checked');
let withContent = $('#swh-filter-empty-visits').prop('checked');
let searchMetadata = $('#swh-search-origin-metadata').prop('checked');
- let queryParameters = '?q=' + encodeURIComponent(patterns);
+ let queryParameters = '?q=' + encodeURIComponent(searchQueryText);
if (withVisit) {
queryParameters += '&with_visit';
}
if (withContent) {
queryParameters += '&with_content';
}
if (searchMetadata) {
queryParameters += '&search_metadata';
}
// Update the url, triggering page reload and effective search
window.location.search = queryParameters;
});
$('#origins-next-results-button').click(event => {
if ($('#origins-next-results-button').hasClass('disabled') || inSearch) {
return;
}
inSearch = true;
linksPrev.push(linkCurrent);
searchOrigins(linkNext);
event.preventDefault();
});
$('#origins-prev-results-button').click(event => {
if ($('#origins-prev-results-button').hasClass('disabled') || inSearch) {
return;
}
inSearch = true;
searchOrigins(linksPrev.pop());
event.preventDefault();
});
let urlParams = new URLSearchParams(window.location.search);
let query = urlParams.get('q');
let withVisit = urlParams.has('with_visit');
let withContent = urlParams.has('with_content');
let searchMetadata = urlParams.has('search_metadata');
if (query) {
$('#origins-url-patterns').val(query);
$('#swh-search-origins-with-visit').prop('checked', withVisit);
$('#swh-filter-empty-visits').prop('checked', withContent);
$('#swh-search-origin-metadata').prop('checked', searchMetadata);
doSearch();
}
});
}
diff --git a/swh/web/assets/src/utils/heaps-permute.js b/swh/web/assets/src/utils/heaps-permute.js
deleted file mode 100644
index ef103160..00000000
--- a/swh/web/assets/src/utils/heaps-permute.js
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Copyright (C) 2018 The Software Heritage developers
- * See the AUTHORS file at the top-level directory of this distribution
- * License: GNU Affero General Public License version 3, or any later version
- * See top-level LICENSE file for more information
- */
-
-// http://dsernst.com/2014/12/14/heaps-permutation-algorithm-in-javascript/
-
-function swap(array, pos1, pos2) {
- let temp = array[pos1];
- array[pos1] = array[pos2];
- array[pos2] = temp;
-}
-
-export function heapsPermute(array, output, n) {
- n = n || array.length; // set n default to array.length
- if (n === 1) {
- output(array);
- } else {
- for (let i = 1; i <= n; i += 1) {
- heapsPermute(array, output, n - 1);
- let j;
- if (n % 2) {
- j = 1;
- } else {
- j = i;
- }
- swap(array, j - 1, n - 1); // -1 to account for javascript zero-indexing
- }
- }
-}
diff --git a/swh/web/common/service.py b/swh/web/common/service.py
index fbb0889e..00bd949f 100644
--- a/swh/web/common/service.py
+++ b/swh/web/common/service.py
@@ -1,1147 +1,1161 @@
# Copyright (C) 2015-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import itertools
import os
+import re
from collections import defaultdict
from typing import Any, Dict
from swh.model import hashutil
from swh.storage.algos import diff, revisions_walker
from swh.model.identifiers import (
CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT
)
from swh.web.common import converters
from swh.web.common import query
from swh.web.common.exc import BadInputExc, NotFoundExc
from swh.web.common.origin_visits import get_origin_visit
from swh.web import config
storage = config.storage()
vault = config.vault()
idx_storage = config.indexer_storage()
MAX_LIMIT = 50 # Top limit the users can ask for
def _first_element(l):
"""Returns the first element in the provided list or None
if it is empty or None"""
return next(iter(l or []), None)
def lookup_multiple_hashes(hashes):
"""Lookup the passed hashes in a single DB connection, using batch
processing.
Args:
An array of {filename: X, sha1: Y}, string X, hex sha1 string Y.
Returns:
The same array with elements updated with elem['found'] = true if
the hash is present in storage, elem['found'] = false if not.
"""
hashlist = [hashutil.hash_to_bytes(elem['sha1']) for elem in hashes]
content_missing = storage.content_missing_per_sha1(hashlist)
missing = [hashutil.hash_to_hex(x) for x in content_missing]
for x in hashes:
x.update({'found': True})
for h in hashes:
if h['sha1'] in missing:
h['found'] = False
return hashes
def lookup_expression(expression, last_sha1, per_page):
"""Lookup expression in raw content.
Args:
expression (str): An expression to lookup through raw indexed
content
last_sha1 (str): Last sha1 seen
per_page (int): Number of results per page
Yields:
ctags whose content match the expression
"""
limit = min(per_page, MAX_LIMIT)
ctags = idx_storage.content_ctags_search(expression,
last_sha1=last_sha1,
limit=limit)
for ctag in ctags:
ctag = converters.from_swh(ctag, hashess={'id'})
ctag['sha1'] = ctag['id']
ctag.pop('id')
yield ctag
def lookup_hash(q):
"""Checks if the storage contains a given content checksum
Args: query string of the form
Returns: Dict with key found containing the hash info if the
hash is present, None if not.
"""
algo, hash = query.parse_hash(q)
found = _first_element(storage.content_find({algo: hash}))
return {'found': converters.from_content(found),
'algo': algo}
def search_hash(q):
"""Checks if the storage contains a given content checksum
Args: query string of the form
Returns: Dict with key found to True or False, according to
whether the checksum is present or not
"""
algo, hash = query.parse_hash(q)
found = _first_element(storage.content_find({algo: hash}))
return {'found': found is not None}
def _lookup_content_sha1(q):
"""Given a possible input, query for the content's sha1.
Args:
q: query string of the form
Returns:
binary sha1 if found or None
"""
algo, hash = query.parse_hash(q)
if algo != 'sha1':
hashes = _first_element(storage.content_find({algo: hash}))
if not hashes:
return None
return hashes['sha1']
return hash
def lookup_content_ctags(q):
"""Return ctags information from a specified content.
Args:
q: query string of the form
Yields:
ctags information (dict) list if the content is found.
"""
sha1 = _lookup_content_sha1(q)
if not sha1:
return None
ctags = list(idx_storage.content_ctags_get([sha1]))
if not ctags:
return None
for ctag in ctags:
yield converters.from_swh(ctag, hashess={'id'})
def lookup_content_filetype(q):
"""Return filetype information from a specified content.
Args:
q: query string of the form
Yields:
filetype information (dict) list if the content is found.
"""
sha1 = _lookup_content_sha1(q)
if not sha1:
return None
filetype = _first_element(list(idx_storage.content_mimetype_get([sha1])))
if not filetype:
return None
return converters.from_filetype(filetype)
def lookup_content_language(q):
"""Return language information from a specified content.
Args:
q: query string of the form
Yields:
language information (dict) list if the content is found.
"""
sha1 = _lookup_content_sha1(q)
if not sha1:
return None
lang = _first_element(list(idx_storage.content_language_get([sha1])))
if not lang:
return None
return converters.from_swh(lang, hashess={'id'})
def lookup_content_license(q):
"""Return license information from a specified content.
Args:
q: query string of the form
Yields:
license information (dict) list if the content is found.
"""
sha1 = _lookup_content_sha1(q)
if not sha1:
return None
lic = _first_element(idx_storage.content_fossology_license_get([sha1]))
if not lic:
return None
return converters.from_swh({'id': sha1, 'facts': lic[sha1]},
hashess={'id'})
def lookup_origin(origin):
"""Return information about the origin matching dict origin.
Args:
origin: origin's dict with 'url' key
Returns:
origin information as dict.
"""
origin_info = storage.origin_get(origin)
if not origin_info:
msg = 'Origin with url %s not found!' % origin['url']
raise NotFoundExc(msg)
return converters.from_origin(origin_info)
def lookup_origins(origin_from=1, origin_count=100):
"""Get list of archived software origins in a paginated way.
Origins are sorted by id before returning them
Args:
origin_from (int): The minimum id of the origins to return
origin_count (int): The maximum number of origins to return
Yields:
origins information as dicts
"""
origins = storage.origin_get_range(origin_from, origin_count)
return map(converters.from_origin, origins)
def search_origin(url_pattern, offset=0, limit=50, regexp=False,
with_visit=False):
"""Search for origins whose urls contain a provided string pattern
or match a provided regular expression.
Args:
url_pattern: the string pattern to search for in origin urls
offset: number of found origins to skip before returning results
limit: the maximum number of found origins to return
Returns:
list of origin information as dict.
"""
+ if not regexp:
+ # If the query is not a regexp, rewrite it as a regexp.
+ regexp = True
+ search_words = [re.escape(word) for word in url_pattern.split()]
+ if len(search_words) >= 7:
+ url_pattern = '.*'.join(search_words)
+ else:
+ pattern_parts = []
+ for permut in itertools.permutations(search_words):
+ pattern_parts.append('.*'.join(permut))
+ url_pattern = '|'.join(pattern_parts)
+
origins = storage.origin_search(url_pattern, offset, limit, regexp,
with_visit)
return map(converters.from_origin, origins)
def search_origin_metadata(fulltext, limit=50):
"""Search for origins whose metadata match a provided string pattern.
Args:
fulltext: the string pattern to search for in origin metadata
offset: number of found origins to skip before returning results
limit: the maximum number of found origins to return
Returns:
list of origin metadata as dict.
"""
matches = idx_storage.origin_intrinsic_metadata_search_fulltext(
conjunction=[fulltext], limit=limit)
results = []
for match in matches:
match['from_revision'] = hashutil.hash_to_hex(match['from_revision'])
origin = storage.origin_get({'url': match['id']})
del match['id']
result = converters.from_origin(origin)
if result:
result['metadata'] = match
results.append(result)
return results
def lookup_origin_intrinsic_metadata(origin_dict):
"""Return intrinsic metadata for origin whose origin matches given
origin.
Args:
origin_dict: origin's dict with keys ('type' AND 'url')
Returns:
origin metadata.
"""
origin_info = storage.origin_get(origin_dict)
if not origin_info:
msg = 'Origin with url %s not found!' % origin_dict['url']
raise NotFoundExc(msg)
origins = [origin_info['url']]
match = _first_element(
idx_storage.origin_intrinsic_metadata_get(origins))
result = {}
if match:
result = match['metadata']
return result
def _to_sha1_bin(sha1_hex):
_, sha1_git_bin = query.parse_hash_with_algorithms_or_throws(
sha1_hex,
['sha1'], # HACK: sha1_git really
'Only sha1_git is supported.')
return sha1_git_bin
def _check_directory_exists(sha1_git, sha1_git_bin):
if len(list(storage.directory_missing([sha1_git_bin]))):
raise NotFoundExc('Directory with sha1_git %s not found' % sha1_git)
def lookup_directory(sha1_git):
"""Return information about the directory with id sha1_git.
Args:
sha1_git as string
Returns:
directory information as dict.
"""
empty_dir_sha1 = '4b825dc642cb6eb9a060e54bf8d69288fbee4904'
if sha1_git == empty_dir_sha1:
return []
sha1_git_bin = _to_sha1_bin(sha1_git)
_check_directory_exists(sha1_git, sha1_git_bin)
directory_entries = storage.directory_ls(sha1_git_bin)
return map(converters.from_directory_entry, directory_entries)
def lookup_directory_with_path(sha1_git, path_string):
"""Return directory information for entry with path path_string w.r.t.
root directory pointed by directory_sha1_git
Args:
- directory_sha1_git: sha1_git corresponding to the directory
to which we append paths to (hopefully) find the entry
- the relative path to the entry starting from the directory pointed by
directory_sha1_git
Raises:
NotFoundExc if the directory entry is not found
"""
sha1_git_bin = _to_sha1_bin(sha1_git)
_check_directory_exists(sha1_git, sha1_git_bin)
paths = path_string.strip(os.path.sep).split(os.path.sep)
queried_dir = storage.directory_entry_get_by_path(
sha1_git_bin, list(map(lambda p: p.encode('utf-8'), paths)))
if not queried_dir:
raise NotFoundExc(('Directory entry with path %s from %s not found') %
(path_string, sha1_git))
return converters.from_directory_entry(queried_dir)
def lookup_release(release_sha1_git):
"""Return information about the release with sha1 release_sha1_git.
Args:
release_sha1_git: The release's sha1 as hexadecimal
Returns:
Release information as dict.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
sha1_git_bin = _to_sha1_bin(release_sha1_git)
release = _first_element(storage.release_get([sha1_git_bin]))
if not release:
raise NotFoundExc('Release with sha1_git %s not found.'
% release_sha1_git)
return converters.from_release(release)
def lookup_release_multiple(sha1_git_list):
"""Return information about the revisions identified with
their sha1_git identifiers.
Args:
sha1_git_list: A list of revision sha1_git identifiers
Returns:
Release information as dict.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
sha1_bin_list = (_to_sha1_bin(sha1_git) for sha1_git in sha1_git_list)
releases = storage.release_get(sha1_bin_list) or []
return (converters.from_release(r) for r in releases)
def lookup_revision(rev_sha1_git):
"""Return information about the revision with sha1 revision_sha1_git.
Args:
revision_sha1_git: The revision's sha1 as hexadecimal
Returns:
Revision information as dict.
Raises:
ValueError if the identifier provided is not of sha1 nature.
NotFoundExc if there is no revision with the provided sha1_git.
"""
sha1_git_bin = _to_sha1_bin(rev_sha1_git)
revision = _first_element(storage.revision_get([sha1_git_bin]))
if not revision:
raise NotFoundExc('Revision with sha1_git %s not found.'
% rev_sha1_git)
return converters.from_revision(revision)
def lookup_revision_multiple(sha1_git_list):
"""Return information about the revisions identified with
their sha1_git identifiers.
Args:
sha1_git_list: A list of revision sha1_git identifiers
Returns:
Generator of revisions information as dict.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
sha1_bin_list = (_to_sha1_bin(sha1_git) for sha1_git in sha1_git_list)
revisions = storage.revision_get(sha1_bin_list) or []
return (converters.from_revision(r) for r in revisions)
def lookup_revision_message(rev_sha1_git):
"""Return the raw message of the revision with sha1 revision_sha1_git.
Args:
revision_sha1_git: The revision's sha1 as hexadecimal
Returns:
Decoded revision message as dict {'message': }
Raises:
ValueError if the identifier provided is not of sha1 nature.
NotFoundExc if the revision is not found, or if it has no message
"""
sha1_git_bin = _to_sha1_bin(rev_sha1_git)
revision = _first_element(storage.revision_get([sha1_git_bin]))
if not revision:
raise NotFoundExc('Revision with sha1_git %s not found.'
% rev_sha1_git)
if 'message' not in revision:
raise NotFoundExc('No message for revision with sha1_git %s.'
% rev_sha1_git)
res = {'message': revision['message']}
return res
def _lookup_revision_id_by(origin, branch_name, timestamp):
def _get_snapshot_branch(snapshot, branch_name):
snapshot = lookup_snapshot(visit['snapshot'],
branches_from=branch_name,
branches_count=10)
branch = None
if branch_name in snapshot['branches']:
branch = snapshot['branches'][branch_name]
return branch
if isinstance(origin, int):
origin = {'id': origin}
elif isinstance(origin, str):
origin = {'url': origin}
else:
raise TypeError('"origin" must be an int or a string.')
visit = get_origin_visit(origin, visit_ts=timestamp)
branch = _get_snapshot_branch(visit['snapshot'], branch_name)
rev_id = None
if branch and branch['target_type'] == 'revision':
rev_id = branch['target']
elif branch and branch['target_type'] == 'alias':
branch = _get_snapshot_branch(visit['snapshot'], branch['target'])
if branch and branch['target_type'] == 'revision':
rev_id = branch['target']
if not rev_id:
raise NotFoundExc('Revision for origin %s and branch %s not found.'
% (origin.get('url'), branch_name))
return rev_id
def lookup_revision_by(origin,
branch_name='HEAD',
timestamp=None):
"""Lookup revision by origin, snapshot branch name and visit timestamp.
If branch_name is not provided, lookup using 'HEAD' as default.
If timestamp is not provided, use the most recent.
Args:
origin (Union[int,str]): origin of the revision
branch_name (str): snapshot branch name
timestamp (str/int): origin visit time frame
Returns:
dict: The revision matching the criterions
Raises:
NotFoundExc if no revision corresponds to the criterion
"""
rev_id = _lookup_revision_id_by(origin, branch_name, timestamp)
return lookup_revision(rev_id)
def lookup_revision_log(rev_sha1_git, limit):
"""Lookup revision log by revision id.
Args:
rev_sha1_git (str): The revision's sha1 as hexadecimal
limit (int): the maximum number of revisions returned
Returns:
list: Revision log as list of revision dicts
Raises:
ValueError: if the identifier provided is not of sha1 nature.
NotFoundExc: if there is no revision with the provided sha1_git.
"""
lookup_revision(rev_sha1_git)
sha1_git_bin = _to_sha1_bin(rev_sha1_git)
revision_entries = storage.revision_log([sha1_git_bin], limit)
return map(converters.from_revision, revision_entries)
def lookup_revision_log_by(origin, branch_name, timestamp, limit):
"""Lookup revision by origin, snapshot branch name and visit timestamp.
Args:
origin (Union[int,str]): origin of the revision
branch_name (str): snapshot branch
timestamp (str/int): origin visit time frame
limit (int): the maximum number of revisions returned
Returns:
list: Revision log as list of revision dicts
Raises:
NotFoundExc: if no revision corresponds to the criterion
"""
rev_id = _lookup_revision_id_by(origin, branch_name, timestamp)
return lookup_revision_log(rev_id, limit)
def lookup_revision_with_context_by(origin, branch_name, timestamp,
sha1_git, limit=100):
"""Return information about revision sha1_git, limited to the
sub-graph of all transitive parents of sha1_git_root.
sha1_git_root being resolved through the lookup of a revision by origin,
branch_name and ts.
In other words, sha1_git is an ancestor of sha1_git_root.
Args:
- origin: origin of the revision.
- branch_name: revision's branch.
- timestamp: revision's time frame.
- sha1_git: one of sha1_git_root's ancestors.
- limit: limit the lookup to 100 revisions back.
Returns:
Pair of (root_revision, revision).
Information on sha1_git if it is an ancestor of sha1_git_root
including children leading to sha1_git_root
Raises:
- BadInputExc in case of unknown algo_hash or bad hash.
- NotFoundExc if either revision is not found or if sha1_git is not an
ancestor of sha1_git_root.
"""
rev_root_id = _lookup_revision_id_by(origin, branch_name, timestamp)
rev_root_id_bin = hashutil.hash_to_bytes(rev_root_id)
rev_root = _first_element(storage.revision_get([rev_root_id_bin]))
return (converters.from_revision(rev_root),
lookup_revision_with_context(rev_root, sha1_git, limit))
def lookup_revision_with_context(sha1_git_root, sha1_git, limit=100):
"""Return information about revision sha1_git, limited to the
sub-graph of all transitive parents of sha1_git_root.
In other words, sha1_git is an ancestor of sha1_git_root.
Args:
sha1_git_root: latest revision. The type is either a sha1 (as an hex
string) or a non converted dict.
sha1_git: one of sha1_git_root's ancestors
limit: limit the lookup to 100 revisions back
Returns:
Information on sha1_git if it is an ancestor of sha1_git_root
including children leading to sha1_git_root
Raises:
BadInputExc in case of unknown algo_hash or bad hash
NotFoundExc if either revision is not found or if sha1_git is not an
ancestor of sha1_git_root
"""
sha1_git_bin = _to_sha1_bin(sha1_git)
revision = _first_element(storage.revision_get([sha1_git_bin]))
if not revision:
raise NotFoundExc('Revision %s not found' % sha1_git)
if isinstance(sha1_git_root, str):
sha1_git_root_bin = _to_sha1_bin(sha1_git_root)
revision_root = _first_element(storage.revision_get([sha1_git_root_bin])) # noqa
if not revision_root:
raise NotFoundExc('Revision root %s not found' % sha1_git_root)
else:
sha1_git_root_bin = sha1_git_root['id']
revision_log = storage.revision_log([sha1_git_root_bin], limit)
parents = {}
children = defaultdict(list)
for rev in revision_log:
rev_id = rev['id']
parents[rev_id] = []
for parent_id in rev['parents']:
parents[rev_id].append(parent_id)
children[parent_id].append(rev_id)
if revision['id'] not in parents:
raise NotFoundExc('Revision %s is not an ancestor of %s' %
(sha1_git, sha1_git_root))
revision['children'] = children[revision['id']]
return converters.from_revision(revision)
def lookup_directory_with_revision(sha1_git, dir_path=None, with_data=False):
"""Return information on directory pointed by revision with sha1_git.
If dir_path is not provided, display top level directory.
Otherwise, display the directory pointed by dir_path (if it exists).
Args:
sha1_git: revision's hash.
dir_path: optional directory pointed to by that revision.
with_data: boolean that indicates to retrieve the raw data if the path
resolves to a content. Default to False (for the api)
Returns:
Information on the directory pointed to by that revision.
Raises:
BadInputExc in case of unknown algo_hash or bad hash.
NotFoundExc either if the revision is not found or the path referenced
does not exist.
NotImplementedError in case of dir_path exists but do not reference a
type 'dir' or 'file'.
"""
sha1_git_bin = _to_sha1_bin(sha1_git)
revision = _first_element(storage.revision_get([sha1_git_bin]))
if not revision:
raise NotFoundExc('Revision %s not found' % sha1_git)
dir_sha1_git_bin = revision['directory']
if dir_path:
paths = dir_path.strip(os.path.sep).split(os.path.sep)
entity = storage.directory_entry_get_by_path(
dir_sha1_git_bin, list(map(lambda p: p.encode('utf-8'), paths)))
if not entity:
raise NotFoundExc(
"Directory or File '%s' pointed to by revision %s not found"
% (dir_path, sha1_git))
else:
entity = {'type': 'dir', 'target': dir_sha1_git_bin}
if entity['type'] == 'dir':
directory_entries = storage.directory_ls(entity['target']) or []
return {'type': 'dir',
'path': '.' if not dir_path else dir_path,
'revision': sha1_git,
'content': list(map(converters.from_directory_entry,
directory_entries))}
elif entity['type'] == 'file': # content
content = _first_element(
storage.content_find({'sha1_git': entity['target']}))
if not content:
raise NotFoundExc('Content not found for revision %s'
% sha1_git)
if with_data:
c = _first_element(storage.content_get([content['sha1']]))
content['data'] = c['data']
return {'type': 'file',
'path': '.' if not dir_path else dir_path,
'revision': sha1_git,
'content': converters.from_content(content)}
elif entity['type'] == 'rev': # revision
revision = next(storage.revision_get([entity['target']]))
return {'type': 'rev',
'path': '.' if not dir_path else dir_path,
'revision': sha1_git,
'content': converters.from_revision(revision)}
else:
raise NotImplementedError('Entity of type %s not implemented.'
% entity['type'])
def lookup_content(q):
"""Lookup the content designed by q.
Args:
q: The release's sha1 as hexadecimal
Raises:
NotFoundExc if the requested content is not found
"""
algo, hash = query.parse_hash(q)
c = _first_element(storage.content_find({algo: hash}))
if not c:
raise NotFoundExc('Content with %s checksum equals to %s not found!' %
(algo, hashutil.hash_to_hex(hash)))
return converters.from_content(c)
def lookup_content_raw(q):
"""Lookup the content defined by q.
Args:
q: query string of the form
Returns:
dict with 'sha1' and 'data' keys.
data representing its raw data decoded.
Raises:
NotFoundExc if the requested content is not found or
if the content bytes are not available in the storage
"""
c = lookup_content(q)
content_sha1_bytes = hashutil.hash_to_bytes(c['checksums']['sha1'])
content = _first_element(storage.content_get([content_sha1_bytes]))
if not content:
algo, hash = query.parse_hash(q)
raise NotFoundExc('Bytes of content with %s checksum equals to %s '
'are not available!' %
(algo, hashutil.hash_to_hex(hash)))
return converters.from_content(content)
def stat_counters():
"""Return the stat counters for Software Heritage
Returns:
A dict mapping textual labels to integer values.
"""
return storage.stat_counters()
def _lookup_origin_visits(origin_url, last_visit=None, limit=10):
"""Yields the origin origins' visits.
Args:
origin_url (str): origin to list visits for
last_visit (int): last visit to lookup from
limit (int): Number of elements max to display
Yields:
Dictionaries of origin_visit for that origin
"""
limit = min(limit, MAX_LIMIT)
for visit in storage.origin_visit_get(
origin_url, last_visit=last_visit, limit=limit):
visit['origin'] = origin_url
yield visit
def lookup_origin_visits(origin, last_visit=None, per_page=10):
"""Yields the origin origins' visits.
Args:
origin: origin to list visits for
Yields:
Dictionaries of origin_visit for that origin
"""
visits = _lookup_origin_visits(origin, last_visit=last_visit,
limit=per_page)
for visit in visits:
yield converters.from_origin_visit(visit)
def lookup_origin_visit_latest(origin_url, require_snapshot):
"""Return the origin's latest visit
Args:
origin_url (str): origin to list visits for
require_snapshot (bool): filter out origins without a snapshot
Returns:
dict: The origin_visit concerned
"""
visit = storage.origin_visit_get_latest(
origin_url, require_snapshot=require_snapshot)
return converters.from_origin_visit(visit)
def lookup_origin_visit(origin_url, visit_id):
"""Return information about visit visit_id with origin origin.
Args:
origin (str): origin concerned by the visit
visit_id: the visit identifier to lookup
Yields:
The dict origin_visit concerned
"""
visit = storage.origin_visit_get_by(origin_url, visit_id)
if not visit:
raise NotFoundExc('Origin %s or its visit '
'with id %s not found!' % (origin_url, visit_id))
visit['origin'] = origin_url
return converters.from_origin_visit(visit)
def lookup_snapshot_sizes(snapshot_id):
"""Count the number of branches in the snapshot with the given id
Args:
snapshot_id (str): sha1 identifier of the snapshot
Returns:
dict: A dict whose keys are the target types of branches and
values their corresponding amount
"""
snapshot_id_bin = _to_sha1_bin(snapshot_id)
snapshot_sizes = storage.snapshot_count_branches(snapshot_id_bin)
if 'revision' not in snapshot_sizes:
snapshot_sizes['revision'] = 0
if 'release' not in snapshot_sizes:
snapshot_sizes['release'] = 0
# adjust revision / release count for display if aliases are defined
if 'alias' in snapshot_sizes:
aliases = lookup_snapshot(snapshot_id,
branches_count=snapshot_sizes['alias'],
target_types=['alias'])
for alias in aliases['branches'].values():
if lookup_snapshot(snapshot_id,
branches_from=alias['target'],
branches_count=1,
target_types=['revision']):
snapshot_sizes['revision'] += 1
else:
snapshot_sizes['release'] += 1
del snapshot_sizes['alias']
return snapshot_sizes
def lookup_snapshot(snapshot_id, branches_from='', branches_count=1000,
target_types=None):
"""Return information about a snapshot, aka the list of named
branches found during a specific visit of an origin.
Args:
snapshot_id (str): sha1 identifier of the snapshot
branches_from (str): optional parameter used to skip branches
whose name is lesser than it before returning them
branches_count (int): optional parameter used to restrain
the amount of returned branches
target_types (list): optional parameter used to filter the
target types of branch to return (possible values that can be
contained in that list are `'content', 'directory',
'revision', 'release', 'snapshot', 'alias'`)
Returns:
A dict filled with the snapshot content.
"""
snapshot_id_bin = _to_sha1_bin(snapshot_id)
snapshot = storage.snapshot_get_branches(snapshot_id_bin,
branches_from.encode(),
branches_count, target_types)
if not snapshot:
raise NotFoundExc('Snapshot with id %s not found!' % snapshot_id)
return converters.from_snapshot(snapshot)
def lookup_latest_origin_snapshot(origin, allowed_statuses=None):
"""Return information about the latest snapshot of an origin.
.. warning:: At most 1000 branches contained in the snapshot
will be returned for performance reasons.
Args:
origin: URL or integer identifier of the origin
allowed_statuses: list of visit statuses considered
to find the latest snapshot for the visit. For instance,
``allowed_statuses=['full']`` will only consider visits that
have successfully run to completion.
Returns:
A dict filled with the snapshot content.
"""
snapshot = storage.snapshot_get_latest(origin, allowed_statuses)
return converters.from_snapshot(snapshot)
def lookup_revision_through(revision, limit=100):
"""Retrieve a revision from the criterion stored in revision dictionary.
Args:
revision: Dictionary of criterion to lookup the revision with.
Here are the supported combination of possible values:
- origin_url, branch_name, ts, sha1_git
- origin_url, branch_name, ts
- sha1_git_root, sha1_git
- sha1_git
Returns:
None if the revision is not found or the actual revision.
"""
if (
'origin_url' in revision and
'branch_name' in revision and
'ts' in revision and
'sha1_git' in revision):
return lookup_revision_with_context_by(revision['origin_url'],
revision['branch_name'],
revision['ts'],
revision['sha1_git'],
limit)
if (
'origin_url' in revision and
'branch_name' in revision and
'ts' in revision):
return lookup_revision_by(revision['origin_url'],
revision['branch_name'],
revision['ts'])
if (
'sha1_git_root' in revision and
'sha1_git' in revision):
return lookup_revision_with_context(revision['sha1_git_root'],
revision['sha1_git'],
limit)
if 'sha1_git' in revision:
return lookup_revision(revision['sha1_git'])
# this should not happen
raise NotImplementedError('Should not happen!')
def lookup_directory_through_revision(revision, path=None,
limit=100, with_data=False):
"""Retrieve the directory information from the revision.
Args:
revision: dictionary of criterion representing a revision to lookup
path: directory's path to lookup.
limit: optional query parameter to limit the revisions log (default to
100). For now, note that this limit could impede the transitivity
conclusion about sha1_git not being an ancestor of.
with_data: indicate to retrieve the content's raw data if path resolves
to a content.
Returns:
The directory pointing to by the revision criterions at path.
"""
rev = lookup_revision_through(revision, limit)
if not rev:
raise NotFoundExc('Revision with criterion %s not found!' % revision)
return (rev['id'],
lookup_directory_with_revision(rev['id'], path, with_data))
def vault_cook(obj_type, obj_id, email=None):
"""Cook a vault bundle.
"""
return vault.cook(obj_type, obj_id, email=email)
def vault_fetch(obj_type, obj_id):
"""Fetch a vault bundle.
"""
return vault.fetch(obj_type, obj_id)
def vault_progress(obj_type, obj_id):
"""Get the current progress of a vault bundle.
"""
return vault.progress(obj_type, obj_id)
def diff_revision(rev_id):
"""Get the list of file changes (insertion / deletion / modification /
renaming) for a particular revision.
"""
rev_sha1_git_bin = _to_sha1_bin(rev_id)
changes = diff.diff_revision(storage, rev_sha1_git_bin,
track_renaming=True)
for change in changes:
change['from'] = converters.from_directory_entry(change['from'])
change['to'] = converters.from_directory_entry(change['to'])
if change['from_path']:
change['from_path'] = change['from_path'].decode('utf-8')
if change['to_path']:
change['to_path'] = change['to_path'].decode('utf-8')
return changes
class _RevisionsWalkerProxy(object):
"""
Proxy class wrapping a revisions walker iterator from
swh-storage and performing needed conversions.
"""
def __init__(self, rev_walker_type, rev_start, *args, **kwargs):
rev_start_bin = hashutil.hash_to_bytes(rev_start)
self.revisions_walker = \
revisions_walker.get_revisions_walker(rev_walker_type,
storage,
rev_start_bin,
*args, **kwargs)
def export_state(self):
return self.revisions_walker.export_state()
def __next__(self):
return converters.from_revision(next(self.revisions_walker))
def __iter__(self):
return self
def get_revisions_walker(rev_walker_type, rev_start, *args, **kwargs):
"""
Utility function to instantiate a revisions walker of a given type,
see :mod:`swh.storage.algos.revisions_walker`.
Args:
rev_walker_type (str): the type of revisions walker to return,
possible values are: ``committer_date``, ``dfs``, ``dfs_post``,
``bfs`` and ``path``
rev_start (str): hexadecimal representation of a revision identifier
args (list): position arguments to pass to the revisions walker
constructor
kwargs (dict): keyword arguments to pass to the revisions walker
constructor
"""
# first check if the provided revision is valid
lookup_revision(rev_start)
return _RevisionsWalkerProxy(rev_walker_type, rev_start, *args, **kwargs)
def lookup_object(object_type: str, object_id: str) -> Dict[str, Any]:
"""
Utility function for looking up an object in the archive by its type
and id.
Args:
object_type (str): the type of object to lookup, either *content*,
*directory*, *release*, *revision* or *snapshot*
object_id (str): the *sha1_git* checksum identifier in hexadecimal
form of the object to lookup
Returns:
Dict[str, Any]: A dictionary describing the object or a list of
dictionary for the directory object type.
Raises:
NotFoundExc: if the object could not be found in the archive
BadInputExc: if the object identifier is invalid
"""
if object_type == CONTENT:
return lookup_content(f'sha1_git:{object_id}')
elif object_type == DIRECTORY:
return {
'id': object_id,
'content': list(lookup_directory(object_id))
}
elif object_type == RELEASE:
return lookup_release(object_id)
elif object_type == REVISION:
return lookup_revision(object_id)
elif object_type == SNAPSHOT:
return lookup_snapshot(object_id)
raise BadInputExc(('Invalid swh object type! Valid types are '
f'{CONTENT}, {DIRECTORY}, {RELEASE} '
f'{REVISION} or {SNAPSHOT}.'))
diff --git a/swh/web/tests/api/views/test_origin.py b/swh/web/tests/api/views/test_origin.py
index 02edcae3..1c1560a7 100644
--- a/swh/web/tests/api/views/test_origin.py
+++ b/swh/web/tests/api/views/test_origin.py
@@ -1,643 +1,686 @@
# Copyright (C) 2015-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from hypothesis import given
import pytest
from requests.utils import parse_header_links
from swh.storage.exc import StorageDBError, StorageAPIError
from swh.web.common.exc import BadInputExc
from swh.web.common.utils import reverse
from swh.web.common.origin_visits import get_origin_visits
from swh.web.tests.strategies import (
origin, new_origin, visit_dates, new_snapshots
)
def _scroll_results(api_client, url):
"""Iterates through pages of results, and returns them all."""
results = []
while True:
rv = api_client.get(url)
assert rv.status_code == 200, rv.data
assert rv['Content-Type'] == 'application/json'
results.extend(rv.data)
if 'Link' in rv:
for link in parse_header_links(rv['Link']):
if link['rel'] == 'next':
# Found link to next page of results
url = link['url']
break
else:
# No link with 'rel=next'
break
else:
# No Link header
break
return results
def test_api_lookup_origin_visits_raise_error(api_client, mocker):
mock_get_origin_visits = mocker.patch(
'swh.web.api.views.origin.get_origin_visits')
err_msg = 'voluntary error to check the bad request middleware.'
mock_get_origin_visits.side_effect = BadInputExc(err_msg)
url = reverse('api-1-origin-visits', url_args={'origin_url': 'http://foo'})
rv = api_client.get(url)
assert rv.status_code == 400, rv.data
assert rv['Content-Type'] == 'application/json'
assert rv.data == {
'exception': 'BadInputExc',
'reason': err_msg
}
def test_api_lookup_origin_visits_raise_swh_storage_error_db(api_client,
mocker):
mock_get_origin_visits = mocker.patch(
'swh.web.api.views.origin.get_origin_visits')
err_msg = 'Storage exploded! Will be back online shortly!'
mock_get_origin_visits.side_effect = StorageDBError(err_msg)
url = reverse('api-1-origin-visits', url_args={'origin_url': 'http://foo'})
rv = api_client.get(url)
assert rv.status_code == 503, rv.data
assert rv['Content-Type'] == 'application/json'
assert rv.data == {
'exception': 'StorageDBError',
'reason':
'An unexpected error occurred in the backend: %s' % err_msg
}
def test_api_lookup_origin_visits_raise_swh_storage_error_api(api_client,
mocker):
mock_get_origin_visits = mocker.patch(
'swh.web.api.views.origin.get_origin_visits')
err_msg = 'Storage API dropped dead! Will resurrect asap!'
mock_get_origin_visits.side_effect = StorageAPIError(err_msg)
url = reverse(
'api-1-origin-visits', url_args={'origin_url': 'http://foo'})
rv = api_client.get(url)
assert rv.status_code == 503, rv.data
assert rv['Content-Type'] == 'application/json'
assert rv.data == {
'exception': 'StorageAPIError',
'reason':
'An unexpected error occurred in the api backend: %s' % err_msg
}
@given(new_origin(), visit_dates(3), new_snapshots(3))
def test_api_lookup_origin_visits(api_client, archive_data, new_origin,
visit_dates, new_snapshots):
archive_data.origin_add_one(new_origin)
for i, visit_date in enumerate(visit_dates):
origin_visit = archive_data.origin_visit_add(
new_origin['url'], visit_date, type='git')
archive_data.snapshot_add([new_snapshots[i]])
archive_data.origin_visit_update(
new_origin['url'], origin_visit['visit'],
snapshot=new_snapshots[i]['id'])
all_visits = list(reversed(get_origin_visits(new_origin)))
for last_visit, expected_visits in (
(None, all_visits[:2]),
(all_visits[1]['visit'], all_visits[2:4])):
url = reverse('api-1-origin-visits',
url_args={'origin_url': new_origin['url']},
query_params={'per_page': 2,
'last_visit': last_visit})
rv = api_client.get(url)
assert rv.status_code == 200, rv.data
assert rv['Content-Type'] == 'application/json'
for expected_visit in expected_visits:
origin_visit_url = reverse(
'api-1-origin-visit',
url_args={'origin_url': new_origin['url'],
'visit_id': expected_visit['visit']})
snapshot_url = reverse(
'api-1-snapshot',
url_args={'snapshot_id': expected_visit['snapshot']})
expected_visit['origin'] = new_origin['url']
expected_visit['origin_visit_url'] = origin_visit_url
expected_visit['snapshot_url'] = snapshot_url
assert rv.data == expected_visits
@given(new_origin(), visit_dates(3), new_snapshots(3))
def test_api_lookup_origin_visits_by_id(api_client, archive_data, new_origin,
visit_dates, new_snapshots):
archive_data.origin_add_one(new_origin)
for i, visit_date in enumerate(visit_dates):
origin_visit = archive_data.origin_visit_add(
new_origin['url'], visit_date, type='git')
archive_data.snapshot_add([new_snapshots[i]])
archive_data.origin_visit_update(
new_origin['url'], origin_visit['visit'],
snapshot=new_snapshots[i]['id'])
all_visits = list(reversed(get_origin_visits(new_origin)))
for last_visit, expected_visits in (
(None, all_visits[:2]),
(all_visits[1]['visit'], all_visits[2:4])):
url = reverse('api-1-origin-visits',
url_args={'origin_url': new_origin['url']},
query_params={'per_page': 2,
'last_visit': last_visit})
rv = api_client.get(url)
assert rv.status_code == 200, rv.data
assert rv['Content-Type'] == 'application/json'
for expected_visit in expected_visits:
origin_visit_url = reverse(
'api-1-origin-visit',
url_args={'origin_url': new_origin['url'],
'visit_id': expected_visit['visit']})
snapshot_url = reverse(
'api-1-snapshot',
url_args={'snapshot_id': expected_visit['snapshot']})
expected_visit['origin'] = new_origin['url']
expected_visit['origin_visit_url'] = origin_visit_url
expected_visit['snapshot_url'] = snapshot_url
assert rv.data == expected_visits
@given(new_origin(), visit_dates(3), new_snapshots(3))
def test_api_lookup_origin_visit(api_client, archive_data, new_origin,
visit_dates, new_snapshots):
archive_data.origin_add_one(new_origin)
for i, visit_date in enumerate(visit_dates):
origin_visit = archive_data.origin_visit_add(
new_origin['url'], visit_date, type='git')
visit_id = origin_visit['visit']
archive_data.snapshot_add([new_snapshots[i]])
archive_data.origin_visit_update(
new_origin['url'], origin_visit['visit'],
snapshot=new_snapshots[i]['id'])
url = reverse('api-1-origin-visit',
url_args={'origin_url': new_origin['url'],
'visit_id': visit_id})
rv = api_client.get(url)
assert rv.status_code == 200, rv.data
assert rv['Content-Type'] == 'application/json'
expected_visit = archive_data.origin_visit_get_by(
new_origin['url'], visit_id)
origin_url = reverse('api-1-origin',
url_args={'origin_url': new_origin['url']})
snapshot_url = reverse(
'api-1-snapshot',
url_args={'snapshot_id': expected_visit['snapshot']})
expected_visit['origin'] = new_origin['url']
expected_visit['origin_url'] = origin_url
expected_visit['snapshot_url'] = snapshot_url
assert rv.data == expected_visit
@given(new_origin())
def test_api_lookup_origin_visit_latest_no_visit(api_client, archive_data,
new_origin):
archive_data.origin_add_one(new_origin)
url = reverse('api-1-origin-visit-latest',
url_args={'origin_url': new_origin['url']})
rv = api_client.get(url)
assert rv.status_code == 404, rv.data
assert rv.data == {
'exception': 'NotFoundExc',
'reason': 'No visit for origin %s found' % new_origin['url']
}
@given(new_origin(), visit_dates(2), new_snapshots(1))
def test_api_lookup_origin_visit_latest(api_client, archive_data, new_origin,
visit_dates, new_snapshots):
archive_data.origin_add_one(new_origin)
visit_dates.sort()
visit_ids = []
for i, visit_date in enumerate(visit_dates):
origin_visit = archive_data.origin_visit_add(
new_origin['url'], visit_date, type='git')
visit_ids.append(origin_visit['visit'])
archive_data.snapshot_add([new_snapshots[0]])
archive_data.origin_visit_update(
new_origin['url'], visit_ids[0],
snapshot=new_snapshots[0]['id'])
url = reverse('api-1-origin-visit-latest',
url_args={'origin_url': new_origin['url']})
rv = api_client.get(url)
assert rv.status_code == 200, rv.data
assert rv['Content-Type'] == 'application/json'
expected_visit = archive_data.origin_visit_get_by(
new_origin['url'], visit_ids[1])
origin_url = reverse('api-1-origin',
url_args={'origin_url': new_origin['url']})
expected_visit['origin'] = new_origin['url']
expected_visit['origin_url'] = origin_url
expected_visit['snapshot_url'] = None
assert rv.data == expected_visit
@given(new_origin(), visit_dates(2), new_snapshots(1))
def test_api_lookup_origin_visit_latest_with_snapshot(api_client, archive_data,
new_origin, visit_dates,
new_snapshots):
archive_data.origin_add_one(new_origin)
visit_dates.sort()
visit_ids = []
for i, visit_date in enumerate(visit_dates):
origin_visit = archive_data.origin_visit_add(
new_origin['url'], visit_date, type='git')
visit_ids.append(origin_visit['visit'])
archive_data.snapshot_add([new_snapshots[0]])
archive_data.origin_visit_update(
new_origin['url'], visit_ids[0],
snapshot=new_snapshots[0]['id'])
url = reverse('api-1-origin-visit-latest',
url_args={'origin_url': new_origin['url']})
url += '?require_snapshot=true'
rv = api_client.get(url)
assert rv.status_code == 200, rv.data
assert rv['Content-Type'] == 'application/json'
expected_visit = archive_data.origin_visit_get_by(
new_origin['url'], visit_ids[0])
origin_url = reverse('api-1-origin',
url_args={'origin_url': new_origin['url']})
snapshot_url = reverse(
'api-1-snapshot',
url_args={'snapshot_id': expected_visit['snapshot']})
expected_visit['origin'] = new_origin['url']
expected_visit['origin_url'] = origin_url
expected_visit['snapshot_url'] = snapshot_url
assert rv.data == expected_visit
@given(origin())
def test_api_lookup_origin_visit_not_found(api_client, origin):
all_visits = list(reversed(get_origin_visits(origin)))
max_visit_id = max([v['visit'] for v in all_visits])
url = reverse('api-1-origin-visit',
url_args={'origin_url': origin['url'],
'visit_id': max_visit_id + 1})
rv = api_client.get(url)
assert rv.status_code == 404, rv.data
assert rv['Content-Type'] == 'application/json'
assert rv.data == {
'exception': 'NotFoundExc',
'reason': 'Origin %s or its visit with id %s not found!' %
(origin['url'], max_visit_id+1)
}
def test_api_origins(api_client, archive_data):
origins = list(archive_data.origin_get_range(0, 10000))
origin_urls = {origin['url'] for origin in origins}
# Get only one
url = reverse('api-1-origins',
query_params={'origin_count': 1})
rv = api_client.get(url)
assert rv.status_code == 200, rv.data
assert rv['Content-Type'] == 'application/json'
assert len(rv.data) == 1
assert {origin['url'] for origin in rv.data} <= origin_urls
# Get all
url = reverse('api-1-origins',
query_params={'origin_count': len(origins)})
rv = api_client.get(url)
assert rv.status_code == 200, rv.data
assert rv['Content-Type'] == 'application/json'
assert len(rv.data) == len(origins)
assert {origin['url'] for origin in rv.data} == origin_urls
# Get "all + 10"
url = reverse('api-1-origins',
query_params={'origin_count': len(origins)+10})
rv = api_client.get(url)
assert rv.status_code == 200, rv.data
assert rv['Content-Type'] == 'application/json'
assert len(rv.data) == len(origins)
assert {origin['url'] for origin in rv.data} == origin_urls
@pytest.mark.parametrize('origin_count', [1, 2, 10, 100])
def test_api_origins_scroll(api_client, archive_data, origin_count):
origins = list(archive_data.origin_get_range(0, 10000))
origin_urls = {origin['url'] for origin in origins}
url = reverse('api-1-origins',
query_params={'origin_count': origin_count})
results = _scroll_results(api_client, url)
assert len(results) == len(origins)
assert {origin['url'] for origin in results} == origin_urls
@given(origin())
def test_api_origin_by_url(api_client, archive_data, origin):
url = reverse('api-1-origin',
url_args={'origin_url': origin['url']})
rv = api_client.get(url)
expected_origin = archive_data.origin_get(origin)
origin_visits_url = reverse('api-1-origin-visits',
url_args={'origin_url': origin['url']})
expected_origin['origin_visits_url'] = origin_visits_url
assert rv.status_code == 200, rv.data
assert rv['Content-Type'] == 'application/json'
assert rv.data == expected_origin
@given(new_origin())
def test_api_origin_not_found(api_client, new_origin):
url = reverse('api-1-origin',
url_args={'origin_url': new_origin['url']})
rv = api_client.get(url)
assert rv.status_code == 404, rv.data
assert rv['Content-Type'] == 'application/json'
assert rv.data == {
'exception': 'NotFoundExc',
'reason': 'Origin with url %s not found!' % new_origin['url']
}
def test_api_origin_search(api_client):
expected_origins = {
'https://github.com/wcoder/highlightjs-line-numbers.js',
'https://github.com/memononen/libtess2',
}
# Search for 'github.com', get only one
url = reverse('api-1-origin-search',
url_args={'url_pattern': 'github.com'},
query_params={'limit': 1})
rv = api_client.get(url)
assert rv.status_code == 200, rv.data
assert rv['Content-Type'] == 'application/json'
assert len(rv.data) == 1
assert {origin['url'] for origin in rv.data} <= expected_origins
# Search for 'github.com', get all
url = reverse('api-1-origin-search',
url_args={'url_pattern': 'github.com'},
query_params={'limit': 2})
rv = api_client.get(url)
assert rv.status_code == 200, rv.data
assert rv['Content-Type'] == 'application/json'
assert {origin['url'] for origin in rv.data} == expected_origins
# Search for 'github.com', get more than available
url = reverse('api-1-origin-search',
url_args={'url_pattern': 'github.com'},
query_params={'limit': 10})
rv = api_client.get(url)
assert rv.status_code == 200, rv.data
assert rv['Content-Type'] == 'application/json'
assert {origin['url'] for origin in rv.data} == expected_origins
+def test_api_origin_search_words(api_client):
+ expected_origins = {
+ 'https://github.com/wcoder/highlightjs-line-numbers.js',
+ 'https://github.com/memononen/libtess2',
+ }
+
+ url = reverse('api-1-origin-search',
+ url_args={'url_pattern': 'github com'},
+ query_params={'limit': 2})
+ rv = api_client.get(url)
+ assert rv.status_code == 200, rv.data
+ assert rv['Content-Type'] == 'application/json'
+ assert {origin['url'] for origin in rv.data} == expected_origins
+
+ url = reverse('api-1-origin-search',
+ url_args={'url_pattern': 'com github'},
+ query_params={'limit': 2})
+ rv = api_client.get(url)
+ assert rv.status_code == 200, rv.data
+ assert rv['Content-Type'] == 'application/json'
+ assert {origin['url'] for origin in rv.data} == expected_origins
+
+ url = reverse('api-1-origin-search',
+ url_args={'url_pattern': 'memononen libtess2'},
+ query_params={'limit': 2})
+ rv = api_client.get(url)
+ assert rv.status_code == 200, rv.data
+ assert rv['Content-Type'] == 'application/json'
+ assert len(rv.data) == 1
+ assert {origin['url'] for origin in rv.data} \
+ == {'https://github.com/memononen/libtess2'}
+
+ url = reverse('api-1-origin-search',
+ url_args={'url_pattern': 'libtess2 memononen'},
+ query_params={'limit': 2})
+ rv = api_client.get(url)
+ assert rv.status_code == 200, rv.data
+ assert rv['Content-Type'] == 'application/json'
+ assert len(rv.data) == 1
+ assert {origin['url'] for origin in rv.data} \
+ == {'https://github.com/memononen/libtess2'}
+
+
def test_api_origin_search_regexp(api_client):
expected_origins = {
'https://github.com/memononen/libtess2',
'repo_with_submodules'
}
url = reverse('api-1-origin-search',
url_args={'url_pattern': '(repo|libtess)'},
query_params={'limit': 10,
'regexp': True})
rv = api_client.get(url)
assert rv.status_code == 200, rv.data
assert rv['Content-Type'] == 'application/json'
assert {origin['url'] for origin in rv.data} == expected_origins
@pytest.mark.parametrize('limit', [1, 2, 3, 10])
def test_api_origin_search_scroll(api_client, archive_data, limit):
expected_origins = {
'https://github.com/wcoder/highlightjs-line-numbers.js',
'https://github.com/memononen/libtess2',
}
url = reverse('api-1-origin-search',
url_args={'url_pattern': 'github.com'},
query_params={'limit': limit})
results = _scroll_results(api_client, url)
assert {origin['url'] for origin in results} == expected_origins
def test_api_origin_search_limit(api_client, archive_data):
archive_data.origin_add([
{'url': 'http://foobar/{}'.format(i)}
for i in range(2000)
])
url = reverse('api-1-origin-search',
url_args={'url_pattern': 'foobar'},
query_params={'limit': 1050})
rv = api_client.get(url)
assert rv.status_code == 200, rv.data
assert rv['Content-Type'] == 'application/json'
assert len(rv.data) == 1000
@given(origin())
def test_api_origin_metadata_search(api_client, mocker, origin):
mock_idx_storage = mocker.patch('swh.web.common.service.idx_storage')
oimsft = mock_idx_storage.origin_intrinsic_metadata_search_fulltext
oimsft.side_effect = lambda conjunction, limit: [{
'from_revision': (
b'p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed '
b'\xf2U\xfa\x05B8'),
'metadata': {'author': 'Jane Doe'},
'id': origin['url'],
'tool': {
'configuration': {
'context': ['NpmMapping', 'CodemetaMapping'],
'type': 'local'
},
'id': 3,
'name': 'swh-metadata-detector',
'version': '0.0.1'
}
}]
url = reverse('api-1-origin-metadata-search',
query_params={'fulltext': 'Jane Doe'})
rv = api_client.get(url)
assert rv.status_code == 200, rv.content
assert rv['Content-Type'] == 'application/json'
expected_data = [{
'url': origin['url'],
'metadata': {
'metadata': {'author': 'Jane Doe'},
'from_revision': (
'7026b7c1a2af56521e951c01ed20f255fa054238'),
'tool': {
'configuration': {
'context': ['NpmMapping', 'CodemetaMapping'],
'type': 'local'
},
'id': 3,
'name': 'swh-metadata-detector',
'version': '0.0.1',
}
}
}]
assert rv.data == expected_data
oimsft.assert_called_with(conjunction=['Jane Doe'], limit=70)
@given(origin())
def test_api_origin_metadata_search_limit(api_client, mocker, origin):
mock_idx_storage = mocker.patch('swh.web.common.service.idx_storage')
oimsft = mock_idx_storage.origin_intrinsic_metadata_search_fulltext
oimsft.side_effect = lambda conjunction, limit: [{
'from_revision': (
b'p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed '
b'\xf2U\xfa\x05B8'),
'metadata': {'author': 'Jane Doe'},
'id': origin['url'],
'tool': {
'configuration': {
'context': ['NpmMapping', 'CodemetaMapping'],
'type': 'local'
},
'id': 3,
'name': 'swh-metadata-detector',
'version': '0.0.1'
}
}]
url = reverse('api-1-origin-metadata-search',
query_params={'fulltext': 'Jane Doe'})
rv = api_client.get(url)
assert rv.status_code == 200, rv.content
assert rv['Content-Type'] == 'application/json'
assert len(rv.data) == 1
oimsft.assert_called_with(conjunction=['Jane Doe'], limit=70)
url = reverse('api-1-origin-metadata-search',
query_params={'fulltext': 'Jane Doe',
'limit': 10})
rv = api_client.get(url)
assert rv.status_code == 200, rv.content
assert rv['Content-Type'] == 'application/json'
assert len(rv.data) == 1
oimsft.assert_called_with(conjunction=['Jane Doe'], limit=10)
url = reverse('api-1-origin-metadata-search',
query_params={'fulltext': 'Jane Doe',
'limit': 987})
rv = api_client.get(url)
assert rv.status_code == 200, rv.content
assert rv['Content-Type'] == 'application/json'
assert len(rv.data) == 1
oimsft.assert_called_with(conjunction=['Jane Doe'], limit=100)
@given(origin())
def test_api_origin_intrinsic_metadata(api_client, mocker, origin):
mock_idx_storage = mocker.patch('swh.web.common.service.idx_storage')
oimg = mock_idx_storage.origin_intrinsic_metadata_get
oimg.side_effect = lambda origin_urls: [{
'from_revision': (
b'p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed '
b'\xf2U\xfa\x05B8'),
'metadata': {'author': 'Jane Doe'},
'id': origin['url'],
'tool': {
'configuration': {
'context': ['NpmMapping', 'CodemetaMapping'],
'type': 'local'
},
'id': 3,
'name': 'swh-metadata-detector',
'version': '0.0.1'
}
}]
url = reverse('api-origin-intrinsic-metadata',
url_args={'origin_url': origin['url']})
rv = api_client.get(url)
oimg.assert_called_once_with([origin['url']])
assert rv.status_code == 200, rv.content
assert rv['Content-Type'] == 'application/json'
expected_data = {'author': 'Jane Doe'}
assert rv.data == expected_data
def test_api_origin_metadata_search_invalid(api_client, mocker):
mock_idx_storage = mocker.patch('swh.web.common.service.idx_storage')
url = reverse('api-1-origin-metadata-search')
rv = api_client.get(url)
assert rv.status_code == 400, rv.content
mock_idx_storage.assert_not_called()