diff --git a/swh/web/ui/backend.py b/swh/web/ui/backend.py index 819d9d15..14a914f1 100644 --- a/swh/web/ui/backend.py +++ b/swh/web/ui/backend.py @@ -1,253 +1,242 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import os from swh.web.ui import main def content_get(sha1_bin): """Lookup the content designed by {algo: hash_bin}. Args: sha1_bin: content's binary sha1. Returns: Content as dict with 'sha1' and 'data' keys. data representing its raw data. """ contents = main.storage().content_get([sha1_bin]) if contents and len(contents) >= 1: return contents[0] return None def content_find(algo, hash_bin): """Retrieve the content with binary hash hash_bin Args: algo: nature of the hash hash_bin. hash_bin: content's hash searched for. Returns: A triplet (sha1, sha1_git, sha256) if the content exist or None otherwise. """ return main.storage().content_find({algo: hash_bin}) def content_find_occurrence(algo, hash_bin): """Find the content's occurrence. Args: algo: nature of the hash hash_bin. hash_bin: content's hash searched for. Returns: The occurrence of the content. """ return main.storage().content_find_occurrence({algo: hash_bin}) def content_missing_per_sha1(sha1list): """List content missing from storage based on sha1 Args: sha1s: Iterable of sha1 to check for absence Returns: an iterable of sha1s missing from the storage """ return main.storage().content_missing_per_sha1(sha1list) def directory_get(sha1_bin): """Retrieve information on one directory. Args: sha1_bin: Directory's identifier Returns: The directory's information. """ res = main.storage().directory_get([sha1_bin]) if res and len(res) >= 1: return res[0] def origin_get(origin_id): """Return information about the origin with id origin_id. Args: origin_id: origin's identifier Returns: Origin information as dict. """ return main.storage().origin_get({'id': origin_id}) def person_get(person_id): """Return information about the person with id person_id. Args: person_id: person's identifier.v Returns: Person information as dict. """ res = main.storage().person_get([person_id]) if res and len(res) >= 1: return res[0] def directory_ls(sha1_git_bin, recursive=False): """Return information about the directory with id sha1_git. Args: sha1_git: directory's identifier. recursive: Optional recursive flag default to False Returns: Directory information as dict. """ directory_entries = main.storage().directory_ls(sha1_git_bin, recursive) if not directory_entries: return [] return directory_entries def release_get(sha1_git_bin): """Return information about the release with sha1 sha1_git_bin. Args: sha1_git_bin: The release's sha1 as bytes. Returns: Release information as dict if found, None otherwise. Raises: ValueError if the identifier provided is not of sha1 nature. """ res = main.storage().release_get([sha1_git_bin]) if res and len(res) >= 1: return res[0] return None def revision_get(sha1_git_bin): """Return information about the revision with sha1 sha1_git_bin. Args: sha1_git_bin: The revision's sha1 as bytes. Returns: Revision information as dict if found, None otherwise. Raises: ValueError if the identifier provided is not of sha1 nature. """ res = main.storage().revision_get([sha1_git_bin]) if res and len(res) >= 1: return res[0] return None def revision_log(sha1_git_bin, limit=100): """Return information about the revision with sha1 sha1_git_bin. Args: sha1_git_bin: The revision's sha1 as bytes. limit: the maximum number of revisions returned. Returns: Revision information as dict if found, None otherwise. Raises: ValueError if the identifier provided is not of sha1 nature. """ return main.storage().revision_log([sha1_git_bin], limit) def revision_log_by(origin_id, branch_name, ts, limit=100): """Return information about the revision matching the timestamp ts, from origin origin_id, in branch branch_name. Args: origin_id: origin of the revision - branch_name: revision's branch. - timestamp: revision's time frame. Returns: Information for the revision matching the criterions. """ - # Disable pending RemoteStorage opening revision_log_by - """ - if not ts and branch_name == 'refs/heads/master': - return main.storage().revision_log_by(origin_id) - """ - - rev = main.storage().revision_get_by(origin_id, - branch_name, - timestamp=ts, - limit=1) - if not rev: - return None - rev_sha1s_bin = [revision['id'] for revision in rev] - return main.storage().revision_log(rev_sha1s_bin, limit) + return main.storage().revision_log_by(origin_id, + branch_name, + ts) def stat_counters(): """Return the stat counters for Software Heritage Returns: A dict mapping textual labels to integer values. """ return main.storage().stat_counters() def revision_get_by(origin_id, branch_name, timestamp): """Return occurrence information matching the criterions origin_id, branch_name, ts. """ res = main.storage().revision_get_by(origin_id, branch_name, timestamp=timestamp, limit=1) if not res: return None return res[0] def directory_entry_get_by_path(directory, path): """Return a directory entry by its path. """ paths = path.strip(os.path.sep).split(os.path.sep) return main.storage().directory_entry_get_by_path( directory, list(map(lambda p: p.encode('utf-8'), paths))) def entity_get(uuid): """Retrieve the entity per its uuid. """ return main.storage().entity_get(uuid) diff --git a/swh/web/ui/templates/upload_and_search.html b/swh/web/ui/templates/upload_and_search.html index 1c90f4b0..ff6254ae 100644 --- a/swh/web/ui/templates/upload_and_search.html +++ b/swh/web/ui/templates/upload_and_search.html @@ -1,102 +1,98 @@ {% extends "layout.html" %} {% block title %}Search SWH{% endblock %} {% block content %}

