diff --git a/swh/web/ui/backend.py b/swh/web/ui/backend.py index d1f914352..bb8ad44de 100644 --- a/swh/web/ui/backend.py +++ b/swh/web/ui/backend.py @@ -1,273 +1,273 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import os from . import main def content_get(sha1_bin): """Lookup the content designed by {algo: hash_bin}. Args: sha1_bin: content's binary sha1. Returns: Content as dict with 'sha1' and 'data' keys. data representing its raw data. """ contents = main.storage().content_get([sha1_bin]) if contents and len(contents) >= 1: return contents[0] return None def content_find(algo, hash_bin): """Retrieve the content with binary hash hash_bin Args: algo: nature of the hash hash_bin. hash_bin: content's hash searched for. Returns: A triplet (sha1, sha1_git, sha256) if the content exist or None otherwise. """ return main.storage().content_find({algo: hash_bin}) def content_find_provenance(algo, hash_bin): """Find the content's provenance information. Args: algo: nature of the hash hash_bin. hash_bin: content's hash corresponding to algo searched for. Yields: Yields the list of provenance information for that content if any (this can be empty if the cache is not populated) """ return main.storage().content_find_provenance({algo: hash_bin}) def content_missing_per_sha1(sha1list): """List content missing from storage based on sha1 Args: sha1s: Iterable of sha1 to check for absence Returns: an iterable of sha1s missing from the storage """ return main.storage().content_missing_per_sha1(sha1list) def directory_get(sha1_bin): """Retrieve information on one directory. Args: sha1_bin: Directory's identifier Returns: The directory's information. """ res = main.storage().directory_get([sha1_bin]) if res and len(res) >= 1: return res[0] def origin_get(origin): """Return information about the origin matching dict origin. Args: origin: origin's dict with keys either 'id' or ('type' AND 'url') Returns: Origin information as dict. """ return main.storage().origin_get(origin) def person_get(person_id): """Return information about the person with id person_id. Args: person_id: person's identifier.v Returns: Person information as dict. """ res = main.storage().person_get([person_id]) if res and len(res) >= 1: return res[0] def directory_ls(sha1_git_bin, recursive=False): """Return information about the directory with id sha1_git. Args: sha1_git: directory's identifier. recursive: Optional recursive flag default to False Returns: Directory information as dict. """ directory_entries = main.storage().directory_ls(sha1_git_bin, recursive) if not directory_entries: return [] return directory_entries def release_get(sha1_git_bin): """Return information about the release with sha1 sha1_git_bin. Args: sha1_git_bin: The release's sha1 as bytes. Returns: Release information as dict if found, None otherwise. Raises: ValueError if the identifier provided is not of sha1 nature. """ res = main.storage().release_get([sha1_git_bin]) if res and len(res) >= 1: return res[0] return None def revision_get(sha1_git_bin): """Return information about the revision with sha1 sha1_git_bin. Args: sha1_git_bin: The revision's sha1 as bytes. Returns: Revision information as dict if found, None otherwise. Raises: ValueError if the identifier provided is not of sha1 nature. """ res = main.storage().revision_get([sha1_git_bin]) if res and len(res) >= 1: return res[0] return None def revision_get_multiple(sha1_git_bin_list): """Return information about the revisions in sha1_git_bin_list Args: sha1_git_bin_list: The revisions' sha1s as a list of bytes. Returns: Revisions' information as an iterable of dicts if any found, an empty list otherwise Raises: ValueError if the identifier provided is not of sha1 nature. """ res = main.storage().revision_get(sha1_git_bin_list) if res and len(res) >= 1: return res return [] def revision_log(sha1_git_bin, limit): """Return information about the revision with sha1 sha1_git_bin. Args: sha1_git_bin: The revision's sha1 as bytes. limit: the maximum number of revisions returned. Returns: Revision information as dict if found, None otherwise. Raises: ValueError if the identifier provided is not of sha1 nature. """ return main.storage().revision_log([sha1_git_bin], limit) def revision_log_by(origin_id, branch_name, ts, limit): """Return information about the revision matching the timestamp ts, from origin origin_id, in branch branch_name. Args: origin_id: origin of the revision - branch_name: revision's branch. - timestamp: revision's time frame. Returns: Information for the revision matching the criterions. """ return main.storage().revision_log_by(origin_id, branch_name, ts, limit=limit) def stat_counters(): """Return the stat counters for Software Heritage Returns: A dict mapping textual labels to integer values. """ return main.storage().stat_counters() -def stat_origin_visits(origin_id): +def lookup_origin_visits(origin_id): """Return the dates at which the given origin was scanned for content. Returns: An array of dates """ return main.storage().origin_visit_get(origin_id) def revision_get_by(origin_id, branch_name, timestamp): """Return occurrence information matching the criterions origin_id, branch_name, ts. """ res = main.storage().revision_get_by(origin_id, branch_name, timestamp=timestamp, limit=1) if not res: return None return res[0] def directory_entry_get_by_path(directory, path): """Return a directory entry by its path. """ paths = path.strip(os.path.sep).split(os.path.sep) return main.storage().directory_entry_get_by_path( directory, list(map(lambda p: p.encode('utf-8'), paths))) def entity_get(uuid): """Retrieve the entity per its uuid. """ return main.storage().entity_get(uuid) diff --git a/swh/web/ui/service.py b/swh/web/ui/service.py index e33ff0c0e..98269aa22 100644 --- a/swh/web/ui/service.py +++ b/swh/web/ui/service.py @@ -1,627 +1,627 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict from swh.core import hashutil from swh.web.ui import converters, query, backend from swh.web.ui.exc import NotFoundExc def lookup_multiple_hashes(hashes): """Lookup the passed hashes in a single DB connection, using batch processing. Args: An array of {filename: X, sha1: Y}, string X, hex sha1 string Y. Returns: The same array with elements updated with elem['found'] = true if the hash is present in storage, elem['found'] = false if not. """ hashlist = [hashutil.hex_to_hash(elem['sha1']) for elem in hashes] content_missing = backend.content_missing_per_sha1(hashlist) missing = [hashutil.hash_to_hex(x) for x in content_missing] for x in hashes: x.update({'found': True}) for h in hashes: if h['sha1'] in missing: h['found'] = False return hashes def lookup_hash(q): """Checks if the storage contains a given content checksum Args: query string of the form Returns: Dict with key found containing the hash info if the hash is present, None if not. """ algo, hash = query.parse_hash(q) found = backend.content_find(algo, hash) return {'found': found, 'algo': algo} def search_hash(q): """Checks if the storage contains a given content checksum Args: query string of the form Returns: Dict with key found to True or False, according to whether the checksum is present or not """ algo, hash = query.parse_hash(q) found = backend.content_find(algo, hash) return {'found': found is not None} def lookup_content_provenance(q): """Return provenance information from a specified content. Args: q: query string of the form Yields: provenance information (dict) list if the content is found. """ algo, hash = query.parse_hash(q) provenances = backend.content_find_provenance(algo, hash) if not provenances: return None return (converters.from_provenance(p) for p in provenances) def lookup_origin(origin): """Return information about the origin matching dict origin. Args: origin: origin's dict with keys either 'id' or ('type' AND 'url') Returns: origin information as dict. """ return backend.origin_get(origin) def lookup_person(person_id): """Return information about the person with id person_id. Args: person_id as string Returns: person information as dict. """ person = backend.person_get(person_id) return converters.from_person(person) def lookup_directory(sha1_git): """Return information about the directory with id sha1_git. Args: sha1_git as string Returns: directory information as dict. """ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( sha1_git, ['sha1'], # HACK: sha1_git really 'Only sha1_git is supported.') dir = backend.directory_get(sha1_git_bin) if not dir: return None directory_entries = backend.directory_ls(sha1_git_bin) return map(converters.from_directory_entry, directory_entries) def lookup_directory_with_path(directory_sha1_git, path_string): """Return directory information for entry with path path_string w.r.t. root directory pointed by directory_sha1_git Args: - directory_sha1_git: sha1_git corresponding to the directory to which we append paths to (hopefully) find the entry - the relative path to the entry starting from the directory pointed by directory_sha1_git Raises: NotFoundExc if the directory entry is not found """ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( directory_sha1_git, ['sha1'], 'Only sha1_git is supported.') queried_dir = backend.directory_entry_get_by_path( sha1_git_bin, path_string) if not queried_dir: raise NotFoundExc(('Directory entry with path %s from %s not found') % (path_string, directory_sha1_git)) return converters.from_directory_entry(queried_dir) def lookup_release(release_sha1_git): """Return information about the release with sha1 release_sha1_git. Args: release_sha1_git: The release's sha1 as hexadecimal Returns: Release information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. """ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( release_sha1_git, ['sha1'], 'Only sha1_git is supported.') res = backend.release_get(sha1_git_bin) return converters.from_release(res) def lookup_revision(rev_sha1_git): """Return information about the revision with sha1 revision_sha1_git. Args: revision_sha1_git: The revision's sha1 as hexadecimal Returns: Revision information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. """ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( rev_sha1_git, ['sha1'], 'Only sha1_git is supported.') revision = backend.revision_get(sha1_git_bin) return converters.from_revision(revision) def lookup_revision_multiple(sha1_git_list): """Return information about the revision with sha1 revision_sha1_git. Args: revision_sha1_git: The revision's sha1 as hexadecimal Returns: Revision information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. """ def to_sha1_bin(sha1_hex): _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( sha1_hex, ['sha1'], 'Only sha1_git is supported.') return sha1_git_bin sha1_bin_list = (to_sha1_bin(x) for x in sha1_git_list) revisions = backend.revision_get_multiple(sha1_bin_list) return (converters.from_revision(x) for x in revisions) def lookup_revision_message(rev_sha1_git): """Return the raw message of the revision with sha1 revision_sha1_git. Args: revision_sha1_git: The revision's sha1 as hexadecimal Returns: Decoded revision message as dict {'message': } Raises: ValueError if the identifier provided is not of sha1 nature. NotFoundExc if the revision is not found, or if it has no message """ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( rev_sha1_git, ['sha1'], 'Only sha1_git is supported.') revision = backend.revision_get(sha1_git_bin) if not revision: raise NotFoundExc('Revision with sha1_git %s not found.' % rev_sha1_git) if 'message' not in revision: raise NotFoundExc('No message for revision with sha1_git %s.' % rev_sha1_git) res = {'message': revision['message']} return res def lookup_revision_by(origin_id, branch_name="refs/heads/master", timestamp=None): """Lookup revisions by origin_id, branch_name and timestamp. If: - branch_name is not provided, lookup using 'refs/heads/master' as default. - ts is not provided, use the most recent Args: - origin_id: origin of the revision. - branch_name: revision's branch. - timestamp: revision's time frame. Yields: The revisions matching the criterions. """ res = backend.revision_get_by(origin_id, branch_name, timestamp) return converters.from_revision(res) def lookup_revision_log(rev_sha1_git, limit): """Return information about the revision with sha1 revision_sha1_git. Args: revision_sha1_git: The revision's sha1 as hexadecimal limit: the maximum number of revisions returned Returns: Revision information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. """ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( rev_sha1_git, ['sha1'], 'Only sha1_git is supported.') revision_entries = backend.revision_log(sha1_git_bin, limit) return map(converters.from_revision, revision_entries) def lookup_revision_log_by(origin_id, branch_name, timestamp, limit): """Return information about the revision with sha1 revision_sha1_git. Args: origin_id: origin of the revision branch_name: revision's branch timestamp: revision's time frame limit: the maximum number of revisions returned Returns: Revision information as dict. Raises: NotFoundExc if no revision corresponds to the criterion NotFoundExc if the corresponding revision has no log """ revision_entries = backend.revision_log_by(origin_id, branch_name, timestamp, limit) if not revision_entries: return None return map(converters.from_revision, revision_entries) def lookup_revision_with_context_by(origin_id, branch_name, ts, sha1_git, limit=100): """Return information about revision sha1_git, limited to the sub-graph of all transitive parents of sha1_git_root. sha1_git_root being resolved through the lookup of a revision by origin_id, branch_name and ts. In other words, sha1_git is an ancestor of sha1_git_root. Args: - origin_id: origin of the revision. - branch_name: revision's branch. - timestamp: revision's time frame. - sha1_git: one of sha1_git_root's ancestors. - limit: limit the lookup to 100 revisions back. Returns: Pair of (root_revision, revision). Information on sha1_git if it is an ancestor of sha1_git_root including children leading to sha1_git_root Raises: - BadInputExc in case of unknown algo_hash or bad hash. - NotFoundExc if either revision is not found or if sha1_git is not an ancestor of sha1_git_root. """ rev_root = backend.revision_get_by(origin_id, branch_name, ts) if not rev_root: raise NotFoundExc('Revision with (origin_id: %s, branch_name: %s' ', ts: %s) not found.' % (origin_id, branch_name, ts)) return (converters.from_revision(rev_root), lookup_revision_with_context(rev_root, sha1_git, limit)) def lookup_revision_with_context(sha1_git_root, sha1_git, limit=100): """Return information about revision sha1_git, limited to the sub-graph of all transitive parents of sha1_git_root. In other words, sha1_git is an ancestor of sha1_git_root. Args: sha1_git_root: latest revision. The type is either a sha1 (as an hex string) or a non converted dict. sha1_git: one of sha1_git_root's ancestors limit: limit the lookup to 100 revisions back Returns: Information on sha1_git if it is an ancestor of sha1_git_root including children leading to sha1_git_root Raises: BadInputExc in case of unknown algo_hash or bad hash NotFoundExc if either revision is not found or if sha1_git is not an ancestor of sha1_git_root """ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( sha1_git, ['sha1'], 'Only sha1_git is supported.') revision = backend.revision_get(sha1_git_bin) if not revision: raise NotFoundExc('Revision %s not found' % sha1_git) if isinstance(sha1_git_root, str): _, sha1_git_root_bin = query.parse_hash_with_algorithms_or_throws( sha1_git_root, ['sha1'], 'Only sha1_git is supported.') revision_root = backend.revision_get(sha1_git_root_bin) if not revision_root: raise NotFoundExc('Revision root %s not found' % sha1_git_root) else: sha1_git_root_bin = sha1_git_root['id'] revision_log = backend.revision_log(sha1_git_root_bin, limit) parents = {} children = defaultdict(list) for rev in revision_log: rev_id = rev['id'] parents[rev_id] = [] for parent_id in rev['parents']: parents[rev_id].append(parent_id) children[parent_id].append(rev_id) if revision['id'] not in parents: raise NotFoundExc('Revision %s is not an ancestor of %s' % (sha1_git, sha1_git_root)) revision['children'] = children[revision['id']] return converters.from_revision(revision) def lookup_directory_with_revision(sha1_git, dir_path=None, with_data=False): """Return information on directory pointed by revision with sha1_git. If dir_path is not provided, display top level directory. Otherwise, display the directory pointed by dir_path (if it exists). Args: sha1_git: revision's hash. dir_path: optional directory pointed to by that revision. with_data: boolean that indicates to retrieve the raw data if the path resolves to a content. Default to False (for the api) Returns: Information on the directory pointed to by that revision. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc either if the revision is not found or the path referenced does not exist. NotImplementedError in case of dir_path exists but do not reference a type 'dir' or 'file'. """ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( sha1_git, ['sha1'], 'Only sha1_git is supported.') revision = backend.revision_get(sha1_git_bin) if not revision: raise NotFoundExc('Revision %s not found' % sha1_git) dir_sha1_git_bin = revision['directory'] if dir_path: entity = backend.directory_entry_get_by_path(dir_sha1_git_bin, dir_path) if not entity: raise NotFoundExc( "Directory or File '%s' pointed to by revision %s not found" % (dir_path, sha1_git)) else: entity = {'type': 'dir', 'target': dir_sha1_git_bin} if entity['type'] == 'dir': directory_entries = backend.directory_ls(entity['target']) return {'type': 'dir', 'path': '.' if not dir_path else dir_path, 'revision': sha1_git, 'content': map(converters.from_directory_entry, directory_entries)} elif entity['type'] == 'file': # content content = backend.content_find('sha1_git', entity['target']) if with_data: content['data'] = backend.content_get(content['sha1'])['data'] return {'type': 'file', 'path': '.' if not dir_path else dir_path, 'revision': sha1_git, 'content': converters.from_content(content)} else: raise NotImplementedError('Entity of type %s not implemented.' % entity['type']) def lookup_content(q): """Lookup the content designed by q. Args: q: The release's sha1 as hexadecimal """ algo, hash = query.parse_hash(q) c = backend.content_find(algo, hash) return converters.from_content(c) def lookup_content_raw(q): """Lookup the content defined by q. Args: q: query string of the form Returns: dict with 'sha1' and 'data' keys. data representing its raw data decoded. """ algo, hash = query.parse_hash(q) c = backend.content_find(algo, hash) if not c: return None content = backend.content_get(c['sha1']) return converters.from_content(content) def stat_counters(): """Return the stat counters for Software Heritage Returns: A dict mapping textual labels to integer values. """ return backend.stat_counters() -def stat_origin_visits(origin_id): +def lookup_origin_visits(origin_id): """Return the dates at which the given origin was scanned for content. Returns: An array of dates in the datetime format """ - for visit in backend.stat_origin_visits(origin_id): + for visit in backend.lookup_origin_visits(origin_id): visit['date'] = visit['date'].timestamp() yield(visit) def lookup_entity_by_uuid(uuid): """Return the entity's hierarchy from its uuid. Args: uuid: entity's identifier. Returns: List of hierarchy entities from the entity with uuid. """ uuid = query.parse_uuid4(uuid) return backend.entity_get(uuid) def lookup_revision_through(revision, limit=100): """Retrieve a revision from the criterion stored in revision dictionary. Args: revision: Dictionary of criterion to lookup the revision with. Here are the supported combination of possible values: - origin_id, branch_name, ts, sha1_git - origin_id, branch_name, ts - sha1_git_root, sha1_git - sha1_git Returns: None if the revision is not found or the actual revision. """ if 'origin_id' in revision and \ 'branch_name' in revision and \ 'ts' in revision and \ 'sha1_git' in revision: return lookup_revision_with_context_by(revision['origin_id'], revision['branch_name'], revision['ts'], revision['sha1_git'], limit) if 'origin_id' in revision and \ 'branch_name' in revision and \ 'ts' in revision: return lookup_revision_by(revision['origin_id'], revision['branch_name'], revision['ts']) if 'sha1_git_root' in revision and \ 'sha1_git' in revision: return lookup_revision_with_context(revision['sha1_git_root'], revision['sha1_git'], limit) if 'sha1_git' in revision: return lookup_revision(revision['sha1_git']) # this should not happen raise NotImplementedError('Should not happen!') def lookup_directory_through_revision(revision, path=None, limit=100, with_data=False): """Retrieve the directory information from the revision. Args: revision: dictionary of criterion representing a revision to lookup path: directory's path to lookup. limit: optional query parameter to limit the revisions log. (default to 100). For now, note that this limit could impede the transitivity conclusion about sha1_git not being an ancestor of. with_data: indicate to retrieve the content's raw data if path resolves to a content. Returns: The directory pointing to by the revision criterions at path. """ rev = lookup_revision_through(revision, limit) if not rev: raise NotFoundExc('Revision with criterion %s not found!' % revision) return (rev['id'], lookup_directory_with_revision(rev['id'], path, with_data)) diff --git a/swh/web/ui/static/js/calendar.js b/swh/web/ui/static/js/calendar.js index c86f48fd5..842d12c5c 100644 --- a/swh/web/ui/static/js/calendar.js +++ b/swh/web/ui/static/js/calendar.js @@ -1,373 +1,373 @@ /** * Calendar: * A one-off object that makes an AJAX call to the API's visit stats * endpoint, then displays these statistics in a zoomable timeline-like * format. * Args: * browse_url: the relative URL for browsing a revision via the web ui, * accurate up to the origin * visit_url: the complete relative URL for getting the origin's visit * stats * origin_id: the origin being browsed * zoomw: the element that should contain the zoomable part of the calendar * staticw: the element that should contain the static part of the calendar * reset: the element that should reset the zoom level on click */ var Calendar = function(browse_url, visit_url, origin_id, zoomw, staticw, reset) { /** Constants **/ this.month_names = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']; /** Display **/ this.desiredPxWidth = 7; this.padding = 0.01; /** Object vars **/ this.origin_id = origin_id; this.zoomw = zoomw; this.staticw = staticw; /** Calendar data **/ this.cal_data = null; this.static = { group_factor: 3600 * 1000, group_data: null, plot_data: null }; this.zoom = { group_factor: 3600 * 1000, group_data: null, plot_data: null }; /** * Keep a reference to the Calendar object context. * Otherwise, 'this' changes to represent current function caller scope */ var self = this; /** Start AJAX call **/ $.ajax({ type: 'GET', url: visit_url, success: function(data) { self.calendar(data); } }); /** * Group the plot's base data according to the grouping ratio and the * range required * Args: * groupFactor: the amount the data should be grouped by * range: * Returns: * A dictionary containing timestamps divided by the grouping ratio as * keys, a list of the corresponding complete timestamps as values */ this.dataGroupedRange = function(groupFactor, range) { var group_dict = {}; var start = range.xaxis.from; var end = range.xaxis.to; var range_data = self.cal_data.filter(function(item, index, arr) { return item >= start && item <= end; }); for (var date_idx in range_data) { var date = range_data[date_idx]; var floor = Math.floor(date / groupFactor); if (group_dict[floor] == undefined) group_dict[floor] = [date]; else group_dict[floor].push(date); } return group_dict; }; /** * Update the ratio that governs how the data is grouped based on changes * in the data range or the display size, and regroup the plot's data * according to this value. * * Args: * element: the element in which the plot is displayed * plotprops: the properties corresponding to that plot * range: the range of the data displayed */ this.updateGroupFactorAndData = function(element, plotprops, range) { var milli_length = range.xaxis.to - range.xaxis.from; var px_length = element.width(); plotprops.group_factor = Math.floor( self.desiredPxWidth * (milli_length / px_length)); plotprops.group_data = self.dataGroupedRange( plotprops.group_factor, range); }; /** Get plot data from the group data **/ this.getPlotData = function(grouped_data) { var plot_data = []; if (self.cal_data.length == 1) { plot_data = [[self.cal_data[0] - 3600*1000*24*30, 0], [self.cal_data[0], 1], [self.cal_data[0] + 3600*1000*24*30, 0]]; } else { $.each(grouped_data, function(key, value) { plot_data.push([value[0], value.length]); }); } return [{ label: 'Calendar', data: plot_data }]; }; this.plotZoom = function(zoom_options) { return $.plot(self.zoomw, self.zoom.plot_data, zoom_options); }; this.plotStatic = function(static_options) { return $.plot(self.staticw, self.static.plot_data, static_options); }; /** * Display a zoomable calendar with click-through links to revisions * of the same origin * * Args: * data: the data that the calendar should present, as a list of * POSIX second-since-epoch timestamps */ this.calendar = function(data) { // POSIX timestamps to JS timestamps self.cal_data = data.map(function(e) - { return Math.floor(e * 1000); }); + { return Math.floor(e['date'] * 1000); }); /** Bootstrap the group ratio **/ var cal_data_range = null; if (self.cal_data.length == 1) { var padding_qty = 3600*1000*24*30; cal_data_range = {xaxis: {from: self.cal_data[0] - padding_qty, to: self.cal_data[0] + padding_qty}}; } else cal_data_range = {xaxis: {from: self.cal_data[0], to: self.cal_data[self.cal_data.length -1] } }; self.updateGroupFactorAndData(self.zoomw, self.zoom, cal_data_range); self.updateGroupFactorAndData(self.staticw, self.static, cal_data_range); /** Bootstrap the plot data **/ self.zoom.plot_data = self.getPlotData(self.zoom.group_data); self.static.plot_data = self.getPlotData(self.zoom.group_data); /** * Return the flot-required function for displaying tooltips, according to * the group we want to display the tooltip for * Args: * group_options: the group we want to display the tooltip for (self.static * or self.zoom) */ function tooltip_fn(group_options) { return function (label, x_timestamp, y_hits, item) { var floor_index = Math.floor( item.datapoint[0] / group_options.group_factor); var tooltip_text = group_options.group_data[floor_index].map( function(elem) { var date = new Date(elem); var year = (date.getYear() + 1900).toString(); var month = self.month_names[date.getMonth()]; var day = date.getDate(); var hr = date.getHours(); var min = date.getMinutes(); if (min < 10) min = '0'+min; return [day, month, year + ',', hr+':'+min, 'UTC'].join(' '); } ); return tooltip_text.join('
'); }; } /** Plot options for both graph windows **/ var zoom_options = { legend: { show: false }, series: { clickable: true, bars: { show: true, lineWidth: 1, barWidth: self.zoom.group_factor } }, xaxis: { mode: 'time', minTickSize: [1, 'day'], // monthNames: self.month_names, position: 'top' }, yaxis: { show: false }, selection: { mode: 'x' }, grid: { clickable: true, hoverable: true }, tooltip: { show: true, content: tooltip_fn(self.zoom) } }; var overview_options = { legend: { show: false }, series: { clickable: true, bars: { show: true, lineWidth: 1, barWidth: self.static.group_factor }, shadowSize: 0 }, yaxis: { show: false }, xaxis: { mode: 'time', minTickSize: [1, 'day'] }, grid: { clickable: true, hoverable: true, color: '#999' }, selection: { mode: 'x' }, tooltip: { show: true, content: tooltip_fn(self.static) } }; function addPadding(options, range) { var len = range.xaxis.to - range.xaxis.from; return $.extend(true, {}, options, { xaxis: { min: range.xaxis.from - (self.padding * len), max: range.xaxis.to + (self.padding * len) } }); } /** draw the windows **/ var plot = self.plotZoom(addPadding(zoom_options, cal_data_range)); var overview = self.plotStatic( addPadding(overview_options, cal_data_range)); var current_ranges = $.extend(true, {}, cal_data_range); /** * Zoom to the mouse-selected range in the given window * * Args: * plotzone: the jQuery-selected element the zoomed plot should be * in (usually the same as the original 'zoom plot' element) * range: the data range as a dict {xaxis: {from:, to:}, * yaxis:{from:, to:}} */ function zoom(ranges) { current_ranges.xaxis.from = ranges.xaxis.from; current_ranges.xaxis.to = ranges.xaxis.to; self.updateGroupFactorAndData( self.zoomw, self.zoom, current_ranges); self.zoom.plot_data = self.getPlotData(self.zoom.group_data); var zoomedopts = $.extend(true, {}, zoom_options, { xaxis: { min: ranges.xaxis.from, max: ranges.xaxis.to }, series: { bars: {barWidth: self.zoom.group_factor} } }); return self.plotZoom(zoomedopts); } function resetZoomW(plot_options) { self.zoom.group_data = self.static.group_data; self.zoom.plot_data = self.static.plot_data; self.updateGroupFactorAndData(zoomw, self.zoom, cal_data_range); plot = self.plotZoom(addPadding(plot_options, cal_data_range)); } // now connect the two self.zoomw.bind('plotselected', function (event, ranges) { // clamp the zooming to prevent eternal zoom if (ranges.xaxis.to - ranges.xaxis.from < 0.00001) ranges.xaxis.to = ranges.xaxis.from + 0.00001; // do the zooming plot = zoom(ranges); // don't fire event on the overview to prevent eternal loop overview.setSelection(ranges, true); }); self.staticw.bind('plotselected', function (event, ranges) { plot.setSelection(ranges); }); function unbindClick() { self.zoomw.unbind('plotclick'); self.staticw.unbind('plotclick'); } function bindClick() { self.zoomw.bind('plotclick', redirect_to_revision); self.staticw.bind('plotclick', redirect_to_revision); } function redirect_to_revision(event, pos, item) { if (item) { var ts = Math.floor(item.datapoint[0] / 1000); // POSIX ts var url = browse_url + 'ts/' + ts + '/'; window.location.href = url; } } reset.click(function(event) { plot.clearSelection(); overview.clearSelection(); current_ranges = $.extend(true, {}, cal_data_range); resetZoomW(zoom_options); }); $(window).resize(function(event) { /** Update zoom display **/ self.updateGroupFactorAndData(zoomw, self.zoom, current_ranges); self.zoom.plot_data = self.getPlotData(self.zoom.group_data); /** Update static display **/ self.updateGroupFactorAndData(staticw, self.static, cal_data_range); self.static.plot_data = self.getPlotData(self.static.group_data); /** Replot **/ plot = self.plotZoom( addPadding(zoom_options, current_ranges)); overview = self.plotStatic( addPadding(overview_options, cal_data_range)); }); bindClick(); }; }; diff --git a/swh/web/ui/tests/test_backend.py b/swh/web/ui/tests/test_backend.py index 7c266bc6d..5d7c02ffb 100644 --- a/swh/web/ui/tests/test_backend.py +++ b/swh/web/ui/tests/test_backend.py @@ -1,725 +1,725 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from nose.tools import istest from unittest.mock import MagicMock from swh.core import hashutil from swh.web.ui import backend from swh.web.ui.tests import test_app class BackendTestCase(test_app.SWHApiTestCase): @istest def content_get_ko_not_found_1(self): # given sha1_bin = hashutil.hex_to_hash( '456caf10e9535160d90e874b45aa426de762f777') self.storage.content_get = MagicMock(return_value=None) # when actual_content = backend.content_get(sha1_bin) # then self.assertIsNone(actual_content) self.storage.content_get.assert_called_once_with( [sha1_bin]) @istest def content_get_ko_not_found_empty_result(self): # given sha1_bin = hashutil.hex_to_hash( '456caf10e9535160d90e874b45aa426de762f19f') self.storage.content_get = MagicMock(return_value=[]) # when actual_content = backend.content_get(sha1_bin) # then self.assertIsNone(actual_content) self.storage.content_get.assert_called_once_with( [sha1_bin]) @istest def content_get(self): # given sha1_bin = hashutil.hex_to_hash( '123caf10e9535160d90e874b45aa426de762f19f') stub_contents = [{ 'sha1': sha1_bin, 'data': b'binary data', }, {}] self.storage.content_get = MagicMock(return_value=stub_contents) # when actual_content = backend.content_get(sha1_bin) # then self.assertEquals(actual_content, stub_contents[0]) self.storage.content_get.assert_called_once_with( [sha1_bin]) @istest def content_find_ko_no_result(self): # given sha1_bin = hashutil.hex_to_hash( '123caf10e9535160d90e874b45aa426de762f19f') self.storage.content_find = MagicMock(return_value=None) # when actual_lookup = backend.content_find('sha1_git', sha1_bin) # then self.assertIsNone(actual_lookup) self.storage.content_find.assert_called_once_with( {'sha1_git': sha1_bin}) @istest def content_find(self): # given sha1_bin = hashutil.hex_to_hash( '456caf10e9535160d90e874b45aa426de762f19f') self.storage.content_find = MagicMock(return_value=(1, 2, 3)) # when actual_content = backend.content_find('sha1', sha1_bin) # then self.assertEquals(actual_content, (1, 2, 3)) # check the function has been called with parameters self.storage.content_find.assert_called_with({'sha1': sha1_bin}) @istest def content_find_provenance_ko_no_result(self): # given sha1_bin = hashutil.hex_to_hash( '123caf10e9535160d90e874b45aa426de762f19f') self.storage.content_find_provenance = MagicMock( return_value=(x for x in [])) # when actual_lookup = backend.content_find_provenance('sha1_git', sha1_bin) # then self.assertEquals(list(actual_lookup), []) self.storage.content_find_provenance.assert_called_once_with( {'sha1_git': sha1_bin}) @istest def content_find_provenance(self): # given sha1_bin = hashutil.hex_to_hash( '456caf10e9535160d90e874b45aa426de762f19f') self.storage.content_find_provenance = MagicMock( return_value=(x for x in (1, 2, 3))) # when actual_content = backend.content_find_provenance('sha1', sha1_bin) # then self.assertEquals(list(actual_content), [1, 2, 3]) # check the function has been called with parameters self.storage.content_find_provenance.assert_called_with( {'sha1': sha1_bin}) @istest def content_missing_per_sha1_none(self): # given sha1s_bin = [hashutil.hex_to_hash( '456caf10e9535160d90e874b45aa426de762f19f'), hashutil.hex_to_hash( '745bab676c8f3cec8016e0c39ea61cf57e518865' )] self.storage.content_missing_per_sha1 = MagicMock(return_value=[]) # when actual_content = backend.content_missing_per_sha1(sha1s_bin) # then self.assertEquals(actual_content, []) self.storage.content_missing_per_sha1.assert_called_with(sha1s_bin) @istest def content_missing_per_sha1_some(self): # given sha1s_bin = [hashutil.hex_to_hash( '456caf10e9535160d90e874b45aa426de762f19f'), hashutil.hex_to_hash( '745bab676c8f3cec8016e0c39ea61cf57e518865' )] self.storage.content_missing_per_sha1 = MagicMock(return_value=[ hashutil.hex_to_hash( '745bab676c8f3cec8016e0c39ea61cf57e518865' )]) # when actual_content = backend.content_missing_per_sha1(sha1s_bin) # then self.assertEquals(actual_content, [hashutil.hex_to_hash( '745bab676c8f3cec8016e0c39ea61cf57e518865' )]) self.storage.content_missing_per_sha1.assert_called_with(sha1s_bin) @istest def origin_get_by_id(self): # given self.storage.origin_get = MagicMock(return_value={ 'id': 'origin-id', 'lister': 'uuid-lister', 'project': 'uuid-project', 'url': 'ftp://some/url/to/origin', 'type': 'ftp'}) # when actual_origin = backend.origin_get({'id': 'origin-id'}) # then self.assertEqual(actual_origin, {'id': 'origin-id', 'lister': 'uuid-lister', 'project': 'uuid-project', 'url': 'ftp://some/url/to/origin', 'type': 'ftp'}) self.storage.origin_get.assert_called_with({'id': 'origin-id'}) @istest def origin_get_by_type_url(self): # given self.storage.origin_get = MagicMock(return_value={ 'id': 'origin-id', 'lister': 'uuid-lister', 'project': 'uuid-project', 'url': 'ftp://some/url/to/origin', 'type': 'ftp'}) # when actual_origin = backend.origin_get({'type': 'ftp', 'url': 'ftp://some/url/to/origin'}) # then self.assertEqual(actual_origin, {'id': 'origin-id', 'lister': 'uuid-lister', 'project': 'uuid-project', 'url': 'ftp://some/url/to/origin', 'type': 'ftp'}) self.storage.origin_get.assert_called_with( {'type': 'ftp', 'url': 'ftp://some/url/to/origin'}) @istest def person_get(self): # given self.storage.person_get = MagicMock(return_value=[{ 'id': 'person-id', 'name': 'blah'}]) # when actual_person = backend.person_get('person-id') # then self.assertEqual(actual_person, {'id': 'person-id', 'name': 'blah'}) self.storage.person_get.assert_called_with(['person-id']) @istest def directory_get_not_found(self): # given sha1_bin = hashutil.hex_to_hash( '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') self.storage.directory_get = MagicMock(return_value=None) # when actual_directory = backend.directory_get(sha1_bin) # then self.assertEquals(actual_directory, None) self.storage.directory_get.assert_called_with([sha1_bin]) @istest def directory_get(self): # given sha1_bin = hashutil.hex_to_hash( '51f71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') sha1_bin2 = hashutil.hex_to_hash( '62071b8614fcd89ccd17ca2b1d9e66c5b00a6d03') stub_dir = {'id': sha1_bin, 'revision': b'sha1-blah'} stub_dir2 = {'id': sha1_bin2, 'revision': b'sha1-foobar'} self.storage.directory_get = MagicMock(return_value=[stub_dir, stub_dir2]) # when actual_directory = backend.directory_get(sha1_bin) # then self.assertEquals(actual_directory, stub_dir) self.storage.directory_get.assert_called_with([sha1_bin]) @istest def directory_ls_empty_result(self): # given sha1_bin = hashutil.hex_to_hash( '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') self.storage.directory_ls = MagicMock(return_value=[]) # when actual_directory = backend.directory_ls(sha1_bin) # then self.assertEquals(actual_directory, []) self.storage.directory_ls.assert_called_with(sha1_bin, False) @istest def directory_ls(self): # given sha1_bin = hashutil.hex_to_hash( '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') stub_dir_entries = [{ 'sha1': hashutil.hex_to_hash('5c6f0e2750f48fa0bd0c4cf5976ba0b9e0' '2ebda5'), 'sha256': hashutil.hex_to_hash('39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926'), 'sha1_git': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'target': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'dir_id': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'name': b'bob', 'type': 10, }] self.storage.directory_ls = MagicMock( return_value=stub_dir_entries) actual_directory = backend.directory_ls(sha1_bin, recursive=True) # then self.assertIsNotNone(actual_directory) self.assertEqual(list(actual_directory), stub_dir_entries) self.storage.directory_ls.assert_called_with(sha1_bin, True) @istest def release_get_not_found(self): # given sha1_bin = hashutil.hex_to_hash( '65a55bbdf3629f916219feb3dcc7393ded1bc8db') self.storage.release_get = MagicMock(return_value=[]) # when actual_release = backend.release_get(sha1_bin) # then self.assertIsNone(actual_release) self.storage.release_get.assert_called_with([sha1_bin]) @istest def release_get(self): # given sha1_bin = hashutil.hex_to_hash( '65a55bbdf3629f916219feb3dcc7393ded1bc8db') stub_releases = [{ 'id': sha1_bin, 'target': None, 'date': datetime.datetime(2015, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc), 'name': b'v0.0.1', 'message': b'synthetic release', 'synthetic': True, }] self.storage.release_get = MagicMock(return_value=stub_releases) # when actual_release = backend.release_get(sha1_bin) # then self.assertEqual(actual_release, stub_releases[0]) self.storage.release_get.assert_called_with([sha1_bin]) @istest def revision_get_by_not_found(self): # given self.storage.revision_get_by = MagicMock(return_value=[]) # when actual_revision = backend.revision_get_by(10, 'master', 'ts2') # then self.assertIsNone(actual_revision) self.storage.revision_get_by.assert_called_with(10, 'master', timestamp='ts2', limit=1) @istest def revision_get_by(self): # given self.storage.revision_get_by = MagicMock(return_value=[{'id': 1}]) # when actual_revisions = backend.revision_get_by(100, 'dev', 'ts') # then self.assertEquals(actual_revisions, {'id': 1}) self.storage.revision_get_by.assert_called_with(100, 'dev', timestamp='ts', limit=1) @istest def revision_get_not_found(self): # given sha1_bin = hashutil.hex_to_hash( '18d8be353ed3480476f032475e7c233eff7371d5') self.storage.revision_get = MagicMock(return_value=[]) # when actual_revision = backend.revision_get(sha1_bin) # then self.assertIsNone(actual_revision) self.storage.revision_get.assert_called_with([sha1_bin]) @istest def revision_get(self): # given sha1_bin = hashutil.hex_to_hash( '18d8be353ed3480476f032475e7c233eff7371d5') stub_revisions = [{ 'id': sha1_bin, 'directory': hashutil.hex_to_hash( '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'), 'author': { 'name': b'bill & boule', 'email': b'bill@boule.org', }, 'committer': { 'name': b'boule & bill', 'email': b'boule@bill.org', }, 'message': b'elegant fix for bug 31415957', 'date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'date_offset': 0, 'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'committer_date_offset': 0, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], }] self.storage.revision_get = MagicMock(return_value=stub_revisions) # when actual_revision = backend.revision_get(sha1_bin) # then self.assertEqual(actual_revision, stub_revisions[0]) self.storage.revision_get.assert_called_with([sha1_bin]) @istest def revision_get_multiple(self): # given sha1_bin = hashutil.hex_to_hash( '18d8be353ed3480476f032475e7c233eff7371d5') sha1_other = hashutil.hex_to_hash( 'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc') stub_revisions = [ { 'id': sha1_bin, 'directory': hashutil.hex_to_hash( '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'), 'author': { 'name': b'bill & boule', 'email': b'bill@boule.org', }, 'committer': { 'name': b'boule & bill', 'email': b'boule@bill.org', }, 'message': b'elegant fix for bug 31415957', 'date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'date_offset': 0, 'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'committer_date_offset': 0, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], }, { 'id': sha1_other, 'directory': hashutil.hex_to_hash( '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'), 'author': { 'name': b'name', 'email': b'name@surname.org', }, 'committer': { 'name': b'name', 'email': b'name@surname.org', }, 'message': b'ugly fix for bug 42', 'date': datetime.datetime(2000, 1, 12, 5, 23, 54), 'date_offset': 0, 'committer_date': datetime.datetime(2000, 1, 12, 5, 23, 54), 'committer_date_offset': 0, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], } ] self.storage.revision_get = MagicMock( return_value=stub_revisions) # when actual_revision = backend.revision_get_multiple([sha1_bin, sha1_other]) # then self.assertEqual(actual_revision, stub_revisions) self.storage.revision_get.assert_called_with( [sha1_bin, sha1_other]) @istest def revision_get_multiple_none_found(self): # given sha1_bin = hashutil.hex_to_hash( '18d8be353ed3480476f032475e7c233eff7371d5') sha1_other = hashutil.hex_to_hash( 'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc') self.storage.revision_get = MagicMock( return_value=[]) # when actual_revision = backend.revision_get_multiple([sha1_bin, sha1_other]) # then self.assertEqual(actual_revision, []) self.storage.revision_get.assert_called_with( [sha1_bin, sha1_other]) @istest def revision_log(self): # given sha1_bin = hashutil.hex_to_hash( '28d8be353ed3480476f032475e7c233eff7371d5') stub_revision_log = [{ 'id': sha1_bin, 'directory': hashutil.hex_to_hash( '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'), 'author': { 'name': b'bill & boule', 'email': b'bill@boule.org', }, 'committer': { 'name': b'boule & bill', 'email': b'boule@bill.org', }, 'message': b'elegant fix for bug 31415957', 'date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'date_offset': 0, 'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'committer_date_offset': 0, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], }] self.storage.revision_log = MagicMock(return_value=stub_revision_log) # when actual_revision = backend.revision_log(sha1_bin, limit=1) # then self.assertEqual(list(actual_revision), stub_revision_log) self.storage.revision_log.assert_called_with([sha1_bin], 1) @istest def revision_log_by(self): # given sha1_bin = hashutil.hex_to_hash( '28d8be353ed3480476f032475e7c233eff7371d5') stub_revision_log = [{ 'id': sha1_bin, 'directory': hashutil.hex_to_hash( '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'), 'author': { 'name': b'bill & boule', 'email': b'bill@boule.org', }, 'committer': { 'name': b'boule & bill', 'email': b'boule@bill.org', }, 'message': b'elegant fix for bug 31415957', 'date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'date_offset': 0, 'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'committer_date_offset': 0, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], }] self.storage.revision_log_by = MagicMock( return_value=stub_revision_log) # when actual_log = backend.revision_log_by(1, 'refs/heads/master', None, limit=1) # then self.assertEqual(actual_log, stub_revision_log) self.storage.revision_log.assert_called_with([sha1_bin], 1) @istest def revision_log_by_norev(self): # given sha1_bin = hashutil.hex_to_hash( '28d8be353ed3480476f032475e7c233eff7371d5') self.storage.revision_log_by = MagicMock(return_value=None) # when actual_log = backend.revision_log_by(1, 'refs/heads/master', None, limit=1) # then self.assertEqual(actual_log, None) self.storage.revision_log.assert_called_with([sha1_bin], 1) @istest def stat_counters(self): # given input_stats = { "content": 1770830, "directory": 211683, "directory_entry_dir": 209167, "directory_entry_file": 1807094, "directory_entry_rev": 0, "entity": 0, "entity_history": 0, "occurrence": 0, "occurrence_history": 19600, "origin": 1096, "person": 0, "release": 8584, "revision": 7792, "revision_history": 0, "skipped_content": 0 } self.storage.stat_counters = MagicMock(return_value=input_stats) # when actual_stats = backend.stat_counters() # then expected_stats = input_stats self.assertEqual(actual_stats, expected_stats) self.storage.stat_counters.assert_called_with() @istest - def stat_origin_visits(self): + def lookup_origin_visits(self): # given expected_dates = [ { 'date': datetime.datetime( 2015, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc), 'origin': 1, 'visit': 1 }, { 'date': datetime.datetime( 2013, 7, 1, 20, 0, 0, tzinfo=datetime.timezone.utc), 'origin': 1, 'visit': 2 }, { 'date': datetime.datetime( 2015, 1, 1, 21, 0, 0, tzinfo=datetime.timezone.utc), 'origin': 1, 'visit': 3 } ] self.storage.origin_visit_get = MagicMock(return_value=expected_dates) # when - actual_dates = backend.stat_origin_visits(5) + actual_dates = backend.lookup_origin_visits(5) # then self.assertEqual(actual_dates, expected_dates) self.storage.origin_visit_get.assert_called_with(5) @istest def directory_entry_get_by_path(self): # given stub_dir_entry = {'id': b'dir-id', 'type': 'dir', 'name': b'some/path/foo'} self.storage.directory_entry_get_by_path = MagicMock( return_value=stub_dir_entry) # when actual_dir_entry = backend.directory_entry_get_by_path(b'dir-sha1', 'some/path/foo') self.assertEquals(actual_dir_entry, stub_dir_entry) self.storage.directory_entry_get_by_path.assert_called_once_with( b'dir-sha1', [b'some', b'path', b'foo']) @istest def entity_get(self): # given stub_entities = [{'uuid': 'e8c3fc2e-a932-4fd7-8f8e-c40645eb35a7', 'parent': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2'}, {'uuid': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2', 'parent': None}] self.storage.entity_get = MagicMock(return_value=stub_entities) # when actual_entities = backend.entity_get( 'e8c3fc2e-a932-4fd7-8f8e-c40645eb35a7') # then self.assertEquals(actual_entities, stub_entities) self.storage.entity_get.assert_called_once_with( 'e8c3fc2e-a932-4fd7-8f8e-c40645eb35a7') diff --git a/swh/web/ui/tests/test_service.py b/swh/web/ui/tests/test_service.py index 21c51ac09..a6d3e5602 100644 --- a/swh/web/ui/tests/test_service.py +++ b/swh/web/ui/tests/test_service.py @@ -1,1736 +1,1736 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from nose.tools import istest from unittest.mock import MagicMock, patch, call from swh.core.hashutil import hex_to_hash, hash_to_hex from swh.web.ui import service from swh.web.ui.exc import BadInputExc, NotFoundExc from swh.web.ui.tests import test_app class ServiceTestCase(test_app.SWHApiTestCase): def setUp(self): self.SHA1_SAMPLE = '18d8be353ed3480476f032475e7c233eff7371d5' self.SHA1_SAMPLE_BIN = hex_to_hash(self.SHA1_SAMPLE) self.SHA256_SAMPLE = ('39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926') self.SHA256_SAMPLE_BIN = hex_to_hash(self.SHA256_SAMPLE) self.SHA1GIT_SAMPLE = '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03' self.SHA1GIT_SAMPLE_BIN = hex_to_hash(self.SHA1GIT_SAMPLE) self.DIRECTORY_ID = '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6' self.DIRECTORY_ID_BIN = hex_to_hash(self.DIRECTORY_ID) self.AUTHOR_ID_BIN = { 'name': b'author', 'email': b'author@company.org', } self.AUTHOR_ID = { 'name': 'author', 'email': 'author@company.org', } self.COMMITTER_ID_BIN = { 'name': b'committer', 'email': b'committer@corp.org', } self.COMMITTER_ID = { 'name': 'committer', 'email': 'committer@corp.org', } self.SAMPLE_DATE_RAW = { 'timestamp': datetime.datetime( 2000, 1, 17, 11, 23, 54, tzinfo=datetime.timezone.utc, ).timestamp(), 'offset': 0, 'negative_utc': False, } self.SAMPLE_DATE = '2000-01-17T11:23:54+00:00' self.SAMPLE_MESSAGE_BIN = b'elegant fix for bug 31415957' self.SAMPLE_MESSAGE = 'elegant fix for bug 31415957' self.SAMPLE_REVISION = { 'id': self.SHA1_SAMPLE, 'directory': self.DIRECTORY_ID, 'author': self.AUTHOR_ID, 'committer': self.COMMITTER_ID, 'message': self.SAMPLE_MESSAGE, 'date': self.SAMPLE_DATE, 'committer_date': self.SAMPLE_DATE, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], 'merge': False } self.SAMPLE_REVISION_RAW = { 'id': self.SHA1_SAMPLE_BIN, 'directory': self.DIRECTORY_ID_BIN, 'author': self.AUTHOR_ID_BIN, 'committer': self.COMMITTER_ID_BIN, 'message': self.SAMPLE_MESSAGE_BIN, 'date': self.SAMPLE_DATE_RAW, 'committer_date': self.SAMPLE_DATE_RAW, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], } self.SAMPLE_CONTENT = { 'sha1': self.SHA1_SAMPLE, 'sha256': self.SHA256_SAMPLE, 'sha1_git': self.SHA1GIT_SAMPLE, 'length': 190, 'status': 'absent' } self.SAMPLE_CONTENT_RAW = { 'sha1': self.SHA1_SAMPLE_BIN, 'sha256': self.SHA256_SAMPLE_BIN, 'sha1_git': self.SHA1GIT_SAMPLE_BIN, 'length': 190, 'status': 'hidden' } @patch('swh.web.ui.service.backend') @istest def lookup_multiple_hashes_ball_missing(self, mock_backend): # given mock_backend.content_missing_per_sha1 = MagicMock(return_value=[]) # when actual_lookup = service.lookup_multiple_hashes( [{'filename': 'a', 'sha1': '456caf10e9535160d90e874b45aa426de762f19f'}, {'filename': 'b', 'sha1': '745bab676c8f3cec8016e0c39ea61cf57e518865'}]) # then self.assertEquals(actual_lookup, [ {'filename': 'a', 'sha1': '456caf10e9535160d90e874b45aa426de762f19f', 'found': True}, {'filename': 'b', 'sha1': '745bab676c8f3cec8016e0c39ea61cf57e518865', 'found': True} ]) @patch('swh.web.ui.service.backend') @istest def lookup_multiple_hashes_some_missing(self, mock_backend): # given mock_backend.content_missing_per_sha1 = MagicMock(return_value=[ hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f') ]) # when actual_lookup = service.lookup_multiple_hashes( [{'filename': 'a', 'sha1': '456caf10e9535160d90e874b45aa426de762f19f'}, {'filename': 'b', 'sha1': '745bab676c8f3cec8016e0c39ea61cf57e518865'}]) # then self.assertEquals(actual_lookup, [ {'filename': 'a', 'sha1': '456caf10e9535160d90e874b45aa426de762f19f', 'found': False}, {'filename': 'b', 'sha1': '745bab676c8f3cec8016e0c39ea61cf57e518865', 'found': True} ]) @patch('swh.web.ui.service.backend') @istest def lookup_hash_does_not_exist(self, mock_backend): # given mock_backend.content_find = MagicMock(return_value=None) # when actual_lookup = service.lookup_hash( 'sha1_git:123caf10e9535160d90e874b45aa426de762f19f') # then self.assertEquals({'found': None, 'algo': 'sha1_git'}, actual_lookup) # check the function has been called with parameters mock_backend.content_find.assert_called_with( 'sha1_git', hex_to_hash('123caf10e9535160d90e874b45aa426de762f19f')) @patch('swh.web.ui.service.backend') @istest def lookup_hash_exist(self, mock_backend): # given stub_content = { 'sha1': hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f') } mock_backend.content_find = MagicMock(return_value=stub_content) # when actual_lookup = service.lookup_hash( 'sha1:456caf10e9535160d90e874b45aa426de762f19f') # then self.assertEquals({'found': stub_content, 'algo': 'sha1'}, actual_lookup) mock_backend.content_find.assert_called_with( 'sha1', hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f'), ) @patch('swh.web.ui.service.backend') @istest def search_hash_does_not_exist(self, mock_backend): # given mock_backend.content_find = MagicMock(return_value=None) # when actual_lookup = service.search_hash( 'sha1_git:123caf10e9535160d90e874b45aa426de762f19f') # then self.assertEquals({'found': False}, actual_lookup) # check the function has been called with parameters mock_backend.content_find.assert_called_with( 'sha1_git', hex_to_hash('123caf10e9535160d90e874b45aa426de762f19f')) @patch('swh.web.ui.service.backend') @istest def search_hash_exist(self, mock_backend): # given stub_content = { 'sha1': hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f') } mock_backend.content_find = MagicMock(return_value=stub_content) # when actual_lookup = service.search_hash( 'sha1:456caf10e9535160d90e874b45aa426de762f19f') # then self.assertEquals({'found': True}, actual_lookup) mock_backend.content_find.assert_called_with( 'sha1', hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f'), ) @patch('swh.web.ui.service.backend') @istest def lookup_content_provenance(self, mock_backend): # given mock_backend.content_find_provenance = MagicMock( return_value=(p for p in [{ 'content': hex_to_hash( '123caf10e9535160d90e874b45aa426de762f19f'), 'revision': hex_to_hash( '456caf10e9535160d90e874b45aa426de762f19f'), 'origin': 100, 'visit': 1, 'path': b'octavio-3.4.0/octave.html/doc_002dS_005fISREG.html' }])) expected_provenances = [{ 'content': '123caf10e9535160d90e874b45aa426de762f19f', 'revision': '456caf10e9535160d90e874b45aa426de762f19f', 'origin': 100, 'visit': 1, 'path': 'octavio-3.4.0/octave.html/doc_002dS_005fISREG.html' }] # when actual_provenances = service.lookup_content_provenance( 'sha1_git:123caf10e9535160d90e874b45aa426de762f19f') # then self.assertEqual(list(actual_provenances), expected_provenances) mock_backend.content_find_provenance.assert_called_with( 'sha1_git', hex_to_hash('123caf10e9535160d90e874b45aa426de762f19f')) @patch('swh.web.ui.service.backend') @istest def lookup_content_provenance_not_found(self, mock_backend): # given mock_backend.content_find_provenance = MagicMock(return_value=None) # when actual_provenances = service.lookup_content_provenance( 'sha1_git:456caf10e9535160d90e874b45aa426de762f19f') # then self.assertIsNone(actual_provenances) mock_backend.content_find_provenance.assert_called_with( 'sha1_git', hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')) @patch('swh.web.ui.service.backend') @istest def stat_counters(self, mock_backend): # given input_stats = { "content": 1770830, "directory": 211683, "directory_entry_dir": 209167, "directory_entry_file": 1807094, "directory_entry_rev": 0, "entity": 0, "entity_history": 0, "occurrence": 0, "occurrence_history": 19600, "origin": 1096, "person": 0, "release": 8584, "revision": 7792, "revision_history": 0, "skipped_content": 0 } mock_backend.stat_counters = MagicMock(return_value=input_stats) # when actual_stats = service.stat_counters() # then expected_stats = input_stats self.assertEqual(actual_stats, expected_stats) mock_backend.stat_counters.assert_called_with() @patch('swh.web.ui.service.backend') @istest - def stat_origin_visits(self, mock_backend): + def lookup_origin_visits(self, mock_backend): # given stub_result = [ { 'date': datetime.datetime( 2015, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc), 'origin': 1, 'visit': 1 }, { 'date': datetime.datetime( 2013, 7, 1, 20, 0, 0, tzinfo=datetime.timezone.utc), 'origin': 1, 'visit': 2 }, { 'date': datetime.datetime( 2015, 1, 1, 21, 0, 0, tzinfo=datetime.timezone.utc), 'origin': 1, 'visit': 3 } ] - mock_backend.stat_origin_visits.return_value = stub_result + mock_backend.lookup_origin_visits.return_value = stub_result # when expected_dates = [ { 'date': datetime.datetime( 2015, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc).timestamp(), 'origin': 1, 'visit': 1 }, { 'date': datetime.datetime( 2013, 7, 1, 20, 0, 0, tzinfo=datetime.timezone.utc).timestamp(), 'origin': 1, 'visit': 2 }, { 'date': datetime.datetime( 2015, 1, 1, 21, 0, 0, tzinfo=datetime.timezone.utc).timestamp(), 'origin': 1, 'visit': 3 } ] - actual_dates = service.stat_origin_visits(6) + actual_dates = service.lookup_origin_visits(6) # then self.assertEqual(expected_dates, list(actual_dates)) - mock_backend.stat_origin_visits.assert_called_once_with(6) + mock_backend.lookup_origin_visits.assert_called_once_with(6) @patch('swh.web.ui.service.backend') @istest def lookup_origin(self, mock_backend): # given mock_backend.origin_get = MagicMock(return_value={ 'id': 'origin-id', 'lister': 'uuid-lister', 'project': 'uuid-project', 'url': 'ftp://some/url/to/origin', 'type': 'ftp'}) # when actual_origin = service.lookup_origin({'id': 'origin-id'}) # then self.assertEqual(actual_origin, {'id': 'origin-id', 'lister': 'uuid-lister', 'project': 'uuid-project', 'url': 'ftp://some/url/to/origin', 'type': 'ftp'}) mock_backend.origin_get.assert_called_with({'id': 'origin-id'}) @patch('swh.web.ui.service.backend') @istest def lookup_release_ko_id_checksum_not_ok_because_not_a_sha1(self, mock_backend): # given mock_backend.release_get = MagicMock() with self.assertRaises(BadInputExc) as cm: # when service.lookup_release('not-a-sha1') self.assertIn('invalid checksum', cm.exception.args[0]) mock_backend.release_get.called = False @patch('swh.web.ui.service.backend') @istest def lookup_release_ko_id_checksum_ok_but_not_a_sha1(self, mock_backend): # given mock_backend.release_get = MagicMock() # when with self.assertRaises(BadInputExc) as cm: service.lookup_release( '13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f62d4daf5' '1aea892abe') self.assertIn('sha1_git supported', cm.exception.args[0]) mock_backend.release_get.called = False @patch('swh.web.ui.service.backend') @istest def lookup_directory_with_path_not_found(self, mock_backend): # given mock_backend.lookup_directory_with_path = MagicMock(return_value=None) sha1_git = '65a55bbdf3629f916219feb3dcc7393ded1bc8db' # when actual_directory = mock_backend.lookup_directory_with_path( sha1_git, 'some/path/here') self.assertIsNone(actual_directory) @patch('swh.web.ui.service.backend') @istest def lookup_directory_with_path_found(self, mock_backend): # given sha1_git = '65a55bbdf3629f916219feb3dcc7393ded1bc8db' entry = {'id': 'dir-id', 'type': 'dir', 'name': 'some/path/foo'} mock_backend.lookup_directory_with_path = MagicMock(return_value=entry) # when actual_directory = mock_backend.lookup_directory_with_path( sha1_git, 'some/path/here') self.assertEqual(entry, actual_directory) @patch('swh.web.ui.service.backend') @istest def lookup_release(self, mock_backend): # given mock_backend.release_get = MagicMock(return_value={ 'id': hex_to_hash('65a55bbdf3629f916219feb3dcc7393ded1bc8db'), 'target': None, 'date': { 'timestamp': datetime.datetime( 2015, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': True, }, 'name': b'v0.0.1', 'message': b'synthetic release', 'synthetic': True, }) # when actual_release = service.lookup_release( '65a55bbdf3629f916219feb3dcc7393ded1bc8db') # then self.assertEqual(actual_release, { 'id': '65a55bbdf3629f916219feb3dcc7393ded1bc8db', 'target': None, 'date': '2015-01-01T22:00:00-00:00', 'name': 'v0.0.1', 'message': 'synthetic release', 'synthetic': True, }) mock_backend.release_get.assert_called_with( hex_to_hash('65a55bbdf3629f916219feb3dcc7393ded1bc8db')) @istest def lookup_revision_with_context_ko_not_a_sha1_1(self): # given sha1_git = '13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f62d4' \ 'daf51aea892abe' sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db' # when with self.assertRaises(BadInputExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Only sha1_git is supported', cm.exception.args[0]) @istest def lookup_revision_with_context_ko_not_a_sha1_2(self): # given sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db' sha1_git = '13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f6' \ '2d4daf51aea892abe' # when with self.assertRaises(BadInputExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Only sha1_git is supported', cm.exception.args[0]) @patch('swh.web.ui.service.backend') @istest def lookup_revision_with_context_ko_sha1_git_does_not_exist( self, mock_backend): # given sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db' sha1_git = '777777bdf3629f916219feb3dcc7393ded1bc8db' sha1_git_bin = hex_to_hash(sha1_git) mock_backend.revision_get.return_value = None # when with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Revision 777777bdf3629f916219feb3dcc7393ded1bc8db' ' not found', cm.exception.args[0]) mock_backend.revision_get.assert_called_once_with( sha1_git_bin) @patch('swh.web.ui.service.backend') @istest def lookup_revision_with_context_ko_root_sha1_git_does_not_exist( self, mock_backend): # given sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db' sha1_git = '777777bdf3629f916219feb3dcc7393ded1bc8db' sha1_git_root_bin = hex_to_hash(sha1_git_root) sha1_git_bin = hex_to_hash(sha1_git) mock_backend.revision_get.side_effect = ['foo', None] # when with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Revision 65a55bbdf3629f916219feb3dcc7393ded1bc8db' ' not found', cm.exception.args[0]) mock_backend.revision_get.assert_has_calls([call(sha1_git_bin), call(sha1_git_root_bin)]) @patch('swh.web.ui.service.backend') @patch('swh.web.ui.service.query') @istest def lookup_revision_with_context(self, mock_query, mock_backend): # given sha1_git_root = '666' sha1_git = '883' sha1_git_root_bin = b'666' sha1_git_bin = b'883' sha1_git_root_dict = { 'id': sha1_git_root_bin, 'parents': [b'999'], } sha1_git_dict = { 'id': sha1_git_bin, 'parents': [], 'directory': b'278', } stub_revisions = [ sha1_git_root_dict, { 'id': b'999', 'parents': [b'777', b'883', b'888'], }, { 'id': b'777', 'parents': [b'883'], }, sha1_git_dict, { 'id': b'888', 'parents': [b'889'], }, { 'id': b'889', 'parents': [], }, ] # inputs ok mock_query.parse_hash_with_algorithms_or_throws.side_effect = [ ('sha1', sha1_git_bin), ('sha1', sha1_git_root_bin) ] # lookup revision first 883, then 666 (both exists) mock_backend.revision_get.side_effect = [ sha1_git_dict, sha1_git_root_dict ] mock_backend.revision_log = MagicMock( return_value=stub_revisions) # when actual_revision = service.lookup_revision_with_context( sha1_git_root, sha1_git) # then self.assertEquals(actual_revision, { 'id': hash_to_hex(sha1_git_bin), 'parents': [], 'children': [hash_to_hex(b'999'), hash_to_hex(b'777')], 'directory': hash_to_hex(b'278'), 'merge': False }) mock_query.parse_hash_with_algorithms_or_throws.assert_has_calls( [call(sha1_git, ['sha1'], 'Only sha1_git is supported.'), call(sha1_git_root, ['sha1'], 'Only sha1_git is supported.')]) mock_backend.revision_log.assert_called_with( sha1_git_root_bin, 100) @patch('swh.web.ui.service.backend') @patch('swh.web.ui.service.query') @istest def lookup_revision_with_context_sha1_git_root_already_retrieved_as_dict( self, mock_query, mock_backend): # given sha1_git = '883' sha1_git_root_bin = b'666' sha1_git_bin = b'883' sha1_git_root_dict = { 'id': sha1_git_root_bin, 'parents': [b'999'], } sha1_git_dict = { 'id': sha1_git_bin, 'parents': [], 'directory': b'278', } stub_revisions = [ sha1_git_root_dict, { 'id': b'999', 'parents': [b'777', b'883', b'888'], }, { 'id': b'777', 'parents': [b'883'], }, sha1_git_dict, { 'id': b'888', 'parents': [b'889'], }, { 'id': b'889', 'parents': [], }, ] # inputs ok mock_query.parse_hash_with_algorithms_or_throws.return_value = ( 'sha1', sha1_git_bin) # lookup only on sha1 mock_backend.revision_get.return_value = sha1_git_dict mock_backend.revision_log.return_value = stub_revisions # when actual_revision = service.lookup_revision_with_context( {'id': sha1_git_root_bin}, sha1_git) # then self.assertEquals(actual_revision, { 'id': hash_to_hex(sha1_git_bin), 'parents': [], 'children': [hash_to_hex(b'999'), hash_to_hex(b'777')], 'directory': hash_to_hex(b'278'), 'merge': False }) mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with( # noqa sha1_git, ['sha1'], 'Only sha1_git is supported.') mock_backend.revision_get.assert_called_once_with(sha1_git_bin) mock_backend.revision_log.assert_called_with( sha1_git_root_bin, 100) @patch('swh.web.ui.service.backend') @patch('swh.web.ui.service.query') @istest def lookup_directory_with_revision_ko_revision_not_found(self, mock_query, mock_backend): # given mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1', b'123') mock_backend.revision_get.return_value = None # when with self.assertRaises(NotFoundExc) as cm: service.lookup_directory_with_revision('123') self.assertIn('Revision 123 not found', cm.exception.args[0]) mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with ('123', ['sha1'], 'Only sha1_git is supported.') mock_backend.revision_get.assert_called_once_with(b'123') @patch('swh.web.ui.service.backend') @patch('swh.web.ui.service.query') @istest def lookup_directory_with_revision_ko_revision_with_path_to_nowhere( self, mock_query, mock_backend): # given mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1', b'123') dir_id = b'dir-id-as-sha1' mock_backend.revision_get.return_value = { 'directory': dir_id, } mock_backend.directory_entry_get_by_path.return_value = None # when with self.assertRaises(NotFoundExc) as cm: service.lookup_directory_with_revision( '123', 'path/to/something/unknown') self.assertIn("Directory/File 'path/to/something/unknown' " + "pointed to by revision 123 not found", cm.exception.args[0]) mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with ('123', ['sha1'], 'Only sha1_git is supported.') mock_backend.revision_get.assert_called_once_with(b'123') mock_backend.directory_entry_get_by_path.assert_called_once_with( b'dir-id-as-sha1', 'path/to/something/unknown') @patch('swh.web.ui.service.backend') @patch('swh.web.ui.service.query') @istest def lookup_directory_with_revision_ko_type_not_implemented( self, mock_query, mock_backend): # given mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1', b'123') dir_id = b'dir-id-as-sha1' mock_backend.revision_get.return_value = { 'directory': dir_id, } mock_backend.directory_entry_get_by_path.return_value = { 'type': 'rev', 'name': b'some/path/to/rev', 'target': b'456' } stub_content = { 'id': b'12', 'type': 'file' } mock_backend.content_get.return_value = stub_content # when with self.assertRaises(NotImplementedError) as cm: service.lookup_directory_with_revision( '123', 'some/path/to/rev') self.assertIn("Entity of type 'rev' not implemented.", cm.exception.args[0]) # then mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with ('123', ['sha1'], 'Only sha1_git is supported.') mock_backend.revision_get.assert_called_once_with(b'123') mock_backend.directory_entry_get_by_path.assert_called_once_with( b'dir-id-as-sha1', 'some/path/to/rev') @patch('swh.web.ui.service.backend') @patch('swh.web.ui.service.query') @istest def lookup_directory_with_revision_revision_without_path(self, mock_query, mock_backend): # given mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1', b'123') dir_id = b'dir-id-as-sha1' mock_backend.revision_get.return_value = { 'directory': dir_id, } stub_dir_entries = [{ 'id': b'123', 'type': 'dir' }, { 'id': b'456', 'type': 'file' }] mock_backend.directory_ls.return_value = stub_dir_entries # when actual_directory_entries = service.lookup_directory_with_revision( '123') self.assertEqual(actual_directory_entries['type'], 'dir') self.assertEqual(list(actual_directory_entries['content']), stub_dir_entries) mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with ('123', ['sha1'], 'Only sha1_git is supported.') mock_backend.revision_get.assert_called_once_with(b'123') mock_backend.directory_ls.assert_called_once_with(dir_id) @patch('swh.web.ui.service.backend') @patch('swh.web.ui.service.query') @istest def lookup_directory_with_revision_revision_with_path_to_dir(self, mock_query, mock_backend): # given mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1', b'123') dir_id = b'dir-id-as-sha1' mock_backend.revision_get.return_value = { 'directory': dir_id, } stub_dir_entries = [{ 'id': b'12', 'type': 'dir' }, { 'id': b'34', 'type': 'file' }] mock_backend.directory_entry_get_by_path.return_value = { 'type': 'dir', 'name': b'some/path', 'target': b'456' } mock_backend.directory_ls.return_value = stub_dir_entries # when actual_directory_entries = service.lookup_directory_with_revision( '123', 'some/path') self.assertEqual(actual_directory_entries['type'], 'dir') self.assertEqual(actual_directory_entries['revision'], '123') self.assertEqual(actual_directory_entries['path'], 'some/path') self.assertEqual(list(actual_directory_entries['content']), stub_dir_entries) mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with ('123', ['sha1'], 'Only sha1_git is supported.') mock_backend.revision_get.assert_called_once_with(b'123') mock_backend.directory_entry_get_by_path.assert_called_once_with( dir_id, 'some/path') mock_backend.directory_ls.assert_called_once_with(b'456') @patch('swh.web.ui.service.backend') @patch('swh.web.ui.service.query') @istest def lookup_directory_with_revision_revision_with_path_to_file_without_data( self, mock_query, mock_backend): # given mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1', b'123') dir_id = b'dir-id-as-sha1' mock_backend.revision_get.return_value = { 'directory': dir_id, } mock_backend.directory_entry_get_by_path.return_value = { 'type': 'file', 'name': b'some/path/to/file', 'target': b'789' } stub_content = { 'status': 'visible', } mock_backend.content_find.return_value = stub_content # when actual_content = service.lookup_directory_with_revision( '123', 'some/path/to/file') # then self.assertEqual(actual_content, {'type': 'file', 'revision': '123', 'path': 'some/path/to/file', 'content': stub_content}) mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with ('123', ['sha1'], 'Only sha1_git is supported.') mock_backend.revision_get.assert_called_once_with(b'123') mock_backend.directory_entry_get_by_path.assert_called_once_with( b'dir-id-as-sha1', 'some/path/to/file') mock_backend.content_find.assert_called_once_with('sha1_git', b'789') @patch('swh.web.ui.service.backend') @patch('swh.web.ui.service.query') @istest def lookup_directory_with_revision_revision_with_path_to_file_with_data( self, mock_query, mock_backend): # given mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1', b'123') dir_id = b'dir-id-as-sha1' mock_backend.revision_get.return_value = { 'directory': dir_id, } mock_backend.directory_entry_get_by_path.return_value = { 'type': 'file', 'name': b'some/path/to/file', 'target': b'789' } stub_content = { 'status': 'visible', 'sha1': b'content-sha1' } mock_backend.content_find.return_value = stub_content mock_backend.content_get.return_value = { 'sha1': b'content-sha1', 'data': b'some raw data' } expected_content = { 'status': 'visible', 'sha1': hash_to_hex(b'content-sha1'), 'data': b'some raw data' } # when actual_content = service.lookup_directory_with_revision( '123', 'some/path/to/file', with_data=True) # then self.assertEqual(actual_content, {'type': 'file', 'revision': '123', 'path': 'some/path/to/file', 'content': expected_content}) mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with ('123', ['sha1'], 'Only sha1_git is supported.') mock_backend.revision_get.assert_called_once_with(b'123') mock_backend.directory_entry_get_by_path.assert_called_once_with( b'dir-id-as-sha1', 'some/path/to/file') mock_backend.content_find.assert_called_once_with('sha1_git', b'789') mock_backend.content_get.assert_called_once_with(b'content-sha1') @patch('swh.web.ui.service.backend') @istest def lookup_revision(self, mock_backend): # given mock_backend.revision_get = MagicMock( return_value=self.SAMPLE_REVISION_RAW) # when actual_revision = service.lookup_revision( self.SHA1_SAMPLE) # then self.assertEqual(actual_revision, self.SAMPLE_REVISION) mock_backend.revision_get.assert_called_with( self.SHA1_SAMPLE_BIN) @patch('swh.web.ui.service.backend') @istest def lookup_revision_invalid_msg(self, mock_backend): # given stub_rev = self.SAMPLE_REVISION_RAW stub_rev['message'] = b'elegant fix for bug \xff' expected_revision = self.SAMPLE_REVISION expected_revision['message'] = None expected_revision['message_decoding_failed'] = True mock_backend.revision_get = MagicMock(return_value=stub_rev) # when actual_revision = service.lookup_revision( self.SHA1_SAMPLE) # then self.assertEqual(actual_revision, expected_revision) mock_backend.revision_get.assert_called_with( self.SHA1_SAMPLE_BIN) @patch('swh.web.ui.service.backend') @istest def lookup_revision_msg_ok(self, mock_backend): # given mock_backend.revision_get.return_value = self.SAMPLE_REVISION_RAW # when rv = service.lookup_revision_message( self.SHA1_SAMPLE) # then self.assertEquals(rv, {'message': self.SAMPLE_MESSAGE_BIN}) mock_backend.revision_get.assert_called_with( self.SHA1_SAMPLE_BIN) @patch('swh.web.ui.service.backend') @istest def lookup_revision_msg_absent(self, mock_backend): # given stub_revision = self.SAMPLE_REVISION_RAW del stub_revision['message'] mock_backend.revision_get.return_value = stub_revision # when with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_message( self.SHA1_SAMPLE) # then mock_backend.revision_get.assert_called_with( self.SHA1_SAMPLE_BIN) self.assertEqual(cm.exception.args[0], 'No message for revision ' 'with sha1_git ' '18d8be353ed3480476f032475e7c233eff7371d5.') @patch('swh.web.ui.service.backend') @istest def lookup_revision_msg_norev(self, mock_backend): # given mock_backend.revision_get.return_value = None # when with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_message( self.SHA1_SAMPLE) # then mock_backend.revision_get.assert_called_with( self.SHA1_SAMPLE_BIN) self.assertEqual(cm.exception.args[0], 'Revision with sha1_git ' '18d8be353ed3480476f032475e7c233eff7371d5 ' 'not found.') @patch('swh.web.ui.service.backend') @istest def lookup_revision_multiple(self, mock_backend): # given sha1 = self.SHA1_SAMPLE sha1_other = 'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc' stub_revisions = [ self.SAMPLE_REVISION_RAW, { 'id': hex_to_hash(sha1_other), 'directory': 'abcdbe353ed3480476f032475e7c233eff7371d5', 'author': { 'name': b'name', 'email': b'name@surname.org', }, 'committer': { 'name': b'name', 'email': b'name@surname.org', }, 'message': b'ugly fix for bug 42', 'date': { 'timestamp': datetime.datetime( 2000, 1, 12, 5, 23, 54, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': False }, 'date_offset': 0, 'committer_date': { 'timestamp': datetime.datetime( 2000, 1, 12, 5, 23, 54, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': False }, 'committer_date_offset': 0, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], } ] mock_backend.revision_get_multiple.return_value = stub_revisions # when actual_revisions = service.lookup_revision_multiple( [sha1, sha1_other]) # then self.assertEqual(list(actual_revisions), [ self.SAMPLE_REVISION, { 'id': sha1_other, 'directory': 'abcdbe353ed3480476f032475e7c233eff7371d5', 'author': { 'name': 'name', 'email': 'name@surname.org', }, 'committer': { 'name': 'name', 'email': 'name@surname.org', }, 'message': 'ugly fix for bug 42', 'date': '2000-01-12T05:23:54+00:00', 'date_offset': 0, 'committer_date': '2000-01-12T05:23:54+00:00', 'committer_date_offset': 0, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], 'merge': False } ]) self.assertEqual( list(mock_backend.revision_get_multiple.call_args[0][0]), [hex_to_hash(sha1), hex_to_hash(sha1_other)]) @patch('swh.web.ui.service.backend') @istest def lookup_revision_multiple_none_found(self, mock_backend): # given sha1_bin = self.SHA1_SAMPLE sha1_other = 'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc' mock_backend.revision_get_multiple.return_value = [] # then actual_revisions = service.lookup_revision_multiple( [sha1_bin, sha1_other]) self.assertEqual(list(actual_revisions), []) self.assertEqual( list(mock_backend.revision_get_multiple.call_args[0][0]), [hex_to_hash(self.SHA1_SAMPLE), hex_to_hash(sha1_other)]) @patch('swh.web.ui.service.backend') @istest def lookup_revision_log(self, mock_backend): # given stub_revision_log = [self.SAMPLE_REVISION_RAW] mock_backend.revision_log = MagicMock(return_value=stub_revision_log) # when actual_revision = service.lookup_revision_log( 'abcdbe353ed3480476f032475e7c233eff7371d5', limit=25) # then self.assertEqual(list(actual_revision), [self.SAMPLE_REVISION]) mock_backend.revision_log.assert_called_with( hex_to_hash('abcdbe353ed3480476f032475e7c233eff7371d5'), 25) @patch('swh.web.ui.service.backend') @istest def lookup_revision_log_by(self, mock_backend): # given stub_revision_log = [self.SAMPLE_REVISION_RAW] mock_backend.revision_log_by = MagicMock( return_value=stub_revision_log) # when actual_log = service.lookup_revision_log_by( 1, 'refs/heads/master', None, limit=100) # then self.assertEqual(list(actual_log), [self.SAMPLE_REVISION]) mock_backend.revision_log_by.assert_called_with( 1, 'refs/heads/master', None, 100) @patch('swh.web.ui.service.backend') @istest def lookup_revision_log_by_nolog(self, mock_backend): # given mock_backend.revision_log_by = MagicMock(return_value=None) # when res = service.lookup_revision_log_by( 1, 'refs/heads/master', None, limit=100) # then self.assertEquals(res, None) mock_backend.revision_log_by.assert_called_with( 1, 'refs/heads/master', None, 100) @patch('swh.web.ui.service.backend') @istest def lookup_content_raw_not_found(self, mock_backend): # given mock_backend.content_find = MagicMock(return_value=None) # when actual_content = service.lookup_content_raw( 'sha1:18d8be353ed3480476f032475e7c233eff7371d5') # then self.assertIsNone(actual_content) mock_backend.content_find.assert_called_with( 'sha1', hex_to_hash(self.SHA1_SAMPLE)) @patch('swh.web.ui.service.backend') @istest def lookup_content_raw(self, mock_backend): # given mock_backend.content_find = MagicMock(return_value={ 'sha1': self.SHA1_SAMPLE, }) mock_backend.content_get = MagicMock(return_value={ 'data': b'binary data'}) # when actual_content = service.lookup_content_raw( 'sha256:%s' % self.SHA256_SAMPLE) # then self.assertEquals(actual_content, {'data': b'binary data'}) mock_backend.content_find.assert_called_once_with( 'sha256', self.SHA256_SAMPLE_BIN) mock_backend.content_get.assert_called_once_with( self.SHA1_SAMPLE) @patch('swh.web.ui.service.backend') @istest def lookup_content_not_found(self, mock_backend): # given mock_backend.content_find = MagicMock(return_value=None) # when actual_content = service.lookup_content( 'sha1:%s' % self.SHA1_SAMPLE) # then self.assertIsNone(actual_content) mock_backend.content_find.assert_called_with( 'sha1', self.SHA1_SAMPLE_BIN) @patch('swh.web.ui.service.backend') @istest def lookup_content_with_sha1(self, mock_backend): # given mock_backend.content_find = MagicMock( return_value=self.SAMPLE_CONTENT_RAW) # when actual_content = service.lookup_content( 'sha1:%s' % self.SHA1_SAMPLE) # then self.assertEqual(actual_content, self.SAMPLE_CONTENT) mock_backend.content_find.assert_called_with( 'sha1', hex_to_hash(self.SHA1_SAMPLE)) @patch('swh.web.ui.service.backend') @istest def lookup_content_with_sha256(self, mock_backend): # given stub_content = self.SAMPLE_CONTENT_RAW stub_content['status'] = 'visible' expected_content = self.SAMPLE_CONTENT expected_content['status'] = 'visible' mock_backend.content_find = MagicMock( return_value=stub_content) # when actual_content = service.lookup_content( 'sha256:%s' % self.SHA256_SAMPLE) # then self.assertEqual(actual_content, expected_content) mock_backend.content_find.assert_called_with( 'sha256', self.SHA256_SAMPLE_BIN) @patch('swh.web.ui.service.backend') @istest def lookup_person(self, mock_backend): # given mock_backend.person_get = MagicMock(return_value={ 'id': 'person_id', 'name': b'some_name', 'email': b'some-email', }) # when actual_person = service.lookup_person('person_id') # then self.assertEqual(actual_person, { 'id': 'person_id', 'name': 'some_name', 'email': 'some-email', }) mock_backend.person_get.assert_called_with('person_id') @patch('swh.web.ui.service.backend') @istest def lookup_directory_bad_checksum(self, mock_backend): # given mock_backend.directory_ls = MagicMock() # when with self.assertRaises(BadInputExc): service.lookup_directory('directory_id') # then mock_backend.directory_ls.called = False @patch('swh.web.ui.service.backend') @patch('swh.web.ui.service.query') @istest def lookup_directory_not_found(self, mock_query, mock_backend): # given mock_query.parse_hash_with_algorithms_or_throws.return_value = ( 'sha1', 'directory-id-bin') mock_backend.directory_get.return_value = None # when actual_dir = service.lookup_directory('directory_id') # then self.assertIsNone(actual_dir) mock_query.parse_hash_with_algorithms_or_throws.assert_called_with( 'directory_id', ['sha1'], 'Only sha1_git is supported.') mock_backend.directory_get.assert_called_with('directory-id-bin') mock_backend.directory_ls.called = False @patch('swh.web.ui.service.backend') @patch('swh.web.ui.service.query') @istest def lookup_directory(self, mock_query, mock_backend): mock_query.parse_hash_with_algorithms_or_throws.return_value = ( 'sha1', 'directory-sha1-bin') # something that exists is all that matters here mock_backend.directory_get.return_value = {'id': b'directory-sha1-bin'} # given stub_dir_entries = [{ 'sha1': self.SHA1_SAMPLE_BIN, 'sha256': self.SHA256_SAMPLE_BIN, 'sha1_git': self.SHA1GIT_SAMPLE_BIN, 'target': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'dir_id': self.DIRECTORY_ID_BIN, 'name': b'bob', 'type': 10, }] expected_dir_entries = [{ 'sha1': self.SHA1_SAMPLE, 'sha256': self.SHA256_SAMPLE, 'sha1_git': self.SHA1GIT_SAMPLE, 'target': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'dir_id': self.DIRECTORY_ID, 'name': 'bob', 'type': 10, }] mock_backend.directory_ls.return_value = stub_dir_entries # when actual_directory_ls = list(service.lookup_directory( 'directory-sha1')) # then self.assertEqual(actual_directory_ls, expected_dir_entries) mock_query.parse_hash_with_algorithms_or_throws.assert_called_with( 'directory-sha1', ['sha1'], 'Only sha1_git is supported.') mock_backend.directory_ls.assert_called_with( 'directory-sha1-bin') @patch('swh.web.ui.service.backend') @istest def lookup_revision_by_nothing_found(self, mock_backend): # given mock_backend.revision_get_by.return_value = None # when actual_revisions = service.lookup_revision_by(1) # then self.assertIsNone(actual_revisions) mock_backend.revision_get_by(1, 'master', None) @patch('swh.web.ui.service.backend') @istest def lookup_revision_by(self, mock_backend): # given stub_rev = self.SAMPLE_REVISION_RAW expected_rev = self.SAMPLE_REVISION mock_backend.revision_get_by.return_value = stub_rev # when actual_revision = service.lookup_revision_by(10, 'master2', 'some-ts') # then self.assertEquals(actual_revision, expected_rev) mock_backend.revision_get_by(1, 'master2', 'some-ts') @patch('swh.web.ui.service.backend') @istest def lookup_revision_by_nomerge(self, mock_backend): # given stub_rev = self.SAMPLE_REVISION_RAW stub_rev['parents'] = [ hex_to_hash('adc83b19e793491b1c6ea0fd8b46cd9f32e592fc')] expected_rev = self.SAMPLE_REVISION expected_rev['parents'] = ['adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'] mock_backend.revision_get_by.return_value = stub_rev # when actual_revision = service.lookup_revision_by(10, 'master2', 'some-ts') # then self.assertEquals(actual_revision, expected_rev) mock_backend.revision_get_by(1, 'master2', 'some-ts') @patch('swh.web.ui.service.backend') @istest def lookup_revision_by_merge(self, mock_backend): # given stub_rev = self.SAMPLE_REVISION_RAW stub_rev['parents'] = [ hex_to_hash('adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'), hex_to_hash('ffff3b19e793491b1c6db0fd8b46cd9f32e592fc') ] expected_rev = self.SAMPLE_REVISION expected_rev['parents'] = [ 'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc', 'ffff3b19e793491b1c6db0fd8b46cd9f32e592fc' ] expected_rev['merge'] = True mock_backend.revision_get_by.return_value = stub_rev # when actual_revision = service.lookup_revision_by(10, 'master2', 'some-ts') # then self.assertEquals(actual_revision, expected_rev) mock_backend.revision_get_by(1, 'master2', 'some-ts') @patch('swh.web.ui.service.backend') @istest def lookup_revision_with_context_by_ko(self, mock_backend): # given mock_backend.revision_get_by.return_value = None # when with self.assertRaises(NotFoundExc) as cm: origin_id = 1 branch_name = 'master3' ts = None service.lookup_revision_with_context_by(origin_id, branch_name, ts, 'sha1') # then self.assertIn( 'Revision with (origin_id: %s, branch_name: %s' ', ts: %s) not found.' % (origin_id, branch_name, ts), cm.exception.args[0]) mock_backend.revision_get_by.assert_called_once_with( origin_id, branch_name, ts) @patch('swh.web.ui.service.lookup_revision_with_context') @patch('swh.web.ui.service.backend') @istest def lookup_revision_with_context_by(self, mock_backend, mock_lookup_revision_with_context): # given stub_root_rev = {'id': 'root-rev-id'} mock_backend.revision_get_by.return_value = {'id': 'root-rev-id'} stub_rev = {'id': 'rev-found'} mock_lookup_revision_with_context.return_value = stub_rev # when origin_id = 1 branch_name = 'master3' ts = None sha1_git = 'sha1' actual_root_rev, actual_rev = service.lookup_revision_with_context_by( origin_id, branch_name, ts, sha1_git) # then self.assertEquals(actual_root_rev, stub_root_rev) self.assertEquals(actual_rev, stub_rev) mock_backend.revision_get_by.assert_called_once_with( origin_id, branch_name, ts) mock_lookup_revision_with_context.assert_called_once_with( stub_root_rev, sha1_git, 100) @patch('swh.web.ui.service.backend') @patch('swh.web.ui.service.query') @istest def lookup_entity_by_uuid(self, mock_query, mock_backend): # given uuid_test = 'correct-uuid' mock_query.parse_uuid4.return_value = uuid_test stub_entities = [{'uuid': uuid_test}] mock_backend.entity_get.return_value = stub_entities # when actual_entities = service.lookup_entity_by_uuid(uuid_test) # then self.assertEquals(actual_entities, stub_entities) mock_query.parse_uuid4.assert_called_once_with(uuid_test) mock_backend.entity_get.assert_called_once_with(uuid_test) @istest def lookup_revision_through_ko_not_implemented(self): # then with self.assertRaises(NotImplementedError): service.lookup_revision_through({ 'something-unknown': 10, }) @patch('swh.web.ui.service.lookup_revision_with_context_by') @istest def lookup_revision_through_with_context_by(self, mock_lookup): # given stub_rev = {'id': 'rev'} mock_lookup.return_value = stub_rev # when actual_revision = service.lookup_revision_through({ 'origin_id': 1, 'branch_name': 'master', 'ts': None, 'sha1_git': 'sha1-git' }, limit=1000) # then self.assertEquals(actual_revision, stub_rev) mock_lookup.assert_called_once_with( 1, 'master', None, 'sha1-git', 1000) @patch('swh.web.ui.service.lookup_revision_by') @istest def lookup_revision_through_with_revision_by(self, mock_lookup): # given stub_rev = {'id': 'rev'} mock_lookup.return_value = stub_rev # when actual_revision = service.lookup_revision_through({ 'origin_id': 2, 'branch_name': 'master2', 'ts': 'some-ts', }, limit=10) # then self.assertEquals(actual_revision, stub_rev) mock_lookup.assert_called_once_with( 2, 'master2', 'some-ts') @patch('swh.web.ui.service.lookup_revision_with_context') @istest def lookup_revision_through_with_context(self, mock_lookup): # given stub_rev = {'id': 'rev'} mock_lookup.return_value = stub_rev # when actual_revision = service.lookup_revision_through({ 'sha1_git_root': 'some-sha1-root', 'sha1_git': 'some-sha1', }) # then self.assertEquals(actual_revision, stub_rev) mock_lookup.assert_called_once_with( 'some-sha1-root', 'some-sha1', 100) @patch('swh.web.ui.service.lookup_revision') @istest def lookup_revision_through_with_revision(self, mock_lookup): # given stub_rev = {'id': 'rev'} mock_lookup.return_value = stub_rev # when actual_revision = service.lookup_revision_through({ 'sha1_git': 'some-sha1', }) # then self.assertEquals(actual_revision, stub_rev) mock_lookup.assert_called_once_with( 'some-sha1') @patch('swh.web.ui.service.lookup_revision_through') @istest def lookup_directory_through_revision_ko_not_found( self, mock_lookup_rev): # given mock_lookup_rev.return_value = None # when with self.assertRaises(NotFoundExc): service.lookup_directory_through_revision( {'id': 'rev'}, 'some/path', 100) mock_lookup_rev.assert_called_once_with({'id': 'rev'}, 100) @patch('swh.web.ui.service.lookup_revision_through') @patch('swh.web.ui.service.lookup_directory_with_revision') @istest def lookup_directory_through_revision_ok_with_data( self, mock_lookup_dir, mock_lookup_rev): # given mock_lookup_rev.return_value = {'id': 'rev-id'} mock_lookup_dir.return_value = {'type': 'dir', 'content': []} # when rev_id, dir_result = service.lookup_directory_through_revision( {'id': 'rev'}, 'some/path', 100) # then self.assertEquals(rev_id, 'rev-id') self.assertEquals(dir_result, {'type': 'dir', 'content': []}) mock_lookup_rev.assert_called_once_with({'id': 'rev'}, 100) mock_lookup_dir.assert_called_once_with('rev-id', 'some/path', False) @patch('swh.web.ui.service.lookup_revision_through') @patch('swh.web.ui.service.lookup_directory_with_revision') @istest def lookup_directory_through_revision_ok_with_content( self, mock_lookup_dir, mock_lookup_rev): # given mock_lookup_rev.return_value = {'id': 'rev-id'} stub_result = {'type': 'file', 'revision': 'rev-id', 'content': {'data': b'blah', 'sha1': 'sha1'}} mock_lookup_dir.return_value = stub_result # when rev_id, dir_result = service.lookup_directory_through_revision( {'id': 'rev'}, 'some/path', 10, with_data=True) # then self.assertEquals(rev_id, 'rev-id') self.assertEquals(dir_result, stub_result) mock_lookup_rev.assert_called_once_with({'id': 'rev'}, 10) mock_lookup_dir.assert_called_once_with('rev-id', 'some/path', True) diff --git a/swh/web/ui/tests/views/test_api.py b/swh/web/ui/tests/views/test_api.py index 5b808bd7d..945c5bb79 100644 --- a/swh/web/ui/tests/views/test_api.py +++ b/swh/web/ui/tests/views/test_api.py @@ -1,1930 +1,1929 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json import unittest import yaml from nose.tools import istest from unittest.mock import patch, MagicMock from swh.web.ui.tests import test_app from swh.web.ui import exc from swh.web.ui.views import api from swh.web.ui.exc import NotFoundExc, BadInputExc from swh.storage.exc import StorageDBError, StorageAPIError class ApiTestCase(test_app.SWHApiTestCase): @istest def generic_api_lookup_nothing_is_found(self): # given def test_generic_lookup_fn(sha1, another_unused_arg): assert another_unused_arg == 'unused arg' assert sha1 == 'sha1' return None # when with self.assertRaises(NotFoundExc) as cm: api._api_lookup('sha1', test_generic_lookup_fn, 'This will be raised because None is returned.', lambda x: x, 'unused arg') self.assertIn('This will be raised because None is returned.', cm.exception.args[0]) @istest def generic_api_map_are_enriched_and_transformed_to_list(self): # given def test_generic_lookup_fn_1(criteria0, param0, param1): assert criteria0 == 'something' return map(lambda x: x + 1, [1, 2, 3]) # when actual_result = api._api_lookup( 'something', test_generic_lookup_fn_1, 'This is not the error message you are looking for. Move along.', lambda x: x * 2, 'some param 0', 'some param 1') self.assertEqual(actual_result, [4, 6, 8]) @istest def generic_api_list_are_enriched_too(self): # given def test_generic_lookup_fn_2(crit): assert crit == 'something' return ['a', 'b', 'c'] # when actual_result = api._api_lookup( 'something', test_generic_lookup_fn_2, 'Not the error message you are looking for, it is. ' 'Along, you move!', lambda x: ''. join(['=', x, '='])) self.assertEqual(actual_result, ['=a=', '=b=', '=c=']) @istest def generic_api_generator_are_enriched_and_returned_as_list(self): # given def test_generic_lookup_fn_3(crit): assert crit == 'crit' return (i for i in [4, 5, 6]) # when actual_result = api._api_lookup( 'crit', test_generic_lookup_fn_3, 'Move!', lambda x: x - 1) self.assertEqual(actual_result, [3, 4, 5]) @istest def generic_api_simple_data_are_enriched_and_returned_too(self): # given def test_generic_lookup_fn_4(crit): assert crit == '123' return {'a': 10} def test_enrich_data(x): x['a'] = x['a'] * 10 return x # when actual_result = api._api_lookup( '123', test_generic_lookup_fn_4, 'Nothing to do', test_enrich_data) self.assertEqual(actual_result, {'a': 100}) @patch('swh.web.ui.views.api.service') @istest def api_content_provenance(self, mock_service): stub_provenances = [{ 'origin': 1, 'visit': 2, 'revision': 'b04caf10e9535160d90e874b45aa426de762f19f', 'content': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'path': 'octavio-3.4.0/octave.html/doc_002dS_005fISREG.html' }] mock_service.lookup_content_provenance.return_value = stub_provenances # when rv = self.app.get( '/api/1/provenance/' 'sha1_git:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, [{ 'origin': 1, 'visit': 2, 'origin_url': '/api/1/origin/1/', 'revision': 'b04caf10e9535160d90e874b45aa426de762f19f', 'revision_url': '/api/1/revision/' 'b04caf10e9535160d90e874b45aa426de762f19f/', 'content': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'content_url': '/api/1/content/' 'sha1_git:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', 'path': 'octavio-3.4.0/octave.html/doc_002dS_005fISREG.html' }]) mock_service.lookup_content_provenance.assert_called_once_with( 'sha1_git:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.ui.views.api.service') @istest def api_content_provenance_sha_not_found(self, mock_service): # given mock_service.lookup_content_provenance.return_value = None # when rv = self.app.get( '/api/1/provenance/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Content with sha1:40e71b8614fcd89ccd17ca2b1d9e6' '6c5b00a6d03 not found.' }) @patch('swh.web.ui.views.api.service') @istest def api_content_metadata(self, mock_service): # given mock_service.lookup_content.return_value = { 'sha1': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'sha1_git': 'b4e8f472ffcb01a03875b26e462eb568739f6882', 'sha256': '83c0e67cc80f60caf1fcbec2d84b0ccd7968b3be4735637006560' 'cde9b067a4f', 'length': 17, 'status': 'visible' } # when rv = self.app.get( '/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'data_url': '/api/1/content/' '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/raw/', 'sha1': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'sha1_git': 'b4e8f472ffcb01a03875b26e462eb568739f6882', 'sha256': '83c0e67cc80f60caf1fcbec2d84b0ccd7968b3be4735637006560c' 'de9b067a4f', 'length': 17, 'status': 'visible' }) mock_service.lookup_content.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.ui.views.api.service') @istest def api_content_not_found_as_json(self, mock_service): # given mock_service.lookup_content.return_value = None mock_service.lookup_content_provenance = MagicMock() # when rv = self.app.get( '/api/1/content/sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3' 'be4735637006560c/') self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Content with sha256:83c0e67cc80f60caf1fcbec2d84b0ccd79' '68b3be4735637006560c not found.' }) mock_service.lookup_content.assert_called_once_with( 'sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3' 'be4735637006560c') mock_service.lookup_content_provenance.called = False @patch('swh.web.ui.views.api.service') @istest def api_content_not_found_as_yaml(self, mock_service): # given mock_service.lookup_content.return_value = None mock_service.lookup_content_provenance = MagicMock() # when rv = self.app.get( '/api/1/content/sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3' 'be4735637006560c/', headers={'accept': 'application/yaml'}) self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/yaml') response_data = yaml.load(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Content with sha256:83c0e67cc80f60caf1fcbec2d84b0ccd79' '68b3be4735637006560c not found.' }) mock_service.lookup_content.assert_called_once_with( 'sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3' 'be4735637006560c') mock_service.lookup_content_provenance.called = False @patch('swh.web.ui.views.api.service') @istest def api_content_raw_ko_not_found(self, mock_service): # given mock_service.lookup_content_raw.return_value = None # when rv = self.app.get( '/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03' '/raw/') self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Content with sha1:40e71b8614fcd89ccd17ca2b1d9e6' '6c5b00a6d03 not found.' }) mock_service.lookup_content_raw.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.ui.views.api.service') @istest def api_content_raw(self, mock_service): # given stub_content = {'data': b'some content data'} mock_service.lookup_content_raw.return_value = stub_content # when rv = self.app.get( '/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03' '/raw/', headers={'Content-type': 'application/octet-stream', 'Content-disposition': 'attachment'}) self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/octet-stream') self.assertEquals(rv.data, stub_content['data']) mock_service.lookup_content_raw.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.ui.views.api.service') @istest def api_search(self, mock_service): # given mock_service.search_hash.return_value = {'found': True} expected_result = { 'search_stats': {'nbfiles': 1, 'pct': 100}, 'search_res': [{'filename': None, 'sha1': 'sha1:blah', 'found': True}] } # when rv = self.app.get('/api/1/content/search/sha1:blah/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_result) mock_service.search_hash.assert_called_once_with('sha1:blah') @patch('swh.web.ui.views.api.service') @istest def api_search_as_yaml(self, mock_service): # given mock_service.search_hash.return_value = {'found': True} expected_result = { 'search_stats': {'nbfiles': 1, 'pct': 100}, 'search_res': [{'filename': None, 'sha1': 'sha1:halb', 'found': True}] } # when rv = self.app.get('/api/1/content/search/sha1:halb/', headers={'Accept': 'application/yaml'}) self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/yaml') response_data = yaml.load(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_result) mock_service.search_hash.assert_called_once_with('sha1:halb') @patch('swh.web.ui.views.api.service') @istest def api_search_not_found(self, mock_service): # given mock_service.search_hash.return_value = {'found': False} expected_result = { 'search_stats': {'nbfiles': 1, 'pct': 0}, 'search_res': [{'filename': None, 'sha1': 'sha1:halb', 'found': False}] } # when rv = self.app.get('/api/1/content/search/sha1:halb/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_result) mock_service.search_hash.assert_called_once_with('sha1:halb') @patch('swh.web.ui.views.api.service') @istest def api_1_stat_counters_raise_error(self, mock_service): # given mock_service.stat_counters.side_effect = ValueError( 'voluntary error to check the bad request middleware.') # when rv = self.app.get('/api/1/stat/counters/') # then self.assertEquals(rv.status_code, 400) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'voluntary error to check the bad request middleware.'}) @patch('swh.web.ui.views.api.service') @istest def api_1_stat_counters_raise_swh_storage_error_db(self, mock_service): # given mock_service.stat_counters.side_effect = StorageDBError( 'SWH Storage exploded! Will be back online shortly!') # when rv = self.app.get('/api/1/stat/counters/') # then self.assertEquals(rv.status_code, 503) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'An unexpected error occurred in the backend: ' 'SWH Storage exploded! Will be back online shortly!'}) @patch('swh.web.ui.views.api.service') @istest def api_1_stat_counters_raise_swh_storage_error_api(self, mock_service): # given mock_service.stat_counters.side_effect = StorageAPIError( 'SWH Storage API dropped dead! Will resurrect from its ashes asap!' ) # when rv = self.app.get('/api/1/stat/counters/') # then self.assertEquals(rv.status_code, 503) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'An unexpected error occurred in the api backend: ' 'SWH Storage API dropped dead! Will resurrect from its ashes asap!' }) @patch('swh.web.ui.views.api.service') @istest def api_1_stat_counters(self, mock_service): # given stub_stats = { "content": 1770830, "directory": 211683, "directory_entry_dir": 209167, "directory_entry_file": 1807094, "directory_entry_rev": 0, "entity": 0, "entity_history": 0, "occurrence": 0, "occurrence_history": 19600, "origin": 1096, "person": 0, "release": 8584, "revision": 7792, "revision_history": 0, "skipped_content": 0 } mock_service.stat_counters.return_value = stub_stats # when rv = self.app.get('/api/1/stat/counters/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, stub_stats) mock_service.stat_counters.assert_called_once_with() @patch('swh.web.ui.views.api.service') @istest - def api_1_stat_origin_visits_raise_error(self, mock_service): + def api_1_lookup_origin_visits_raise_error(self, mock_service): # given - mock_service.stat_origin_visits.side_effect = ValueError( + mock_service.lookup_origin_visits.side_effect = ValueError( 'voluntary error to check the bad request middleware.') # when - rv = self.app.get('/api/1/stat/visits/2/') + rv = self.app.get('/api/1/origin/2/visits/') # then self.assertEquals(rv.status_code, 400) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'voluntary error to check the bad request middleware.'}) @patch('swh.web.ui.views.api.service') @istest - def api_1_stat_origin_visits_raise_swh_storage_error_db( + def api_1_lookup_origin_visits_raise_swh_storage_error_db( self, mock_service): # given - mock_service.stat_origin_visits.side_effect = StorageDBError( + mock_service.lookup_origin_visits.side_effect = StorageDBError( 'SWH Storage exploded! Will be back online shortly!') # when - rv = self.app.get('/api/1/stat/visits/2/') + rv = self.app.get('/api/1/origin/2/visits/') # then self.assertEquals(rv.status_code, 503) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'An unexpected error occurred in the backend: ' 'SWH Storage exploded! Will be back online shortly!'}) @patch('swh.web.ui.views.api.service') @istest - def api_1_stat_origin_visits_raise_swh_storage_error_api( + def api_1_lookup_origin_visits_raise_swh_storage_error_api( self, mock_service): # given - mock_service.stat_origin_visits.side_effect = StorageAPIError( + mock_service.lookup_origin_visits.side_effect = StorageAPIError( 'SWH Storage API dropped dead! Will resurrect from its ashes asap!' ) # when - rv = self.app.get('/api/1/stat/visits/2/') + rv = self.app.get('/api/1/origin/2/visits/') # then self.assertEquals(rv.status_code, 503) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'An unexpected error occurred in the api backend: ' 'SWH Storage API dropped dead! Will resurrect from its ashes asap!' }) @patch('swh.web.ui.views.api.service') @istest - def api_1_stat_origin_visits(self, mock_service): + def api_1_lookup_origin_visits(self, mock_service): # given - stub_stats = [ + stub_visits = [ { - 'date': 1420149600.0, + 'date': 1104616800.0, 'origin': 1, 'visit': 1 }, { - 'date': 1104616800.0, + 'date': 1293919200.0, 'origin': 1, 'visit': 2 }, { - 'date': 1293919200.0, + 'date': 1420149600.0, 'origin': 1, 'visit': 3 } ] - expected_stats = [1104616800.0, 1293919200.0, 1420149600.0] - mock_service.stat_origin_visits.return_value = stub_stats + mock_service.lookup_origin_visits.return_value = stub_visits # when - rv = self.app.get('/api/1/stat/visits/2/') + rv = self.app.get('/api/1/origin/2/visits/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) - self.assertEquals(response_data, expected_stats) + self.assertEquals(response_data, stub_visits) - mock_service.stat_origin_visits.assert_called_once_with(2) + mock_service.lookup_origin_visits.assert_called_once_with(2) @patch('swh.web.ui.views.api.service') @istest def api_origin_by_id(self, mock_service): # given stub_origin = { 'id': 1234, 'lister': 'uuid-lister-0', 'project': 'uuid-project-0', 'url': 'ftp://some/url/to/origin/0', 'type': 'ftp' } mock_service.lookup_origin.return_value = stub_origin # when rv = self.app.get('/api/1/origin/1234/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, stub_origin) mock_service.lookup_origin.assert_called_with({'id': 1234}) @patch('swh.web.ui.views.api.service') @istest def api_origin_by_type_url(self, mock_service): # given stub_origin = { 'id': 1234, 'lister': 'uuid-lister-0', 'project': 'uuid-project-0', 'url': 'ftp://some/url/to/origin/0', 'type': 'ftp' } mock_service.lookup_origin.return_value = stub_origin # when rv = self.app.get('/api/1/origin/ftp/url/ftp://some/url/to/origin/0/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, stub_origin) mock_service.lookup_origin.assert_called_with( {'url': 'ftp://some/url/to/origin/0', 'type': 'ftp'}) @patch('swh.web.ui.views.api.service') @istest def api_origin_not_found(self, mock_service): # given mock_service.lookup_origin.return_value = None # when rv = self.app.get('/api/1/origin/4321/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Origin with id 4321 not found.' }) mock_service.lookup_origin.assert_called_with({'id': 4321}) @patch('swh.web.ui.views.api.service') @istest def api_release(self, mock_service): # given stub_release = { 'id': 'release-0', 'target_type': 'revision', 'target': 'revision-sha1', "date": "Mon, 10 Mar 1997 08:00:00 GMT", "synthetic": True, 'author': { 'name': 'author release name', 'email': 'author@email', }, } expected_release = { 'id': 'release-0', 'target_type': 'revision', 'target': 'revision-sha1', 'target_url': '/api/1/revision/revision-sha1/', "date": "Mon, 10 Mar 1997 08:00:00 GMT", "synthetic": True, 'author': { 'name': 'author release name', 'email': 'author@email', }, } mock_service.lookup_release.return_value = stub_release # when rv = self.app.get('/api/1/release/release-0/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_release) mock_service.lookup_release.assert_called_once_with('release-0') @patch('swh.web.ui.views.api.service') @istest def api_release_target_type_not_a_revision(self, mock_service): # given stub_release = { 'id': 'release-0', 'target_type': 'other-stuff', 'target': 'other-stuff-checksum', "date": "Mon, 10 Mar 1997 08:00:00 GMT", "synthetic": True, 'author': { 'name': 'author release name', 'email': 'author@email', }, } expected_release = { 'id': 'release-0', 'target_type': 'other-stuff', 'target': 'other-stuff-checksum', "date": "Mon, 10 Mar 1997 08:00:00 GMT", "synthetic": True, 'author': { 'name': 'author release name', 'email': 'author@email', }, } mock_service.lookup_release.return_value = stub_release # when rv = self.app.get('/api/1/release/release-0/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_release) mock_service.lookup_release.assert_called_once_with('release-0') @patch('swh.web.ui.views.api.service') @istest def api_release_not_found(self, mock_service): # given mock_service.lookup_release.return_value = None # when rv = self.app.get('/api/1/release/release-0/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Release with sha1_git release-0 not found.' }) @patch('swh.web.ui.views.api.service') @istest def api_revision(self, mock_service): # given stub_revision = { 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': ['8734ef7e7c357ce2af928115c6c6a42b7e2a44e7'], 'type': 'tar', 'synthetic': True, 'metadata': { 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912' }] }, } mock_service.lookup_revision.return_value = stub_revision expected_revision = { 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'url': '/api/1/revision/18d8be353ed3480476f032475e7c233eff7371d5/', 'history_url': '/api/1/revision/18d8be353ed3480476f032475e7c233e' 'ff7371d5/log/', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'directory_url': '/api/1/directory/7834ef7e7c357ce2af928115c6c6' 'a42b7e2a44e6/', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': [ '8734ef7e7c357ce2af928115c6c6a42b7e2a44e7' ], 'parent_urls': [ '/api/1/revision/8734ef7e7c357ce2af928115c6c6a42b7e2a44e7' '/prev/18d8be353ed3480476f032475e7c233eff7371d5/' ], 'type': 'tar', 'synthetic': True, 'metadata': { 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912' }] }, } # when rv = self.app.get('/api/1/revision/' '18d8be353ed3480476f032475e7c233eff7371d5/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_revision) mock_service.lookup_revision.assert_called_once_with( '18d8be353ed3480476f032475e7c233eff7371d5') @patch('swh.web.ui.views.api.service') @istest def api_revision_not_found(self, mock_service): # given mock_service.lookup_revision.return_value = None # when rv = self.app.get('/api/1/revision/revision-0/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Revision with sha1_git revision-0 not found.'}) @patch('swh.web.ui.views.api.service') @istest def api_revision_raw_ok(self, mock_service): # given stub_revision = {'message': 'synthetic revision message'} mock_service.lookup_revision_message.return_value = stub_revision # when rv = self.app.get('/api/1/revision/18d8be353ed3480476f032475e7c2' '33eff7371d5/raw/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/octet-stream') self.assertEquals(rv.data, b'synthetic revision message') mock_service.lookup_revision_message.assert_called_once_with( '18d8be353ed3480476f032475e7c233eff7371d5') @patch('swh.web.ui.views.api.service') @istest def api_revision_raw_ok_no_msg(self, mock_service): # given mock_service.lookup_revision_message.side_effect = NotFoundExc( 'No message for revision') # when rv = self.app.get('/api/1/revision/' '18d8be353ed3480476f032475e7c233eff7371d5/raw/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'No message for revision'}) self.assertEquals mock_service.lookup_revision_message.assert_called_once_with( '18d8be353ed3480476f032475e7c233eff7371d5') @patch('swh.web.ui.views.api.service') @istest def api_revision_raw_ko_no_rev(self, mock_service): # given mock_service.lookup_revision_message.side_effect = NotFoundExc( 'No revision found') # when rv = self.app.get('/api/1/revision/' '18d8be353ed3480476f032475e7c233eff7371d5/raw/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'No revision found'}) mock_service.lookup_revision_message.assert_called_once_with( '18d8be353ed3480476f032475e7c233eff7371d5') @patch('swh.web.ui.views.api.service') @istest def api_revision_with_origin_not_found(self, mock_service): mock_service.lookup_revision_by.return_value = None rv = self.app.get('/api/1/revision/origin/123/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertIn('Revision with (origin_id: 123', response_data['error']) self.assertIn('not found', response_data['error']) mock_service.lookup_revision_by.assert_called_once_with( 123, 'refs/heads/master', None) @patch('swh.web.ui.views.api.service') @istest def api_revision_with_origin(self, mock_service): mock_revision = { 'id': '32', 'directory': '21', 'message': 'message 1', 'type': 'deb', } expected_revision = { 'id': '32', 'url': '/api/1/revision/32/', 'history_url': '/api/1/revision/32/log/', 'directory': '21', 'directory_url': '/api/1/directory/21/', 'message': 'message 1', 'type': 'deb', } mock_service.lookup_revision_by.return_value = mock_revision rv = self.app.get('/api/1/revision/origin/1/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEqual(response_data, expected_revision) mock_service.lookup_revision_by.assert_called_once_with( 1, 'refs/heads/master', None) @patch('swh.web.ui.views.api.service') @istest def api_revision_with_origin_and_branch_name(self, mock_service): mock_revision = { 'id': '12', 'directory': '23', 'message': 'message 2', 'type': 'tar', } mock_service.lookup_revision_by.return_value = mock_revision expected_revision = { 'id': '12', 'url': '/api/1/revision/12/', 'history_url': '/api/1/revision/12/log/', 'directory': '23', 'directory_url': '/api/1/directory/23/', 'message': 'message 2', 'type': 'tar', } rv = self.app.get('/api/1/revision/origin/1/branch/refs/origin/dev/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEqual(response_data, expected_revision) mock_service.lookup_revision_by.assert_called_once_with( 1, 'refs/origin/dev', None) @patch('swh.web.ui.views.api.service') @patch('swh.web.ui.views.api.utils') @istest def api_revision_with_origin_and_branch_name_and_timestamp(self, mock_utils, mock_service): mock_revision = { 'id': '123', 'directory': '456', 'message': 'message 3', 'type': 'tar', } mock_service.lookup_revision_by.return_value = mock_revision expected_revision = { 'id': '123', 'url': '/api/1/revision/123/', 'history_url': '/api/1/revision/123/log/', 'directory': '456', 'directory_url': '/api/1/directory/456/', 'message': 'message 3', 'type': 'tar', } mock_utils.parse_timestamp.return_value = 'parsed-date' mock_utils.enrich_revision.return_value = expected_revision rv = self.app.get('/api/1/revision' '/origin/1' '/branch/refs/origin/dev' '/ts/1452591542/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEqual(response_data, expected_revision) mock_service.lookup_revision_by.assert_called_once_with( 1, 'refs/origin/dev', 'parsed-date') mock_utils.parse_timestamp.assert_called_once_with('1452591542') mock_utils.enrich_revision.assert_called_once_with( mock_revision) @patch('swh.web.ui.views.api.service') @patch('swh.web.ui.views.api.utils') @istest def api_revision_with_origin_and_branch_name_and_timestamp_with_escapes( self, mock_utils, mock_service): mock_revision = { 'id': '999', } mock_service.lookup_revision_by.return_value = mock_revision expected_revision = { 'id': '999', 'url': '/api/1/revision/999/', 'history_url': '/api/1/revision/999/log/', } mock_utils.parse_timestamp.return_value = 'parsed-date' mock_utils.enrich_revision.return_value = expected_revision rv = self.app.get('/api/1/revision' '/origin/1' '/branch/refs%2Forigin%2Fdev' '/ts/Today%20is%20' 'January%201,%202047%20at%208:21:00AM/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEqual(response_data, expected_revision) mock_service.lookup_revision_by.assert_called_once_with( 1, 'refs/origin/dev', 'parsed-date') mock_utils.parse_timestamp.assert_called_once_with( 'Today is January 1, 2047 at 8:21:00AM') mock_utils.enrich_revision.assert_called_once_with( mock_revision) @patch('swh.web.ui.views.api.service') @istest def revision_directory_by_ko_raise(self, mock_service): # given mock_service.lookup_directory_through_revision.side_effect = NotFoundExc('not') # noqa # when with self.assertRaises(NotFoundExc): api._revision_directory_by( {'sha1_git': 'id'}, None, '/api/1/revision/sha1/directory/') # then mock_service.lookup_directory_through_revision.assert_called_once_with( {'sha1_git': 'id'}, None, limit=100, with_data=False) @patch('swh.web.ui.views.api.service') @istest def revision_directory_by_type_dir(self, mock_service): # given mock_service.lookup_directory_through_revision.return_value = ( 'rev-id', { 'type': 'dir', 'revision': 'rev-id', 'path': 'some/path', 'content': [] }) # when actual_dir_content = api._revision_directory_by( {'sha1_git': 'blah-id'}, 'some/path', '/api/1/revision/sha1/directory/') # then self.assertEquals(actual_dir_content, { 'type': 'dir', 'revision': 'rev-id', 'path': 'some/path', 'content': [] }) mock_service.lookup_directory_through_revision.assert_called_once_with( {'sha1_git': 'blah-id'}, 'some/path', limit=100, with_data=False) @patch('swh.web.ui.views.api.service') @istest def revision_directory_by_type_file(self, mock_service): # given mock_service.lookup_directory_through_revision.return_value = ( 'rev-id', { 'type': 'file', 'revision': 'rev-id', 'path': 'some/path', 'content': {'blah': 'blah'} }) # when actual_dir_content = api._revision_directory_by( {'sha1_git': 'sha1'}, 'some/path', '/api/1/revision/origin/2/directory/', limit=1000, with_data=True) # then self.assertEquals(actual_dir_content, { 'type': 'file', 'revision': 'rev-id', 'path': 'some/path', 'content': {'blah': 'blah'} }) mock_service.lookup_directory_through_revision.assert_called_once_with( {'sha1_git': 'sha1'}, 'some/path', limit=1000, with_data=True) @patch('swh.web.ui.views.api.utils') @patch('swh.web.ui.views.api._revision_directory_by') @istest def api_directory_through_revision_origin_ko_not_found(self, mock_rev_dir, mock_utils): mock_rev_dir.side_effect = NotFoundExc('not found') mock_utils.parse_timestamp.return_value = '2012-10-20 00:00:00' rv = self.app.get('/api/1/revision' '/origin/10' '/branch/refs/remote/origin/dev' '/ts/2012-10-20' '/directory/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEqual(response_data, { 'error': 'not found'}) mock_rev_dir.assert_called_once_with( {'origin_id': 10, 'branch_name': 'refs/remote/origin/dev', 'ts': '2012-10-20 00:00:00'}, None, '/api/1/revision' '/origin/10' '/branch/refs/remote/origin/dev' '/ts/2012-10-20' '/directory/', with_data=False) @patch('swh.web.ui.views.api._revision_directory_by') @istest def api_directory_through_revision_origin(self, mock_revision_dir): expected_res = [{ 'id': '123' }] mock_revision_dir.return_value = expected_res rv = self.app.get('/api/1/revision/origin/3/directory/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEqual(response_data, expected_res) mock_revision_dir.assert_called_once_with({ 'origin_id': 3, 'branch_name': 'refs/heads/master', 'ts': None}, None, '/api/1/revision/origin/3/directory/', with_data=False) @patch('swh.web.ui.views.api.service') @istest def api_revision_log(self, mock_service): # given stub_revisions = [{ 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': ['7834ef7e7c357ce2af928115c6c6a42b7e2a4345'], 'type': 'tar', 'synthetic': True, }] mock_service.lookup_revision_log.return_value = stub_revisions expected_revisions = [{ 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'url': '/api/1/revision/18d8be353ed3480476f032475e7c233eff7371d5/', 'history_url': '/api/1/revision/18d8be353ed3480476f032475e7c233ef' 'f7371d5/log/', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'directory_url': '/api/1/directory/7834ef7e7c357ce2af928115c6c6a' '42b7e2a44e6/', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': [ '7834ef7e7c357ce2af928115c6c6a42b7e2a4345' ], 'parent_urls': [ '/api/1/revision/7834ef7e7c357ce2af928115c6c6a42b7e2a4345' '/prev/18d8be353ed3480476f032475e7c233eff7371d5/' ], 'type': 'tar', 'synthetic': True, }] expected_response = { 'revisions': expected_revisions, 'next_revs_url': None } # when rv = self.app.get('/api/1/revision/8834ef7e7c357ce2af928115c6c6a42' 'b7e2a44e6/log/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_response) mock_service.lookup_revision_log.assert_called_once_with( '8834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 26) @patch('swh.web.ui.views.api.service') @istest def api_revision_log_with_next(self, mock_service): # given stub_revisions = [] for i in range(27): stub_revisions.append({'id': i}) mock_service.lookup_revision_log.return_value = stub_revisions[:26] expected_revisions = [x for x in stub_revisions if x['id'] < 25] for e in expected_revisions: e['url'] = '/api/1/revision/%s/' % e['id'] e['history_url'] = '/api/1/revision/%s/log/' % e['id'] expected_response = { 'revisions': expected_revisions, 'next_revs_url': '/api/1/revision/25/log/' } # when rv = self.app.get('/api/1/revision/8834ef7e7c357ce2af928115c6c6a42' 'b7e2a44e6/log/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_response) mock_service.lookup_revision_log.assert_called_once_with( '8834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 26) @patch('swh.web.ui.views.api.service') @istest def api_revision_log_not_found(self, mock_service): # given mock_service.lookup_revision_log.return_value = None # when rv = self.app.get('/api/1/revision/8834ef7e7c357ce2af928115c6c6a42b7' 'e2a44e6/log/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Revision with sha1_git' ' 8834ef7e7c357ce2af928115c6c6a42b7e2a44e6 not found.'}) mock_service.lookup_revision_log.assert_called_once_with( '8834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 26) @patch('swh.web.ui.views.api.service') @istest def api_revision_log_context(self, mock_service): # given stub_revisions = [{ 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': ['7834ef7e7c357ce2af928115c6c6a42b7e2a4345'], 'type': 'tar', 'synthetic': True, }] mock_service.lookup_revision_log.return_value = stub_revisions mock_service.lookup_revision_multiple.return_value = [{ 'id': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'directory': '18d8be353ed3480476f032475e7c233eff7371d5', 'author_name': 'Name Surname', 'author_email': 'name@surname.com', 'committer_name': 'Name Surname', 'committer_email': 'name@surname.com', 'message': 'amazing revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': ['adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'], 'type': 'tar', 'synthetic': True, }] expected_revisions = [ { 'url': '/api/1/revision/' '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6/', 'history_url': '/api/1/revision/' '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6/log/', 'id': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'directory': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory_url': '/api/1/directory/' '18d8be353ed3480476f032475e7c233eff7371d5/', 'author_name': 'Name Surname', 'author_email': 'name@surname.com', 'committer_name': 'Name Surname', 'committer_email': 'name@surname.com', 'message': 'amazing revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': ['adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'], 'parent_urls': [ '/api/1/revision/adc83b19e793491b1c6ea0fd8b46cd9f32e592fc' '/prev/7834ef7e7c357ce2af928115c6c6a42b7e2a44e6/' ], 'type': 'tar', 'synthetic': True, }, { 'url': '/api/1/revision/' '18d8be353ed3480476f032475e7c233eff7371d5/', 'history_url': '/api/1/revision/' '18d8be353ed3480476f032475e7c233eff7371d5/log/', 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'directory_url': '/api/1/directory/' '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6/', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': ['7834ef7e7c357ce2af928115c6c6a42b7e2a4345'], 'parent_urls': [ '/api/1/revision/7834ef7e7c357ce2af928115c6c6a42b7e2a4345' '/prev/18d8be353ed3480476f032475e7c233eff7371d5/' ], 'type': 'tar', 'synthetic': True, }] expected_response = { 'revisions': expected_revisions, 'next_revs_url': None } # when rv = self.app.get('/api/1/revision/18d8be353ed3480476f0' '32475e7c233eff7371d5/prev/prev-rev/log/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_response) mock_service.lookup_revision_log.assert_called_once_with( '18d8be353ed3480476f032475e7c233eff7371d5', 26) mock_service.lookup_revision_multiple.assert_called_once_with( ['prev-rev']) @patch('swh.web.ui.views.api.service') @istest def api_revision_log_by(self, mock_service): # given stub_revisions = [{ 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': ['7834ef7e7c357ce2af928115c6c6a42b7e2a4345'], 'type': 'tar', 'synthetic': True, }] mock_service.lookup_revision_log_by.return_value = stub_revisions expected_revisions = [{ 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'url': '/api/1/revision/18d8be353ed3480476f032475e7c233eff7371d5/', 'history_url': '/api/1/revision/18d8be353ed3480476f032475e7c233ef' 'f7371d5/log/', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'directory_url': '/api/1/directory/7834ef7e7c357ce2af928115c6c6a' '42b7e2a44e6/', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': [ '7834ef7e7c357ce2af928115c6c6a42b7e2a4345' ], 'parent_urls': [ '/api/1/revision/7834ef7e7c357ce2af928115c6c6a42b7e2a4345' '/prev/18d8be353ed3480476f032475e7c233eff7371d5/' ], 'type': 'tar', 'synthetic': True, }] expected_result = { 'revisions': expected_revisions, 'next_revs_url': None } # when rv = self.app.get('/api/1/revision/origin/1/log/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_result) mock_service.lookup_revision_log_by.assert_called_once_with( 1, 'refs/heads/master', None, 26) @patch('swh.web.ui.views.api.service') @istest def api_revision_log_by_with_next(self, mock_service): # given stub_revisions = [] for i in range(27): stub_revisions.append({'id': i}) mock_service.lookup_revision_log_by.return_value = stub_revisions[:26] expected_revisions = [x for x in stub_revisions if x['id'] < 25] for e in expected_revisions: e['url'] = '/api/1/revision/%s/' % e['id'] e['history_url'] = '/api/1/revision/%s/log/' % e['id'] expected_response = { 'revisions': expected_revisions, 'next_revs_url': '/api/1/revision/25/log/' } # when rv = self.app.get('/api/1/revision/origin/1/log/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_response) mock_service.lookup_revision_log_by.assert_called_once_with( 1, 'refs/heads/master', None, 26) @patch('swh.web.ui.views.api.service') @istest def api_revision_log_by_norev(self, mock_service): # given mock_service.lookup_revision_log_by.side_effect = NotFoundExc( 'No revision') # when rv = self.app.get('/api/1/revision/origin/1/log/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, {'error': 'No revision'}) mock_service.lookup_revision_log_by.assert_called_once_with( 1, 'refs/heads/master', None, 26) @patch('swh.web.ui.views.api.service') @istest def api_revision_history(self, mock_service): # for readability purposes, we use: # - sha1 as 3 letters (url are way too long otherwise to respect pep8) # - only keys with modification steps (all other keys are kept as is) # given stub_revision = { 'id': '883', 'children': ['777', '999'], 'parents': [], 'directory': '272' } mock_service.lookup_revision.return_value = stub_revision # then rv = self.app.get('/api/1/revision/883/prev/999/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'id': '883', 'url': '/api/1/revision/883/', 'history_url': '/api/1/revision/883/log/', 'history_context_url': '/api/1/revision/883/prev/999/log/', 'children': ['777', '999'], 'children_urls': ['/api/1/revision/777/', '/api/1/revision/999/'], 'parents': [], 'parent_urls': [], 'directory': '272', 'directory_url': '/api/1/directory/272/' }) mock_service.lookup_revision.assert_called_once_with('883') @patch('swh.web.ui.views.api._revision_directory_by') @istest def api_revision_directory_ko_not_found(self, mock_rev_dir): # given mock_rev_dir.side_effect = NotFoundExc('Not found') # then rv = self.app.get('/api/1/revision/999/directory/some/path/to/dir/') self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Not found'}) mock_rev_dir.assert_called_once_with( {'sha1_git': '999'}, 'some/path/to/dir', '/api/1/revision/999/directory/some/path/to/dir/', with_data=False) @patch('swh.web.ui.views.api._revision_directory_by') @istest def api_revision_directory_ok_returns_dir_entries(self, mock_rev_dir): stub_dir = { 'type': 'dir', 'revision': '999', 'content': [ { 'sha1_git': '789', 'type': 'file', 'target': '101', 'target_url': '/api/1/content/sha1_git:101/', 'name': 'somefile', 'file_url': '/api/1/revision/999/directory/some/path/' 'somefile/' }, { 'sha1_git': '123', 'type': 'dir', 'target': '456', 'target_url': '/api/1/directory/456/', 'name': 'to-subdir', 'dir_url': '/api/1/revision/999/directory/some/path/' 'to-subdir/', }] } # given mock_rev_dir.return_value = stub_dir # then rv = self.app.get('/api/1/revision/999/directory/some/path/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, stub_dir) mock_rev_dir.assert_called_once_with( {'sha1_git': '999'}, 'some/path', '/api/1/revision/999/directory/some/path/', with_data=False) @patch('swh.web.ui.views.api._revision_directory_by') @istest def api_revision_directory_ok_returns_content(self, mock_rev_dir): stub_content = { 'type': 'file', 'revision': '999', 'content': { 'sha1_git': '789', 'sha1': '101', 'data_url': '/api/1/content/101/raw/', } } # given mock_rev_dir.return_value = stub_content # then url = '/api/1/revision/666/directory/some/other/path/' rv = self.app.get(url) self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, stub_content) mock_rev_dir.assert_called_once_with( {'sha1_git': '666'}, 'some/other/path', url, with_data=False) @patch('swh.web.ui.views.api.service') @istest def api_person(self, mock_service): # given stub_person = { 'id': '198003', 'name': 'Software Heritage', 'email': 'robot@softwareheritage.org', } mock_service.lookup_person.return_value = stub_person # when rv = self.app.get('/api/1/person/198003/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, stub_person) @patch('swh.web.ui.views.api.service') @istest def api_person_not_found(self, mock_service): # given mock_service.lookup_person.return_value = None # when rv = self.app.get('/api/1/person/666/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Person with id 666 not found.'}) @patch('swh.web.ui.views.api.service') @istest def api_directory(self, mock_service): # given stub_directories = [ { 'sha1_git': '18d8be353ed3480476f032475e7c233eff7371d5', 'type': 'file', 'target': '4568be353ed3480476f032475e7c233eff737123', }, { 'sha1_git': '1d518d8be353ed3480476f032475e7c233eff737', 'type': 'dir', 'target': '8be353ed3480476f032475e7c233eff737123456', }] expected_directories = [ { 'sha1_git': '18d8be353ed3480476f032475e7c233eff7371d5', 'type': 'file', 'target': '4568be353ed3480476f032475e7c233eff737123', 'target_url': '/api/1/content/' 'sha1_git:4568be353ed3480476f032475e7c233eff737123/', }, { 'sha1_git': '1d518d8be353ed3480476f032475e7c233eff737', 'type': 'dir', 'target': '8be353ed3480476f032475e7c233eff737123456', 'target_url': '/api/1/directory/8be353ed3480476f032475e7c233eff737123456/', }] mock_service.lookup_directory.return_value = stub_directories # when rv = self.app.get('/api/1/directory/' '18d8be353ed3480476f032475e7c233eff7371d5/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_directories) mock_service.lookup_directory.assert_called_once_with( '18d8be353ed3480476f032475e7c233eff7371d5') @patch('swh.web.ui.views.api.service') @istest def api_directory_not_found(self, mock_service): # given mock_service.lookup_directory.return_value = [] # when rv = self.app.get('/api/1/directory/' '66618d8be353ed3480476f032475e7c233eff737/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Directory with sha1_git ' '66618d8be353ed3480476f032475e7c233eff737 not found.'}) @patch('swh.web.ui.views.api.service') @istest def api_directory_with_path_found(self, mock_service): # given expected_dir = { 'sha1_git': '18d8be353ed3480476f032475e7c233eff7371d5', 'type': 'file', 'name': 'bla', 'target': '4568be353ed3480476f032475e7c233eff737123', 'target_url': '/api/1/content/' 'sha1_git:4568be353ed3480476f032475e7c233eff737123/', } mock_service.lookup_directory_with_path.return_value = expected_dir # when rv = self.app.get('/api/1/directory/' '18d8be353ed3480476f032475e7c233eff7371d5/bla/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_dir) mock_service.lookup_directory_with_path.assert_called_once_with( '18d8be353ed3480476f032475e7c233eff7371d5', 'bla') @patch('swh.web.ui.views.api.service') @istest def api_directory_with_path_not_found(self, mock_service): # given mock_service.lookup_directory_with_path.return_value = None path = 'some/path/to/dir/' # when rv = self.app.get(('/api/1/directory/' '66618d8be353ed3480476f032475e7c233eff737/%s') % path) path = path.strip('/') # Path stripped of lead/trail separators # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': (('Entry with path %s relative to ' 'directory with sha1_git ' '66618d8be353ed3480476f032475e7c233eff737 not found.') % path)}) @patch('swh.web.ui.views.api.service') @istest def api_lookup_entity_by_uuid_not_found(self, mock_service): # when mock_service.lookup_entity_by_uuid.return_value = [] # when rv = self.app.get('/api/1/entity/' '5f4d4c51-498a-4e28-88b3-b3e4e8396cba/') self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': "Entity with uuid '5f4d4c51-498a-4e28-88b3-b3e4e8396cba' not " + "found."}) mock_service.lookup_entity_by_uuid.assert_called_once_with( '5f4d4c51-498a-4e28-88b3-b3e4e8396cba') @patch('swh.web.ui.views.api.service') @istest def api_lookup_entity_by_uuid_bad_request(self, mock_service): # when mock_service.lookup_entity_by_uuid.side_effect = BadInputExc( 'bad input: uuid malformed!') # when rv = self.app.get('/api/1/entity/uuid malformed/') self.assertEquals(rv.status_code, 400) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'bad input: uuid malformed!'}) mock_service.lookup_entity_by_uuid.assert_called_once_with( 'uuid malformed') @patch('swh.web.ui.views.api.service') @istest def api_lookup_entity_by_uuid(self, mock_service): # when stub_entities = [ { 'uuid': '34bd6b1b-463f-43e5-a697-785107f598e4', 'parent': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2' }, { 'uuid': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2' } ] mock_service.lookup_entity_by_uuid.return_value = stub_entities expected_entities = [ { 'uuid': '34bd6b1b-463f-43e5-a697-785107f598e4', 'uuid_url': '/api/1/entity/34bd6b1b-463f-43e5-a697-' '785107f598e4/', 'parent': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2', 'parent_url': '/api/1/entity/aee991a0-f8d7-4295-a201-' 'd1ce2efc9fb2/' }, { 'uuid': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2', 'uuid_url': '/api/1/entity/aee991a0-f8d7-4295-a201-' 'd1ce2efc9fb2/' } ] # when rv = self.app.get('/api/1/entity' '/34bd6b1b-463f-43e5-a697-785107f598e4/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_entities) mock_service.lookup_entity_by_uuid.assert_called_once_with( '34bd6b1b-463f-43e5-a697-785107f598e4') class ApiUtils(unittest.TestCase): @istest def api_lookup_not_found(self): # when with self.assertRaises(exc.NotFoundExc) as e: api._api_lookup('something', lambda x: None, 'this is the error message raised as it is None') self.assertEqual(e.exception.args[0], 'this is the error message raised as it is None') @istest def api_lookup_with_result(self): # when actual_result = api._api_lookup('something', lambda x: x + '!', 'this is the error which won\'t be ' 'used here') self.assertEqual(actual_result, 'something!') @istest def api_lookup_with_result_as_map(self): # when actual_result = api._api_lookup([1, 2, 3], lambda x: map(lambda y: y+1, x), 'this is the error which won\'t be ' 'used here') self.assertEqual(actual_result, [2, 3, 4]) diff --git a/swh/web/ui/views/api.py b/swh/web/ui/views/api.py index 6c2d0dc98..d3e8befaa 100644 --- a/swh/web/ui/views/api.py +++ b/swh/web/ui/views/api.py @@ -1,770 +1,780 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from types import GeneratorType from flask import request, url_for from swh.web.ui import service, utils, apidoc as doc from swh.web.ui.exc import NotFoundExc from swh.web.ui.main import app @app.route('/api/1/stat/counters/') @doc.route('/api/1/stat/counters/', noargs=True) @doc.returns(rettype=doc.rettypes.dict, retdoc="A dictionary of SWH's most important statistics") def api_stats(): """Return statistics on SWH storage. """ return service.stat_counters() -@app.route('/api/1/stat/visits//') -@doc.route('/api/1/stat/visits/') +@app.route('/api/1/origin//visits/') +@app.route('/api/1/origin//visits/') +@doc.route('/api/1/origin/visits/') @doc.arg('origin_id', default=1, argtype=doc.argtypes.int, argdoc='The requested SWH origin identifier') +@doc.arg('visit_id', + default=None, + argtype=doc.argtypes.int, + argdoc='The requested SWH origin visit identifier') @doc.returns(rettype=doc.rettypes.list, retdoc="""All instances of visits of the origin pointed by - origin_id as POSIX time since epoch""") -def api_origin_visits(origin_id): + origin_id as POSIX time since epoch (if visit_id is not defined). + Only the instance of visit pointed by origin_id and visit_id.""") +def api_origin_visits(origin_id, visit_id=None): """Return a list of visit dates as POSIX timestamps for the given revision. """ - date_gen = (item['date'] for item in service.stat_origin_visits(origin_id)) - return sorted(date_gen) + if not visit_id: + return _api_lookup( + origin_id, + service.lookup_origin_visits, + error_msg_if_not_found='No origin %s found' % origin_id) + # TODO @app.route('/api/1/content/search/', methods=['POST']) @app.route('/api/1/content/search//') @doc.route('/api/1/content/search/') @doc.arg('q', default='sha1:adc83b19e793491b1c6ea0fd8b46cd9f32e592fc', argtype=doc.argtypes.algo_and_hash, argdoc="""An algo_hash:hash string, where algo_hash is one of sha1, sha1_git or sha256 and hash is the hash to search for in SWH""") @doc.raises(exc=doc.excs.badinput, doc='Raised if q is not well formed') @doc.returns(rettype=doc.rettypes.dict, retdoc="""A dict with keys: - search_res: a list of dicts corresponding to queried content with key 'found' to True if found, 'False' if not - search_stats: a dict containing number of files searched and percentage of files found """) def api_search(q=None): """Search a content per hash. This may take the form of: - a GET request with a single checksum - a POST request with many hashes, with the request body containing identifiers (typically filenames) as keys and corresponding hashes as values. """ response = {'search_res': None, 'search_stats': None} search_stats = {'nbfiles': 0, 'pct': 0} search_res = None # Single hash request route if q: r = service.search_hash(q) search_res = [{'filename': None, 'sha1': q, 'found': r['found']}] search_stats['nbfiles'] = 1 search_stats['pct'] = 100 if r['found'] else 0 # Post form submission with many hash requests elif request.method == 'POST': data = request.form queries = [] # Remove potential inputs with no associated value for k, v in data.items(): if v is not None: if k == 'q' and len(v) > 0: queries.append({'filename': None, 'sha1': v}) elif v != '': queries.append({'filename': k, 'sha1': v}) if len(queries) > 0: lookup = service.lookup_multiple_hashes(queries) result = [] for el in lookup: result.append({'filename': el['filename'], 'sha1': el['sha1'], 'found': el['found']}) search_res = result nbfound = len([x for x in lookup if x['found']]) search_stats['nbfiles'] = len(queries) search_stats['pct'] = (nbfound / len(queries))*100 response['search_res'] = search_res response['search_stats'] = search_stats return response def _api_lookup(criteria, lookup_fn, error_msg_if_not_found, enrich_fn=lambda x: x, *args): """Capture a redundant behavior of: - looking up the backend with a criteria (be it an identifier or checksum) passed to the function lookup_fn - if nothing is found, raise an NotFoundExc exception with error message error_msg_if_not_found. - Otherwise if something is returned: - either as list, map or generator, map the enrich_fn function to it and return the resulting data structure as list. - either as dict and pass to enrich_fn and return the dict enriched. Args: - criteria: discriminating criteria to lookup - lookup_fn: function expects one criteria and optional supplementary *args. - error_msg_if_not_found: if nothing matching the criteria is found, raise NotFoundExc with this error message. - enrich_fn: Function to use to enrich the result returned by lookup_fn. Default to the identity function if not provided. - *args: supplementary arguments to pass to lookup_fn. Raises: NotFoundExp or whatever `lookup_fn` raises. """ res = lookup_fn(criteria, *args) if not res: raise NotFoundExc(error_msg_if_not_found) if isinstance(res, (map, list, GeneratorType)): return [enrich_fn(x) for x in res] return enrich_fn(res) @app.route('/api/1/origin//') @app.route('/api/1/origin//url//') @doc.route('/api/1/origin/') @doc.arg('origin_id', default=1, argtype=doc.argtypes.int, argdoc="The origin's SWH origin_id.") @doc.arg('origin_type', default='git', argtype=doc.argtypes.str, argdoc="The origin's type (git, svn..)") @doc.arg('origin_url', default='https://github.com/hylang/hy', argtype=doc.argtypes.path, argdoc="The origin's URL.") @doc.raises(exc=doc.excs.notfound, doc='Raised if origin_id does not correspond to an origin in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc='The metadata of the origin identified by origin_id') def api_origin(origin_id=None, origin_type=None, origin_url=None): """Return information about the origin matching the passed criteria. Criteria may be: - An SWH-specific ID, if you already know it - An origin type and its URL, if you do not have the origin's SWH identifier """ ori_dict = { 'id': origin_id, 'type': origin_type, 'url': origin_url } ori_dict = {k: v for k, v in ori_dict.items() if ori_dict[k]} if 'id' in ori_dict: error_msg = 'Origin with id %s not found.' % ori_dict['id'] else: error_msg = 'Origin with type %s and URL %s not found' % ( ori_dict['type'], ori_dict['url']) return _api_lookup( ori_dict, lookup_fn=service.lookup_origin, error_msg_if_not_found=error_msg) @app.route('/api/1/person//') @doc.route('/api/1/person/') @doc.arg('person_id', default=1, argtype=doc.argtypes.int, argdoc="The person's SWH identifier") @doc.raises(exc=doc.excs.notfound, doc='Raised if person_id does not correspond to an origin in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc='The metadata of the person identified by person_id') def api_person(person_id): """Return information about person with identifier person_id. """ return _api_lookup( person_id, lookup_fn=service.lookup_person, error_msg_if_not_found='Person with id %s not found.' % person_id) @app.route('/api/1/release//') @doc.route('/api/1/release/') @doc.arg('sha1_git', default='8b137891791fe96927ad78e64b0aad7bded08bdc', argtype=doc.argtypes.sha1_git, argdoc="The release's sha1_git identifier") @doc.raises(exc=doc.excs.badinput, doc='Raised if the argument is not a sha1') @doc.raises(exc=doc.excs.notfound, doc='Raised if sha1_git does not correspond to a release in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc='The metadata of the release identified by sha1_git') def api_release(sha1_git): """Return information about release with id sha1_git. """ error_msg = 'Release with sha1_git %s not found.' % sha1_git return _api_lookup( sha1_git, lookup_fn=service.lookup_release, error_msg_if_not_found=error_msg, enrich_fn=utils.enrich_release) def _revision_directory_by(revision, path, request_path, limit=100, with_data=False): """Compute the revision matching criterion's directory or content data. Args: revision: dictionary of criterions representing a revision to lookup path: directory's path to lookup request_path: request path which holds the original context to limit: optional query parameter to limit the revisions log (default to 100). For now, note that this limit could impede the transitivity conclusion about sha1_git not being an ancestor of with_data: indicate to retrieve the content's raw data if path resolves to a content. """ def enrich_directory_local(dir, context_url=request_path): return utils.enrich_directory(dir, context_url) rev_id, result = service.lookup_directory_through_revision( revision, path, limit=limit, with_data=with_data) content = result['content'] if result['type'] == 'dir': # dir_entries result['content'] = list(map(enrich_directory_local, content)) else: # content result['content'] = utils.enrich_content(content) return result @app.route('/api/1/revision' '/origin/' '/directory/') @app.route('/api/1/revision' '/origin/' '/directory//') @app.route('/api/1/revision' '/origin/' '/branch/' '/directory/') @app.route('/api/1/revision' '/origin/' '/branch/' '/directory//') @app.route('/api/1/revision' '/origin/' '/branch/' '/ts/' '/directory/') @app.route('/api/1/revision' '/origin/' '/branch/' '/ts/' '/directory//') @doc.route('/api/1/revision/origin/directory/') @doc.arg('origin_id', default=1, argtype=doc.argtypes.int, argdoc="The revision's origin's SWH identifier") @doc.arg('branch_name', default='refs/heads/master', argtype=doc.argtypes.path, argdoc="""The optional branch for the given origin (default to master""") @doc.arg('ts', default='2000-01-17T11:23:54+00:00', argtype=doc.argtypes.ts, argdoc="""Optional timestamp (default to the nearest time crawl of timestamp)""") @doc.arg('path', default='.', argtype=doc.argtypes.path, argdoc='The path to the directory or file to display') @doc.raises(exc=doc.excs.notfound, doc="""Raised if a revision matching the passed criteria was not found""") @doc.returns(rettype=doc.rettypes.dict, retdoc="""The metadata of the revision corresponding to the passed criteria""") def api_directory_through_revision_origin(origin_id, branch_name="refs/heads/master", ts=None, path=None, with_data=False): """Display directory or content information through a revision identified by origin/branch/timestamp. """ if ts: ts = utils.parse_timestamp(ts) return _revision_directory_by( { 'origin_id': origin_id, 'branch_name': branch_name, 'ts': ts }, path, request.path, with_data=with_data) @app.route('/api/1/revision' '/origin//') @app.route('/api/1/revision' '/origin/' '/branch//') @app.route('/api/1/revision' '/origin/' '/branch/' '/ts//') @app.route('/api/1/revision' '/origin/' '/ts//') @doc.route('/api/1/revision/origin/') @doc.arg('origin_id', default=1, argtype=doc.argtypes.int, argdoc="The queried revision's origin identifier in SWH") @doc.arg('branch_name', default='refs/heads/master', argtype=doc.argtypes.path, argdoc="""The optional branch for the given origin (default to master)""") @doc.arg('ts', default='2000-01-17T11:23:54+00:00', argtype=doc.argtypes.ts, argdoc="The time at which the queried revision should be constrained") @doc.raises(exc=doc.excs.notfound, doc="""Raised if a revision matching given criteria was not found in SWH""") @doc.returns(rettype=doc.rettypes.dict, retdoc="""The metadata of the revision identified by the given criteria""") def api_revision_with_origin(origin_id, branch_name="refs/heads/master", ts=None): """Display revision information through its identification by origin/branch/timestamp. """ if ts: ts = utils.parse_timestamp(ts) return _api_lookup( origin_id, service.lookup_revision_by, 'Revision with (origin_id: %s, branch_name: %s' ', ts: %s) not found.' % (origin_id, branch_name, ts), utils.enrich_revision, branch_name, ts) @app.route('/api/1/revision//') @app.route('/api/1/revision//prev//') @doc.route('/api/1/revision/') @doc.arg('sha1_git', default='ec72c666fb345ea5f21359b7bc063710ce558e39', argtype=doc.argtypes.sha1_git, argdoc="The revision's sha1_git identifier") @doc.arg('context', default='6adc4a22f20bbf3bbc754f1ec8c82be5dfb5c71a', argtype=doc.argtypes.path, argdoc='The navigation breadcrumbs -- use at your own risk') @doc.raises(exc=doc.excs.badinput, doc='Raised if sha1_git is not well formed') @doc.raises(exc=doc.excs.notfound, doc='Raised if a revision matching sha1_git was not found in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc='The metadata of the revision identified by sha1_git') def api_revision(sha1_git, context=None): """Return information about revision with id sha1_git. """ def _enrich_revision(revision, context=context): return utils.enrich_revision(revision, context) return _api_lookup( sha1_git, service.lookup_revision, 'Revision with sha1_git %s not found.' % sha1_git, _enrich_revision) @app.route('/api/1/revision//raw/') @doc.route('/api/1/revision/raw/') @doc.arg('sha1_git', default='ec72c666fb345ea5f21359b7bc063710ce558e39', argtype=doc.argtypes.sha1_git, argdoc="The queried revision's sha1_git identifier") @doc.raises(exc=doc.excs.badinput, doc='Raised if sha1_git is not well formed') @doc.raises(exc=doc.excs.notfound, doc='Raised if a revision matching sha1_git was not found in SWH') @doc.returns(rettype=doc.rettypes.octet_stream, retdoc="""The message of the revision identified by sha1_git as a downloadable octet stream""") def api_revision_raw_message(sha1_git): """Return the raw data of the message of revision identified by sha1_git """ raw = service.lookup_revision_message(sha1_git) return app.response_class(raw['message'], headers={'Content-disposition': 'attachment;' 'filename=rev_%s_raw' % sha1_git}, mimetype='application/octet-stream') @app.route('/api/1/revision//directory/') @app.route('/api/1/revision//directory//') @doc.route('/api/1/revision/directory/') @doc.arg('sha1_git', default='ec72c666fb345ea5f21359b7bc063710ce558e39', argtype=doc.argtypes.sha1_git, argdoc="The revision's sha1_git identifier.") @doc.arg('dir_path', default='.', argtype=doc.argtypes.path, argdoc='The path from the top level directory') @doc.raises(exc=doc.excs.badinput, doc='Raised if sha1_git is not well formed') @doc.raises(exc=doc.excs.notfound, doc="""Raised if a revision matching sha1_git was not found in SWH , or if the path specified does not exist""") @doc.returns(rettype=doc.rettypes.dict, retdoc="""The metadata of the directory pointed by revision id sha1-git and dir_path""") def api_revision_directory(sha1_git, dir_path=None, with_data=False): """Return information on directory pointed by revision with sha1_git. If dir_path is not provided, display top level directory. Otherwise, display the directory pointed by dir_path (if it exists). """ return _revision_directory_by( { 'sha1_git': sha1_git }, dir_path, request.path, with_data=with_data) @app.route('/api/1/revision//log/') @app.route('/api/1/revision//prev//log/') @doc.route('/api/1/revision/log/') @doc.arg('sha1_git', default='ec72c666fb345ea5f21359b7bc063710ce558e39', argtype=doc.argtypes.sha1_git, argdoc='The sha1_git of the revision queried') @doc.arg('prev_sha1s', default='6adc4a22f20bbf3bbc754f1ec8c82be5dfb5c71a', argtype=doc.argtypes.path, argdoc='The navigation breadcrumbs -- use at your own risk!') @doc.raises(exc=doc.excs.badinput, doc='Raised if sha1_git or prev_sha1s is not well formed') @doc.raises(exc=doc.excs.notfound, doc='Raised if a revision matching sha1_git was not found in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc="""The log data starting at the revision identified by sha1_git, completed with the navigation breadcrumbs, if any""") def api_revision_log(sha1_git, prev_sha1s=None): """Show all revisions (~git log) starting from sha1_git. The first element returned is the given sha1_git, or the first breadcrumb, if any. """ limit = app.config['conf']['max_log_revs'] response = {'revisions': None, 'next_revs_url': None} revisions = None next_revs_url = None def lookup_revision_log_with_limit(s, limit=limit+1): return service.lookup_revision_log(s, limit) error_msg = 'Revision with sha1_git %s not found.' % sha1_git rev_get = _api_lookup(sha1_git, lookup_fn=lookup_revision_log_with_limit, error_msg_if_not_found=error_msg, enrich_fn=utils.enrich_revision) if len(rev_get) == limit+1: rev_backward = rev_get[:-1] next_revs_url = url_for('api_revision_log', sha1_git=rev_get[-1]['id']) else: rev_backward = rev_get if not prev_sha1s: # no nav breadcrumbs, so we're done revisions = rev_backward else: rev_forward_ids = prev_sha1s.split('/') rev_forward = _api_lookup(rev_forward_ids, lookup_fn=service.lookup_revision_multiple, error_msg_if_not_found=error_msg, enrich_fn=utils.enrich_revision) revisions = rev_forward + rev_backward response['revisions'] = revisions response['next_revs_url'] = next_revs_url return response @app.route('/api/1/revision' '/origin//log/') @app.route('/api/1/revision' '/origin/' '/branch//log/') @app.route('/api/1/revision' '/origin/' '/branch/' '/ts//log/') @app.route('/api/1/revision' '/origin/' '/ts//log/') @doc.route('/api/1/revision/origin/log/') @doc.arg('origin_id', default=1, argtype=doc.argtypes.int, argdoc="The revision's SWH origin identifier") @doc.arg('branch_name', default='refs/heads/master', argtype=doc.argtypes.path, argdoc="The revision's branch name within the origin specified") @doc.arg('ts', default='2000-01-17T11:23:54+00:00', argtype=doc.argtypes.ts, argdoc="""A time or timestamp string to parse""") @doc.raises(exc=doc.excs.notfound, doc="""Raised if a revision matching the given criteria was not found in SWH""") @doc.returns(rettype=doc.rettypes.dict, retdoc="""The metadata of the revision log starting at the revision matching the given criteria.""") def api_revision_log_by(origin_id, branch_name='refs/heads/master', ts=None): """Show all revisions (~git log) starting from the revision described by its origin_id, optional branch name and timestamp. The first element returned is the described revision. """ limit = app.config['conf']['max_log_revs'] response = {'revisions': None, 'next_revs_url': None} next_revs_url = None if ts: ts = utils.parse_timestamp(ts) def lookup_revision_log_by_with_limit(o_id, br, ts, limit=limit+1): return service.lookup_revision_log_by(o_id, br, ts, limit) error_msg = 'No revision matching origin %s ' % origin_id error_msg += ', branch name %s' % branch_name error_msg += (' and time stamp %s.' % ts) if ts else '.' rev_get = _api_lookup(origin_id, lookup_revision_log_by_with_limit, error_msg, utils.enrich_revision, branch_name, ts) if len(rev_get) == limit+1: revisions = rev_get[:-1] next_revs_url = url_for('api_revision_log', sha1_git=rev_get[-1]['id']) else: revisions = rev_get response['revisions'] = revisions response['next_revs_url'] = next_revs_url return response @app.route('/api/1/directory//') @app.route('/api/1/directory///') @doc.route('/api/1/directory/') @doc.arg('sha1_git', default='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc', argtype=doc.argtypes.sha1_git, argdoc="The queried directory's corresponding sha1_git hash") @doc.arg('path', default='.', argtype=doc.argtypes.path, argdoc="A path relative to the queried directory's top level") @doc.raises(exc=doc.excs.badinput, doc='Raised if sha1_git is not well formed') @doc.raises(exc=doc.excs.notfound, doc='Raised if a directory matching sha1_git was not found in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc="""The metadata and contents of the release identified by sha1_git""") def api_directory(sha1_git, path=None): """Return information about release with id sha1_git. """ if path: error_msg_path = ('Entry with path %s relative to directory ' 'with sha1_git %s not found.') % (path, sha1_git) return _api_lookup( sha1_git, service.lookup_directory_with_path, error_msg_path, utils.enrich_directory, path) else: error_msg_nopath = 'Directory with sha1_git %s not found.' % sha1_git return _api_lookup( sha1_git, service.lookup_directory, error_msg_nopath, utils.enrich_directory) @app.route('/api/1/provenance//') @doc.route('/api/1/provenance/') @doc.arg('q', default='sha1_git:88b9b366facda0b5ff8d8640ee9279bed346f242', argtype=doc.argtypes.algo_and_hash, argdoc="""The queried content's corresponding hash (supported hash algorithms: sha1_git, sha1, sha256)""") @doc.raises(exc=doc.excs.badinput, doc="""Raised if hash algorithm is incorrect or if the hash value is badly formatted.""") @doc.raises(exc=doc.excs.notfound, doc="""Raised if a content matching the hash was not found in SWH""") @doc.returns(rettype=doc.rettypes.dict, retdoc="""List of provenance information (dict) for the matched content.""") def api_content_provenance(q): """Return content's provenance information if any. """ def _enrich_revision(provenance): p = provenance.copy() p['revision_url'] = url_for('api_revision', sha1_git=provenance['revision']) p['content_url'] = url_for('api_content_metadata', q='sha1_git:%s' % provenance['content']) p['origin_url'] = url_for('api_origin', origin_id=provenance['origin']) return p return _api_lookup( q, lookup_fn=service.lookup_content_provenance, error_msg_if_not_found='Content with %s not found.' % q, enrich_fn=_enrich_revision) @app.route('/api/1/content//raw/') @doc.route('/api/1/content/raw/') @doc.arg('q', default='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc', argtype=doc.argtypes.algo_and_hash, argdoc="""An algo_hash:hash string, where algo_hash is one of sha1, sha1_git or sha256 and hash is the hash to search for in SWH. Defaults to sha1 in the case of a missing algo_hash """) @doc.raises(exc=doc.excs.badinput, doc='Raised if q is not well formed') @doc.raises(exc=doc.excs.notfound, doc='Raised if a content matching q was not found in SWH') @doc.returns(rettype=doc.rettypes.octet_stream, retdoc='The raw content data as an octet stream') def api_content_raw(q): """Return content's raw data if content is found. """ def generate(content): yield content['data'] content = service.lookup_content_raw(q) if not content: raise NotFoundExc('Content with %s not found.' % q) return app.response_class(generate(content), headers={'Content-disposition': 'attachment;' 'filename=content_%s_raw' % q}, mimetype='application/octet-stream') @app.route('/api/1/content//') @doc.route('/api/1/content/') @doc.arg('q', default='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc', argtype=doc.argtypes.algo_and_hash, argdoc="""An algo_hash:hash string, where algo_hash is one of sha1, sha1_git or sha256 and hash is the hash to search for in SWH. Defaults to sha1 in the case of a missing algo_hash """) @doc.raises(exc=doc.excs.badinput, doc='Raised if q is not well formed') @doc.raises(exc=doc.excs.notfound, doc='Raised if a content matching q was not found in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc="""The metadata of the content identified by q. If content decoding was successful, it also returns the data""") def api_content_metadata(q): """Return content information if content is found. """ return _api_lookup( q, lookup_fn=service.lookup_content, error_msg_if_not_found='Content with %s not found.' % q, enrich_fn=utils.enrich_content) @app.route('/api/1/entity//') @doc.route('/api/1/entity/') @doc.arg('uuid', default='5f4d4c51-498a-4e28-88b3-b3e4e8396cba', argtype=doc.argtypes.uuid, argdoc="The entity's uuid identifier") @doc.raises(exc=doc.excs.badinput, doc='Raised if uuid is not well formed') @doc.raises(exc=doc.excs.notfound, doc='Raised if an entity matching uuid was not found in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc='The metadata of the entity identified by uuid') def api_entity_by_uuid(uuid): """Return content information if content is found. """ return _api_lookup( uuid, lookup_fn=service.lookup_entity_by_uuid, error_msg_if_not_found="Entity with uuid '%s' not found." % uuid, enrich_fn=utils.enrich_entity)