Drag and drop or click here to hash files and search for them. Your files will NOT be uploaded, hashing is done locally. Filesizes over 20Mb may be slow to process, use with care.
- {% if search_stats is not none and search_stats %} + {% if search_res is not none %} + {% if search_stats is not none %} {% endif %} - {% if search_res is not none %} {% for res in search_res %} {% if res['filename'] is not none %} {% else %} {% endif %} {% if res['found'] %} {% else %} {% endif %} {% endfor %}
File name SHA1 checksum Result
{{ res['filename'] }}From text input{{ res['sha1'] }} {{ res['sha1'] }}
{% endif %} - {% if messages is not none and messages %} -
- {% for message in messages %} -
+ {% if message is not none %}
{{ message | safe }}
- {% endfor %}
{% endif %}
{% endblock %} diff --git a/swh/web/ui/tests/test_backend.py b/swh/web/ui/tests/test_backend.py index 7b1b017e..aba91cfd 100644 --- a/swh/web/ui/tests/test_backend.py +++ b/swh/web/ui/tests/test_backend.py @@ -1,578 +1,575 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from nose.tools import istest from unittest.mock import MagicMock from swh.core import hashutil from swh.web.ui import backend from swh.web.ui.tests import test_app class BackendTestCase(test_app.SWHApiTestCase): @istest def content_get_ko_not_found_1(self): # given sha1_bin = hashutil.hex_to_hash( '456caf10e9535160d90e874b45aa426de762f777') self.storage.content_get = MagicMock(return_value=None) # when actual_content = backend.content_get(sha1_bin) # then self.assertIsNone(actual_content) self.storage.content_get.assert_called_once_with( [sha1_bin]) @istest def content_get_ko_not_found_empty_result(self): # given sha1_bin = hashutil.hex_to_hash( '456caf10e9535160d90e874b45aa426de762f19f') self.storage.content_get = MagicMock(return_value=[]) # when actual_content = backend.content_get(sha1_bin) # then self.assertIsNone(actual_content) self.storage.content_get.assert_called_once_with( [sha1_bin]) @istest def content_get(self): # given sha1_bin = hashutil.hex_to_hash( '123caf10e9535160d90e874b45aa426de762f19f') stub_contents = [{ 'sha1': sha1_bin, 'data': b'binary data', }, {}] self.storage.content_get = MagicMock(return_value=stub_contents) # when actual_content = backend.content_get(sha1_bin) # then self.assertEquals(actual_content, stub_contents[0]) self.storage.content_get.assert_called_once_with( [sha1_bin]) @istest def content_find_ko_no_result(self): # given sha1_bin = hashutil.hex_to_hash( '123caf10e9535160d90e874b45aa426de762f19f') self.storage.content_find = MagicMock(return_value=None) # when actual_lookup = backend.content_find('sha1_git', sha1_bin) # then self.assertIsNone(actual_lookup) self.storage.content_find.assert_called_once_with( {'sha1_git': sha1_bin}) @istest def content_find(self): # given sha1_bin = hashutil.hex_to_hash( '456caf10e9535160d90e874b45aa426de762f19f') self.storage.content_find = MagicMock(return_value=(1, 2, 3)) # when actual_content = backend.content_find('sha1', sha1_bin) # then self.assertEquals(actual_content, (1, 2, 3)) # check the function has been called with parameters self.storage.content_find.assert_called_with({'sha1': sha1_bin}) @istest def content_find_occurrence_ko_no_result(self): # given sha1_bin = hashutil.hex_to_hash( '123caf10e9535160d90e874b45aa426de762f19f') self.storage.content_find_occurrence = MagicMock(return_value=None) # when actual_lookup = backend.content_find_occurrence('sha1_git', sha1_bin) # then self.assertIsNone(actual_lookup) self.storage.content_find_occurrence.assert_called_once_with( {'sha1_git': sha1_bin}) @istest def content_find_occurrence(self): # given sha1_bin = hashutil.hex_to_hash( '456caf10e9535160d90e874b45aa426de762f19f') self.storage.content_find_occurrence = MagicMock( return_value=(1, 2, 3)) # when actual_content = backend.content_find_occurrence('sha1', sha1_bin) # then self.assertEquals(actual_content, (1, 2, 3)) # check the function has been called with parameters self.storage.content_find_occurrence.assert_called_with( {'sha1': sha1_bin}) @istest def content_missing_per_sha1_none(self): # given sha1s_bin = [hashutil.hex_to_hash( '456caf10e9535160d90e874b45aa426de762f19f'), hashutil.hex_to_hash( '745bab676c8f3cec8016e0c39ea61cf57e518865' )] self.storage.content_missing_per_sha1 = MagicMock(return_value=[]) # when actual_content = backend.content_missing_per_sha1(sha1s_bin) # then self.assertEquals(actual_content, []) self.storage.content_missing_per_sha1.assert_called_with(sha1s_bin) @istest def content_missing_per_sha1_some(self): # given sha1s_bin = [hashutil.hex_to_hash( '456caf10e9535160d90e874b45aa426de762f19f'), hashutil.hex_to_hash( '745bab676c8f3cec8016e0c39ea61cf57e518865' )] self.storage.content_missing_per_sha1 = MagicMock(return_value=[ hashutil.hex_to_hash( '745bab676c8f3cec8016e0c39ea61cf57e518865' )]) # when actual_content = backend.content_missing_per_sha1(sha1s_bin) # then self.assertEquals(actual_content, [hashutil.hex_to_hash( '745bab676c8f3cec8016e0c39ea61cf57e518865' )]) self.storage.content_missing_per_sha1.assert_called_with(sha1s_bin) @istest def origin_get(self): # given self.storage.origin_get = MagicMock(return_value={ 'id': 'origin-id', 'lister': 'uuid-lister', 'project': 'uuid-project', 'url': 'ftp://some/url/to/origin', 'type': 'ftp'}) # when actual_origin = backend.origin_get('origin-id') # then self.assertEqual(actual_origin, {'id': 'origin-id', 'lister': 'uuid-lister', 'project': 'uuid-project', 'url': 'ftp://some/url/to/origin', 'type': 'ftp'}) self.storage.origin_get.assert_called_with({'id': 'origin-id'}) @istest def person_get(self): # given self.storage.person_get = MagicMock(return_value=[{ 'id': 'person-id', 'name': 'blah'}]) # when actual_person = backend.person_get('person-id') # then self.assertEqual(actual_person, {'id': 'person-id', 'name': 'blah'}) self.storage.person_get.assert_called_with(['person-id']) @istest def directory_get_not_found(self): # given sha1_bin = hashutil.hex_to_hash( '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') self.storage.directory_get = MagicMock(return_value=None) # when actual_directory = backend.directory_get(sha1_bin) # then self.assertEquals(actual_directory, None) self.storage.directory_get.assert_called_with([sha1_bin]) @istest def directory_get(self): # given sha1_bin = hashutil.hex_to_hash( '51f71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') sha1_bin2 = hashutil.hex_to_hash( '62071b8614fcd89ccd17ca2b1d9e66c5b00a6d03') stub_dir = {'id': sha1_bin, 'revision': b'sha1-blah'} stub_dir2 = {'id': sha1_bin2, 'revision': b'sha1-foobar'} self.storage.directory_get = MagicMock(return_value=[stub_dir, stub_dir2]) # when actual_directory = backend.directory_get(sha1_bin) # then self.assertEquals(actual_directory, stub_dir) self.storage.directory_get.assert_called_with([sha1_bin]) @istest def directory_ls_empty_result(self): # given sha1_bin = hashutil.hex_to_hash( '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') self.storage.directory_ls = MagicMock(return_value=[]) # when actual_directory = backend.directory_ls(sha1_bin) # then self.assertEquals(actual_directory, []) self.storage.directory_ls.assert_called_with(sha1_bin, False) @istest def directory_ls(self): # given sha1_bin = hashutil.hex_to_hash( '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') stub_dir_entries = [{ 'sha1': hashutil.hex_to_hash('5c6f0e2750f48fa0bd0c4cf5976ba0b9e0' '2ebda5'), 'sha256': hashutil.hex_to_hash('39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926'), 'sha1_git': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'target': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'dir_id': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'name': b'bob', 'type': 10, }] self.storage.directory_ls = MagicMock( return_value=stub_dir_entries) actual_directory = backend.directory_ls(sha1_bin, recursive=True) # then self.assertIsNotNone(actual_directory) self.assertEqual(list(actual_directory), stub_dir_entries) self.storage.directory_ls.assert_called_with(sha1_bin, True) @istest def release_get_not_found(self): # given sha1_bin = hashutil.hex_to_hash( '65a55bbdf3629f916219feb3dcc7393ded1bc8db') self.storage.release_get = MagicMock(return_value=[]) # when actual_release = backend.release_get(sha1_bin) # then self.assertIsNone(actual_release) self.storage.release_get.assert_called_with([sha1_bin]) @istest def release_get(self): # given sha1_bin = hashutil.hex_to_hash( '65a55bbdf3629f916219feb3dcc7393ded1bc8db') stub_releases = [{ 'id': sha1_bin, 'target': None, 'date': datetime.datetime(2015, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc), 'name': b'v0.0.1', 'message': b'synthetic release', 'synthetic': True, }] self.storage.release_get = MagicMock(return_value=stub_releases) # when actual_release = backend.release_get(sha1_bin) # then self.assertEqual(actual_release, stub_releases[0]) self.storage.release_get.assert_called_with([sha1_bin]) @istest def revision_get_by_not_found(self): # given self.storage.revision_get_by = MagicMock(return_value=[]) # when actual_revision = backend.revision_get_by(10, 'master', 'ts2') # then self.assertIsNone(actual_revision) self.storage.revision_get_by.assert_called_with(10, 'master', timestamp='ts2', limit=1) @istest def revision_get_by(self): # given self.storage.revision_get_by = MagicMock(return_value=[{'id': 1}]) # when actual_revisions = backend.revision_get_by(100, 'dev', 'ts') # then self.assertEquals(actual_revisions, {'id': 1}) self.storage.revision_get_by.assert_called_with(100, 'dev', timestamp='ts', limit=1) @istest def revision_get_not_found(self): # given sha1_bin = hashutil.hex_to_hash( '18d8be353ed3480476f032475e7c233eff7371d5') self.storage.revision_get = MagicMock(return_value=[]) # when actual_revision = backend.revision_get(sha1_bin) # then self.assertIsNone(actual_revision) self.storage.revision_get.assert_called_with([sha1_bin]) @istest def revision_get(self): # given sha1_bin = hashutil.hex_to_hash( '18d8be353ed3480476f032475e7c233eff7371d5') stub_revisions = [{ 'id': sha1_bin, 'directory': hashutil.hex_to_hash( '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'), 'author': { 'name': b'bill & boule', 'email': b'bill@boule.org', }, 'committer': { 'name': b'boule & bill', 'email': b'boule@bill.org', }, 'message': b'elegant fix for bug 31415957', 'date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'date_offset': 0, 'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'committer_date_offset': 0, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], }] self.storage.revision_get = MagicMock(return_value=stub_revisions) # when actual_revision = backend.revision_get(sha1_bin) # then self.assertEqual(actual_revision, stub_revisions[0]) self.storage.revision_get.assert_called_with([sha1_bin]) @istest def revision_log(self): # given sha1_bin = hashutil.hex_to_hash( '28d8be353ed3480476f032475e7c233eff7371d5') stub_revision_log = [{ 'id': sha1_bin, 'directory': hashutil.hex_to_hash( '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'), 'author': { 'name': b'bill & boule', 'email': b'bill@boule.org', }, 'committer': { 'name': b'boule & bill', 'email': b'boule@bill.org', }, 'message': b'elegant fix for bug 31415957', 'date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'date_offset': 0, 'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'committer_date_offset': 0, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], }] self.storage.revision_log = MagicMock(return_value=stub_revision_log) # when actual_revision = backend.revision_log(sha1_bin) # then self.assertEqual(list(actual_revision), stub_revision_log) self.storage.revision_log.assert_called_with([sha1_bin], 100) @istest def revision_log_by(self): - # given # given sha1_bin = hashutil.hex_to_hash( '28d8be353ed3480476f032475e7c233eff7371d5') stub_revision_log = [{ 'id': sha1_bin, 'directory': hashutil.hex_to_hash( '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'), 'author': { 'name': b'bill & boule', 'email': b'bill@boule.org', }, 'committer': { 'name': b'boule & bill', 'email': b'boule@bill.org', }, 'message': b'elegant fix for bug 31415957', 'date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'date_offset': 0, 'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'committer_date_offset': 0, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], }] - self.storage.revision_get_by = MagicMock(return_value=[ - {'id': sha1_bin}]) - self.storage.revision_log = MagicMock(return_value=stub_revision_log) + self.storage.revision_log_by = MagicMock( + return_value=stub_revision_log) # when actual_log = backend.revision_log_by(1, 'refs/heads/master', None) # then self.assertEqual(actual_log, stub_revision_log) self.storage.revision_log.assert_called_with([sha1_bin], 100) @istest def revision_log_by_norev(self): - # given # given sha1_bin = hashutil.hex_to_hash( '28d8be353ed3480476f032475e7c233eff7371d5') - self.storage.revision_get_by = MagicMock(return_value=None) + self.storage.revision_log_by = MagicMock(return_value=None) # when actual_log = backend.revision_log_by(1, 'refs/heads/master', None) # then self.assertEqual(actual_log, None) self.storage.revision_log.assert_called_with([sha1_bin], 100) @istest def stat_counters(self): # given input_stats = { "content": 1770830, "directory": 211683, "directory_entry_dir": 209167, "directory_entry_file": 1807094, "directory_entry_rev": 0, "entity": 0, "entity_history": 0, "occurrence": 0, "occurrence_history": 19600, "origin": 1096, "person": 0, "release": 8584, "revision": 7792, "revision_history": 0, "skipped_content": 0 } self.storage.stat_counters = MagicMock(return_value=input_stats) # when actual_stats = backend.stat_counters() # then expected_stats = input_stats self.assertEqual(actual_stats, expected_stats) self.storage.stat_counters.assert_called_with() @istest def directory_entry_get_by_path(self): # given stub_dir_entry = {'id': b'dir-id', 'type': 'dir', 'name': b'some/path/foo'} self.storage.directory_entry_get_by_path = MagicMock( return_value=stub_dir_entry) # when actual_dir_entry = backend.directory_entry_get_by_path(b'dir-sha1', 'some/path/foo') self.assertEquals(actual_dir_entry, stub_dir_entry) self.storage.directory_entry_get_by_path.assert_called_once_with( b'dir-sha1', [b'some', b'path', b'foo']) @istest def entity_get(self): # given stub_entities = [{'uuid': 'e8c3fc2e-a932-4fd7-8f8e-c40645eb35a7', 'parent': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2'}, {'uuid': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2', 'parent': None}] self.storage.entity_get = MagicMock(return_value=stub_entities) # when actual_entities = backend.entity_get( 'e8c3fc2e-a932-4fd7-8f8e-c40645eb35a7') # then self.assertEquals(actual_entities, stub_entities) self.storage.entity_get.assert_called_once_with( 'e8c3fc2e-a932-4fd7-8f8e-c40645eb35a7')