Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F11023579
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
269 KB
Subscribers
None
View Options
This file is larger than 256 KB, so syntax highlighting was skipped.
diff --git a/sql/upgrades/134.sql b/sql/upgrades/134.sql
new file mode 100644
index 00000000..ca2fcbf3
--- /dev/null
+++ b/sql/upgrades/134.sql
@@ -0,0 +1,46 @@
+-- SWH DB schema upgrade
+-- from_version: 133
+-- to_version: 134
+-- description: Make swh_snapshot_add delete the temporary table at the end.
+
+insert into dbversion(version, release, description)
+ values(134, now(), 'Work In Progress');
+
+create or replace function swh_snapshot_add(snapshot_id snapshot.id%type)
+ returns void
+ language plpgsql
+as $$
+declare
+ snapshot_object_id snapshot.object_id%type;
+begin
+ select object_id from snapshot where id = snapshot_id into snapshot_object_id;
+ if snapshot_object_id is null then
+ insert into snapshot (id) values (snapshot_id) returning object_id into snapshot_object_id;
+ insert into snapshot_branch (name, target_type, target)
+ select name, target_type, target from tmp_snapshot_branch tmp
+ where not exists (
+ select 1
+ from snapshot_branch sb
+ where sb.name = tmp.name
+ and sb.target = tmp.target
+ and sb.target_type = tmp.target_type
+ )
+ on conflict do nothing;
+ insert into snapshot_branches (snapshot_id, branch_id)
+ select snapshot_object_id, sb.object_id as branch_id
+ from tmp_snapshot_branch tmp
+ join snapshot_branch sb
+ using (name, target, target_type)
+ where tmp.target is not null and tmp.target_type is not null
+ union
+ select snapshot_object_id, sb.object_id as branch_id
+ from tmp_snapshot_branch tmp
+ join snapshot_branch sb
+ using (name)
+ where tmp.target is null and tmp.target_type is null
+ and sb.target is null and sb.target_type is null;
+ end if;
+ truncate table tmp_snapshot_branch;
+end;
+$$;
+
diff --git a/swh/storage/api/client.py b/swh/storage/api/client.py
index 933667df..65e87e78 100644
--- a/swh/storage/api/client.py
+++ b/swh/storage/api/client.py
@@ -1,280 +1,280 @@
# Copyright (C) 2015-2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import warnings
from swh.core.api import SWHRemoteAPI
from ..exc import StorageAPIError
class RemoteStorage(SWHRemoteAPI):
"""Proxy to a remote storage API"""
api_exception = StorageAPIError
def check_config(self, *, check_write):
return self.post('check_config', {'check_write': check_write})
def content_add(self, content):
return self.post('content/add', {'content': content})
def content_update(self, content, keys=[]):
return self.post('content/update', {'content': content,
'keys': keys})
def content_missing(self, content, key_hash='sha1'):
return self.post('content/missing', {'content': content,
'key_hash': key_hash})
def content_missing_per_sha1(self, contents):
return self.post('content/missing/sha1', {'contents': contents})
def content_get(self, content):
return self.post('content/data', {'content': content})
def content_get_metadata(self, content):
return self.post('content/metadata', {'content': content})
def content_get_range(self, start, end, limit=1000):
return self.post('content/range', {'start': start,
'end': end,
'limit': limit})
def content_find(self, content):
return self.post('content/present', {'content': content})
def directory_add(self, directories):
return self.post('directory/add', {'directories': directories})
def directory_missing(self, directories):
return self.post('directory/missing', {'directories': directories})
def directory_ls(self, directory, recursive=False):
return self.post('directory/ls', {'directory': directory,
'recursive': recursive})
def revision_get(self, revisions):
return self.post('revision', {'revisions': revisions})
def revision_log(self, revisions, limit=None):
return self.post('revision/log', {'revisions': revisions,
'limit': limit})
def revision_shortlog(self, revisions, limit=None):
return self.post('revision/shortlog', {'revisions': revisions,
'limit': limit})
def revision_add(self, revisions):
return self.post('revision/add', {'revisions': revisions})
def revision_missing(self, revisions):
return self.post('revision/missing', {'revisions': revisions})
def release_add(self, releases):
return self.post('release/add', {'releases': releases})
def release_get(self, releases):
return self.post('release', {'releases': releases})
def release_missing(self, releases):
return self.post('release/missing', {'releases': releases})
def object_find_by_sha1_git(self, ids):
return self.post('object/find_by_sha1_git', {'ids': ids})
- def snapshot_add(self, snapshot, origin=None, visit=None):
+ def snapshot_add(self, snapshots, origin=None, visit=None):
if origin:
assert visit
- (origin, visit, snapshot) = (snapshot, origin, visit)
+ (origin, visit, snapshots) = (snapshots, origin, visit)
warnings.warn("arguments 'origin' and 'visit' of snapshot_add "
"are deprecated since v0.0.131, please use "
- "snapshot_add(snapshot) + "
+ "snapshot_add([snapshot]) + "
"origin_visit_update(origin, visit, "
"snapshot=snapshot['id']) instead.",
DeprecationWarning)
return self.post('snapshot/add', {
- 'origin': origin, 'visit': visit, 'snapshot': snapshot,
+ 'origin': origin, 'visit': visit, 'snapshots': snapshots,
})
else:
assert not visit
return self.post('snapshot/add', {
- 'snapshot': snapshot,
+ 'snapshots': snapshots,
})
def snapshot_get(self, snapshot_id):
return self.post('snapshot', {
'snapshot_id': snapshot_id
})
def snapshot_get_by_origin_visit(self, origin, visit):
return self.post('snapshot/by_origin_visit', {
'origin': origin,
'visit': visit
})
def snapshot_get_latest(self, origin, allowed_statuses=None):
return self.post('snapshot/latest', {
'origin': origin,
'allowed_statuses': allowed_statuses
})
def snapshot_count_branches(self, snapshot_id):
return self.post('snapshot/count_branches', {
'snapshot_id': snapshot_id
})
def snapshot_get_branches(self, snapshot_id, branches_from=b'',
branches_count=1000, target_types=None):
return self.post('snapshot/get_branches', {
'snapshot_id': snapshot_id,
'branches_from': branches_from,
'branches_count': branches_count,
'target_types': target_types
})
def origin_get(self, origins=None, *, origin=None):
if origin is None:
if origins is None:
raise TypeError('origin_get expected 1 argument')
else:
assert origins is None
origins = origin
warnings.warn("argument 'origin' of origin_get was renamed "
"to 'origins' in v0.0.123.",
DeprecationWarning)
return self.post('origin/get', {'origins': origins})
def origin_search(self, url_pattern, offset=0, limit=50, regexp=False,
with_visit=False):
return self.post('origin/search', {'url_pattern': url_pattern,
'offset': offset,
'limit': limit,
'regexp': regexp,
'with_visit': with_visit})
def origin_count(self, url_pattern, regexp=False, with_visit=False):
return self.post('origin/count', {'url_pattern': url_pattern,
'regexp': regexp,
'with_visit': with_visit})
def origin_get_range(self, origin_from=1, origin_count=100):
return self.post('origin/get_range', {'origin_from': origin_from,
'origin_count': origin_count})
def origin_add(self, origins):
return self.post('origin/add_multi', {'origins': origins})
def origin_add_one(self, origin):
return self.post('origin/add', {'origin': origin})
def origin_visit_add(self, origin, date, *, ts=None):
if ts is None:
if date is None:
raise TypeError('origin_visit_add expected 2 arguments.')
else:
assert date is None
warnings.warn("argument 'ts' of origin_visit_add was renamed "
"to 'date' in v0.0.109.",
DeprecationWarning)
date = ts
return self.post('origin/visit/add', {'origin': origin, 'date': date})
def origin_visit_update(self, origin, visit_id, status=None,
metadata=None, snapshot=None):
return self.post('origin/visit/update', {'origin': origin,
'visit_id': visit_id,
'status': status,
'metadata': metadata,
'snapshot': snapshot})
def origin_visit_get(self, origin, last_visit=None, limit=None):
return self.post('origin/visit/get', {
'origin': origin, 'last_visit': last_visit, 'limit': limit})
def origin_visit_get_by(self, origin, visit):
return self.post('origin/visit/getby', {'origin': origin,
'visit': visit})
def person_get(self, person):
return self.post('person', {'person': person})
def fetch_history_start(self, origin_id):
return self.post('fetch_history/start', {'origin_id': origin_id})
def fetch_history_end(self, fetch_history_id, data):
return self.post('fetch_history/end',
{'fetch_history_id': fetch_history_id,
'data': data})
def fetch_history_get(self, fetch_history_id):
return self.get('fetch_history', {'id': fetch_history_id})
def entity_add(self, entities):
return self.post('entity/add', {'entities': entities})
def entity_get(self, uuid):
return self.post('entity/get', {'uuid': uuid})
def entity_get_one(self, uuid):
return self.get('entity', {'uuid': uuid})
def entity_get_from_lister_metadata(self, entities):
return self.post('entity/from_lister_metadata', {'entities': entities})
def stat_counters(self):
return self.get('stat/counters')
def directory_entry_get_by_path(self, directory, paths):
return self.post('directory/path', dict(directory=directory,
paths=paths))
def tool_add(self, tools):
return self.post('tool/add', {'tools': tools})
def tool_get(self, tool):
return self.post('tool/data', {'tool': tool})
def origin_metadata_add(self, origin_id, ts, provider, tool, metadata):
return self.post('origin/metadata/add', {'origin_id': origin_id,
'ts': ts,
'provider': provider,
'tool': tool,
'metadata': metadata})
def origin_metadata_get_by(self, origin_id, provider_type=None):
return self.post('origin/metadata/get', {
'origin_id': origin_id,
'provider_type': provider_type
})
def metadata_provider_add(self, provider_name, provider_type, provider_url,
metadata):
return self.post('provider/add', {'provider_name': provider_name,
'provider_type': provider_type,
'provider_url': provider_url,
'metadata': metadata})
def metadata_provider_get(self, provider_id):
return self.post('provider/get', {'provider_id': provider_id})
def metadata_provider_get_by(self, provider):
return self.post('provider/getby', {'provider': provider})
def diff_directories(self, from_dir, to_dir, track_renaming=False):
return self.post('algos/diff_directories',
{'from_dir': from_dir,
'to_dir': to_dir,
'track_renaming': track_renaming})
def diff_revisions(self, from_rev, to_rev, track_renaming=False):
return self.post('algos/diff_revisions',
{'from_rev': from_rev,
'to_rev': to_rev,
'track_renaming': track_renaming})
def diff_revision(self, revision, track_renaming=False):
return self.post('algos/diff_revision',
{'revision': revision,
'track_renaming': track_renaming})
diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py
index ad25916c..40e23a1e 100644
--- a/swh/storage/in_memory.py
+++ b/swh/storage/in_memory.py
@@ -1,1392 +1,1395 @@
# Copyright (C) 2015-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import re
import bisect
import dateutil
import collections
from collections import defaultdict
import copy
import datetime
import itertools
import random
import warnings
from swh.model.hashutil import DEFAULT_ALGORITHMS
from swh.model.identifiers import normalize_timestamp
from swh.objstorage import get_objstorage
from swh.objstorage.exc import ObjNotFoundError
from .journal_writer import get_journal_writer
# Max block size of contents to return
BULK_BLOCK_CONTENT_LEN_MAX = 10000
def now():
return datetime.datetime.now(tz=datetime.timezone.utc)
class Storage:
def __init__(self, journal_writer=None):
self._contents = {}
self._content_indexes = defaultdict(lambda: defaultdict(set))
self._directories = {}
self._revisions = {}
self._releases = {}
self._snapshots = {}
self._origins = []
self._origin_visits = []
self._persons = []
self._origin_metadata = defaultdict(list)
self._tools = {}
self._metadata_providers = {}
self._objects = defaultdict(list)
# ideally we would want a skip list for both fast inserts and searches
self._sorted_sha1s = []
self.objstorage = get_objstorage('memory', {})
if journal_writer:
self.journal_writer = get_journal_writer(**journal_writer)
else:
self.journal_writer = None
def check_config(self, *, check_write):
"""Check that the storage is configured and ready to go."""
return True
def content_add(self, contents):
"""Add content blobs to the storage
Args:
content (iterable): iterable of dictionaries representing
individual pieces of content to add. Each dictionary has the
following keys:
- data (bytes): the actual content
- length (int): content length (default: -1)
- one key for each checksum algorithm in
:data:`swh.model.hashutil.DEFAULT_ALGORITHMS`, mapped to the
corresponding checksum
- status (str): one of visible, hidden, absent
- reason (str): if status = absent, the reason why
- origin (int): if status = absent, the origin we saw the
content in
"""
if self.journal_writer:
for content in contents:
if 'data' in content:
content = content.copy()
del content['data']
self.journal_writer.write_addition('content', content)
for content in contents:
key = self._content_key(content)
if key in self._contents:
continue
for algorithm in DEFAULT_ALGORITHMS:
if content[algorithm] in self._content_indexes[algorithm]:
from . import HashCollision
raise HashCollision(algorithm, content[algorithm], key)
for algorithm in DEFAULT_ALGORITHMS:
self._content_indexes[algorithm][content[algorithm]].add(key)
self._objects[content['sha1_git']].append(
('content', content['sha1']))
self._contents[key] = copy.deepcopy(content)
self._contents[key]['ctime'] = now()
bisect.insort(self._sorted_sha1s, content['sha1'])
if self._contents[key]['status'] == 'visible':
content_data = self._contents[key].pop('data')
self.objstorage.add(content_data, content['sha1'])
def content_get(self, ids):
"""Retrieve in bulk contents and their data.
This function may yield more blobs than provided sha1 identifiers,
in case they collide.
Args:
content: iterables of sha1
Yields:
Dict[str, bytes]: Generates streams of contents as dict with their
raw data:
- sha1 (bytes): content id
- data (bytes): content's raw data
Raises:
ValueError in case of too much contents are required.
cf. BULK_BLOCK_CONTENT_LEN_MAX
"""
# FIXME: Make this method support slicing the `data`.
if len(ids) > BULK_BLOCK_CONTENT_LEN_MAX:
raise ValueError(
"Sending at most %s contents." % BULK_BLOCK_CONTENT_LEN_MAX)
for obj_id in ids:
try:
data = self.objstorage.get(obj_id)
except ObjNotFoundError:
yield None
continue
yield {'sha1': obj_id, 'data': data}
def content_get_range(self, start, end, limit=1000, db=None, cur=None):
"""Retrieve contents within range [start, end] bound by limit.
Note that this function may return more than one blob per hash. The
limit is enforced with multiplicity (ie. two blobs with the same hash
will count twice toward the limit).
Args:
**start** (bytes): Starting identifier range (expected smaller
than end)
**end** (bytes): Ending identifier range (expected larger
than start)
**limit** (int): Limit result (default to 1000)
Returns:
a dict with keys:
- contents [dict]: iterable of contents in between the range.
- next (bytes): There remains content in the range
starting from this next sha1
"""
if limit is None:
raise ValueError('Development error: limit should not be None')
from_index = bisect.bisect_left(self._sorted_sha1s, start)
sha1s = itertools.islice(self._sorted_sha1s, from_index, None)
sha1s = ((sha1, content_key)
for sha1 in sha1s
for content_key in self._content_indexes['sha1'][sha1])
matched = []
next_content = None
for sha1, key in sha1s:
if sha1 > end:
break
if len(matched) >= limit:
next_content = sha1
break
matched.append({
**self._contents[key],
})
return {
'contents': matched,
'next': next_content,
}
def content_get_metadata(self, sha1s):
"""Retrieve content metadata in bulk
Args:
content: iterable of content identifiers (sha1)
Returns:
an iterable with content metadata corresponding to the given ids
"""
# FIXME: the return value should be a mapping from search key to found
# content*s*
for sha1 in sha1s:
if sha1 in self._content_indexes['sha1']:
objs = self._content_indexes['sha1'][sha1]
# FIXME: rather than selecting one of the objects with that
# hash, we should return all of them. See:
# https://forge.softwareheritage.org/D645?id=1994#inline-3389
key = random.sample(objs, 1)[0]
data = copy.deepcopy(self._contents[key])
data.pop('ctime')
yield data
else:
# FIXME: should really be None
yield {
'sha1': sha1,
'sha1_git': None,
'sha256': None,
'blake2s256': None,
'length': None,
'status': None,
}
def content_find(self, content):
if not set(content).intersection(DEFAULT_ALGORITHMS):
raise ValueError('content keys must contain at least one of: '
'%s' % ', '.join(sorted(DEFAULT_ALGORITHMS)))
found = []
for algo in DEFAULT_ALGORITHMS:
hash = content.get(algo)
if hash and hash in self._content_indexes[algo]:
found.append(self._content_indexes[algo][hash])
if not found:
return
keys = list(set.intersection(*found))
# FIXME: should really be a list of all the objects found
return copy.deepcopy(self._contents[keys[0]])
def content_missing(self, contents, key_hash='sha1'):
"""List content missing from storage
Args:
contents ([dict]): iterable of dictionaries whose keys are
either 'length' or an item of
:data:`swh.model.hashutil.ALGORITHMS`;
mapped to the corresponding checksum
(or length).
key_hash (str): name of the column to use as hash id
result (default: 'sha1')
Returns:
iterable ([bytes]): missing content ids (as per the
key_hash column)
"""
for content in contents:
for (algo, hash_) in content.items():
if algo not in DEFAULT_ALGORITHMS:
continue
if hash_ not in self._content_indexes.get(algo, []):
yield content[key_hash]
break
else:
# content_find cannot return None here, because we checked
# above that there is a content with matching hashes.
if self.content_find(content)['status'] == 'missing':
yield content[key_hash]
def content_missing_per_sha1(self, contents):
"""List content missing from storage based only on sha1.
Args:
contents: Iterable of sha1 to check for absence.
Returns:
iterable: missing ids
Raises:
TODO: an exception when we get a hash collision.
"""
for content in contents:
if content not in self._content_indexes['sha1']:
yield content
def directory_add(self, directories):
"""Add directories to the storage
Args:
directories (iterable): iterable of dictionaries representing the
individual directories to add. Each dict has the following
keys:
- id (sha1_git): the id of the directory to add
- entries (list): list of dicts for each entry in the
directory. Each dict has the following keys:
- name (bytes)
- type (one of 'file', 'dir', 'rev'): type of the
directory entry (file, directory, revision)
- target (sha1_git): id of the object pointed at by the
directory entry
- perms (int): entry permissions
"""
if self.journal_writer:
self.journal_writer.write_additions('directory', directories)
for directory in directories:
if directory['id'] not in self._directories:
self._directories[directory['id']] = copy.deepcopy(directory)
self._objects[directory['id']].append(
('directory', directory['id']))
def directory_missing(self, directory_ids):
"""List directories missing from storage
Args:
directories (iterable): an iterable of directory ids
Yields:
missing directory ids
"""
for id in directory_ids:
if id not in self._directories:
yield id
def _join_dentry_to_content(self, dentry):
keys = (
'status',
'sha1',
'sha1_git',
'sha256',
'length',
)
ret = dict.fromkeys(keys)
ret.update(dentry)
if ret['type'] == 'file':
content = self.content_find({'sha1_git': ret['target']})
if content:
for key in keys:
ret[key] = content[key]
return ret
def _directory_ls(self, directory_id, recursive, prefix=b''):
if directory_id in self._directories:
for entry in self._directories[directory_id]['entries']:
ret = self._join_dentry_to_content(entry)
ret['name'] = prefix + ret['name']
ret['dir_id'] = directory_id
yield ret
if recursive and ret['type'] == 'dir':
yield from self._directory_ls(
ret['target'], True, prefix + ret['name'] + b'/')
def directory_ls(self, directory_id, recursive=False):
"""Get entries for one directory.
Args:
- directory: the directory to list entries from.
- recursive: if flag on, this list recursively from this directory.
Returns:
List of entries for such directory.
If `recursive=True`, names in the path of a dir/file not at the
root are concatenated with a slash (`/`).
"""
yield from self._directory_ls(directory_id, recursive)
def directory_entry_get_by_path(self, directory, paths):
"""Get the directory entry (either file or dir) from directory with path.
Args:
- directory: sha1 of the top level directory
- paths: path to lookup from the top level directory. From left
(top) to right (bottom).
Returns:
The corresponding directory entry if found, None otherwise.
"""
if not paths:
return
contents = list(self.directory_ls(directory))
if not contents:
return
def _get_entry(entries, name):
for entry in entries:
if entry['name'] == name:
return entry
first_item = _get_entry(contents, paths[0])
if len(paths) == 1:
return first_item
if not first_item or first_item['type'] != 'dir':
return
return self.directory_entry_get_by_path(
first_item['target'], paths[1:])
def revision_add(self, revisions):
"""Add revisions to the storage
Args:
revisions (Iterable[dict]): iterable of dictionaries representing
the individual revisions to add. Each dict has the following
keys:
- **id** (:class:`sha1_git`): id of the revision to add
- **date** (:class:`dict`): date the revision was written
- **committer_date** (:class:`dict`): date the revision got
added to the origin
- **type** (one of 'git', 'tar'): type of the
revision added
- **directory** (:class:`sha1_git`): the directory the
revision points at
- **message** (:class:`bytes`): the message associated with
the revision
- **author** (:class:`Dict[str, bytes]`): dictionary with
keys: name, fullname, email
- **committer** (:class:`Dict[str, bytes]`): dictionary with
keys: name, fullname, email
- **metadata** (:class:`jsonb`): extra information as
dictionary
- **synthetic** (:class:`bool`): revision's nature (tarball,
directory creates synthetic revision`)
- **parents** (:class:`list[sha1_git]`): the parents of
this revision
date dictionaries have the form defined in :mod:`swh.model`.
"""
if self.journal_writer:
self.journal_writer.write_additions('revision', revisions)
for revision in revisions:
if revision['id'] not in self._revisions:
self._revisions[revision['id']] = rev = copy.deepcopy(revision)
self._person_add(rev['committer'])
self._person_add(rev['author'])
rev['date'] = normalize_timestamp(rev.get('date'))
rev['committer_date'] = normalize_timestamp(
rev.get('committer_date'))
self._objects[revision['id']].append(
('revision', revision['id']))
def revision_missing(self, revision_ids):
"""List revisions missing from storage
Args:
revisions (iterable): revision ids
Yields:
missing revision ids
"""
for id in revision_ids:
if id not in self._revisions:
yield id
def revision_get(self, revision_ids):
for id in revision_ids:
yield copy.deepcopy(self._revisions.get(id))
def _get_parent_revs(self, rev_id, seen, limit):
if limit and len(seen) >= limit:
return
if rev_id in seen or rev_id not in self._revisions:
return
seen.add(rev_id)
yield self._revisions[rev_id]
for parent in self._revisions[rev_id]['parents']:
yield from self._get_parent_revs(parent, seen, limit)
def revision_log(self, revision_ids, limit=None):
"""Fetch revision entry from the given root revisions.
Args:
revisions: array of root revision to lookup
limit: limitation on the output result. Default to None.
Yields:
List of revision log from such revisions root.
"""
seen = set()
for rev_id in revision_ids:
yield from self._get_parent_revs(rev_id, seen, limit)
def revision_shortlog(self, revisions, limit=None):
"""Fetch the shortlog for the given revisions
Args:
revisions: list of root revisions to lookup
limit: depth limitation for the output
Yields:
a list of (id, parents) tuples.
"""
yield from ((rev['id'], rev['parents'])
for rev in self.revision_log(revisions, limit))
def release_add(self, releases):
"""Add releases to the storage
Args:
releases (Iterable[dict]): iterable of dictionaries representing
the individual releases to add. Each dict has the following
keys:
- **id** (:class:`sha1_git`): id of the release to add
- **revision** (:class:`sha1_git`): id of the revision the
release points to
- **date** (:class:`dict`): the date the release was made
- **name** (:class:`bytes`): the name of the release
- **comment** (:class:`bytes`): the comment associated with
the release
- **author** (:class:`Dict[str, bytes]`): dictionary with
keys: name, fullname, email
the date dictionary has the form defined in :mod:`swh.model`.
"""
if self.journal_writer:
self.journal_writer.write_additions('release', releases)
for rel in releases:
rel = copy.deepcopy(rel)
rel['date'] = normalize_timestamp(rel['date'])
self._person_add(rel['author'])
self._objects[rel['id']].append(
('release', rel['id']))
self._releases[rel['id']] = rel
def release_missing(self, releases):
"""List releases missing from storage
Args:
releases: an iterable of release ids
Returns:
a list of missing release ids
"""
yield from (rel for rel in releases if rel not in self._releases)
def release_get(self, releases):
"""Given a list of sha1, return the releases's information
Args:
releases: list of sha1s
Yields:
dicts with the same keys as those given to `release_add`
(or ``None`` if a release does not exist)
"""
for rel_id in releases:
yield copy.deepcopy(self._releases.get(rel_id))
- def snapshot_add(self, snapshot, legacy_arg1=None, legacy_arg2=None):
- """Add a snapshot for the given origin/visit couple
+ def snapshot_add(self, snapshots, legacy_arg1=None, legacy_arg2=None):
+ """Add a snapshot to the storage
Args:
- snapshot (dict): the snapshot to add to the visit, containing the
+ snapshot ([dict]): the snapshots to add, containing the
following keys:
- **id** (:class:`bytes`): id of the snapshot
- **branches** (:class:`dict`): branches the snapshot contains,
mapping the branch name (:class:`bytes`) to the branch target,
itself a :class:`dict` (or ``None`` if the branch points to an
unknown object)
- **target_type** (:class:`str`): one of ``content``,
``directory``, ``revision``, ``release``,
``snapshot``, ``alias``
- **target** (:class:`bytes`): identifier of the target
(currently a ``sha1_git`` for all object kinds, or the name
of the target branch for aliases)
Raises:
ValueError: if the origin's or visit's identifier does not exist.
"""
if legacy_arg1:
assert legacy_arg2
- (origin, visit, snapshot) = \
- (snapshot, legacy_arg1, legacy_arg2)
+ (origin, visit, snapshots) = \
+ (snapshots, legacy_arg1, [legacy_arg2])
else:
origin = visit = None
- snapshot_id = snapshot['id']
- if self.journal_writer:
- self.journal_writer.write_addition(
- 'snapshot', snapshot)
- if snapshot_id not in self._snapshots:
- self._snapshots[snapshot_id] = {
- 'id': snapshot_id,
- 'branches': copy.deepcopy(snapshot['branches']),
- '_sorted_branch_names': sorted(snapshot['branches'])
- }
- self._objects[snapshot_id].append(('snapshot', snapshot_id))
+ for snapshot in snapshots:
+ snapshot_id = snapshot['id']
+ if snapshot_id not in self._snapshots:
+ if self.journal_writer:
+ self.journal_writer.write_addition('snapshot', snapshot)
+
+ self._snapshots[snapshot_id] = {
+ 'id': snapshot_id,
+ 'branches': copy.deepcopy(snapshot['branches']),
+ '_sorted_branch_names': sorted(snapshot['branches'])
+ }
+ self._objects[snapshot_id].append(('snapshot', snapshot_id))
if origin:
- self.origin_visit_update(origin, visit, snapshot=snapshot_id)
+ # Legacy API, there can be only one snapshot
+ self.origin_visit_update(
+ origin, visit, snapshot=snapshots[0]['id'])
def snapshot_get(self, snapshot_id):
"""Get the content, possibly partial, of a snapshot with the given id
The branches of the snapshot are iterated in the lexicographical
order of their names.
.. warning:: At most 1000 branches contained in the snapshot will be
returned for performance reasons. In order to browse the whole
set of branches, the method :meth:`snapshot_get_branches`
should be used instead.
Args:
snapshot_id (bytes): identifier of the snapshot
Returns:
dict: a dict with three keys:
* **id**: identifier of the snapshot
* **branches**: a dict of branches contained in the snapshot
whose keys are the branches' names.
* **next_branch**: the name of the first branch not returned
or :const:`None` if the snapshot has less than 1000
branches.
"""
return self.snapshot_get_branches(snapshot_id)
def snapshot_get_by_origin_visit(self, origin, visit):
"""Get the content, possibly partial, of a snapshot for the given origin visit
The branches of the snapshot are iterated in the lexicographical
order of their names.
.. warning:: At most 1000 branches contained in the snapshot will be
returned for performance reasons. In order to browse the whole
set of branches, the method :meth:`snapshot_get_branches`
should be used instead.
Args:
origin (int): the origin's identifier
visit (int): the visit's identifier
Returns:
dict: None if the snapshot does not exist;
a dict with three keys otherwise:
* **id**: identifier of the snapshot
* **branches**: a dict of branches contained in the snapshot
whose keys are the branches' names.
* **next_branch**: the name of the first branch not returned
or :const:`None` if the snapshot has less than 1000
branches.
"""
if origin > len(self._origins) or \
visit > len(self._origin_visits[origin-1]):
return None
snapshot_id = self._origin_visits[origin-1][visit-1]['snapshot']
if snapshot_id:
return self.snapshot_get(snapshot_id)
else:
return None
def snapshot_get_latest(self, origin, allowed_statuses=None):
"""Get the content, possibly partial, of the latest snapshot for the
given origin, optionally only from visits that have one of the given
allowed_statuses
The branches of the snapshot are iterated in the lexicographical
order of their names.
.. warning:: At most 1000 branches contained in the snapshot will be
returned for performance reasons. In order to browse the whole
set of branches, the method :meth:`snapshot_get_branches`
should be used instead.
Args:
origin (int): the origin's identifier
allowed_statuses (list of str): list of visit statuses considered
to find the latest snapshot for the visit. For instance,
``allowed_statuses=['full']`` will only consider visits that
have successfully run to completion.
Returns:
dict: a dict with three keys:
* **id**: identifier of the snapshot
* **branches**: a dict of branches contained in the snapshot
whose keys are the branches' names.
* **next_branch**: the name of the first branch not returned
or :const:`None` if the snapshot has less than 1000
branches.
"""
visits = self._origin_visits[origin-1]
if allowed_statuses is not None:
visits = [visit for visit in visits
if visit['status'] in allowed_statuses]
snapshot = None
for visit in sorted(visits, key=lambda v: (v['date'], v['visit']),
reverse=True):
snapshot_id = visit['snapshot']
snapshot = self.snapshot_get(snapshot_id)
if snapshot:
break
return snapshot
def snapshot_count_branches(self, snapshot_id, db=None, cur=None):
"""Count the number of branches in the snapshot with the given id
Args:
snapshot_id (bytes): identifier of the snapshot
Returns:
dict: A dict whose keys are the target types of branches and
values their corresponding amount
"""
branches = list(self._snapshots[snapshot_id]['branches'].values())
return collections.Counter(branch['target_type'] if branch else None
for branch in branches)
def snapshot_get_branches(self, snapshot_id, branches_from=b'',
branches_count=1000, target_types=None):
"""Get the content, possibly partial, of a snapshot with the given id
The branches of the snapshot are iterated in the lexicographical
order of their names.
Args:
snapshot_id (bytes): identifier of the snapshot
branches_from (bytes): optional parameter used to skip branches
whose name is lesser than it before returning them
branches_count (int): optional parameter used to restrain
the amount of returned branches
target_types (list): optional parameter used to filter the
target types of branch to return (possible values that can be
contained in that list are `'content', 'directory',
'revision', 'release', 'snapshot', 'alias'`)
Returns:
dict: None if the snapshot does not exist;
a dict with three keys otherwise:
* **id**: identifier of the snapshot
* **branches**: a dict of branches contained in the snapshot
whose keys are the branches' names.
* **next_branch**: the name of the first branch not returned
or :const:`None` if the snapshot has less than
`branches_count` branches after `branches_from` included.
"""
snapshot = self._snapshots.get(snapshot_id)
if snapshot is None:
return None
sorted_branch_names = snapshot['_sorted_branch_names']
from_index = bisect.bisect_left(
sorted_branch_names, branches_from)
if target_types:
next_branch = None
branches = {}
for branch_name in sorted_branch_names[from_index:]:
branch = snapshot['branches'][branch_name]
if branch and branch['target_type'] in target_types:
if len(branches) < branches_count:
branches[branch_name] = branch
else:
next_branch = branch_name
break
else:
# As there is no 'target_types', we can do that much faster
to_index = from_index + branches_count
returned_branch_names = sorted_branch_names[from_index:to_index]
branches = {branch_name: snapshot['branches'][branch_name]
for branch_name in returned_branch_names}
if to_index >= len(sorted_branch_names):
next_branch = None
else:
next_branch = sorted_branch_names[to_index]
return {
'id': snapshot_id,
'branches': branches,
'next_branch': next_branch,
}
def object_find_by_sha1_git(self, ids, db=None, cur=None):
"""Return the objects found with the given ids.
Args:
ids: a generator of sha1_gits
Returns:
dict: a mapping from id to the list of objects found. Each object
found is itself a dict with keys:
- sha1_git: the input id
- type: the type of object found
- id: the id of the object found
- object_id: the numeric id of the object found.
"""
ret = {}
for id_ in ids:
objs = self._objects.get(id_, [])
ret[id_] = [{
'sha1_git': id_,
'type': obj[0],
'id': obj[1],
'object_id': id_,
} for obj in objs]
return ret
def origin_get(self, origins):
"""Return origins, either all identified by their ids or all
identified by tuples (type, url).
Args:
origin: a list of dictionaries representing the individual
origins to find.
These dicts have either the keys type and url:
- type (FIXME: enum TBD): the origin type ('git', 'wget', ...)
- url (bytes): the url the origin points to
or the id:
- id (int): the origin's identifier
Returns:
dict: the origin dictionary with the keys:
- id: origin's id
- type: origin's type
- url: origin's url
Raises:
ValueError: if the keys does not match (url and type) nor id.
"""
if isinstance(origins, dict):
# Old API
return_single = True
origins = [origins]
else:
return_single = False
# Sanity check to be error-compatible with the pgsql backend
if any('id' in origin for origin in origins) \
and not all('id' in origin for origin in origins):
raise ValueError(
'Either all origins or none at all should have an "id".')
if any('type' in origin and 'url' in origin for origin in origins) \
and not all('type' in origin and 'url' in origin
for origin in origins):
raise ValueError(
'Either all origins or none at all should have a '
'"type" and an "url".')
results = []
for origin in origins:
if 'id' in origin:
origin_id = origin['id']
elif 'type' in origin and 'url' in origin:
origin_id = self._origin_id(origin)
else:
raise ValueError(
'Origin must have either id or (type and url).')
origin = None
# self._origin_id can return None
if origin_id is not None and origin_id <= len(self._origins):
origin = copy.deepcopy(self._origins[origin_id-1])
origin['id'] = origin_id
results.append(origin)
if return_single:
assert len(results) == 1
return results[0]
else:
return results
def origin_get_range(self, origin_from=1, origin_count=100):
"""Retrieve ``origin_count`` origins whose ids are greater
or equal than ``origin_from``.
Origins are sorted by id before retrieving them.
Args:
origin_from (int): the minimum id of origins to retrieve
origin_count (int): the maximum number of origins to retrieve
Yields:
dicts containing origin information as returned
by :meth:`swh.storage.in_memory.Storage.origin_get`.
"""
origin_from = max(origin_from, 1)
if origin_from <= len(self._origins):
max_idx = origin_from + origin_count - 1
if max_idx > len(self._origins):
max_idx = len(self._origins)
for idx in range(origin_from-1, max_idx):
yield copy.deepcopy(self._origins[idx])
def origin_search(self, url_pattern, offset=0, limit=50,
regexp=False, with_visit=False, db=None, cur=None):
"""Search for origins whose urls contain a provided string pattern
or match a provided regular expression.
The search is performed in a case insensitive way.
Args:
url_pattern (str): the string pattern to search for in origin urls
offset (int): number of found origins to skip before returning
results
limit (int): the maximum number of found origins to return
regexp (bool): if True, consider the provided pattern as a regular
expression and return origins whose urls match it
with_visit (bool): if True, filter out origins with no visit
Returns:
An iterable of dict containing origin information as returned
by :meth:`swh.storage.storage.Storage.origin_get`.
"""
origins = self._origins
if regexp:
pat = re.compile(url_pattern)
origins = [orig for orig in origins if pat.match(orig['url'])]
else:
origins = [orig for orig in origins if url_pattern in orig['url']]
if with_visit:
origins = [orig for orig in origins
if len(self._origin_visits[orig['id']-1]) > 0]
origins = copy.deepcopy(origins[offset:offset+limit])
return origins
def origin_count(self, url_pattern, regexp=False, with_visit=False,
db=None, cur=None):
"""Count origins whose urls contain a provided string pattern
or match a provided regular expression.
The pattern search in origin urls is performed in a case insensitive
way.
Args:
url_pattern (str): the string pattern to search for in origin urls
regexp (bool): if True, consider the provided pattern as a regular
expression and return origins whose urls match it
with_visit (bool): if True, filter out origins with no visit
Returns:
int: The number of origins matching the search criterion.
"""
return len(self.origin_search(url_pattern, regexp=regexp,
with_visit=with_visit,
limit=len(self._origins)))
def origin_add(self, origins):
"""Add origins to the storage
Args:
origins: list of dictionaries representing the individual origins,
with the following keys:
- type: the origin type ('git', 'svn', 'deb', ...)
- url (bytes): the url the origin points to
Returns:
list: given origins as dict updated with their id
"""
origins = copy.deepcopy(origins)
for origin in origins:
origin['id'] = self.origin_add_one(origin)
return origins
def origin_add_one(self, origin):
"""Add origin to the storage
Args:
origin: dictionary representing the individual origin to add. This
dict has the following keys:
- type (FIXME: enum TBD): the origin type ('git', 'wget', ...)
- url (bytes): the url the origin points to
Returns:
the id of the added origin, or of the identical one that already
exists.
"""
origin = copy.deepcopy(origin)
assert 'id' not in origin
origin_id = self._origin_id(origin)
if origin_id is None:
# origin ids are in the range [1, +inf[
origin_id = len(self._origins) + 1
origin['id'] = origin_id
self._origins.append(origin)
self._origin_visits.append([])
key = (origin['type'], origin['url'])
self._objects[key].append(('origin', origin_id))
else:
origin['id'] = origin_id
if self.journal_writer:
self.journal_writer.write_addition('origin', origin)
return origin_id
def fetch_history_start(self, origin_id):
"""Add an entry for origin origin_id in fetch_history. Returns the id
of the added fetch_history entry
"""
pass
def fetch_history_end(self, fetch_history_id, data):
"""Close the fetch_history entry with id `fetch_history_id`, replacing
its data with `data`.
"""
pass
def fetch_history_get(self, fetch_history_id):
"""Get the fetch_history entry with id `fetch_history_id`.
"""
raise NotImplementedError('fetch_history_get is deprecated, use '
'origin_visit_get instead.')
def origin_visit_add(self, origin, date=None, *, ts=None):
"""Add an origin_visit for the origin at date with status 'ongoing'.
Args:
origin (int): visited origin's identifier
date: timestamp of such visit
Returns:
dict: dictionary with keys origin and visit where:
- origin: origin's identifier
- visit: the visit's identifier for the new visit occurrence
"""
if ts is None:
if date is None:
raise TypeError('origin_visit_add expected 2 arguments.')
else:
assert date is None
warnings.warn("argument 'ts' of origin_visit_add was renamed "
"to 'date' in v0.0.109.",
DeprecationWarning)
date = ts
origin_id = origin # TODO: rename the argument
if isinstance(date, str):
date = dateutil.parser.parse(date)
visit_ret = None
if origin_id <= len(self._origin_visits):
# visit ids are in the range [1, +inf[
visit_id = len(self._origin_visits[origin_id-1]) + 1
status = 'ongoing'
visit = {
'origin': origin_id,
'date': date,
'status': status,
'snapshot': None,
'metadata': None,
'visit': visit_id
}
self._origin_visits[origin_id-1].append(visit)
visit_ret = {
'origin': origin_id,
'visit': visit_id,
}
if self.journal_writer:
origin = self.origin_get([{'id': origin_id}])[0]
self.journal_writer.write_addition('origin_visit', {
**visit, 'origin': origin})
return visit_ret
def origin_visit_update(self, origin, visit_id, status=None,
metadata=None, snapshot=None):
"""Update an origin_visit's status.
Args:
origin (int): visited origin's identifier
visit_id (int): visit's identifier
status: visit's new status
metadata: data associated to the visit
snapshot (sha1_git): identifier of the snapshot to add to
the visit
Returns:
None
"""
origin_id = origin # TODO: rename the argument
try:
visit = self._origin_visits[origin_id-1][visit_id-1]
except IndexError:
raise ValueError('Invalid origin_id or visit_id') from None
if self.journal_writer:
origin = self.origin_get([{'id': origin_id}])[0]
self.journal_writer.write_update('origin_visit', {
'origin': origin, 'visit': visit_id,
'status': status or visit['status'],
'date': visit['date'],
'metadata': metadata or visit['metadata'],
'snapshot': snapshot or visit['snapshot']})
if origin_id > len(self._origin_visits) or \
visit_id > len(self._origin_visits[origin_id-1]):
return
if status:
visit['status'] = status
if metadata:
visit['metadata'] = metadata
if snapshot:
visit['snapshot'] = snapshot
def origin_visit_get(self, origin, last_visit=None, limit=None):
"""Retrieve all the origin's visit's information.
Args:
origin (int): the origin's identifier
last_visit (int): visit's id from which listing the next ones,
default to None
limit (int): maximum number of results to return,
default to None
Yields:
List of visits.
"""
if origin <= len(self._origin_visits):
visits = self._origin_visits[origin-1]
if last_visit is not None:
visits = visits[last_visit:]
if limit is not None:
visits = visits[:limit]
for visit in visits:
visit_id = visit['visit']
yield copy.deepcopy(self._origin_visits[origin-1][visit_id-1])
def origin_visit_get_by(self, origin, visit):
"""Retrieve origin visit's information.
Args:
origin (int): the origin's identifier
Returns:
The information on that particular (origin, visit) or None if
it does not exist
"""
origin_visit = None
if origin <= len(self._origin_visits) and \
visit <= len(self._origin_visits[origin-1]):
origin_visit = self._origin_visits[origin-1][visit-1]
return copy.deepcopy(origin_visit)
def person_get(self, person):
"""Return the persons identified by their ids.
Args:
person: array of ids.
Returns:
The array of persons corresponding of the ids.
"""
for p in person:
if 0 <= (p - 1) < len(self._persons):
yield dict(self._persons[p - 1], id=p)
else:
yield None
def stat_counters(self):
"""compute statistics about the number of tuples in various tables
Returns:
dict: a dictionary mapping textual labels (e.g., content) to
integer values (e.g., the number of tuples in table content)
"""
keys = (
'content',
'directory',
'origin',
'origin_visit',
'person',
'release',
'revision',
'skipped_content',
'snapshot'
)
stats = {key: 0 for key in keys}
stats.update(collections.Counter(
obj_type
for (obj_type, obj_id)
in itertools.chain(*self._objects.values())))
return stats
def refresh_stat_counters(self):
"""Recomputes the statistics for `stat_counters`."""
pass
def origin_metadata_add(self, origin_id, ts, provider, tool, metadata,
db=None, cur=None):
""" Add an origin_metadata for the origin at ts with provenance and
metadata.
Args:
origin_id (int): the origin's id for which the metadata is added
ts (datetime): timestamp of the found metadata
provider: id of the provider of metadata (ex:'hal')
tool: id of the tool used to extract metadata
metadata (jsonb): the metadata retrieved at the time and location
"""
if isinstance(ts, str):
ts = dateutil.parser.parse(ts)
origin_metadata = {
'origin_id': origin_id,
'discovery_date': ts,
'tool_id': tool,
'metadata': metadata,
'provider_id': provider,
}
self._origin_metadata[origin_id].append(origin_metadata)
return None
def origin_metadata_get_by(self, origin_id, provider_type=None, db=None,
cur=None):
"""Retrieve list of all origin_metadata entries for the origin_id
Args:
origin_id (int): the unique origin's identifier
provider_type (str): (optional) type of provider
Returns:
list of dicts: the origin_metadata dictionary with the keys:
- origin_id (int): origin's identifier
- discovery_date (datetime): timestamp of discovery
- tool_id (int): metadata's extracting tool
- metadata (jsonb)
- provider_id (int): metadata's provider
- provider_name (str)
- provider_type (str)
- provider_url (str)
"""
metadata = []
for item in self._origin_metadata[origin_id]:
item = copy.deepcopy(item)
provider = self.metadata_provider_get(item['provider_id'])
for attr in ('name', 'type', 'url'):
item['provider_' + attr] = provider[attr]
metadata.append(item)
return metadata
def tool_add(self, tools):
"""Add new tools to the storage.
Args:
tools (iterable of :class:`dict`): Tool information to add to
storage. Each tool is a :class:`dict` with the following keys:
- name (:class:`str`): name of the tool
- version (:class:`str`): version of the tool
- configuration (:class:`dict`): configuration of the tool,
must be json-encodable
Yields:
:class:`dict`: All the tools inserted in storage
(including the internal ``id``). The order of the list is not
guaranteed to match the order of the initial list.
"""
inserted = []
for tool in tools:
key = self._tool_key(tool)
assert 'id' not in tool
record = copy.deepcopy(tool)
record['id'] = key # TODO: remove this
if key not in self._tools:
self._tools[key] = record
inserted.append(copy.deepcopy(self._tools[key]))
yield from inserted
def tool_get(self, tool):
"""Retrieve tool information.
Args:
tool (dict): Tool information we want to retrieve from storage.
The dicts have the same keys as those used in :func:`tool_add`.
Returns:
dict: The full tool information if it exists (``id`` included),
None otherwise.
"""
return self._tools.get(self._tool_key(tool))
def metadata_provider_add(self, provider_name, provider_type, provider_url,
metadata):
"""Add a metadata provider.
Args:
provider_name (str): Its name
provider_type (str): Its type
provider_url (str): Its URL
metadata: JSON-encodable object
Returns:
an identifier of the provider
"""
provider = {
'name': provider_name,
'type': provider_type,
'url': provider_url,
'metadata': metadata,
}
key = self._metadata_provider_key(provider)
provider['id'] = key
self._metadata_providers[key] = provider
return key
def metadata_provider_get(self, provider_id, db=None, cur=None):
"""Get a metadata provider
Args:
provider_id: Its identifier, as given by `metadata_provider_add`.
Returns:
dict: same as `metadata_provider_add`;
or None if it does not exist.
"""
return self._metadata_providers.get(provider_id)
def metadata_provider_get_by(self, provider, db=None, cur=None):
"""Get a metadata provider
Args:
provider_name: Its name
provider_url: Its URL
Returns:
dict: same as `metadata_provider_add`;
or None if it does not exist.
"""
key = self._metadata_provider_key({
'name': provider['provider_name'],
'url': provider['provider_url']})
return self._metadata_providers.get(key)
def _origin_id(self, origin):
origin_id = None
for stored_origin in self._origins:
if stored_origin['type'] == origin['type'] and \
stored_origin['url'] == origin['url']:
origin_id = stored_origin['id']
break
return origin_id
def _person_add(self, person):
"""Add a person in storage.
Note: Private method, do not use outside of this class.
Args:
person: dictionary with keys fullname, name and email.
"""
key = ('person', person['fullname'])
if key not in self._objects:
person_id = len(self._persons) + 1
self._persons.append(dict(person))
self._objects[key].append(('person', person_id))
else:
person_id = self._objects[key][0][1]
p = next(self.person_get([person_id]))
person.update(p.items())
person['id'] = person_id
@staticmethod
def _content_key(content):
"""A stable key for a content"""
return tuple(content.get(key) for key in sorted(DEFAULT_ALGORITHMS))
@staticmethod
def _tool_key(tool):
return (tool['name'], tool['version'],
tuple(sorted(tool['configuration'].items())))
@staticmethod
def _metadata_provider_key(provider):
return (provider['name'], provider['url'])
diff --git a/swh/storage/sql/40-swh-func.sql b/swh/storage/sql/40-swh-func.sql
index aab1d8cd..941efea1 100644
--- a/swh/storage/sql/40-swh-func.sql
+++ b/swh/storage/sql/40-swh-func.sql
@@ -1,1117 +1,1118 @@
create or replace function hash_sha1(text)
returns text
as $$
select encode(digest($1, 'sha1'), 'hex')
$$ language sql strict immutable;
comment on function hash_sha1(text) is 'Compute SHA1 hash as text';
-- create a temporary table called tmp_TBLNAME, mimicking existing table
-- TBLNAME
--
-- Args:
-- tblname: name of the table to mimick
create or replace function swh_mktemp(tblname regclass)
returns void
language plpgsql
as $$
begin
execute format('
create temporary table tmp_%1$I
(like %1$I including defaults)
on commit drop;
alter table tmp_%1$I drop column if exists object_id;
', tblname);
return;
end
$$;
-- create a temporary table for directory entries called tmp_TBLNAME,
-- mimicking existing table TBLNAME with an extra dir_id (sha1_git)
-- column, and dropping the id column.
--
-- This is used to create the tmp_directory_entry_<foo> tables.
--
-- Args:
-- tblname: name of the table to mimick
create or replace function swh_mktemp_dir_entry(tblname regclass)
returns void
language plpgsql
as $$
begin
execute format('
create temporary table tmp_%1$I
(like %1$I including defaults, dir_id sha1_git)
on commit drop;
alter table tmp_%1$I drop column id;
', tblname);
return;
end
$$;
-- create a temporary table for revisions called tmp_revisions,
-- mimicking existing table revision, replacing the foreign keys to
-- people with an email and name field
--
create or replace function swh_mktemp_revision()
returns void
language sql
as $$
create temporary table tmp_revision (
like revision including defaults,
author_fullname bytea,
author_name bytea,
author_email bytea,
committer_fullname bytea,
committer_name bytea,
committer_email bytea
) on commit drop;
alter table tmp_revision drop column author;
alter table tmp_revision drop column committer;
alter table tmp_revision drop column object_id;
$$;
-- create a temporary table for releases called tmp_release,
-- mimicking existing table release, replacing the foreign keys to
-- people with an email and name field
--
create or replace function swh_mktemp_release()
returns void
language sql
as $$
create temporary table tmp_release (
like release including defaults,
author_fullname bytea,
author_name bytea,
author_email bytea
) on commit drop;
alter table tmp_release drop column author;
alter table tmp_release drop column object_id;
$$;
-- create a temporary table for the branches of a snapshot
create or replace function swh_mktemp_snapshot_branch()
returns void
language sql
as $$
create temporary table tmp_snapshot_branch (
name bytea not null,
target bytea,
target_type snapshot_target
) on commit drop;
$$;
create or replace function swh_mktemp_tool()
returns void
language sql
as $$
create temporary table tmp_tool (
like tool including defaults
) on commit drop;
alter table tmp_tool drop column id;
$$;
-- a content signature is a set of cryptographic checksums that we use to
-- uniquely identify content, for the purpose of verifying if we already have
-- some content or not during content injection
create type content_signature as (
sha1 sha1,
sha1_git sha1_git,
sha256 sha256,
blake2s256 blake2s256
);
-- check which entries of tmp_skipped_content are missing from skipped_content
--
-- operates in bulk: 0. swh_mktemp(skipped_content), 1. COPY to tmp_skipped_content,
-- 2. call this function
create or replace function swh_skipped_content_missing()
returns setof content_signature
language plpgsql
as $$
begin
return query
select sha1, sha1_git, sha256, blake2s256 from tmp_skipped_content t
where not exists
(select 1 from skipped_content s where
s.sha1 is not distinct from t.sha1 and
s.sha1_git is not distinct from t.sha1_git and
s.sha256 is not distinct from t.sha256);
return;
end
$$;
-- Look up content based on one or several different checksums. Return all
-- content information if the content is found; a NULL row otherwise.
--
-- At least one checksum should be not NULL. If several are not NULL, they will
-- be AND-ed together in the lookup query.
--
-- Note: this function is meant to be used to look up individual contents
-- (e.g., for the web app), for batch lookup of missing content (e.g., to be
-- added) see swh_content_missing
create or replace function swh_content_find(
sha1 sha1 default NULL,
sha1_git sha1_git default NULL,
sha256 sha256 default NULL,
blake2s256 blake2s256 default NULL
)
returns content
language plpgsql
as $$
declare
con content;
filters text[] := array[] :: text[]; -- AND-clauses used to filter content
q text;
begin
if sha1 is not null then
filters := filters || format('sha1 = %L', sha1);
end if;
if sha1_git is not null then
filters := filters || format('sha1_git = %L', sha1_git);
end if;
if sha256 is not null then
filters := filters || format('sha256 = %L', sha256);
end if;
if blake2s256 is not null then
filters := filters || format('blake2s256 = %L', blake2s256);
end if;
if cardinality(filters) = 0 then
return null;
else
q = format('select * from content where %s',
array_to_string(filters, ' and '));
execute q into con;
return con;
end if;
end
$$;
-- add tmp_content entries to content, skipping duplicates
--
-- operates in bulk: 0. swh_mktemp(content), 1. COPY to tmp_content,
-- 2. call this function
create or replace function swh_content_add()
returns void
language plpgsql
as $$
begin
insert into content (sha1, sha1_git, sha256, blake2s256, length, status)
select distinct sha1, sha1_git, sha256, blake2s256, length, status from tmp_content;
return;
end
$$;
-- add tmp_skipped_content entries to skipped_content, skipping duplicates
--
-- operates in bulk: 0. swh_mktemp(skipped_content), 1. COPY to tmp_skipped_content,
-- 2. call this function
create or replace function swh_skipped_content_add()
returns void
language plpgsql
as $$
begin
insert into skipped_content (sha1, sha1_git, sha256, blake2s256, length, status, reason, origin)
select distinct sha1, sha1_git, sha256, blake2s256, length, status, reason, origin
from tmp_skipped_content
where (coalesce(sha1, ''), coalesce(sha1_git, ''), coalesce(sha256, '')) in (
select coalesce(sha1, ''), coalesce(sha1_git, ''), coalesce(sha256, '')
from swh_skipped_content_missing()
);
-- TODO XXX use postgres 9.5 "UPSERT" support here, when available.
-- Specifically, using "INSERT .. ON CONFLICT IGNORE" we can avoid
-- the extra swh_content_missing() query here.
return;
end
$$;
-- Update content entries from temporary table.
-- (columns are potential new columns added to the schema, this cannot be empty)
--
create or replace function swh_content_update(columns_update text[])
returns void
language plpgsql
as $$
declare
query text;
tmp_array text[];
begin
if array_length(columns_update, 1) = 0 then
raise exception 'Please, provide the list of column names to update.';
end if;
tmp_array := array(select format('%1$s=t.%1$s', unnest) from unnest(columns_update));
query = format('update content set %s
from tmp_content t where t.sha1 = content.sha1',
array_to_string(tmp_array, ', '));
execute query;
return;
end
$$;
comment on function swh_content_update(text[]) IS 'Update existing content''s columns';
-- check which entries of tmp_directory are missing from directory
--
-- operates in bulk: 0. swh_mktemp(directory), 1. COPY to tmp_directory,
-- 2. call this function
create or replace function swh_directory_missing()
returns setof sha1_git
language plpgsql
as $$
begin
return query
select id from tmp_directory t
where not exists (
select 1 from directory d
where d.id = t.id);
return;
end
$$;
create type directory_entry_type as enum('file', 'dir', 'rev');
-- Add tmp_directory_entry_* entries to directory_entry_* and directory,
-- skipping duplicates in directory_entry_*. This is a generic function that
-- works on all kind of directory entries.
--
-- operates in bulk: 0. swh_mktemp_dir_entry('directory_entry_*'), 1 COPY to
-- tmp_directory_entry_*, 2. call this function
--
-- Assumption: this function is used in the same transaction that inserts the
-- context directory in table "directory".
create or replace function swh_directory_entry_add(typ directory_entry_type)
returns void
language plpgsql
as $$
begin
execute format('
insert into directory_entry_%1$s (target, name, perms)
select distinct t.target, t.name, t.perms
from tmp_directory_entry_%1$s t
where not exists (
select 1
from directory_entry_%1$s i
where t.target = i.target and t.name = i.name and t.perms = i.perms)
', typ);
execute format('
with new_entries as (
select t.dir_id, array_agg(i.id) as entries
from tmp_directory_entry_%1$s t
inner join directory_entry_%1$s i
using (target, name, perms)
group by t.dir_id
)
update tmp_directory as d
set %1$s_entries = new_entries.entries
from new_entries
where d.id = new_entries.dir_id
', typ);
return;
end
$$;
-- Insert the data from tmp_directory, tmp_directory_entry_file,
-- tmp_directory_entry_dir, tmp_directory_entry_rev into their final
-- tables.
--
-- Prerequisites:
-- directory ids in tmp_directory
-- entries in tmp_directory_entry_{file,dir,rev}
--
create or replace function swh_directory_add()
returns void
language plpgsql
as $$
begin
perform swh_directory_entry_add('file');
perform swh_directory_entry_add('dir');
perform swh_directory_entry_add('rev');
insert into directory
select * from tmp_directory t
where not exists (
select 1 from directory d
where d.id = t.id);
return;
end
$$;
-- a directory listing entry with all the metadata
--
-- can be used to list a directory, and retrieve all the data in one go.
create type directory_entry as
(
dir_id sha1_git, -- id of the parent directory
type directory_entry_type, -- type of entry
target sha1_git, -- id of target
name unix_path, -- path name, relative to containing dir
perms file_perms, -- unix-like permissions
status content_status, -- visible or absent
sha1 sha1, -- content if sha1 if type is not dir
sha1_git sha1_git, -- content's sha1 git if type is not dir
sha256 sha256, -- content's sha256 if type is not dir
length bigint -- content length if type is not dir
);
-- List a single level of directory walked_dir_id
-- FIXME: order by name is not correct. For git, we need to order by
-- lexicographic order but as if a trailing / is present in directory
-- name
create or replace function swh_directory_walk_one(walked_dir_id sha1_git)
returns setof directory_entry
language sql
stable
as $$
with dir as (
select id as dir_id, dir_entries, file_entries, rev_entries
from directory
where id = walked_dir_id),
ls_d as (select dir_id, unnest(dir_entries) as entry_id from dir),
ls_f as (select dir_id, unnest(file_entries) as entry_id from dir),
ls_r as (select dir_id, unnest(rev_entries) as entry_id from dir)
(select dir_id, 'dir'::directory_entry_type as type,
e.target, e.name, e.perms, NULL::content_status,
NULL::sha1, NULL::sha1_git, NULL::sha256, NULL::bigint
from ls_d
left join directory_entry_dir e on ls_d.entry_id = e.id)
union
(select dir_id, 'file'::directory_entry_type as type,
e.target, e.name, e.perms, c.status,
c.sha1, c.sha1_git, c.sha256, c.length
from ls_f
left join directory_entry_file e on ls_f.entry_id = e.id
left join content c on e.target = c.sha1_git)
union
(select dir_id, 'rev'::directory_entry_type as type,
e.target, e.name, e.perms, NULL::content_status,
NULL::sha1, NULL::sha1_git, NULL::sha256, NULL::bigint
from ls_r
left join directory_entry_rev e on ls_r.entry_id = e.id)
order by name;
$$;
-- List recursively the revision directory arborescence
create or replace function swh_directory_walk(walked_dir_id sha1_git)
returns setof directory_entry
language sql
stable
as $$
with recursive entries as (
select dir_id, type, target, name, perms, status, sha1, sha1_git,
sha256, length
from swh_directory_walk_one(walked_dir_id)
union all
select dir_id, type, target, (dirname || '/' || name)::unix_path as name,
perms, status, sha1, sha1_git, sha256, length
from (select (swh_directory_walk_one(dirs.target)).*, dirs.name as dirname
from (select target, name from entries where type = 'dir') as dirs) as with_parent
)
select dir_id, type, target, name, perms, status, sha1, sha1_git, sha256, length
from entries
$$;
create or replace function swh_revision_walk(revision_id sha1_git)
returns setof directory_entry
language sql
stable
as $$
select dir_id, type, target, name, perms, status, sha1, sha1_git, sha256, length
from swh_directory_walk((select directory from revision where id=revision_id))
$$;
COMMENT ON FUNCTION swh_revision_walk(sha1_git) IS 'Recursively list the revision targeted directory arborescence';
-- Find a directory entry by its path
create or replace function swh_find_directory_entry_by_path(
walked_dir_id sha1_git,
dir_or_content_path bytea[])
returns directory_entry
language plpgsql
as $$
declare
end_index integer;
paths bytea default '';
path bytea;
res bytea[];
r record;
begin
end_index := array_upper(dir_or_content_path, 1);
res[1] := walked_dir_id;
for i in 1..end_index
loop
path := dir_or_content_path[i];
-- concatenate path for patching the name in the result record (if we found it)
if i = 1 then
paths = path;
else
paths := paths || '/' || path; -- concatenate paths
end if;
if i <> end_index then
select *
from swh_directory_walk_one(res[i] :: sha1_git)
where name=path
and type = 'dir'
limit 1 into r;
else
select *
from swh_directory_walk_one(res[i] :: sha1_git)
where name=path
limit 1 into r;
end if;
-- find the path
if r is null then
return null;
else
-- store the next dir to lookup the next local path from
res[i+1] := r.target;
end if;
end loop;
-- at this moment, r is the result. Patch its 'name' with the full path before returning it.
r.name := paths;
return r;
end
$$;
-- List all revision IDs starting from a given revision, going back in time
--
-- TODO ordering: should be breadth-first right now (what do we want?)
-- TODO ordering: ORDER BY parent_rank somewhere?
create or replace function swh_revision_list(root_revisions bytea[], num_revs bigint default NULL)
returns table (id sha1_git, parents bytea[])
language sql
stable
as $$
with recursive full_rev_list(id) as (
(select id from revision where id = ANY(root_revisions))
union
(select h.parent_id
from revision_history as h
join full_rev_list on h.id = full_rev_list.id)
),
rev_list as (select id from full_rev_list limit num_revs)
select rev_list.id as id,
array(select rh.parent_id::bytea
from revision_history rh
where rh.id = rev_list.id
order by rh.parent_rank
) as parent
from rev_list;
$$;
-- List all the children of a given revision
create or replace function swh_revision_list_children(root_revisions bytea[], num_revs bigint default NULL)
returns table (id sha1_git, parents bytea[])
language sql
stable
as $$
with recursive full_rev_list(id) as (
(select id from revision where id = ANY(root_revisions))
union
(select h.id
from revision_history as h
join full_rev_list on h.parent_id = full_rev_list.id)
),
rev_list as (select id from full_rev_list limit num_revs)
select rev_list.id as id,
array(select rh.parent_id::bytea
from revision_history rh
where rh.id = rev_list.id
order by rh.parent_rank
) as parent
from rev_list;
$$;
-- Detailed entry for a revision
create type revision_entry as
(
id sha1_git,
date timestamptz,
date_offset smallint,
date_neg_utc_offset boolean,
committer_date timestamptz,
committer_date_offset smallint,
committer_date_neg_utc_offset boolean,
type revision_type,
directory sha1_git,
message bytea,
author_id bigint,
author_fullname bytea,
author_name bytea,
author_email bytea,
committer_id bigint,
committer_fullname bytea,
committer_name bytea,
committer_email bytea,
metadata jsonb,
synthetic boolean,
parents bytea[],
object_id bigint
);
-- "git style" revision log. Similar to swh_revision_list(), but returning all
-- information associated to each revision, and expanding authors/committers
create or replace function swh_revision_log(root_revisions bytea[], num_revs bigint default NULL)
returns setof revision_entry
language sql
stable
as $$
select t.id, r.date, r.date_offset, r.date_neg_utc_offset,
r.committer_date, r.committer_date_offset, r.committer_date_neg_utc_offset,
r.type, r.directory, r.message,
a.id, a.fullname, a.name, a.email,
c.id, c.fullname, c.name, c.email,
r.metadata, r.synthetic, t.parents, r.object_id
from swh_revision_list(root_revisions, num_revs) as t
left join revision r on t.id = r.id
left join person a on a.id = r.author
left join person c on c.id = r.committer;
$$;
-- Detailed entry for a release
create type release_entry as
(
id sha1_git,
target sha1_git,
target_type object_type,
date timestamptz,
date_offset smallint,
date_neg_utc_offset boolean,
name bytea,
comment bytea,
synthetic boolean,
author_id bigint,
author_fullname bytea,
author_name bytea,
author_email bytea,
object_id bigint
);
-- Create entries in person from tmp_revision
create or replace function swh_person_add_from_revision()
returns void
language plpgsql
as $$
begin
with t as (
select author_fullname as fullname, author_name as name, author_email as email from tmp_revision
union
select committer_fullname as fullname, committer_name as name, committer_email as email from tmp_revision
) insert into person (fullname, name, email)
select distinct fullname, name, email from t
where not exists (
select 1
from person p
where t.fullname = p.fullname
);
return;
end
$$;
-- Create entries in revision from tmp_revision
create or replace function swh_revision_add()
returns void
language plpgsql
as $$
begin
perform swh_person_add_from_revision();
insert into revision (id, date, date_offset, date_neg_utc_offset, committer_date, committer_date_offset, committer_date_neg_utc_offset, type, directory, message, author, committer, metadata, synthetic)
select t.id, t.date, t.date_offset, t.date_neg_utc_offset, t.committer_date, t.committer_date_offset, t.committer_date_neg_utc_offset, t.type, t.directory, t.message, a.id, c.id, t.metadata, t.synthetic
from tmp_revision t
left join person a on a.fullname = t.author_fullname
left join person c on c.fullname = t.committer_fullname;
return;
end
$$;
-- Create entries in person from tmp_release
create or replace function swh_person_add_from_release()
returns void
language plpgsql
as $$
begin
with t as (
select distinct author_fullname as fullname, author_name as name, author_email as email from tmp_release
) insert into person (fullname, name, email)
select fullname, name, email from t
where not exists (
select 1
from person p
where t.fullname = p.fullname
);
return;
end
$$;
-- Create entries in release from tmp_release
create or replace function swh_release_add()
returns void
language plpgsql
as $$
begin
perform swh_person_add_from_release();
insert into release (id, target, target_type, date, date_offset, date_neg_utc_offset, name, comment, author, synthetic)
select t.id, t.target, t.target_type, t.date, t.date_offset, t.date_neg_utc_offset, t.name, t.comment, a.id, t.synthetic
from tmp_release t
left join person a on a.fullname = t.author_fullname;
return;
end
$$;
-- add a new origin_visit for origin origin_id at date.
--
-- Returns the new visit id.
create or replace function swh_origin_visit_add(origin_id bigint, date timestamptz)
returns bigint
language sql
as $$
with last_known_visit as (
select coalesce(max(visit), 0) as visit
from origin_visit
where origin = origin_id
)
insert into origin_visit (origin, date, visit, status)
values (origin_id, date, (select visit from last_known_visit) + 1, 'ongoing')
returning visit;
$$;
create or replace function swh_snapshot_add(snapshot_id snapshot.id%type)
returns void
language plpgsql
as $$
declare
snapshot_object_id snapshot.object_id%type;
begin
select object_id from snapshot where id = snapshot_id into snapshot_object_id;
if snapshot_object_id is null then
insert into snapshot (id) values (snapshot_id) returning object_id into snapshot_object_id;
insert into snapshot_branch (name, target_type, target)
select name, target_type, target from tmp_snapshot_branch tmp
where not exists (
select 1
from snapshot_branch sb
where sb.name = tmp.name
and sb.target = tmp.target
and sb.target_type = tmp.target_type
)
on conflict do nothing;
insert into snapshot_branches (snapshot_id, branch_id)
select snapshot_object_id, sb.object_id as branch_id
from tmp_snapshot_branch tmp
join snapshot_branch sb
using (name, target, target_type)
where tmp.target is not null and tmp.target_type is not null
union
select snapshot_object_id, sb.object_id as branch_id
from tmp_snapshot_branch tmp
join snapshot_branch sb
using (name)
where tmp.target is null and tmp.target_type is null
and sb.target is null and sb.target_type is null;
end if;
+ truncate table tmp_snapshot_branch;
end;
$$;
create type snapshot_result as (
snapshot_id sha1_git,
name bytea,
target bytea,
target_type snapshot_target
);
create or replace function swh_snapshot_get_by_id(id snapshot.id%type,
branches_from bytea default '', branches_count bigint default null,
target_types snapshot_target[] default NULL)
returns setof snapshot_result
language sql
stable
as $$
select
swh_snapshot_get_by_id.id as snapshot_id, name, target, target_type
from snapshot_branches
inner join snapshot_branch on snapshot_branches.branch_id = snapshot_branch.object_id
where snapshot_id = (select object_id from snapshot where snapshot.id = swh_snapshot_get_by_id.id)
and (target_types is null or target_type = any(target_types))
and name >= branches_from
order by name limit branches_count
$$;
create type snapshot_size as (
target_type snapshot_target,
count bigint
);
create or replace function swh_snapshot_count_branches(id snapshot.id%type)
returns setof snapshot_size
language sql
stable
as $$
SELECT target_type, count(name)
from swh_snapshot_get_by_id(swh_snapshot_count_branches.id)
group by target_type;
$$;
-- Absolute path: directory reference + complete path relative to it
create type content_dir as (
directory sha1_git,
path unix_path
);
-- Find the containing directory of a given content, specified by sha1
-- (note: *not* sha1_git).
--
-- Return a pair (dir_it, path) where path is a UNIX path that, from the
-- directory root, reach down to a file with the desired content. Return NULL
-- if no match is found.
--
-- In case of multiple paths (i.e., pretty much always), an arbitrary one is
-- chosen.
create or replace function swh_content_find_directory(content_id sha1)
returns content_dir
language sql
stable
as $$
with recursive path as (
-- Recursively build a path from the requested content to a root
-- directory. Each iteration returns a pair (dir_id, filename) where
-- filename is relative to dir_id. Stops when no parent directory can
-- be found.
(select dir.id as dir_id, dir_entry_f.name as name, 0 as depth
from directory_entry_file as dir_entry_f
join content on content.sha1_git = dir_entry_f.target
join directory as dir on dir.file_entries @> array[dir_entry_f.id]
where content.sha1 = content_id
limit 1)
union all
(select dir.id as dir_id,
(dir_entry_d.name || '/' || path.name)::unix_path as name,
path.depth + 1
from path
join directory_entry_dir as dir_entry_d on dir_entry_d.target = path.dir_id
join directory as dir on dir.dir_entries @> array[dir_entry_d.id]
limit 1)
)
select dir_id, name from path order by depth desc limit 1;
$$;
-- Find the visit of origin id closest to date visit_date
create or replace function swh_visit_find_by_date(origin bigint, visit_date timestamptz default NOW())
returns origin_visit
language sql
stable
as $$
with closest_two_visits as ((
select ov, (date - visit_date) as interval
from origin_visit ov
where ov.origin = origin
and ov.date >= visit_date
order by ov.date asc
limit 1
) union (
select ov, (visit_date - date) as interval
from origin_visit ov
where ov.origin = origin
and ov.date < visit_date
order by ov.date desc
limit 1
)) select (ov).* from closest_two_visits order by interval limit 1
$$;
-- Find the visit of origin id closest to date visit_date
create or replace function swh_visit_get(origin bigint)
returns origin_visit
language sql
stable
as $$
select *
from origin_visit
where origin=origin
order by date desc
$$;
-- Object listing by object_id
create or replace function swh_content_list_by_object_id(
min_excl bigint,
max_incl bigint
)
returns setof content
language sql
stable
as $$
select * from content
where object_id > min_excl and object_id <= max_incl
order by object_id;
$$;
create or replace function swh_revision_list_by_object_id(
min_excl bigint,
max_incl bigint
)
returns setof revision_entry
language sql
stable
as $$
with revs as (
select * from revision
where object_id > min_excl and object_id <= max_incl
)
select r.id, r.date, r.date_offset, r.date_neg_utc_offset,
r.committer_date, r.committer_date_offset, r.committer_date_neg_utc_offset,
r.type, r.directory, r.message,
a.id, a.fullname, a.name, a.email, c.id, c.fullname, c.name, c.email, r.metadata, r.synthetic,
array(select rh.parent_id::bytea from revision_history rh where rh.id = r.id order by rh.parent_rank)
as parents, r.object_id
from revs r
left join person a on a.id = r.author
left join person c on c.id = r.committer
order by r.object_id;
$$;
create or replace function swh_release_list_by_object_id(
min_excl bigint,
max_incl bigint
)
returns setof release_entry
language sql
stable
as $$
with rels as (
select * from release
where object_id > min_excl and object_id <= max_incl
)
select r.id, r.target, r.target_type, r.date, r.date_offset, r.date_neg_utc_offset, r.name, r.comment,
r.synthetic, p.id as author_id, p.fullname as author_fullname, p.name as author_name, p.email as author_email, r.object_id
from rels r
left join person p on p.id = r.author
order by r.object_id;
$$;
-- end revision_metadata functions
-- origin_metadata functions
create type origin_metadata_signature as (
id bigint,
origin_id bigint,
discovery_date timestamptz,
tool_id bigint,
metadata jsonb,
provider_id integer,
provider_name text,
provider_type text,
provider_url text
);
create or replace function swh_origin_metadata_get_by_origin(
origin integer)
returns setof origin_metadata_signature
language sql
stable
as $$
select om.id as id, origin_id, discovery_date, tool_id, om.metadata,
mp.id as provider_id, provider_name, provider_type, provider_url
from origin_metadata as om
inner join metadata_provider mp on om.provider_id = mp.id
where om.origin_id = origin
order by discovery_date desc;
$$;
create or replace function swh_origin_metadata_get_by_provider_type(
origin integer,
type text)
returns setof origin_metadata_signature
language sql
stable
as $$
select om.id as id, origin_id, discovery_date, tool_id, om.metadata,
mp.id as provider_id, provider_name, provider_type, provider_url
from origin_metadata as om
inner join metadata_provider mp on om.provider_id = mp.id
where om.origin_id = origin
and mp.provider_type = type
order by discovery_date desc;
$$;
-- end origin_metadata functions
-- add tmp_tool entries to tool,
-- skipping duplicates if any.
--
-- operates in bulk: 0. create temporary tmp_tool, 1. COPY to
-- it, 2. call this function to insert and filtering out duplicates
create or replace function swh_tool_add()
returns setof tool
language plpgsql
as $$
begin
insert into tool(name, version, configuration)
select name, version, configuration from tmp_tool tmp
on conflict(name, version, configuration) do nothing;
return query
select id, name, version, configuration
from tmp_tool join tool
using(name, version, configuration);
return;
end
$$;
-- simple counter mapping a textual label to an integer value
create type counter as (
label text,
value bigint
);
-- return statistics about the number of tuples in various SWH tables
--
-- Note: the returned values are based on postgres internal statistics
-- (pg_class table), which are only updated daily (by autovacuum) or so
create or replace function swh_stat_counters()
returns setof counter
language sql
stable
as $$
select object_type as label, value as value
from object_counts
where object_type in (
'content',
'directory',
'directory_entry_dir',
'directory_entry_file',
'directory_entry_rev',
'origin',
'origin_visit',
'person',
'release',
'revision',
'revision_history',
'skipped_content',
'snapshot'
);
$$;
create or replace function swh_update_counter(object_type text)
returns void
language plpgsql
as $$
begin
execute format('
insert into object_counts
(value, last_update, object_type)
values
((select count(*) from %1$I), NOW(), %1$L)
on conflict (object_type) do update set
value = excluded.value,
last_update = excluded.last_update',
object_type);
return;
end;
$$;
create or replace function swh_update_counter_bucketed()
returns void
language plpgsql
as $$
declare
query text;
line_to_update int;
new_value bigint;
begin
select
object_counts_bucketed.line,
format(
'select count(%I) from %I where %s',
coalesce(identifier, '*'),
object_type,
coalesce(
concat_ws(
' and ',
case when bucket_start is not null then
format('%I >= %L', identifier, bucket_start) -- lower bound condition, inclusive
end,
case when bucket_end is not null then
format('%I < %L', identifier, bucket_end) -- upper bound condition, exclusive
end
),
'true'
)
)
from object_counts_bucketed
order by coalesce(last_update, now() - '1 month'::interval) asc
limit 1
into line_to_update, query;
execute query into new_value;
update object_counts_bucketed
set value = new_value,
last_update = now()
where object_counts_bucketed.line = line_to_update;
END
$$;
create or replace function swh_update_counters_from_buckets()
returns trigger
language plpgsql
as $$
begin
with to_update as (
select object_type, sum(value) as value, max(last_update) as last_update
from object_counts_bucketed ob1
where not exists (
select 1 from object_counts_bucketed ob2
where ob1.object_type = ob2.object_type
and value is null
)
group by object_type
) update object_counts
set
value = to_update.value,
last_update = to_update.last_update
from to_update
where
object_counts.object_type = to_update.object_type
and object_counts.value != to_update.value;
return null;
end
$$;
create trigger update_counts_from_bucketed
after insert or update
on object_counts_bucketed
for each row
when (NEW.line % 256 = 0)
execute procedure swh_update_counters_from_buckets();
diff --git a/swh/storage/storage.py b/swh/storage/storage.py
index 5406d19d..6850141a 100644
--- a/swh/storage/storage.py
+++ b/swh/storage/storage.py
@@ -1,1629 +1,1638 @@
# Copyright (C) 2015-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
import datetime
import itertools
import json
import warnings
import dateutil.parser
import psycopg2
import psycopg2.pool
from . import converters
from .common import db_transaction_generator, db_transaction
from .db import Db
from .exc import StorageDBError
from .algos import diff
from .journal_writer import get_journal_writer
from swh.model.hashutil import ALGORITHMS, hash_to_bytes
from swh.objstorage import get_objstorage
from swh.objstorage.exc import ObjNotFoundError
# Max block size of contents to return
BULK_BLOCK_CONTENT_LEN_MAX = 10000
EMPTY_SNAPSHOT_ID = hash_to_bytes('1a8893e6a86f444e8be8e7bda6cb34fb1735a00e')
"""Identifier for the empty snapshot"""
class Storage():
"""SWH storage proxy, encompassing DB and object storage
"""
def __init__(self, db, objstorage, min_pool_conns=1, max_pool_conns=10,
journal_writer=None):
"""
Args:
db_conn: either a libpq connection string, or a psycopg2 connection
obj_root: path to the root of the object storage
"""
try:
if isinstance(db, psycopg2.extensions.connection):
self._pool = None
self._db = Db(db)
else:
self._pool = psycopg2.pool.ThreadedConnectionPool(
min_pool_conns, max_pool_conns, db
)
self._db = None
except psycopg2.OperationalError as e:
raise StorageDBError(e)
self.objstorage = get_objstorage(**objstorage)
if journal_writer:
self.journal_writer = get_journal_writer(**journal_writer)
else:
self.journal_writer = None
def get_db(self):
if self._db:
return self._db
else:
return Db.from_pool(self._pool)
def check_config(self, *, check_write):
"""Check that the storage is configured and ready to go."""
if not self.objstorage.check_config(check_write=check_write):
return False
# Check permissions on one of the tables
with self.get_db().transaction() as cur:
if check_write:
check = 'INSERT'
else:
check = 'SELECT'
cur.execute(
"select has_table_privilege(current_user, 'content', %s)",
(check,)
)
return cur.fetchone()[0]
return True
def content_add(self, content):
"""Add content blobs to the storage
Note: in case of DB errors, objects might have already been added to
the object storage and will not be removed. Since addition to the
object storage is idempotent, that should not be a problem.
Args:
contents (iterable): iterable of dictionaries representing
individual pieces of content to add. Each dictionary has the
following keys:
- data (bytes): the actual content
- length (int): content length (default: -1)
- one key for each checksum algorithm in
:data:`swh.model.hashutil.ALGORITHMS`, mapped to the
corresponding checksum
- status (str): one of visible, hidden, absent
- reason (str): if status = absent, the reason why
- origin (int): if status = absent, the origin we saw the
content in
"""
if self.journal_writer:
for item in content:
if 'data' in item:
item = item.copy()
del item['data']
self.journal_writer.write_addition('content', item)
db = self.get_db()
def _unique_key(hash, keys=db.content_hash_keys):
"""Given a hash (tuple or dict), return a unique key from the
aggregation of keys.
"""
if isinstance(hash, tuple):
return hash
return tuple([hash[k] for k in keys])
content_by_status = defaultdict(list)
for d in content:
if 'status' not in d:
d['status'] = 'visible'
if 'length' not in d:
d['length'] = -1
content_by_status[d['status']].append(d)
content_with_data = content_by_status['visible']
content_without_data = content_by_status['absent']
missing_content = set(self.content_missing(content_with_data))
missing_skipped = set(_unique_key(hashes) for hashes
in self.skipped_content_missing(
content_without_data))
def add_to_objstorage():
data = {
cont['sha1']: cont['data']
for cont in content_with_data
if cont['sha1'] in missing_content
}
self.objstorage.add_batch(data)
with db.transaction() as cur:
with ThreadPoolExecutor(max_workers=1) as executor:
added_to_objstorage = executor.submit(add_to_objstorage)
if missing_content:
# create temporary table for metadata injection
db.mktemp('content', cur)
content_filtered = (cont for cont in content_with_data
if cont['sha1'] in missing_content)
db.copy_to(content_filtered, 'tmp_content',
db.content_get_metadata_keys, cur)
# move metadata in place
try:
db.content_add_from_temp(cur)
except psycopg2.IntegrityError as e:
from . import HashCollision
if e.diag.sqlstate == '23505' and \
e.diag.table_name == 'content':
constaint_to_hash_name = {
'content_pkey': 'sha1',
'content_sha1_git_idx': 'sha1_git',
'content_sha256_idx': 'sha256',
}
colliding_hash_name = constaint_to_hash_name \
.get(e.diag.constraint_name)
raise HashCollision(colliding_hash_name)
else:
raise
if missing_skipped:
missing_filtered = (
cont for cont in content_without_data
if _unique_key(cont) in missing_skipped
)
db.mktemp('skipped_content', cur)
db.copy_to(missing_filtered, 'tmp_skipped_content',
db.skipped_content_keys, cur)
# move metadata in place
db.skipped_content_add_from_temp(cur)
# Wait for objstorage addition before returning from the
# transaction, bubbling up any exception
added_to_objstorage.result()
@db_transaction()
def content_update(self, content, keys=[], db=None, cur=None):
"""Update content blobs to the storage. Does nothing for unknown
contents or skipped ones.
Args:
content (iterable): iterable of dictionaries representing
individual pieces of content to update. Each dictionary has the
following keys:
- data (bytes): the actual content
- length (int): content length (default: -1)
- one key for each checksum algorithm in
:data:`swh.model.hashutil.ALGORITHMS`, mapped to the
corresponding checksum
- status (str): one of visible, hidden, absent
keys (list): List of keys (str) whose values needs an update, e.g.,
new hash column
"""
# TODO: Add a check on input keys. How to properly implement
# this? We don't know yet the new columns.
if self.journal_writer:
raise NotImplementedError(
'content_update is not yet support with a journal_writer.')
db.mktemp('content', cur)
select_keys = list(set(db.content_get_metadata_keys).union(set(keys)))
db.copy_to(content, 'tmp_content', select_keys, cur)
db.content_update_from_temp(keys_to_update=keys,
cur=cur)
def content_get(self, content):
"""Retrieve in bulk contents and their data.
This generator yields exactly as many items than provided sha1
identifiers, but callers should not assume this will always be true.
It may also yield `None` values in case an object was not found.
Args:
content: iterables of sha1
Yields:
Dict[str, bytes]: Generates streams of contents as dict with their
raw data:
- sha1 (bytes): content id
- data (bytes): content's raw data
Raises:
ValueError in case of too much contents are required.
cf. BULK_BLOCK_CONTENT_LEN_MAX
"""
# FIXME: Make this method support slicing the `data`.
if len(content) > BULK_BLOCK_CONTENT_LEN_MAX:
raise ValueError(
"Send at maximum %s contents." % BULK_BLOCK_CONTENT_LEN_MAX)
for obj_id in content:
try:
data = self.objstorage.get(obj_id)
except ObjNotFoundError:
yield None
continue
yield {'sha1': obj_id, 'data': data}
@db_transaction()
def content_get_range(self, start, end, limit=1000, db=None, cur=None):
"""Retrieve contents within range [start, end] bound by limit.
Note that this function may return more than one blob per hash. The
limit is enforced with multiplicity (ie. two blobs with the same hash
will count twice toward the limit).
Args:
**start** (bytes): Starting identifier range (expected smaller
than end)
**end** (bytes): Ending identifier range (expected larger
than start)
**limit** (int): Limit result (default to 1000)
Returns:
a dict with keys:
- contents [dict]: iterable of contents in between the range.
- next (bytes): There remains content in the range
starting from this next sha1
"""
if limit is None:
raise ValueError('Development error: limit should not be None')
contents = []
next_content = None
for counter, content_row in enumerate(
db.content_get_range(start, end, limit+1, cur)):
content = dict(zip(db.content_get_metadata_keys, content_row))
if counter >= limit:
# take the last commit for the next page starting from this
next_content = content['sha1']
break
contents.append(content)
return {
'contents': contents,
'next': next_content,
}
@db_transaction_generator(statement_timeout=500)
def content_get_metadata(self, content, db=None, cur=None):
"""Retrieve content metadata in bulk
Args:
content: iterable of content identifiers (sha1)
Returns:
an iterable with content metadata corresponding to the given ids
"""
for metadata in db.content_get_metadata_from_sha1s(content, cur):
yield dict(zip(db.content_get_metadata_keys, metadata))
@db_transaction_generator()
def content_missing(self, content, key_hash='sha1', db=None, cur=None):
"""List content missing from storage
Args:
content ([dict]): iterable of dictionaries whose keys are
either 'length' or an item of
:data:`swh.model.hashutil.ALGORITHMS`;
mapped to the corresponding checksum
(or length).
key_hash (str): name of the column to use as hash id
result (default: 'sha1')
Returns:
iterable ([bytes]): missing content ids (as per the
key_hash column)
Raises:
TODO: an exception when we get a hash collision.
"""
keys = db.content_hash_keys
if key_hash not in keys:
raise ValueError("key_hash should be one of %s" % keys)
key_hash_idx = keys.index(key_hash)
if not content:
return
for obj in db.content_missing_from_list(content, cur):
yield obj[key_hash_idx]
@db_transaction_generator()
def content_missing_per_sha1(self, contents, db=None, cur=None):
"""List content missing from storage based only on sha1.
Args:
contents: Iterable of sha1 to check for absence.
Returns:
iterable: missing ids
Raises:
TODO: an exception when we get a hash collision.
"""
for obj in db.content_missing_per_sha1(contents, cur):
yield obj[0]
@db_transaction_generator()
def skipped_content_missing(self, content, db=None, cur=None):
"""List skipped_content missing from storage
Args:
content: iterable of dictionaries containing the data for each
checksum algorithm.
Returns:
iterable: missing signatures
"""
keys = db.content_hash_keys
db.mktemp('skipped_content', cur)
db.copy_to(content, 'tmp_skipped_content',
keys + ['length', 'reason'], cur)
yield from db.skipped_content_missing_from_temp(cur)
@db_transaction()
def content_find(self, content, db=None, cur=None):
"""Find a content hash in db.
Args:
content: a dictionary representing one content hash, mapping
checksum algorithm names (see swh.model.hashutil.ALGORITHMS) to
checksum values
Returns:
a triplet (sha1, sha1_git, sha256) if the content exist
or None otherwise.
Raises:
ValueError: in case the key of the dictionary is not sha1, sha1_git
nor sha256.
"""
if not set(content).intersection(ALGORITHMS):
raise ValueError('content keys must contain at least one of: '
'sha1, sha1_git, sha256, blake2s256')
c = db.content_find(sha1=content.get('sha1'),
sha1_git=content.get('sha1_git'),
sha256=content.get('sha256'),
blake2s256=content.get('blake2s256'),
cur=cur)
if c:
return dict(zip(db.content_find_cols, c))
return None
def directory_add(self, directories):
"""Add directories to the storage
Args:
directories (iterable): iterable of dictionaries representing the
individual directories to add. Each dict has the following
keys:
- id (sha1_git): the id of the directory to add
- entries (list): list of dicts for each entry in the
directory. Each dict has the following keys:
- name (bytes)
- type (one of 'file', 'dir', 'rev'): type of the
directory entry (file, directory, revision)
- target (sha1_git): id of the object pointed at by the
directory entry
- perms (int): entry permissions
"""
if self.journal_writer:
self.journal_writer.write_additions('directory', directories)
dirs = set()
dir_entries = {
'file': defaultdict(list),
'dir': defaultdict(list),
'rev': defaultdict(list),
}
for cur_dir in directories:
dir_id = cur_dir['id']
dirs.add(dir_id)
for src_entry in cur_dir['entries']:
entry = src_entry.copy()
entry['dir_id'] = dir_id
dir_entries[entry['type']][dir_id].append(entry)
dirs_missing = set(self.directory_missing(dirs))
if not dirs_missing:
return
db = self.get_db()
with db.transaction() as cur:
# Copy directory ids
dirs_missing_dict = ({'id': dir} for dir in dirs_missing)
db.mktemp('directory', cur)
db.copy_to(dirs_missing_dict, 'tmp_directory', ['id'], cur)
# Copy entries
for entry_type, entry_list in dir_entries.items():
entries = itertools.chain.from_iterable(
entries_for_dir
for dir_id, entries_for_dir
in entry_list.items()
if dir_id in dirs_missing)
db.mktemp_dir_entry(entry_type)
db.copy_to(
entries,
'tmp_directory_entry_%s' % entry_type,
['target', 'name', 'perms', 'dir_id'],
cur,
)
# Do the final copy
db.directory_add_from_temp(cur)
@db_transaction_generator()
def directory_missing(self, directories, db=None, cur=None):
"""List directories missing from storage
Args:
directories (iterable): an iterable of directory ids
Yields:
missing directory ids
"""
for obj in db.directory_missing_from_list(directories, cur):
yield obj[0]
@db_transaction_generator(statement_timeout=20000)
def directory_ls(self, directory, recursive=False, db=None, cur=None):
"""Get entries for one directory.
Args:
- directory: the directory to list entries from.
- recursive: if flag on, this list recursively from this directory.
Returns:
List of entries for such directory.
If `recursive=True`, names in the path of a dir/file not at the
root are concatenated with a slash (`/`).
"""
if recursive:
res_gen = db.directory_walk(directory, cur=cur)
else:
res_gen = db.directory_walk_one(directory, cur=cur)
for line in res_gen:
yield dict(zip(db.directory_ls_cols, line))
@db_transaction(statement_timeout=2000)
def directory_entry_get_by_path(self, directory, paths, db=None, cur=None):
"""Get the directory entry (either file or dir) from directory with path.
Args:
- directory: sha1 of the top level directory
- paths: path to lookup from the top level directory. From left
(top) to right (bottom).
Returns:
The corresponding directory entry if found, None otherwise.
"""
res = db.directory_entry_get_by_path(directory, paths, cur)
if res:
return dict(zip(db.directory_ls_cols, res))
def revision_add(self, revisions):
"""Add revisions to the storage
Args:
revisions (Iterable[dict]): iterable of dictionaries representing
the individual revisions to add. Each dict has the following
keys:
- **id** (:class:`sha1_git`): id of the revision to add
- **date** (:class:`dict`): date the revision was written
- **committer_date** (:class:`dict`): date the revision got
added to the origin
- **type** (one of 'git', 'tar'): type of the
revision added
- **directory** (:class:`sha1_git`): the directory the
revision points at
- **message** (:class:`bytes`): the message associated with
the revision
- **author** (:class:`Dict[str, bytes]`): dictionary with
keys: name, fullname, email
- **committer** (:class:`Dict[str, bytes]`): dictionary with
keys: name, fullname, email
- **metadata** (:class:`jsonb`): extra information as
dictionary
- **synthetic** (:class:`bool`): revision's nature (tarball,
directory creates synthetic revision`)
- **parents** (:class:`list[sha1_git]`): the parents of
this revision
date dictionaries have the form defined in :mod:`swh.model`.
"""
if self.journal_writer:
self.journal_writer.write_additions('revision', revisions)
db = self.get_db()
revisions_missing = set(self.revision_missing(
set(revision['id'] for revision in revisions)))
if not revisions_missing:
return
with db.transaction() as cur:
db.mktemp_revision(cur)
revisions_filtered = (
converters.revision_to_db(revision) for revision in revisions
if revision['id'] in revisions_missing)
parents_filtered = []
db.copy_to(
revisions_filtered, 'tmp_revision', db.revision_add_cols,
cur,
lambda rev: parents_filtered.extend(rev['parents']))
db.revision_add_from_temp(cur)
db.copy_to(parents_filtered, 'revision_history',
['id', 'parent_id', 'parent_rank'], cur)
@db_transaction_generator()
def revision_missing(self, revisions, db=None, cur=None):
"""List revisions missing from storage
Args:
revisions (iterable): revision ids
Yields:
missing revision ids
"""
if not revisions:
return
for obj in db.revision_missing_from_list(revisions, cur):
yield obj[0]
@db_transaction_generator(statement_timeout=1000)
def revision_get(self, revisions, db=None, cur=None):
"""Get all revisions from storage
Args:
revisions: an iterable of revision ids
Returns:
iterable: an iterable of revisions as dictionaries (or None if the
revision doesn't exist)
"""
for line in db.revision_get_from_list(revisions, cur):
data = converters.db_to_revision(
dict(zip(db.revision_get_cols, line))
)
if not data['type']:
yield None
continue
yield data
@db_transaction_generator(statement_timeout=2000)
def revision_log(self, revisions, limit=None, db=None, cur=None):
"""Fetch revision entry from the given root revisions.
Args:
revisions: array of root revision to lookup
limit: limitation on the output result. Default to None.
Yields:
List of revision log from such revisions root.
"""
for line in db.revision_log(revisions, limit, cur):
data = converters.db_to_revision(
dict(zip(db.revision_get_cols, line))
)
if not data['type']:
yield None
continue
yield data
@db_transaction_generator(statement_timeout=2000)
def revision_shortlog(self, revisions, limit=None, db=None, cur=None):
"""Fetch the shortlog for the given revisions
Args:
revisions: list of root revisions to lookup
limit: depth limitation for the output
Yields:
a list of (id, parents) tuples.
"""
yield from db.revision_shortlog(revisions, limit, cur)
def release_add(self, releases):
"""Add releases to the storage
Args:
releases (Iterable[dict]): iterable of dictionaries representing
the individual releases to add. Each dict has the following
keys:
- **id** (:class:`sha1_git`): id of the release to add
- **revision** (:class:`sha1_git`): id of the revision the
release points to
- **date** (:class:`dict`): the date the release was made
- **name** (:class:`bytes`): the name of the release
- **comment** (:class:`bytes`): the comment associated with
the release
- **author** (:class:`Dict[str, bytes]`): dictionary with
keys: name, fullname, email
the date dictionary has the form defined in :mod:`swh.model`.
"""
if self.journal_writer:
self.journal_writer.write_additions('release', releases)
db = self.get_db()
release_ids = set(release['id'] for release in releases)
releases_missing = set(self.release_missing(release_ids))
if not releases_missing:
return
with db.transaction() as cur:
db.mktemp_release(cur)
releases_filtered = (
converters.release_to_db(release) for release in releases
if release['id'] in releases_missing
)
db.copy_to(releases_filtered, 'tmp_release', db.release_add_cols,
cur)
db.release_add_from_temp(cur)
@db_transaction_generator()
def release_missing(self, releases, db=None, cur=None):
"""List releases missing from storage
Args:
releases: an iterable of release ids
Returns:
a list of missing release ids
"""
if not releases:
return
for obj in db.release_missing_from_list(releases, cur):
yield obj[0]
@db_transaction_generator(statement_timeout=500)
def release_get(self, releases, db=None, cur=None):
"""Given a list of sha1, return the releases's information
Args:
releases: list of sha1s
Yields:
dicts with the same keys as those given to `release_add`
(or ``None`` if a release does not exist)
"""
for release in db.release_get_from_list(releases, cur):
data = converters.db_to_release(
dict(zip(db.release_get_cols, release))
)
yield data if data['target_type'] else None
@db_transaction()
- def snapshot_add(self, snapshot, origin=None, visit=None,
+ def snapshot_add(self, snapshots, origin=None, visit=None,
db=None, cur=None):
- """Add a snapshot for the given origin/visit couple
+ """Add snapshots to the storage.
Args:
- snapshot (dict): the snapshot to add to the visit, containing the
+ snapshot ([dict]): the snapshots to add, containing the
following keys:
- **id** (:class:`bytes`): id of the snapshot
- **branches** (:class:`dict`): branches the snapshot contains,
mapping the branch name (:class:`bytes`) to the branch target,
itself a :class:`dict` (or ``None`` if the branch points to an
unknown object)
- **target_type** (:class:`str`): one of ``content``,
``directory``, ``revision``, ``release``,
``snapshot``, ``alias``
- **target** (:class:`bytes`): identifier of the target
(currently a ``sha1_git`` for all object kinds, or the name
of the target branch for aliases)
origin (int): legacy argument for backward compatibility
visit (int): legacy argument for backward compatibility
Raises:
ValueError: if the origin or visit id does not exist.
"""
if origin:
if not visit:
raise TypeError(
'snapshot_add expects one argument (or, as a legacy '
'behavior, three arguments), not two')
- if isinstance(snapshot, int):
+ if isinstance(snapshots, int):
# Called by legacy code that uses the new api/client.py
- (origin_id, visit_id, snapshot) = \
- (snapshot, origin, visit)
+ (origin_id, visit_id, snapshots) = \
+ (snapshots, origin, [visit])
else:
# Called by legacy code that uses the old api/client.py
origin_id = origin
visit_id = visit
+ snapshots = [snapshots]
else:
# Called by new code that uses the new api/client.py
origin_id = visit_id = None
- if not db.snapshot_exists(snapshot['id'], cur):
- db.mktemp_snapshot_branch(cur)
- db.copy_to(
- (
- {
- 'name': name,
- 'target': info['target'] if info else None,
- 'target_type': info['target_type'] if info else None,
- }
- for name, info in snapshot['branches'].items()
- ),
- 'tmp_snapshot_branch',
- ['name', 'target', 'target_type'],
- cur,
- )
+ created_temp_table = False
- if self.journal_writer:
- self.journal_writer.write_addition('snapshot', snapshot)
+ for snapshot in snapshots:
+ if not db.snapshot_exists(snapshot['id'], cur):
+ if not created_temp_table:
+ db.mktemp_snapshot_branch(cur)
+ created_temp_table = True
+
+ db.copy_to(
+ (
+ {
+ 'name': name,
+ 'target': info['target'] if info else None,
+ 'target_type': (info['target_type']
+ if info else None),
+ }
+ for name, info in snapshot['branches'].items()
+ ),
+ 'tmp_snapshot_branch',
+ ['name', 'target', 'target_type'],
+ cur,
+ )
+
+ if self.journal_writer:
+ self.journal_writer.write_addition('snapshot', snapshot)
- db.snapshot_add(snapshot['id'], cur)
+ db.snapshot_add(snapshot['id'], cur)
if visit_id:
+ # Legacy API, there can be only one snapshot
self.origin_visit_update(
- origin_id, visit_id, snapshot=snapshot['id'],
+ origin_id, visit_id, snapshot=snapshots[0]['id'],
db=db, cur=cur)
@db_transaction(statement_timeout=2000)
def snapshot_get(self, snapshot_id, db=None, cur=None):
"""Get the content, possibly partial, of a snapshot with the given id
The branches of the snapshot are iterated in the lexicographical
order of their names.
.. warning:: At most 1000 branches contained in the snapshot will be
returned for performance reasons. In order to browse the whole
set of branches, the method :meth:`snapshot_get_branches`
should be used instead.
Args:
snapshot_id (bytes): identifier of the snapshot
Returns:
dict: a dict with three keys:
* **id**: identifier of the snapshot
* **branches**: a dict of branches contained in the snapshot
whose keys are the branches' names.
* **next_branch**: the name of the first branch not returned
or :const:`None` if the snapshot has less than 1000
branches.
"""
return self.snapshot_get_branches(snapshot_id, db=db, cur=cur)
@db_transaction(statement_timeout=2000)
def snapshot_get_by_origin_visit(self, origin, visit, db=None, cur=None):
"""Get the content, possibly partial, of a snapshot for the given origin visit
The branches of the snapshot are iterated in the lexicographical
order of their names.
.. warning:: At most 1000 branches contained in the snapshot will be
returned for performance reasons. In order to browse the whole
set of branches, the method :meth:`snapshot_get_branches`
should be used instead.
Args:
origin (int): the origin identifier
visit (int): the visit identifier
Returns:
dict: None if the snapshot does not exist;
a dict with three keys otherwise:
* **id**: identifier of the snapshot
* **branches**: a dict of branches contained in the snapshot
whose keys are the branches' names.
* **next_branch**: the name of the first branch not returned
or :const:`None` if the snapshot has less than 1000
branches.
"""
snapshot_id = db.snapshot_get_by_origin_visit(origin, visit, cur)
if snapshot_id:
return self.snapshot_get(snapshot_id, db=db, cur=cur)
return None
@db_transaction(statement_timeout=4000)
def snapshot_get_latest(self, origin, allowed_statuses=None, db=None,
cur=None):
"""Get the content, possibly partial, of the latest snapshot for the
given origin, optionally only from visits that have one of the given
allowed_statuses
The branches of the snapshot are iterated in the lexicographical
order of their names.
.. warning:: At most 1000 branches contained in the snapshot will be
returned for performance reasons. In order to browse the whole
set of branches, the method :meth:`snapshot_get_branches`
should be used instead.
Args:
origin (int): the origin identifier
allowed_statuses (list of str): list of visit statuses considered
to find the latest snapshot for the visit. For instance,
``allowed_statuses=['full']`` will only consider visits that
have successfully run to completion.
Returns:
dict: a dict with three keys:
* **id**: identifier of the snapshot
* **branches**: a dict of branches contained in the snapshot
whose keys are the branches' names.
* **next_branch**: the name of the first branch not returned
or :const:`None` if the snapshot has less than 1000
branches.
"""
origin_visit = db.origin_visit_get_latest_snapshot(
origin, allowed_statuses=allowed_statuses, cur=cur)
if origin_visit:
origin_visit = dict(zip(db.origin_visit_get_cols, origin_visit))
return self.snapshot_get(origin_visit['snapshot'], db=db, cur=cur)
@db_transaction(statement_timeout=2000)
def snapshot_count_branches(self, snapshot_id, db=None, cur=None):
"""Count the number of branches in the snapshot with the given id
Args:
snapshot_id (bytes): identifier of the snapshot
Returns:
dict: A dict whose keys are the target types of branches and
values their corresponding amount
"""
return dict([bc for bc in
db.snapshot_count_branches(snapshot_id, cur)])
@db_transaction(statement_timeout=2000)
def snapshot_get_branches(self, snapshot_id, branches_from=b'',
branches_count=1000, target_types=None,
db=None, cur=None):
"""Get the content, possibly partial, of a snapshot with the given id
The branches of the snapshot are iterated in the lexicographical
order of their names.
Args:
snapshot_id (bytes): identifier of the snapshot
branches_from (bytes): optional parameter used to skip branches
whose name is lesser than it before returning them
branches_count (int): optional parameter used to restrain
the amount of returned branches
target_types (list): optional parameter used to filter the
target types of branch to return (possible values that can be
contained in that list are `'content', 'directory',
'revision', 'release', 'snapshot', 'alias'`)
Returns:
dict: None if the snapshot does not exist;
a dict with three keys otherwise:
* **id**: identifier of the snapshot
* **branches**: a dict of branches contained in the snapshot
whose keys are the branches' names.
* **next_branch**: the name of the first branch not returned
or :const:`None` if the snapshot has less than
`branches_count` branches after `branches_from` included.
"""
if snapshot_id == EMPTY_SNAPSHOT_ID:
return {
'id': snapshot_id,
'branches': {},
'next_branch': None,
}
branches = {}
next_branch = None
fetched_branches = list(db.snapshot_get_by_id(
snapshot_id, branches_from=branches_from,
branches_count=branches_count+1, target_types=target_types,
cur=cur,
))
for branch in fetched_branches[:branches_count]:
branch = dict(zip(db.snapshot_get_cols, branch))
del branch['snapshot_id']
name = branch.pop('name')
if branch == {'target': None, 'target_type': None}:
branch = None
branches[name] = branch
if len(fetched_branches) > branches_count:
branch = dict(zip(db.snapshot_get_cols, fetched_branches[-1]))
next_branch = branch['name']
if branches:
return {
'id': snapshot_id,
'branches': branches,
'next_branch': next_branch,
}
return None
@db_transaction()
def origin_visit_add(self, origin, date=None, db=None, cur=None, *,
ts=None):
"""Add an origin_visit for the origin at ts with status 'ongoing'.
Args:
origin: Visited Origin id
date: timestamp of such visit
Returns:
dict: dictionary with keys origin and visit where:
- origin: origin identifier
- visit: the visit identifier for the new visit occurrence
"""
if ts is None:
if date is None:
raise TypeError('origin_visit_add expected 2 arguments.')
else:
assert date is None
warnings.warn("argument 'ts' of origin_visit_add was renamed "
"to 'date' in v0.0.109.",
DeprecationWarning)
date = ts
origin_id = origin # TODO: rename the argument
if isinstance(date, str):
date = dateutil.parser.parse(date)
visit = db.origin_visit_add(origin, date, cur)
if self.journal_writer:
# We can write to the journal only after inserting to the
# DB, because we want the id of the visit
origin = self.origin_get([{'id': origin_id}], db=db, cur=cur)[0]
self.journal_writer.write_addition('origin_visit', {
'origin': origin, 'date': date, 'visit': visit,
'status': 'ongoing', 'metadata': None, 'snapshot': None})
return {
'origin': origin_id,
'visit': visit,
}
@db_transaction()
def origin_visit_update(self, origin, visit_id, status=None,
metadata=None, snapshot=None,
db=None, cur=None):
"""Update an origin_visit's status.
Args:
origin: Visited Origin id
visit_id: Visit's id
status: Visit's new status
metadata: Data associated to the visit
snapshot (sha1_git): identifier of the snapshot to add to
the visit
Returns:
None
"""
origin_id = origin # TODO: rename the argument
visit = db.origin_visit_get(origin_id, visit_id, cur=cur)
if not visit:
raise ValueError('Invalid visit_id for this origin.')
visit = dict(zip(db.origin_visit_get_cols, visit))
updates = {}
if status and status != visit['status']:
updates['status'] = status
if metadata and metadata != visit['metadata']:
updates['metadata'] = metadata
if snapshot and snapshot != visit['snapshot']:
updates['snapshot'] = snapshot
if updates:
if self.journal_writer:
origin = self.origin_get(
[{'id': origin_id}], db=db, cur=cur)[0]
self.journal_writer.write_update('origin_visit', {
**visit, **updates, 'origin': origin})
db.origin_visit_update(origin_id, visit_id, updates, cur)
@db_transaction_generator(statement_timeout=500)
def origin_visit_get(self, origin, last_visit=None, limit=None, db=None,
cur=None):
"""Retrieve all the origin's visit's information.
Args:
origin (int): The occurrence's origin (identifier).
last_visit: Starting point from which listing the next visits
Default to None
limit (int): Number of results to return from the last visit.
Default to None
Yields:
List of visits.
"""
for line in db.origin_visit_get_all(
origin, last_visit=last_visit, limit=limit, cur=cur):
data = dict(zip(db.origin_visit_get_cols, line))
yield data
@db_transaction(statement_timeout=500)
def origin_visit_get_by(self, origin, visit, db=None, cur=None):
"""Retrieve origin visit's information.
Args:
origin: The occurrence's origin (identifier).
Returns:
The information on that particular (origin, visit) or None if
it does not exist
"""
ori_visit = db.origin_visit_get(origin, visit, cur)
if not ori_visit:
return None
return dict(zip(db.origin_visit_get_cols, ori_visit))
@db_transaction(statement_timeout=2000)
def object_find_by_sha1_git(self, ids, db=None, cur=None):
"""Return the objects found with the given ids.
Args:
ids: a generator of sha1_gits
Returns:
dict: a mapping from id to the list of objects found. Each object
found is itself a dict with keys:
- sha1_git: the input id
- type: the type of object found
- id: the id of the object found
- object_id: the numeric id of the object found.
"""
ret = {id: [] for id in ids}
for retval in db.object_find_by_sha1_git(ids, cur=cur):
if retval[1]:
ret[retval[0]].append(dict(zip(db.object_find_by_sha1_git_cols,
retval)))
return ret
origin_keys = ['id', 'type', 'url']
@db_transaction(statement_timeout=500)
def origin_get(self, origins, db=None, cur=None):
"""Return origins, either all identified by their ids or all
identified by tuples (type, url).
Args:
origin: a list of dictionaries representing the individual
origins to find.
These dicts have either the keys type and url:
- type (FIXME: enum TBD): the origin type ('git', 'wget', ...)
- url (bytes): the url the origin points to
or the id:
- id: the origin id
Returns:
dict: the origin dictionary with the keys:
- id: origin's id
- type: origin's type
- url: origin's url
Raises:
ValueError: if the keys does not match (url and type) nor id.
"""
if isinstance(origins, dict):
# Old API
return_single = True
origins = [origins]
elif len(origins) == 0:
return []
else:
return_single = False
origin_ids = [origin.get('id') for origin in origins]
origin_types_and_urls = [(origin.get('type'), origin.get('url'))
for origin in origins]
if any(origin_ids):
# Lookup per ID
if all(origin_ids):
results = db.origin_get(origin_ids, cur)
else:
raise ValueError(
'Either all origins or none at all should have an "id".')
elif any(type_ and url for (type_, url) in origin_types_and_urls):
# Lookup per type + URL
if all(type_ and url for (type_, url) in origin_types_and_urls):
results = db.origin_get_with(origin_types_and_urls, cur)
else:
raise ValueError(
'Either all origins or none at all should have a '
'"type" and an "url".')
else: # unsupported lookup
raise ValueError('Origin must have either id or (type and url).')
results = [dict(zip(self.origin_keys, result))
for result in results]
if return_single:
assert len(results) == 1
if results[0]['id'] is not None:
return results[0]
else:
return None
else:
return [None if res['id'] is None else res for res in results]
@db_transaction_generator()
def origin_get_range(self, origin_from=1, origin_count=100,
db=None, cur=None):
"""Retrieve ``origin_count`` origins whose ids are greater
or equal than ``origin_from``.
Origins are sorted by id before retrieving them.
Args:
origin_from (int): the minimum id of origins to retrieve
origin_count (int): the maximum number of origins to retrieve
Yields:
dicts containing origin information as returned
by :meth:`swh.storage.storage.Storage.origin_get`.
"""
for origin in db.origin_get_range(origin_from, origin_count, cur):
yield dict(zip(self.origin_keys, origin))
@db_transaction_generator()
def origin_search(self, url_pattern, offset=0, limit=50,
regexp=False, with_visit=False, db=None, cur=None):
"""Search for origins whose urls contain a provided string pattern
or match a provided regular expression.
The search is performed in a case insensitive way.
Args:
url_pattern (str): the string pattern to search for in origin urls
offset (int): number of found origins to skip before returning
results
limit (int): the maximum number of found origins to return
regexp (bool): if True, consider the provided pattern as a regular
expression and return origins whose urls match it
with_visit (bool): if True, filter out origins with no visit
Yields:
dicts containing origin information as returned
by :meth:`swh.storage.storage.Storage.origin_get`.
"""
for origin in db.origin_search(url_pattern, offset, limit,
regexp, with_visit, cur):
yield dict(zip(self.origin_keys, origin))
@db_transaction()
def origin_count(self, url_pattern, regexp=False,
with_visit=False, db=None, cur=None):
"""Count origins whose urls contain a provided string pattern
or match a provided regular expression.
The pattern search in origin urls is performed in a case insensitive
way.
Args:
url_pattern (str): the string pattern to search for in origin urls
regexp (bool): if True, consider the provided pattern as a regular
expression and return origins whose urls match it
with_visit (bool): if True, filter out origins with no visit
Returns:
int: The number of origins matching the search criterion.
"""
return db.origin_count(url_pattern, regexp, with_visit, cur)
@db_transaction_generator(statement_timeout=500)
def person_get(self, person, db=None, cur=None):
"""Return the persons identified by their ids.
Args:
person: array of ids.
Returns:
The array of persons corresponding of the ids.
"""
for person in db.person_get(person):
yield dict(zip(db.person_get_cols, person))
@db_transaction()
def origin_add(self, origins, db=None, cur=None):
"""Add origins to the storage
Args:
origins: list of dictionaries representing the individual origins,
with the following keys:
- type: the origin type ('git', 'svn', 'deb', ...)
- url (bytes): the url the origin points to
Returns:
list: given origins as dict updated with their id
"""
for origin in origins:
origin['id'] = self.origin_add_one(origin, db=db, cur=cur)
return origins
@db_transaction()
def origin_add_one(self, origin, db=None, cur=None):
"""Add origin to the storage
Args:
origin: dictionary representing the individual origin to add. This
dict has the following keys:
- type (FIXME: enum TBD): the origin type ('git', 'wget', ...)
- url (bytes): the url the origin points to
Returns:
the id of the added origin, or of the identical one that already
exists.
"""
origin_id = list(db.origin_get_with(
[(origin['type'], origin['url'])], cur))[0][0]
if origin_id:
return origin_id
origin['id'] = db.origin_add(origin['type'], origin['url'], cur)
if self.journal_writer:
self.journal_writer.write_addition('origin', origin)
return origin['id']
@db_transaction()
def fetch_history_start(self, origin_id, db=None, cur=None):
"""Add an entry for origin origin_id in fetch_history. Returns the id
of the added fetch_history entry
"""
fetch_history = {
'origin': origin_id,
'date': datetime.datetime.now(tz=datetime.timezone.utc),
}
return db.create_fetch_history(fetch_history, cur)
@db_transaction()
def fetch_history_end(self, fetch_history_id, data, db=None, cur=None):
"""Close the fetch_history entry with id `fetch_history_id`, replacing
its data with `data`.
"""
now = datetime.datetime.now(tz=datetime.timezone.utc)
fetch_history = db.get_fetch_history(fetch_history_id, cur)
if not fetch_history:
raise ValueError('No fetch_history with id %d' % fetch_history_id)
fetch_history['duration'] = now - fetch_history['date']
fetch_history.update(data)
db.update_fetch_history(fetch_history, cur)
@db_transaction()
def fetch_history_get(self, fetch_history_id, db=None, cur=None):
"""Get the fetch_history entry with id `fetch_history_id`.
"""
return db.get_fetch_history(fetch_history_id, cur)
@db_transaction(statement_timeout=500)
def stat_counters(self, db=None, cur=None):
"""compute statistics about the number of tuples in various tables
Returns:
dict: a dictionary mapping textual labels (e.g., content) to
integer values (e.g., the number of tuples in table content)
"""
return {k: v for (k, v) in db.stat_counters()}
@db_transaction()
def refresh_stat_counters(self, db=None, cur=None):
"""Recomputes the statistics for `stat_counters`."""
keys = [
'content',
'directory',
'directory_entry_dir',
'directory_entry_file',
'directory_entry_rev',
'origin',
'origin_visit',
'person',
'release',
'revision',
'revision_history',
'skipped_content',
'snapshot']
for key in keys:
cur.execute('select * from swh_update_counter(%s)', (key,))
@db_transaction()
def origin_metadata_add(self, origin_id, ts, provider, tool, metadata,
db=None, cur=None):
""" Add an origin_metadata for the origin at ts with provenance and
metadata.
Args:
origin_id (int): the origin's id for which the metadata is added
ts (datetime): timestamp of the found metadata
provider (int): the provider of metadata (ex:'hal')
tool (int): tool used to extract metadata
metadata (jsonb): the metadata retrieved at the time and location
Returns:
id (int): the origin_metadata unique id
"""
if isinstance(ts, str):
ts = dateutil.parser.parse(ts)
return db.origin_metadata_add(origin_id, ts, provider, tool,
metadata, cur)
@db_transaction_generator(statement_timeout=500)
def origin_metadata_get_by(self, origin_id, provider_type=None, db=None,
cur=None):
"""Retrieve list of all origin_metadata entries for the origin_id
Args:
origin_id (int): the unique origin identifier
provider_type (str): (optional) type of provider
Returns:
list of dicts: the origin_metadata dictionary with the keys:
- origin_id (int): origin's id
- discovery_date (datetime): timestamp of discovery
- tool_id (int): metadata's extracting tool
- metadata (jsonb)
- provider_id (int): metadata's provider
- provider_name (str)
- provider_type (str)
- provider_url (str)
"""
for line in db.origin_metadata_get_by(origin_id, provider_type, cur):
yield dict(zip(db.origin_metadata_get_cols, line))
@db_transaction_generator()
def tool_add(self, tools, db=None, cur=None):
"""Add new tools to the storage.
Args:
tools (iterable of :class:`dict`): Tool information to add to
storage. Each tool is a :class:`dict` with the following keys:
- name (:class:`str`): name of the tool
- version (:class:`str`): version of the tool
- configuration (:class:`dict`): configuration of the tool,
must be json-encodable
Yields:
:class:`dict`: All the tools inserted in storage
(including the internal ``id``). The order of the list is not
guaranteed to match the order of the initial list.
"""
db.mktemp_tool(cur)
db.copy_to(tools, 'tmp_tool',
['name', 'version', 'configuration'],
cur)
tools = db.tool_add_from_temp(cur)
for line in tools:
yield dict(zip(db.tool_cols, line))
@db_transaction(statement_timeout=500)
def tool_get(self, tool, db=None, cur=None):
"""Retrieve tool information.
Args:
tool (dict): Tool information we want to retrieve from storage.
The dicts have the same keys as those used in :func:`tool_add`.
Returns:
dict: The full tool information if it exists (``id`` included),
None otherwise.
"""
tool_conf = tool['configuration']
if isinstance(tool_conf, dict):
tool_conf = json.dumps(tool_conf)
idx = db.tool_get(tool['name'],
tool['version'],
tool_conf)
if not idx:
return None
return dict(zip(db.tool_cols, idx))
@db_transaction()
def metadata_provider_add(self, provider_name, provider_type, provider_url,
metadata, db=None, cur=None):
"""Add a metadata provider.
Args:
provider_name (str): Its name
provider_type (str): Its type (eg. `'deposit-client'`)
provider_url (str): Its URL
metadata: JSON-encodable object
Returns:
int: an identifier of the provider
"""
return db.metadata_provider_add(provider_name, provider_type,
provider_url, metadata, cur)
@db_transaction()
def metadata_provider_get(self, provider_id, db=None, cur=None):
"""Get a metadata provider
Args:
provider_id: Its identifier, as given by `metadata_provider_add`.
Returns:
dict: same as `metadata_provider_add`;
or None if it does not exist.
"""
result = db.metadata_provider_get(provider_id)
if not result:
return None
return dict(zip(db.metadata_provider_cols, result))
@db_transaction()
def metadata_provider_get_by(self, provider, db=None, cur=None):
"""Get a metadata provider
Args:
provider (dict): A dictionary with keys:
* provider_name: Its name
* provider_url: Its URL
Returns:
dict: same as `metadata_provider_add`;
or None if it does not exist.
"""
result = db.metadata_provider_get_by(provider['provider_name'],
provider['provider_url'])
if not result:
return None
return dict(zip(db.metadata_provider_cols, result))
def diff_directories(self, from_dir, to_dir, track_renaming=False):
"""Compute the list of file changes introduced between two arbitrary
directories (insertion / deletion / modification / renaming of files).
Args:
from_dir (bytes): identifier of the directory to compare from
to_dir (bytes): identifier of the directory to compare to
track_renaming (bool): whether or not to track files renaming
Returns:
A list of dict describing the introduced file changes
(see :func:`swh.storage.algos.diff.diff_directories`
for more details).
"""
return diff.diff_directories(self, from_dir, to_dir, track_renaming)
def diff_revisions(self, from_rev, to_rev, track_renaming=False):
"""Compute the list of file changes introduced between two arbitrary
revisions (insertion / deletion / modification / renaming of files).
Args:
from_rev (bytes): identifier of the revision to compare from
to_rev (bytes): identifier of the revision to compare to
track_renaming (bool): whether or not to track files renaming
Returns:
A list of dict describing the introduced file changes
(see :func:`swh.storage.algos.diff.diff_directories`
for more details).
"""
return diff.diff_revisions(self, from_rev, to_rev, track_renaming)
def diff_revision(self, revision, track_renaming=False):
"""Compute the list of file changes introduced by a specific revision
(insertion / deletion / modification / renaming of files) by comparing
it against its first parent.
Args:
revision (bytes): identifier of the revision from which to
compute the list of files changes
track_renaming (bool): whether or not to track files renaming
Returns:
A list of dict describing the introduced file changes
(see :func:`swh.storage.algos.diff.diff_directories`
for more details).
"""
return diff.diff_revision(self, revision, track_renaming)
diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py
index bdb6aefb..64d431f4 100644
--- a/swh/storage/tests/test_storage.py
+++ b/swh/storage/tests/test_storage.py
@@ -1,2814 +1,2835 @@
# Copyright (C) 2015-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import copy
import datetime
import itertools
import random
import unittest
from collections import defaultdict
from unittest.mock import Mock, patch
import pytest
from hypothesis import given, strategies
from swh.model import from_disk, identifiers
from swh.model.hashutil import hash_to_bytes
from swh.storage.tests.storage_testing import StorageTestFixture
from swh.storage import HashCollision
from .generate_data_test import gen_contents, gen_origins
@pytest.mark.db
class StorageTestDbFixture(StorageTestFixture):
def setUp(self):
super().setUp()
db = self.test_db[self.TEST_DB_NAME]
self.conn = db.conn
self.cursor = db.cursor
self.maxDiff = None
def tearDown(self):
self.reset_storage_tables()
super().tearDown()
class TestStorageData:
def setUp(self):
super().setUp()
self.cont = {
'data': b'42\n',
'length': 3,
'sha1': hash_to_bytes(
'34973274ccef6ab4dfaaf86599792fa9c3fe4689'),
'sha1_git': hash_to_bytes(
'd81cc0710eb6cf9efd5b920a8453e1e07157b6cd'),
'sha256': hash_to_bytes(
'673650f936cb3b0a2f93ce09d81be107'
'48b1b203c19e8176b4eefc1964a0cf3a'),
'blake2s256': hash_to_bytes('d5fe1939576527e42cfd76a9455a2'
'432fe7f56669564577dd93c4280e76d661d'),
'status': 'visible',
}
self.cont2 = {
'data': b'4242\n',
'length': 5,
'sha1': hash_to_bytes(
'61c2b3a30496d329e21af70dd2d7e097046d07b7'),
'sha1_git': hash_to_bytes(
'36fade77193cb6d2bd826161a0979d64c28ab4fa'),
'sha256': hash_to_bytes(
'859f0b154fdb2d630f45e1ecae4a8629'
'15435e663248bb8461d914696fc047cd'),
'blake2s256': hash_to_bytes('849c20fad132b7c2d62c15de310adfe87be'
'94a379941bed295e8141c6219810d'),
'status': 'visible',
}
self.cont3 = {
'data': b'424242\n',
'length': 7,
'sha1': hash_to_bytes(
'3e21cc4942a4234c9e5edd8a9cacd1670fe59f13'),
'sha1_git': hash_to_bytes(
'c932c7649c6dfa4b82327d121215116909eb3bea'),
'sha256': hash_to_bytes(
'92fb72daf8c6818288a35137b72155f5'
'07e5de8d892712ab96277aaed8cf8a36'),
'blake2s256': hash_to_bytes('76d0346f44e5a27f6bafdd9c2befd304af'
'f83780f93121d801ab6a1d4769db11'),
'status': 'visible',
}
self.missing_cont = {
'data': b'missing\n',
'length': 8,
'sha1': hash_to_bytes(
'f9c24e2abb82063a3ba2c44efd2d3c797f28ac90'),
'sha1_git': hash_to_bytes(
'33e45d56f88993aae6a0198013efa80716fd8919'),
'sha256': hash_to_bytes(
'6bbd052ab054ef222c1c87be60cd191a'
'ddedd24cc882d1f5f7f7be61dc61bb3a'),
'blake2s256': hash_to_bytes('306856b8fd879edb7b6f1aeaaf8db9bbecc9'
'93cd7f776c333ac3a782fa5c6eba'),
'status': 'absent',
}
self.skipped_cont = {
'length': 1024 * 1024 * 200,
'sha1_git': hash_to_bytes(
'33e45d56f88993aae6a0198013efa80716fd8920'),
'sha1': hash_to_bytes(
'43e45d56f88993aae6a0198013efa80716fd8920'),
'sha256': hash_to_bytes(
'7bbd052ab054ef222c1c87be60cd191a'
'ddedd24cc882d1f5f7f7be61dc61bb3a'),
'blake2s256': hash_to_bytes(
'ade18b1adecb33f891ca36664da676e1'
'2c772cc193778aac9a137b8dc5834b9b'),
'reason': 'Content too long',
'status': 'absent',
}
self.skipped_cont2 = {
'length': 1024 * 1024 * 300,
'sha1_git': hash_to_bytes(
'44e45d56f88993aae6a0198013efa80716fd8921'),
'sha1': hash_to_bytes(
'54e45d56f88993aae6a0198013efa80716fd8920'),
'sha256': hash_to_bytes(
'8cbd052ab054ef222c1c87be60cd191a'
'ddedd24cc882d1f5f7f7be61dc61bb3a'),
'blake2s256': hash_to_bytes(
'9ce18b1adecb33f891ca36664da676e1'
'2c772cc193778aac9a137b8dc5834b9b'),
'reason': 'Content too long',
'status': 'absent',
}
self.dir = {
'id': b'4\x013\x422\x531\x000\xf51\xe62\xa73\xff7\xc3\xa90',
'entries': [
{
'name': b'foo',
'type': 'file',
'target': self.cont['sha1_git'],
'perms': from_disk.DentryPerms.content,
},
{
'name': b'bar\xc3',
'type': 'dir',
'target': b'12345678901234567890',
'perms': from_disk.DentryPerms.directory,
},
],
}
self.dir2 = {
'id': b'4\x013\x422\x531\x000\xf51\xe62\xa73\xff7\xc3\xa95',
'entries': [
{
'name': b'oof',
'type': 'file',
'target': self.cont2['sha1_git'],
'perms': from_disk.DentryPerms.content,
}
],
}
self.dir3 = {
'id': hash_to_bytes('33e45d56f88993aae6a0198013efa80716fd8921'),
'entries': [
{
'name': b'foo',
'type': 'file',
'target': self.cont['sha1_git'],
'perms': from_disk.DentryPerms.content,
},
{
'name': b'subdir',
'type': 'dir',
'target': self.dir['id'],
'perms': from_disk.DentryPerms.directory,
},
{
'name': b'hello',
'type': 'file',
'target': b'12345678901234567890',
'perms': from_disk.DentryPerms.content,
},
],
}
self.minus_offset = datetime.timezone(datetime.timedelta(minutes=-120))
self.plus_offset = datetime.timezone(datetime.timedelta(minutes=120))
self.revision = {
'id': b'56789012345678901234',
'message': b'hello',
'author': {
'name': b'Nicolas Dandrimont',
'email': b'nicolas@example.com',
'fullname': b'Nicolas Dandrimont <nicolas@example.com> ',
},
'date': {
'timestamp': 1234567890,
'offset': 120,
'negative_utc': None,
},
'committer': {
'name': b'St\xc3fano Zacchiroli',
'email': b'stefano@example.com',
'fullname': b'St\xc3fano Zacchiroli <stefano@example.com>'
},
'committer_date': {
'timestamp': 1123456789,
'offset': 0,
'negative_utc': True,
},
'parents': [b'01234567890123456789', b'23434512345123456789'],
'type': 'git',
'directory': self.dir['id'],
'metadata': {
'checksums': {
'sha1': 'tarball-sha1',
'sha256': 'tarball-sha256',
},
'signed-off-by': 'some-dude',
'extra_headers': [
['gpgsig', b'test123'],
['mergetags', [b'foo\\bar', b'\x22\xaf\x89\x80\x01\x00']],
],
},
'synthetic': True
}
self.revision2 = {
'id': b'87659012345678904321',
'message': b'hello again',
'author': {
'name': b'Roberto Dicosmo',
'email': b'roberto@example.com',
'fullname': b'Roberto Dicosmo <roberto@example.com>',
},
'date': {
'timestamp': {
'seconds': 1234567843,
'microseconds': 220000,
},
'offset': -720,
'negative_utc': None,
},
'committer': {
'name': b'tony',
'email': b'ar@dumont.fr',
'fullname': b'tony <ar@dumont.fr>',
},
'committer_date': {
'timestamp': 1123456789,
'offset': 0,
'negative_utc': False,
},
'parents': [b'01234567890123456789'],
'type': 'git',
'directory': self.dir2['id'],
'metadata': None,
'synthetic': False
}
self.revision3 = {
'id': hash_to_bytes('7026b7c1a2af56521e951c01ed20f255fa054238'),
'message': b'a simple revision with no parents this time',
'author': {
'name': b'Roberto Dicosmo',
'email': b'roberto@example.com',
'fullname': b'Roberto Dicosmo <roberto@example.com>',
},
'date': {
'timestamp': {
'seconds': 1234567843,
'microseconds': 220000,
},
'offset': -720,
'negative_utc': None,
},
'committer': {
'name': b'tony',
'email': b'ar@dumont.fr',
'fullname': b'tony <ar@dumont.fr>',
},
'committer_date': {
'timestamp': 1127351742,
'offset': 0,
'negative_utc': False,
},
'parents': [],
'type': 'git',
'directory': self.dir2['id'],
'metadata': None,
'synthetic': True
}
self.revision4 = {
'id': hash_to_bytes('368a48fe15b7db2383775f97c6b247011b3f14f4'),
'message': b'parent of self.revision2',
'author': {
'name': b'me',
'email': b'me@soft.heri',
'fullname': b'me <me@soft.heri>',
},
'date': {
'timestamp': {
'seconds': 1244567843,
'microseconds': 220000,
},
'offset': -720,
'negative_utc': None,
},
'committer': {
'name': b'committer-dude',
'email': b'committer@dude.com',
'fullname': b'committer-dude <committer@dude.com>',
},
'committer_date': {
'timestamp': {
'seconds': 1244567843,
'microseconds': 220000,
},
'offset': -720,
'negative_utc': None,
},
'parents': [self.revision3['id']],
'type': 'git',
'directory': self.dir['id'],
'metadata': None,
'synthetic': False
}
self.origin = {
'url': 'file:///dev/null',
'type': 'git',
}
self.origin2 = {
'url': 'file:///dev/zero',
'type': 'git',
}
self.provider = {
'name': 'hal',
'type': 'deposit-client',
'url': 'http:///hal/inria',
'metadata': {
'location': 'France'
}
}
self.metadata_tool = {
'name': 'swh-deposit',
'version': '0.0.1',
'configuration': {
'sword_version': '2'
}
}
self.origin_metadata = {
'origin': self.origin,
'discovery_date': datetime.datetime(2015, 1, 1, 23, 0, 0,
tzinfo=datetime.timezone.utc),
'provider': self.provider,
'tool': 'swh-deposit',
'metadata': {
'name': 'test_origin_metadata',
'version': '0.0.1'
}
}
self.origin_metadata2 = {
'origin': self.origin,
'discovery_date': datetime.datetime(2017, 1, 1, 23, 0, 0,
tzinfo=datetime.timezone.utc),
'provider': self.provider,
'tool': 'swh-deposit',
'metadata': {
'name': 'test_origin_metadata',
'version': '0.0.1'
}
}
self.date_visit1 = datetime.datetime(2015, 1, 1, 23, 0, 0,
tzinfo=datetime.timezone.utc)
self.date_visit2 = datetime.datetime(2017, 1, 1, 23, 0, 0,
tzinfo=datetime.timezone.utc)
self.date_visit3 = datetime.datetime(2018, 1, 1, 23, 0, 0,
tzinfo=datetime.timezone.utc)
self.release = {
'id': b'87659012345678901234',
'name': b'v0.0.1',
'author': {
'name': b'olasd',
'email': b'nic@olasd.fr',
'fullname': b'olasd <nic@olasd.fr>',
},
'date': {
'timestamp': 1234567890,
'offset': 42,
'negative_utc': None,
},
'target': b'43210987654321098765',
'target_type': 'revision',
'message': b'synthetic release',
'synthetic': True,
}
self.release2 = {
'id': b'56789012348765901234',
'name': b'v0.0.2',
'author': {
'name': b'tony',
'email': b'ar@dumont.fr',
'fullname': b'tony <ar@dumont.fr>',
},
'date': {
'timestamp': 1634366813,
'offset': -120,
'negative_utc': None,
},
'target': b'432109\xa9765432\xc309\x00765',
'target_type': 'revision',
'message': b'v0.0.2\nMisc performance improvements + bug fixes',
'synthetic': False
}
self.release3 = {
'id': b'87659012345678904321',
'name': b'v0.0.2',
'author': {
'name': b'tony',
'email': b'tony@ardumont.fr',
'fullname': b'tony <tony@ardumont.fr>',
},
'date': {
'timestamp': 1634336813,
'offset': 0,
'negative_utc': False,
},
'target': self.revision2['id'],
'target_type': 'revision',
'message': b'yet another synthetic release',
'synthetic': True,
}
self.fetch_history_date = datetime.datetime(
2015, 1, 2, 21, 0, 0,
tzinfo=datetime.timezone.utc)
self.fetch_history_end = datetime.datetime(
2015, 1, 2, 23, 0, 0,
tzinfo=datetime.timezone.utc)
self.fetch_history_duration = (self.fetch_history_end -
self.fetch_history_date)
self.fetch_history_data = {
'status': True,
'result': {'foo': 'bar'},
'stdout': 'blabla',
'stderr': 'blablabla',
}
self.snapshot = {
'id': hash_to_bytes('2498dbf535f882bc7f9a18fb16c9ad27fda7bab7'),
'branches': {
b'master': {
'target': self.revision['id'],
'target_type': 'revision',
},
},
'next_branch': None
}
self.empty_snapshot = {
'id': hash_to_bytes('1a8893e6a86f444e8be8e7bda6cb34fb1735a00e'),
'branches': {},
'next_branch': None
}
self.complete_snapshot = {
'id': hash_to_bytes('6e65b86363953b780d92b0a928f3e8fcdd10db36'),
'branches': {
b'directory': {
'target': hash_to_bytes(
'1bd0e65f7d2ff14ae994de17a1e7fe65111dcad8'),
'target_type': 'directory',
},
b'content': {
'target': hash_to_bytes(
'fe95a46679d128ff167b7c55df5d02356c5a1ae1'),
'target_type': 'content',
},
b'alias': {
'target': b'revision',
'target_type': 'alias',
},
b'revision': {
'target': hash_to_bytes(
'aafb16d69fd30ff58afdd69036a26047f3aebdc6'),
'target_type': 'revision',
},
b'release': {
'target': hash_to_bytes(
'7045404f3d1c54e6473c71bbb716529fbad4be24'),
'target_type': 'release',
},
b'snapshot': {
'target': hash_to_bytes(
'1a8893e6a86f444e8be8e7bda6cb34fb1735a00e'),
'target_type': 'snapshot',
},
b'dangling': None,
},
'next_branch': None
}
class CommonTestStorage(TestStorageData):
"""Base class for Storage testing.
This class is used as-is to test local storage (see TestLocalStorage
below) and remote storage (see TestRemoteStorage in
test_remote_storage.py.
We need to have the two classes inherit from this base class
separately to avoid nosetests running the tests from the base
class twice.
"""
@staticmethod
def normalize_entity(entity):
entity = copy.deepcopy(entity)
for key in ('date', 'committer_date'):
if key in entity:
entity[key] = identifiers.normalize_timestamp(entity[key])
return entity
def test_check_config(self):
self.assertTrue(self.storage.check_config(check_write=True))
self.assertTrue(self.storage.check_config(check_write=False))
def test_content_add(self):
cont = self.cont
self.storage.content_add([cont])
self.assertEqual(list(self.storage.content_get([cont['sha1']])),
[{'sha1': cont['sha1'], 'data': cont['data']}])
expected_cont = cont.copy()
del expected_cont['data']
self.assertEqual(list(self.journal_writer.objects),
[('content', expected_cont)])
def test_content_add_db(self):
cont = self.cont
self.storage.content_add([cont])
if hasattr(self.storage, 'objstorage'):
self.assertIn(cont['sha1'], self.storage.objstorage)
self.cursor.execute('SELECT sha1, sha1_git, sha256, length, status'
' FROM content WHERE sha1 = %s',
(cont['sha1'],))
datum = self.cursor.fetchone()
self.assertEqual(
(datum[0].tobytes(), datum[1].tobytes(), datum[2].tobytes(),
datum[3], datum[4]),
(cont['sha1'], cont['sha1_git'], cont['sha256'],
cont['length'], 'visible'))
expected_cont = cont.copy()
del expected_cont['data']
self.assertEqual(list(self.journal_writer.objects),
[('content', expected_cont)])
def test_content_add_collision(self):
cont1 = self.cont
# create (corrupted) content with same sha1{,_git} but != sha256
cont1b = cont1.copy()
sha256_array = bytearray(cont1b['sha256'])
sha256_array[0] += 1
cont1b['sha256'] = bytes(sha256_array)
with self.assertRaises(HashCollision) as cm:
self.storage.content_add([cont1, cont1b])
self.assertIn(cm.exception.args[0], ['sha1', 'sha1_git', 'blake2s256'])
def test_skipped_content_add(self):
cont = self.skipped_cont.copy()
cont2 = self.skipped_cont2.copy()
cont2['blake2s256'] = None
self.storage.content_add([cont, cont, cont2])
self.cursor.execute('SELECT sha1, sha1_git, sha256, blake2s256, '
'length, status, reason '
'FROM skipped_content ORDER BY sha1_git')
datums = self.cursor.fetchall()
self.assertEqual(2, len(datums))
datum = datums[0]
self.assertEqual(
(datum[0].tobytes(), datum[1].tobytes(), datum[2].tobytes(),
datum[3].tobytes(), datum[4], datum[5], datum[6]),
(cont['sha1'], cont['sha1_git'], cont['sha256'],
cont['blake2s256'], cont['length'], 'absent',
'Content too long')
)
datum2 = datums[1]
self.assertEqual(
(datum2[0].tobytes(), datum2[1].tobytes(), datum2[2].tobytes(),
datum2[3], datum2[4], datum2[5], datum2[6]),
(cont2['sha1'], cont2['sha1_git'], cont2['sha256'],
cont2['blake2s256'], cont2['length'], 'absent',
'Content too long')
)
@pytest.mark.property_based
@given(strategies.sets(
elements=strategies.sampled_from(
['sha256', 'sha1_git', 'blake2s256']),
min_size=0))
def test_content_missing(self, algos):
algos |= {'sha1'}
cont2 = self.cont2
missing_cont = self.missing_cont
self.storage.content_add([cont2])
test_contents = [cont2]
missing_per_hash = defaultdict(list)
for i in range(256):
test_content = missing_cont.copy()
for hash in algos:
test_content[hash] = bytes([i]) + test_content[hash][1:]
missing_per_hash[hash].append(test_content[hash])
test_contents.append(test_content)
self.assertCountEqual(
self.storage.content_missing(test_contents),
missing_per_hash['sha1']
)
for hash in algos:
self.assertCountEqual(
self.storage.content_missing(test_contents, key_hash=hash),
missing_per_hash[hash]
)
def test_content_missing_per_sha1(self):
# given
cont2 = self.cont2
missing_cont = self.missing_cont
self.storage.content_add([cont2])
# when
gen = self.storage.content_missing_per_sha1([cont2['sha1'],
missing_cont['sha1']])
# then
self.assertEqual(list(gen), [missing_cont['sha1']])
def test_content_get_metadata(self):
cont1 = self.cont.copy()
cont2 = self.cont2.copy()
self.storage.content_add([cont1, cont2])
gen = self.storage.content_get_metadata([cont1['sha1'], cont2['sha1']])
# we only retrieve the metadata
cont1.pop('data')
cont2.pop('data')
self.assertCountEqual(list(gen), [cont1, cont2])
def test_content_get_metadata_missing_sha1(self):
cont1 = self.cont.copy()
cont2 = self.cont2.copy()
missing_cont = self.missing_cont.copy()
self.storage.content_add([cont1, cont2])
gen = self.storage.content_get_metadata([missing_cont['sha1']])
# All the metadata keys are None
missing_cont.pop('data')
for key in list(missing_cont):
if key != 'sha1':
missing_cont[key] = None
self.assertEqual(list(gen), [missing_cont])
@staticmethod
def _transform_entries(dir_, *, prefix=b''):
for ent in dir_['entries']:
yield {
'dir_id': dir_['id'],
'type': ent['type'],
'target': ent['target'],
'name': prefix + ent['name'],
'perms': ent['perms'],
'status': None,
'sha1': None,
'sha1_git': None,
'sha256': None,
'length': None,
}
def test_directory_add(self):
init_missing = list(self.storage.directory_missing([self.dir['id']]))
self.assertEqual([self.dir['id']], init_missing)
self.storage.directory_add([self.dir])
self.assertEqual(list(self.journal_writer.objects),
[('directory', self.dir)])
actual_data = list(self.storage.directory_ls(self.dir['id']))
expected_data = list(self._transform_entries(self.dir))
self.assertCountEqual(expected_data, actual_data)
after_missing = list(self.storage.directory_missing([self.dir['id']]))
self.assertEqual([], after_missing)
def test_directory_get_recursive(self):
init_missing = list(self.storage.directory_missing([self.dir['id']]))
self.assertEqual([self.dir['id']], init_missing)
self.storage.directory_add([self.dir, self.dir2, self.dir3])
self.assertEqual(list(self.journal_writer.objects),
[('directory', self.dir),
('directory', self.dir2),
('directory', self.dir3)])
actual_data = list(self.storage.directory_ls(
self.dir['id'], recursive=True))
expected_data = list(self._transform_entries(self.dir))
self.assertCountEqual(expected_data, actual_data)
actual_data = list(self.storage.directory_ls(
self.dir2['id'], recursive=True))
expected_data = list(self._transform_entries(self.dir2))
self.assertCountEqual(expected_data, actual_data)
actual_data = list(self.storage.directory_ls(
self.dir3['id'], recursive=True))
expected_data = list(itertools.chain(
self._transform_entries(self.dir3),
self._transform_entries(self.dir, prefix=b'subdir/')))
self.assertCountEqual(expected_data, actual_data)
def test_directory_entry_get_by_path(self):
# given
init_missing = list(self.storage.directory_missing([self.dir3['id']]))
self.assertEqual([self.dir3['id']], init_missing)
self.storage.directory_add([self.dir3])
expected_entries = [
{
'dir_id': self.dir3['id'],
'name': b'foo',
'type': 'file',
'target': self.cont['sha1_git'],
'sha1': None,
'sha1_git': None,
'sha256': None,
'status': None,
'perms': from_disk.DentryPerms.content,
'length': None,
},
{
'dir_id': self.dir3['id'],
'name': b'subdir',
'type': 'dir',
'target': self.dir['id'],
'sha1': None,
'sha1_git': None,
'sha256': None,
'status': None,
'perms': from_disk.DentryPerms.directory,
'length': None,
},
{
'dir_id': self.dir3['id'],
'name': b'hello',
'type': 'file',
'target': b'12345678901234567890',
'sha1': None,
'sha1_git': None,
'sha256': None,
'status': None,
'perms': from_disk.DentryPerms.content,
'length': None,
},
]
# when (all must be found here)
for entry, expected_entry in zip(self.dir3['entries'],
expected_entries):
actual_entry = self.storage.directory_entry_get_by_path(
self.dir3['id'],
[entry['name']])
self.assertEqual(actual_entry, expected_entry)
# when (nothing should be found here since self.dir is not persisted.)
for entry in self.dir['entries']:
actual_entry = self.storage.directory_entry_get_by_path(
self.dir['id'],
[entry['name']])
self.assertIsNone(actual_entry)
def test_revision_add(self):
init_missing = self.storage.revision_missing([self.revision['id']])
self.assertEqual([self.revision['id']], list(init_missing))
self.storage.revision_add([self.revision])
end_missing = self.storage.revision_missing([self.revision['id']])
self.assertEqual([], list(end_missing))
self.assertEqual(list(self.journal_writer.objects),
[('revision', self.revision)])
def test_revision_log(self):
# given
# self.revision4 -is-child-of-> self.revision3
self.storage.revision_add([self.revision3,
self.revision4])
# when
actual_results = list(self.storage.revision_log(
[self.revision4['id']]))
# hack: ids generated
for actual_result in actual_results:
if 'id' in actual_result['author']:
del actual_result['author']['id']
if 'id' in actual_result['committer']:
del actual_result['committer']['id']
self.assertEqual(len(actual_results), 2) # rev4 -child-> rev3
self.assertEqual(actual_results[0],
self.normalize_entity(self.revision4))
self.assertEqual(actual_results[1],
self.normalize_entity(self.revision3))
self.assertEqual(list(self.journal_writer.objects),
[('revision', self.revision3),
('revision', self.revision4)])
def test_revision_log_with_limit(self):
# given
# self.revision4 -is-child-of-> self.revision3
self.storage.revision_add([self.revision3,
self.revision4])
actual_results = list(self.storage.revision_log(
[self.revision4['id']], 1))
# hack: ids generated
for actual_result in actual_results:
if 'id' in actual_result['author']:
del actual_result['author']['id']
if 'id' in actual_result['committer']:
del actual_result['committer']['id']
self.assertEqual(len(actual_results), 1)
self.assertEqual(actual_results[0], self.revision4)
def test_revision_log_unknown_revision(self):
rev_log = list(self.storage.revision_log([self.revision['id']]))
self.assertEqual(rev_log, [])
@staticmethod
def _short_revision(revision):
return [revision['id'], revision['parents']]
def test_revision_shortlog(self):
# given
# self.revision4 -is-child-of-> self.revision3
self.storage.revision_add([self.revision3,
self.revision4])
# when
actual_results = list(self.storage.revision_shortlog(
[self.revision4['id']]))
self.assertEqual(len(actual_results), 2) # rev4 -child-> rev3
self.assertEqual(list(actual_results[0]),
self._short_revision(self.revision4))
self.assertEqual(list(actual_results[1]),
self._short_revision(self.revision3))
def test_revision_shortlog_with_limit(self):
# given
# self.revision4 -is-child-of-> self.revision3
self.storage.revision_add([self.revision3,
self.revision4])
actual_results = list(self.storage.revision_shortlog(
[self.revision4['id']], 1))
self.assertEqual(len(actual_results), 1)
self.assertEqual(list(actual_results[0]),
self._short_revision(self.revision4))
def test_revision_get(self):
self.storage.revision_add([self.revision])
actual_revisions = list(self.storage.revision_get(
[self.revision['id'], self.revision2['id']]))
# when
if 'id' in actual_revisions[0]['author']:
del actual_revisions[0]['author']['id'] # hack: ids are generated
if 'id' in actual_revisions[0]['committer']:
del actual_revisions[0]['committer']['id']
self.assertEqual(len(actual_revisions), 2)
self.assertEqual(actual_revisions[0],
self.normalize_entity(self.revision))
self.assertIsNone(actual_revisions[1])
def test_revision_get_no_parents(self):
self.storage.revision_add([self.revision3])
get = list(self.storage.revision_get([self.revision3['id']]))
self.assertEqual(len(get), 1)
self.assertEqual(get[0]['parents'], []) # no parents on this one
def test_release_add(self):
init_missing = self.storage.release_missing([self.release['id'],
self.release2['id']])
self.assertEqual([self.release['id'], self.release2['id']],
list(init_missing))
self.storage.release_add([self.release, self.release2])
end_missing = self.storage.release_missing([self.release['id'],
self.release2['id']])
self.assertEqual([], list(end_missing))
self.assertEqual(list(self.journal_writer.objects),
[('release', self.release),
('release', self.release2)])
def test_release_get(self):
# given
self.storage.release_add([self.release, self.release2])
# when
actual_releases = list(self.storage.release_get([self.release['id'],
self.release2['id']]))
# then
for actual_release in actual_releases:
if 'id' in actual_release['author']:
del actual_release['author']['id'] # hack: ids are generated
self.assertEqual([self.normalize_entity(self.release),
self.normalize_entity(self.release2)],
[actual_releases[0], actual_releases[1]])
unknown_releases = \
list(self.storage.release_get([self.release3['id']]))
self.assertIsNone(unknown_releases[0])
def test_origin_add_one(self):
origin0 = self.storage.origin_get(self.origin)
self.assertIsNone(origin0)
id = self.storage.origin_add_one(self.origin)
actual_origin = self.storage.origin_get({'url': self.origin['url'],
'type': self.origin['type']})
self.assertEqual(actual_origin['id'], id)
id2 = self.storage.origin_add_one(self.origin)
self.assertEqual(id, id2)
def test_origin_add(self):
origin0 = self.storage.origin_get([self.origin])[0]
self.assertIsNone(origin0)
origin1, origin2 = self.storage.origin_add([self.origin, self.origin2])
actual_origin = self.storage.origin_get([{
'url': self.origin['url'],
'type': self.origin['type'],
}])[0]
self.assertEqual(actual_origin['id'], origin1['id'])
actual_origin2 = self.storage.origin_get([{
'url': self.origin2['url'],
'type': self.origin2['type'],
}])[0]
self.assertEqual(actual_origin2['id'], origin2['id'])
self.assertEqual(list(self.journal_writer.objects),
[('origin', actual_origin),
('origin', actual_origin2)])
def test_origin_add_twice(self):
add1 = self.storage.origin_add([self.origin, self.origin2])
add2 = self.storage.origin_add([self.origin, self.origin2])
self.assertEqual(add1, add2)
def test_origin_get_legacy(self):
self.assertIsNone(self.storage.origin_get(self.origin))
id = self.storage.origin_add_one(self.origin)
# lookup per type and url (returns id)
actual_origin0 = self.storage.origin_get(
{'url': self.origin['url'], 'type': self.origin['type']})
self.assertEqual(actual_origin0['id'], id)
# lookup per id (returns dict)
actual_origin1 = self.storage.origin_get({'id': id})
self.assertEqual(actual_origin1, {'id': id,
'type': self.origin['type'],
'url': self.origin['url']})
def test_origin_get(self):
self.assertIsNone(self.storage.origin_get(self.origin))
origin_id = self.storage.origin_add_one(self.origin)
# lookup per type and url (returns id)
actual_origin0 = self.storage.origin_get(
[{'url': self.origin['url'], 'type': self.origin['type']}])
self.assertEqual(len(actual_origin0), 1, actual_origin0)
self.assertEqual(actual_origin0[0]['id'], origin_id)
# lookup per id (returns dict)
actual_origin1 = self.storage.origin_get([{'id': origin_id}])
self.assertEqual(len(actual_origin1), 1, actual_origin1)
self.assertEqual(actual_origin1[0], {'id': origin_id,
'type': self.origin['type'],
'url': self.origin['url']})
def test_origin_get_consistency(self):
self.assertIsNone(self.storage.origin_get(self.origin))
id = self.storage.origin_add_one(self.origin)
with self.assertRaises(ValueError):
self.storage.origin_get([
{'url': self.origin['url'], 'type': self.origin['type']},
{'id': id}])
def test_origin_search(self):
found_origins = list(self.storage.origin_search(self.origin['url']))
self.assertEqual(len(found_origins), 0)
found_origins = list(self.storage.origin_search(self.origin['url'],
regexp=True))
self.assertEqual(len(found_origins), 0)
id = self.storage.origin_add_one(self.origin)
origin_data = {'id': id,
'type': self.origin['type'],
'url': self.origin['url']}
found_origins = list(self.storage.origin_search(self.origin['url']))
self.assertEqual(len(found_origins), 1)
self.assertEqual(found_origins[0], origin_data)
found_origins = list(self.storage.origin_search(
'.' + self.origin['url'][1:-1] + '.', regexp=True))
self.assertEqual(len(found_origins), 1)
self.assertEqual(found_origins[0], origin_data)
id2 = self.storage.origin_add_one(self.origin2)
origin2_data = {'id': id2,
'type': self.origin2['type'],
'url': self.origin2['url']}
found_origins = list(self.storage.origin_search(self.origin2['url']))
self.assertEqual(len(found_origins), 1)
self.assertEqual(found_origins[0], origin2_data)
found_origins = list(self.storage.origin_search(
'.' + self.origin2['url'][1:-1] + '.', regexp=True))
self.assertEqual(len(found_origins), 1)
self.assertEqual(found_origins[0], origin2_data)
found_origins = list(self.storage.origin_search('/'))
self.assertEqual(len(found_origins), 2)
found_origins = list(self.storage.origin_search('.*/.*', regexp=True))
self.assertEqual(len(found_origins), 2)
found_origins = list(self.storage.origin_search('/', offset=0, limit=1)) # noqa
self.assertEqual(len(found_origins), 1)
self.assertEqual(found_origins[0], origin_data)
found_origins = list(self.storage.origin_search('.*/.*', offset=0, limit=1, regexp=True)) # noqa
self.assertEqual(len(found_origins), 1)
self.assertEqual(found_origins[0], origin_data)
found_origins = list(self.storage.origin_search('/', offset=1, limit=1)) # noqa
self.assertEqual(len(found_origins), 1)
self.assertEqual(found_origins[0], origin2_data)
found_origins = list(self.storage.origin_search('.*/.*', offset=1, limit=1, regexp=True)) # noqa
self.assertEqual(len(found_origins), 1)
self.assertEqual(found_origins[0], origin2_data)
def test_origin_visit_add(self):
# given
self.assertIsNone(self.storage.origin_get([self.origin2])[0])
origin_id = self.storage.origin_add_one(self.origin2)
self.assertIsNotNone(origin_id)
# when
origin_visit1 = self.storage.origin_visit_add(
origin_id,
date=self.date_visit2)
# then
self.assertEqual(origin_visit1['origin'], origin_id)
self.assertIsNotNone(origin_visit1['visit'])
actual_origin_visits = list(self.storage.origin_visit_get(origin_id))
self.assertEqual(actual_origin_visits,
[{
'origin': origin_id,
'date': self.date_visit2,
'visit': origin_visit1['visit'],
'status': 'ongoing',
'metadata': None,
'snapshot': None,
}])
expected_origin = self.origin2.copy()
expected_origin['id'] = origin_id
data = {
'origin': expected_origin,
'date': self.date_visit2,
'visit': origin_visit1['visit'],
'status': 'ongoing',
'metadata': None,
'snapshot': None,
}
self.assertEqual(list(self.journal_writer.objects),
[('origin', expected_origin),
('origin_visit', data)])
def test_origin_visit_update(self):
# given
origin_id = self.storage.origin_add_one(self.origin2)
origin_id2 = self.storage.origin_add_one(self.origin)
origin_visit1 = self.storage.origin_visit_add(
origin_id,
date=self.date_visit2)
origin_visit2 = self.storage.origin_visit_add(
origin_id,
date=self.date_visit3)
origin_visit3 = self.storage.origin_visit_add(
origin_id2,
date=self.date_visit3)
# when
visit1_metadata = {
'contents': 42,
'directories': 22,
}
self.storage.origin_visit_update(
origin_id, origin_visit1['visit'], status='full',
metadata=visit1_metadata)
self.storage.origin_visit_update(origin_id2, origin_visit3['visit'],
status='partial')
# then
actual_origin_visits = list(self.storage.origin_visit_get(origin_id))
self.assertEqual(actual_origin_visits, [{
'origin': origin_visit2['origin'],
'date': self.date_visit2,
'visit': origin_visit1['visit'],
'status': 'full',
'metadata': visit1_metadata,
'snapshot': None,
}, {
'origin': origin_visit2['origin'],
'date': self.date_visit3,
'visit': origin_visit2['visit'],
'status': 'ongoing',
'metadata': None,
'snapshot': None,
}])
actual_origin_visits_bis = list(self.storage.origin_visit_get(
origin_id, limit=1))
self.assertEqual(actual_origin_visits_bis,
[{
'origin': origin_visit2['origin'],
'date': self.date_visit2,
'visit': origin_visit1['visit'],
'status': 'full',
'metadata': visit1_metadata,
'snapshot': None,
}])
actual_origin_visits_ter = list(self.storage.origin_visit_get(
origin_id, last_visit=origin_visit1['visit']))
self.assertEqual(actual_origin_visits_ter,
[{
'origin': origin_visit2['origin'],
'date': self.date_visit3,
'visit': origin_visit2['visit'],
'status': 'ongoing',
'metadata': None,
'snapshot': None,
}])
actual_origin_visits2 = list(self.storage.origin_visit_get(origin_id2))
self.assertEqual(actual_origin_visits2,
[{
'origin': origin_visit3['origin'],
'date': self.date_visit3,
'visit': origin_visit3['visit'],
'status': 'partial',
'metadata': None,
'snapshot': None,
}])
expected_origin = self.origin2.copy()
expected_origin['id'] = origin_id
expected_origin2 = self.origin.copy()
expected_origin2['id'] = origin_id2
data1 = {
'origin': expected_origin,
'date': self.date_visit2,
'visit': origin_visit1['visit'],
'status': 'ongoing',
'metadata': None,
'snapshot': None,
}
data2 = {
'origin': expected_origin,
'date': self.date_visit3,
'visit': origin_visit2['visit'],
'status': 'ongoing',
'metadata': None,
'snapshot': None,
}
data3 = {
'origin': expected_origin2,
'date': self.date_visit3,
'visit': origin_visit3['visit'],
'status': 'ongoing',
'metadata': None,
'snapshot': None,
}
data4 = {
'origin': expected_origin,
'date': self.date_visit2,
'visit': origin_visit1['visit'],
'metadata': visit1_metadata,
'status': 'full',
'snapshot': None,
}
data5 = {
'origin': expected_origin2,
'date': self.date_visit3,
'visit': origin_visit3['visit'],
'status': 'partial',
'metadata': None,
'snapshot': None,
}
self.assertEqual(list(self.journal_writer.objects),
[('origin', expected_origin),
('origin', expected_origin2),
('origin_visit', data1),
('origin_visit', data2),
('origin_visit', data3),
('origin_visit', data4),
('origin_visit', data5)])
def test_origin_visit_update_missing_snapshot(self):
# given
origin_id = self.storage.origin_add_one(self.origin)
origin_visit = self.storage.origin_visit_add(
origin_id,
date=self.date_visit1)
# when
self.storage.origin_visit_update(
origin_id, origin_visit['visit'],
snapshot=self.snapshot['id'])
# then
actual_origin_visit = self.storage.origin_visit_get_by(
origin_visit['origin'], origin_visit['visit'])
self.assertEqual(actual_origin_visit['snapshot'], self.snapshot['id'])
# when
- self.storage.snapshot_add(self.snapshot)
+ self.storage.snapshot_add([self.snapshot])
self.assertEqual(actual_origin_visit['snapshot'], self.snapshot['id'])
def test_origin_visit_get_by(self):
origin_id = self.storage.origin_add_one(self.origin2)
origin_id2 = self.storage.origin_add_one(self.origin)
origin_visit1 = self.storage.origin_visit_add(
origin_id,
date=self.date_visit2)
self.storage.snapshot_add(origin_id, origin_visit1['visit'],
self.snapshot)
# Add some other {origin, visit} entries
self.storage.origin_visit_add(origin_id, date=self.date_visit3)
self.storage.origin_visit_add(origin_id2, date=self.date_visit3)
# when
visit1_metadata = {
'contents': 42,
'directories': 22,
}
self.storage.origin_visit_update(
origin_id, origin_visit1['visit'], status='full',
metadata=visit1_metadata)
expected_origin_visit = origin_visit1.copy()
expected_origin_visit.update({
'origin': origin_id,
'visit': origin_visit1['visit'],
'date': self.date_visit2,
'metadata': visit1_metadata,
'status': 'full',
'snapshot': self.snapshot['id'],
})
# when
actual_origin_visit1 = self.storage.origin_visit_get_by(
origin_visit1['origin'], origin_visit1['visit'])
# then
self.assertEqual(actual_origin_visit1, expected_origin_visit)
def test_origin_visit_get_by_no_result(self):
# No result
actual_origin_visit = self.storage.origin_visit_get_by(
10, 999)
self.assertIsNone(actual_origin_visit)
def test_person_get(self):
# given (person injection through revision for example)
self.storage.revision_add([self.revision])
rev = list(self.storage.revision_get([self.revision['id']]))[0]
id0 = rev['committer']['id']
person0 = self.revision['committer']
id1 = rev['author']['id']
person1 = self.revision['author']
# when
actual_persons = self.storage.person_get([id0, id1])
# then
self.assertEqual(
list(actual_persons), [
{
'id': id0,
'fullname': person0['fullname'],
'name': person0['name'],
'email': person0['email'],
},
{
'id': id1,
'fullname': person1['fullname'],
'name': person1['name'],
'email': person1['email'],
}
])
def test_person_get_fullname_unicity(self):
# given (person injection through revisions for example)
revision = self.revision
# create a revision with same committer fullname but wo name and email
revision2 = copy.deepcopy(self.revision2)
revision2['committer'] = dict(revision['committer'])
revision2['committer']['email'] = None
revision2['committer']['name'] = None
self.storage.revision_add([revision])
self.storage.revision_add([revision2])
# when getting added revisions
revisions = list(
self.storage.revision_get([revision['id'], revision2['id']]))
# then
# check committers are the same
self.assertEqual(revisions[0]['committer'],
revisions[1]['committer'])
# check person_get return same result
person0 = list(
self.storage.person_get([revisions[0]['committer']['id']]))[0]
person1 = list(
self.storage.person_get([revisions[1]['committer']['id']]))[0]
self.assertEqual(person0, person1)
def test_snapshot_add_get_empty(self):
origin_id = self.storage.origin_add_one(self.origin)
origin_visit1 = self.storage.origin_visit_add(origin_id,
self.date_visit1)
visit_id = origin_visit1['visit']
- self.storage.snapshot_add(self.empty_snapshot)
+ self.storage.snapshot_add([self.empty_snapshot])
self.storage.origin_visit_update(
origin_id, visit_id, snapshot=self.empty_snapshot['id'])
by_id = self.storage.snapshot_get(self.empty_snapshot['id'])
self.assertEqual(by_id, self.empty_snapshot)
by_ov = self.storage.snapshot_get_by_origin_visit(origin_id, visit_id)
self.assertEqual(by_ov, self.empty_snapshot)
expected_origin = self.origin.copy()
expected_origin['id'] = origin_id
data1 = {
'origin': expected_origin,
'date': self.date_visit1,
'visit': origin_visit1['visit'],
'status': 'ongoing',
'metadata': None,
'snapshot': None,
}
data2 = {
'origin': expected_origin,
'date': self.date_visit1,
'visit': origin_visit1['visit'],
'status': 'ongoing',
'metadata': None,
'snapshot': self.empty_snapshot['id'],
}
self.assertEqual(list(self.journal_writer.objects),
[('origin', expected_origin),
('origin_visit', data1),
('snapshot', self.empty_snapshot),
('origin_visit', data2)])
def test_snapshot_add_get_empty__legacy_add(self):
origin_id = self.storage.origin_add_one(self.origin)
origin_visit1 = self.storage.origin_visit_add(origin_id,
self.date_visit1)
visit_id = origin_visit1['visit']
self.storage.snapshot_add(origin_id, visit_id, self.empty_snapshot)
by_id = self.storage.snapshot_get(self.empty_snapshot['id'])
self.assertEqual(by_id, self.empty_snapshot)
by_ov = self.storage.snapshot_get_by_origin_visit(origin_id, visit_id)
self.assertEqual(by_ov, self.empty_snapshot)
expected_origin = self.origin.copy()
expected_origin['id'] = origin_id
data1 = {
'origin': expected_origin,
'date': self.date_visit1,
'visit': origin_visit1['visit'],
'status': 'ongoing',
'metadata': None,
'snapshot': None,
}
data2 = {
'origin': expected_origin,
'date': self.date_visit1,
'visit': origin_visit1['visit'],
'status': 'ongoing',
'metadata': None,
'snapshot': self.empty_snapshot['id'],
}
self.assertEqual(list(self.journal_writer.objects),
[('origin', expected_origin),
('origin_visit', data1),
('snapshot', self.empty_snapshot),
('origin_visit', data2)])
def test_snapshot_add_get_complete(self):
origin_id = self.storage.origin_add_one(self.origin)
origin_visit1 = self.storage.origin_visit_add(origin_id,
self.date_visit1)
visit_id = origin_visit1['visit']
self.storage.snapshot_add(origin_id, visit_id, self.complete_snapshot)
by_id = self.storage.snapshot_get(self.complete_snapshot['id'])
self.assertEqual(by_id, self.complete_snapshot)
by_ov = self.storage.snapshot_get_by_origin_visit(origin_id, visit_id)
self.assertEqual(by_ov, self.complete_snapshot)
+ def test_snapshot_add_many(self):
+ self.storage.snapshot_add([self.snapshot, self.complete_snapshot])
+
+ self.assertEqual(
+ self.complete_snapshot,
+ self.storage.snapshot_get(self.complete_snapshot['id']))
+
+ self.assertEqual(
+ self.snapshot,
+ self.storage.snapshot_get(self.snapshot['id']))
+
+ def test_snapshot_add_many_incremental(self):
+ self.storage.snapshot_add([self.complete_snapshot])
+ self.storage.snapshot_add([self.snapshot, self.complete_snapshot])
+
+ self.assertEqual(
+ self.complete_snapshot,
+ self.storage.snapshot_get(self.complete_snapshot['id']))
+
+ self.assertEqual(
+ self.snapshot,
+ self.storage.snapshot_get(self.snapshot['id']))
+
def test_snapshot_add_count_branches(self):
origin_id = self.storage.origin_add_one(self.origin)
origin_visit1 = self.storage.origin_visit_add(origin_id,
self.date_visit1)
visit_id = origin_visit1['visit']
self.storage.snapshot_add(origin_id, visit_id, self.complete_snapshot)
snp_id = self.complete_snapshot['id']
snp_size = self.storage.snapshot_count_branches(snp_id)
expected_snp_size = {
'alias': 1,
'content': 1,
'directory': 1,
'release': 1,
'revision': 1,
'snapshot': 1,
None: 1
}
self.assertEqual(snp_size, expected_snp_size)
def test_snapshot_add_get_paginated(self):
origin_id = self.storage.origin_add_one(self.origin)
origin_visit1 = self.storage.origin_visit_add(origin_id,
self.date_visit1)
visit_id = origin_visit1['visit']
self.storage.snapshot_add(origin_id, visit_id, self.complete_snapshot)
snp_id = self.complete_snapshot['id']
branches = self.complete_snapshot['branches']
branch_names = list(sorted(branches))
snapshot = self.storage.snapshot_get_branches(snp_id,
branches_from=b'release')
rel_idx = branch_names.index(b'release')
expected_snapshot = {
'id': snp_id,
'branches': {
name: branches[name]
for name in branch_names[rel_idx:]
},
'next_branch': None,
}
self.assertEqual(snapshot, expected_snapshot)
snapshot = self.storage.snapshot_get_branches(snp_id,
branches_count=1)
expected_snapshot = {
'id': snp_id,
'branches': {
branch_names[0]: branches[branch_names[0]],
},
'next_branch': b'content',
}
self.assertEqual(snapshot, expected_snapshot)
snapshot = self.storage.snapshot_get_branches(
snp_id, branches_from=b'directory', branches_count=3)
dir_idx = branch_names.index(b'directory')
expected_snapshot = {
'id': snp_id,
'branches': {
name: branches[name]
for name in branch_names[dir_idx:dir_idx + 3]
},
'next_branch': branch_names[dir_idx + 3],
}
self.assertEqual(snapshot, expected_snapshot)
def test_snapshot_add_get_filtered(self):
origin_id = self.storage.origin_add_one(self.origin)
origin_visit1 = self.storage.origin_visit_add(origin_id,
self.date_visit1)
visit_id = origin_visit1['visit']
self.storage.snapshot_add(origin_id, visit_id, self.complete_snapshot)
snp_id = self.complete_snapshot['id']
branches = self.complete_snapshot['branches']
snapshot = self.storage.snapshot_get_branches(
snp_id, target_types=['release', 'revision'])
expected_snapshot = {
'id': snp_id,
'branches': {
name: tgt
for name, tgt in branches.items()
if tgt and tgt['target_type'] in ['release', 'revision']
},
'next_branch': None,
}
self.assertEqual(snapshot, expected_snapshot)
snapshot = self.storage.snapshot_get_branches(snp_id,
target_types=['alias'])
expected_snapshot = {
'id': snp_id,
'branches': {
name: tgt
for name, tgt in branches.items()
if tgt and tgt['target_type'] == 'alias'
},
'next_branch': None,
}
self.assertEqual(snapshot, expected_snapshot)
def test_snapshot_add_get(self):
origin_id = self.storage.origin_add_one(self.origin)
origin_visit1 = self.storage.origin_visit_add(origin_id,
self.date_visit1)
visit_id = origin_visit1['visit']
self.storage.snapshot_add(origin_id, visit_id, self.snapshot)
by_id = self.storage.snapshot_get(self.snapshot['id'])
self.assertEqual(by_id, self.snapshot)
by_ov = self.storage.snapshot_get_by_origin_visit(origin_id, visit_id)
self.assertEqual(by_ov, self.snapshot)
origin_visit_info = self.storage.origin_visit_get_by(origin_id,
visit_id)
self.assertEqual(origin_visit_info['snapshot'], self.snapshot['id'])
def test_snapshot_add_nonexistent_visit(self):
origin_id = self.storage.origin_add_one(self.origin)
visit_id = 54164461156
self.journal_writer.objects[:] = []
- self.storage.snapshot_add(self.snapshot)
+ self.storage.snapshot_add([self.snapshot])
with self.assertRaises(ValueError):
self.storage.origin_visit_update(
origin_id, visit_id, snapshot=self.snapshot['id'])
self.assertEqual(list(self.journal_writer.objects), [
('snapshot', self.snapshot)])
def test_snapshot_add_nonexistent_visit__legacy_add(self):
origin_id = self.storage.origin_add_one(self.origin)
visit_id = 54164461156
self.journal_writer.objects[:] = []
with self.assertRaises(ValueError):
self.storage.snapshot_add(origin_id, visit_id, self.snapshot)
# Note: the actual legacy behavior was to abort before adding
# the snapshot; but delaying non-existence checks makes the
# compatibility code simpler
self.assertEqual(list(self.journal_writer.objects), [
('snapshot', self.snapshot)])
def test_snapshot_add_twice(self):
origin_id = self.storage.origin_add_one(self.origin)
origin_visit1 = self.storage.origin_visit_add(origin_id,
self.date_visit1)
visit1_id = origin_visit1['visit']
- self.storage.snapshot_add(self.snapshot)
+ self.storage.snapshot_add([self.snapshot])
self.storage.origin_visit_update(
origin_id, visit1_id, snapshot=self.snapshot['id'])
by_ov1 = self.storage.snapshot_get_by_origin_visit(origin_id,
visit1_id)
self.assertEqual(by_ov1, self.snapshot)
origin_visit2 = self.storage.origin_visit_add(origin_id,
self.date_visit2)
visit2_id = origin_visit2['visit']
- self.storage.snapshot_add(self.snapshot)
+ self.storage.snapshot_add([self.snapshot])
self.storage.origin_visit_update(
origin_id, visit2_id, snapshot=self.snapshot['id'])
by_ov2 = self.storage.snapshot_get_by_origin_visit(origin_id,
visit2_id)
self.assertEqual(by_ov2, self.snapshot)
expected_origin = self.origin.copy()
expected_origin['id'] = origin_id
data1 = {
'origin': expected_origin,
'date': self.date_visit1,
'visit': origin_visit1['visit'],
'status': 'ongoing',
'metadata': None,
'snapshot': None,
}
data2 = {
'origin': expected_origin,
'date': self.date_visit1,
'visit': origin_visit1['visit'],
'status': 'ongoing',
'metadata': None,
'snapshot': self.snapshot['id'],
}
data3 = {
'origin': expected_origin,
'date': self.date_visit2,
'visit': origin_visit2['visit'],
'status': 'ongoing',
'metadata': None,
'snapshot': None,
}
data4 = {
'origin': expected_origin,
'date': self.date_visit2,
'visit': origin_visit2['visit'],
'status': 'ongoing',
'metadata': None,
'snapshot': self.snapshot['id'],
}
self.assertEqual(list(self.journal_writer.objects),
[('origin', expected_origin),
('origin_visit', data1),
('snapshot', self.snapshot),
('origin_visit', data2),
('origin_visit', data3),
- ('snapshot', self.snapshot),
('origin_visit', data4)])
def test_snapshot_add_twice__legacy_add(self):
origin_id = self.storage.origin_add_one(self.origin)
origin_visit1 = self.storage.origin_visit_add(origin_id,
self.date_visit1)
visit1_id = origin_visit1['visit']
self.storage.snapshot_add(origin_id, visit1_id, self.snapshot)
by_ov1 = self.storage.snapshot_get_by_origin_visit(origin_id,
visit1_id)
self.assertEqual(by_ov1, self.snapshot)
origin_visit2 = self.storage.origin_visit_add(origin_id,
self.date_visit2)
visit2_id = origin_visit2['visit']
self.storage.snapshot_add(origin_id, visit2_id, self.snapshot)
by_ov2 = self.storage.snapshot_get_by_origin_visit(origin_id,
visit2_id)
self.assertEqual(by_ov2, self.snapshot)
expected_origin = self.origin.copy()
expected_origin['id'] = origin_id
data1 = {
'origin': expected_origin,
'date': self.date_visit1,
'visit': origin_visit1['visit'],
'status': 'ongoing',
'metadata': None,
'snapshot': None,
}
data2 = {
'origin': expected_origin,
'date': self.date_visit1,
'visit': origin_visit1['visit'],
'status': 'ongoing',
'metadata': None,
'snapshot': self.snapshot['id'],
}
data3 = {
'origin': expected_origin,
'date': self.date_visit2,
'visit': origin_visit2['visit'],
'status': 'ongoing',
'metadata': None,
'snapshot': None,
}
data4 = {
'origin': expected_origin,
'date': self.date_visit2,
'visit': origin_visit2['visit'],
'status': 'ongoing',
'metadata': None,
'snapshot': self.snapshot['id'],
}
self.assertEqual(list(self.journal_writer.objects),
[('origin', expected_origin),
('origin_visit', data1),
('snapshot', self.snapshot),
('origin_visit', data2),
('origin_visit', data3),
- ('snapshot', self.snapshot),
('origin_visit', data4)])
def test_snapshot_get_nonexistent(self):
bogus_snapshot_id = b'bogus snapshot id 00'
bogus_origin_id = 1
bogus_visit_id = 1
by_id = self.storage.snapshot_get(bogus_snapshot_id)
self.assertIsNone(by_id)
by_ov = self.storage.snapshot_get_by_origin_visit(bogus_origin_id,
bogus_visit_id)
self.assertIsNone(by_ov)
def test_snapshot_get_latest(self):
origin_id = self.storage.origin_add_one(self.origin)
origin_visit1 = self.storage.origin_visit_add(origin_id,
self.date_visit1)
visit1_id = origin_visit1['visit']
origin_visit2 = self.storage.origin_visit_add(origin_id,
self.date_visit2)
visit2_id = origin_visit2['visit']
# Add a visit with the same date as the previous one
origin_visit3 = self.storage.origin_visit_add(origin_id,
self.date_visit2)
visit3_id = origin_visit3['visit']
# Two visits, both with no snapshot: latest snapshot is None
self.assertIsNone(self.storage.snapshot_get_latest(origin_id))
# Add snapshot to visit1, latest snapshot = visit 1 snapshot
- self.storage.snapshot_add(self.complete_snapshot)
+ self.storage.snapshot_add([self.complete_snapshot])
self.storage.origin_visit_update(
origin_id, visit1_id, snapshot=self.complete_snapshot['id'])
self.assertEqual(self.complete_snapshot,
self.storage.snapshot_get_latest(origin_id))
# Status filter: both visits are status=ongoing, so no snapshot
# returned
self.assertIsNone(
self.storage.snapshot_get_latest(origin_id,
allowed_statuses=['full'])
)
# Mark the first visit as completed and check status filter again
self.storage.origin_visit_update(origin_id, visit1_id, status='full')
self.assertEqual(
self.complete_snapshot,
self.storage.snapshot_get_latest(origin_id,
allowed_statuses=['full']),
)
# Add snapshot to visit2 and check that the new snapshot is returned
- self.storage.snapshot_add(self.empty_snapshot)
+ self.storage.snapshot_add([self.empty_snapshot])
self.storage.origin_visit_update(
origin_id, visit2_id, snapshot=self.empty_snapshot['id'])
self.assertEqual(self.empty_snapshot,
self.storage.snapshot_get_latest(origin_id))
# Check that the status filter is still working
self.assertEqual(
self.complete_snapshot,
self.storage.snapshot_get_latest(origin_id,
allowed_statuses=['full']),
)
# Add snapshot to visit3 (same date as visit2) and check that
# the new snapshot is returned
- self.storage.snapshot_add(self.complete_snapshot)
+ self.storage.snapshot_add([self.complete_snapshot])
self.storage.origin_visit_update(
origin_id, visit3_id, snapshot=self.complete_snapshot['id'])
self.assertEqual(self.complete_snapshot,
self.storage.snapshot_get_latest(origin_id))
def test_snapshot_get_latest__missing_snapshot(self):
origin_id = self.storage.origin_add_one(self.origin)
origin_visit1 = self.storage.origin_visit_add(origin_id,
self.date_visit1)
visit1_id = origin_visit1['visit']
origin_visit2 = self.storage.origin_visit_add(origin_id,
self.date_visit2)
visit2_id = origin_visit2['visit']
# Two visits, both with no snapshot: latest snapshot is None
self.assertIsNone(self.storage.snapshot_get_latest(origin_id))
# Add unknown snapshot to visit1, latest snapshot = None
self.storage.origin_visit_update(
origin_id, visit1_id, snapshot=self.complete_snapshot['id'])
self.assertIsNone(self.storage.snapshot_get_latest(origin_id))
# Status filter: both visits are status=ongoing, so no snapshot
# returned
self.assertIsNone(
self.storage.snapshot_get_latest(origin_id,
allowed_statuses=['full'])
)
# Mark the first visit as completed and check status filter again
self.storage.origin_visit_update(origin_id, visit1_id, status='full')
self.assertIsNone(
self.storage.snapshot_get_latest(origin_id,
allowed_statuses=['full']),
)
# Actually add the snapshot and check status filter again
- self.storage.snapshot_add(self.complete_snapshot)
+ self.storage.snapshot_add([self.complete_snapshot])
self.assertEqual(
self.complete_snapshot,
self.storage.snapshot_get_latest(origin_id)
)
# Add unknown snapshot to visit2 and check that the old snapshot
# is still returned
self.storage.origin_visit_update(
origin_id, visit2_id, snapshot=self.empty_snapshot['id'])
self.assertEqual(
self.complete_snapshot,
self.storage.snapshot_get_latest(origin_id))
# Actually add that snapshot and check that the new one is returned
- self.storage.snapshot_add(self.empty_snapshot)
+ self.storage.snapshot_add([self.empty_snapshot])
self.assertEqual(
self.empty_snapshot,
self.storage.snapshot_get_latest(origin_id)
)
def test_snapshot_get_latest__legacy_add(self):
origin_id = self.storage.origin_add_one(self.origin)
origin_visit1 = self.storage.origin_visit_add(origin_id,
self.date_visit1)
visit1_id = origin_visit1['visit']
origin_visit2 = self.storage.origin_visit_add(origin_id,
self.date_visit2)
visit2_id = origin_visit2['visit']
# Add a visit with the same date as the previous one
origin_visit3 = self.storage.origin_visit_add(origin_id,
self.date_visit2)
visit3_id = origin_visit3['visit']
# Two visits, both with no snapshot: latest snapshot is None
self.assertIsNone(self.storage.snapshot_get_latest(origin_id))
# Add snapshot to visit1, latest snapshot = visit 1 snapshot
self.storage.snapshot_add(origin_id, visit1_id, self.complete_snapshot)
self.assertEqual(self.complete_snapshot,
self.storage.snapshot_get_latest(origin_id))
# Status filter: both visits are status=ongoing, so no snapshot
# returned
self.assertIsNone(
self.storage.snapshot_get_latest(origin_id,
allowed_statuses=['full'])
)
# Mark the first visit as completed and check status filter again
self.storage.origin_visit_update(origin_id, visit1_id, status='full')
self.assertEqual(
self.complete_snapshot,
self.storage.snapshot_get_latest(origin_id,
allowed_statuses=['full']),
)
# Add snapshot to visit2 and check that the new snapshot is returned
self.storage.snapshot_add(origin_id, visit2_id, self.empty_snapshot)
self.assertEqual(self.empty_snapshot,
self.storage.snapshot_get_latest(origin_id))
# Check that the status filter is still working
self.assertEqual(
self.complete_snapshot,
self.storage.snapshot_get_latest(origin_id,
allowed_statuses=['full']),
)
# Add snapshot to visit3 (same date as visit2) and check that
# the new snapshot is returned
self.storage.snapshot_add(origin_id, visit3_id, self.complete_snapshot)
self.assertEqual(self.complete_snapshot,
self.storage.snapshot_get_latest(origin_id))
def test_stat_counters(self):
expected_keys = ['content', 'directory',
'origin', 'person', 'revision']
# Initially, all counters are 0
self.storage.refresh_stat_counters()
counters = self.storage.stat_counters()
self.assertTrue(set(expected_keys) <= set(counters))
for key in expected_keys:
self.assertEqual(counters[key], 0)
# Add a content. Only the content counter should increase.
self.storage.content_add([self.cont])
self.storage.refresh_stat_counters()
counters = self.storage.stat_counters()
self.assertTrue(set(expected_keys) <= set(counters))
for key in expected_keys:
if key != 'content':
self.assertEqual(counters[key], 0)
self.assertEqual(counters['content'], 1)
# Add other objects. Check their counter increased as well.
origin_id = self.storage.origin_add_one(self.origin2)
origin_visit1 = self.storage.origin_visit_add(
origin_id,
date=self.date_visit2)
self.storage.snapshot_add(origin_id, origin_visit1['visit'],
self.snapshot)
self.storage.directory_add([self.dir])
self.storage.revision_add([self.revision])
self.storage.refresh_stat_counters()
counters = self.storage.stat_counters()
self.assertEqual(counters['content'], 1)
self.assertEqual(counters['directory'], 1)
self.assertEqual(counters['snapshot'], 1)
self.assertEqual(counters['origin'], 1)
self.assertEqual(counters['revision'], 1)
self.assertEqual(counters['person'], 2)
def test_content_find_with_present_content(self):
# 1. with something to find
cont = self.cont
self.storage.content_add([cont])
actually_present = self.storage.content_find({'sha1': cont['sha1']})
actually_present.pop('ctime')
self.assertEqual(actually_present, {
'sha1': cont['sha1'],
'sha256': cont['sha256'],
'sha1_git': cont['sha1_git'],
'blake2s256': cont['blake2s256'],
'length': cont['length'],
'status': 'visible'
})
# 2. with something to find
actually_present = self.storage.content_find(
{'sha1_git': cont['sha1_git']})
actually_present.pop('ctime')
self.assertEqual(actually_present, {
'sha1': cont['sha1'],
'sha256': cont['sha256'],
'sha1_git': cont['sha1_git'],
'blake2s256': cont['blake2s256'],
'length': cont['length'],
'status': 'visible'
})
# 3. with something to find
actually_present = self.storage.content_find(
{'sha256': cont['sha256']})
actually_present.pop('ctime')
self.assertEqual(actually_present, {
'sha1': cont['sha1'],
'sha256': cont['sha256'],
'sha1_git': cont['sha1_git'],
'blake2s256': cont['blake2s256'],
'length': cont['length'],
'status': 'visible'
})
# 4. with something to find
actually_present = self.storage.content_find({
'sha1': cont['sha1'],
'sha1_git': cont['sha1_git'],
'sha256': cont['sha256'],
'blake2s256': cont['blake2s256'],
})
actually_present.pop('ctime')
self.assertEqual(actually_present, {
'sha1': cont['sha1'],
'sha256': cont['sha256'],
'sha1_git': cont['sha1_git'],
'blake2s256': cont['blake2s256'],
'length': cont['length'],
'status': 'visible'
})
def test_content_find_with_non_present_content(self):
# 1. with something that does not exist
missing_cont = self.missing_cont
actually_present = self.storage.content_find(
{'sha1': missing_cont['sha1']})
self.assertIsNone(actually_present)
# 2. with something that does not exist
actually_present = self.storage.content_find(
{'sha1_git': missing_cont['sha1_git']})
self.assertIsNone(actually_present)
# 3. with something that does not exist
actually_present = self.storage.content_find(
{'sha256': missing_cont['sha256']})
self.assertIsNone(actually_present)
def test_content_find_bad_input(self):
# 1. with bad input
with self.assertRaises(ValueError):
self.storage.content_find({}) # empty is bad
# 2. with bad input
with self.assertRaises(ValueError):
self.storage.content_find(
{'unknown-sha1': 'something'}) # not the right key
def test_object_find_by_sha1_git(self):
sha1_gits = [b'00000000000000000000']
expected = {
b'00000000000000000000': [],
}
self.storage.content_add([self.cont])
sha1_gits.append(self.cont['sha1_git'])
expected[self.cont['sha1_git']] = [{
'sha1_git': self.cont['sha1_git'],
'type': 'content',
'id': self.cont['sha1'],
}]
self.storage.directory_add([self.dir])
sha1_gits.append(self.dir['id'])
expected[self.dir['id']] = [{
'sha1_git': self.dir['id'],
'type': 'directory',
'id': self.dir['id'],
}]
self.storage.revision_add([self.revision])
sha1_gits.append(self.revision['id'])
expected[self.revision['id']] = [{
'sha1_git': self.revision['id'],
'type': 'revision',
'id': self.revision['id'],
}]
self.storage.release_add([self.release])
sha1_gits.append(self.release['id'])
expected[self.release['id']] = [{
'sha1_git': self.release['id'],
'type': 'release',
'id': self.release['id'],
}]
ret = self.storage.object_find_by_sha1_git(sha1_gits)
for val in ret.values():
for obj in val:
del obj['object_id']
self.assertEqual(expected, ret)
def test_tool_add(self):
tool = {
'name': 'some-unknown-tool',
'version': 'some-version',
'configuration': {"debian-package": "some-package"},
}
actual_tool = self.storage.tool_get(tool)
self.assertIsNone(actual_tool) # does not exist
# add it
actual_tools = list(self.storage.tool_add([tool]))
self.assertEqual(len(actual_tools), 1)
actual_tool = actual_tools[0]
self.assertIsNotNone(actual_tool) # now it exists
new_id = actual_tool.pop('id')
self.assertEqual(actual_tool, tool)
actual_tools2 = list(self.storage.tool_add([tool]))
actual_tool2 = actual_tools2[0]
self.assertIsNotNone(actual_tool2) # now it exists
new_id2 = actual_tool2.pop('id')
self.assertEqual(new_id, new_id2)
self.assertEqual(actual_tool, actual_tool2)
def test_tool_add_multiple(self):
tool = {
'name': 'some-unknown-tool',
'version': 'some-version',
'configuration': {"debian-package": "some-package"},
}
actual_tools = list(self.storage.tool_add([tool]))
self.assertEqual(len(actual_tools), 1)
new_tools = [tool, {
'name': 'yet-another-tool',
'version': 'version',
'configuration': {},
}]
actual_tools = list(self.storage.tool_add(new_tools))
self.assertEqual(len(actual_tools), 2)
# order not guaranteed, so we iterate over results to check
for tool in actual_tools:
_id = tool.pop('id')
self.assertIsNotNone(_id)
self.assertIn(tool, new_tools)
def test_tool_get_missing(self):
tool = {
'name': 'unknown-tool',
'version': '3.1.0rc2-31-ga2cbb8c',
'configuration': {"command_line": "nomossa <filepath>"},
}
actual_tool = self.storage.tool_get(tool)
self.assertIsNone(actual_tool)
def test_tool_metadata_get_missing_context(self):
tool = {
'name': 'swh-metadata-translator',
'version': '0.0.1',
'configuration': {"context": "unknown-context"},
}
actual_tool = self.storage.tool_get(tool)
self.assertIsNone(actual_tool)
def test_tool_metadata_get(self):
tool = {
'name': 'swh-metadata-translator',
'version': '0.0.1',
'configuration': {"type": "local", "context": "npm"},
}
tools = list(self.storage.tool_add([tool]))
expected_tool = tools[0]
# when
actual_tool = self.storage.tool_get(tool)
# then
self.assertEqual(expected_tool, actual_tool)
def test_metadata_provider_get(self):
# given
no_provider = self.storage.metadata_provider_get(6459456445615)
self.assertIsNone(no_provider)
# when
provider_id = self.storage.metadata_provider_add(
self.provider['name'],
self.provider['type'],
self.provider['url'],
self.provider['metadata'])
actual_provider = self.storage.metadata_provider_get(provider_id)
expected_provider = {
'provider_name': self.provider['name'],
'provider_url': self.provider['url']
}
# then
del actual_provider['id']
self.assertTrue(actual_provider, expected_provider)
def test_metadata_provider_get_by(self):
# given
no_provider = self.storage.metadata_provider_get_by({
'provider_name': self.provider['name'],
'provider_url': self.provider['url']
})
self.assertIsNone(no_provider)
# when
provider_id = self.storage.metadata_provider_add(
self.provider['name'],
self.provider['type'],
self.provider['url'],
self.provider['metadata'])
actual_provider = self.storage.metadata_provider_get_by({
'provider_name': self.provider['name'],
'provider_url': self.provider['url']
})
# then
self.assertTrue(provider_id, actual_provider['id'])
def test_origin_metadata_add(self):
# given
origin_id = self.storage.origin_add([self.origin])[0]['id']
origin_metadata0 = list(self.storage.origin_metadata_get_by(origin_id))
self.assertTrue(len(origin_metadata0) == 0)
tools = list(self.storage.tool_add([self.metadata_tool]))
tool = tools[0]
self.storage.metadata_provider_add(
self.provider['name'],
self.provider['type'],
self.provider['url'],
self.provider['metadata'])
provider = self.storage.metadata_provider_get_by({
'provider_name': self.provider['name'],
'provider_url': self.provider['url']
})
# when adding for the same origin 2 metadatas
self.storage.origin_metadata_add(
origin_id,
self.origin_metadata['discovery_date'],
provider['id'],
tool['id'],
self.origin_metadata['metadata'])
actual_om1 = list(self.storage.origin_metadata_get_by(origin_id))
# then
self.assertEqual(len(actual_om1), 1)
self.assertEqual(actual_om1[0]['origin_id'], origin_id)
def test_origin_metadata_get(self):
# given
origin_id = self.storage.origin_add([self.origin])[0]['id']
origin_id2 = self.storage.origin_add([self.origin2])[0]['id']
self.storage.metadata_provider_add(self.provider['name'],
self.provider['type'],
self.provider['url'],
self.provider['metadata'])
provider = self.storage.metadata_provider_get_by({
'provider_name': self.provider['name'],
'provider_url': self.provider['url']
})
tool = list(self.storage.tool_add([self.metadata_tool]))[0]
# when adding for the same origin 2 metadatas
self.storage.origin_metadata_add(
origin_id,
self.origin_metadata['discovery_date'],
provider['id'],
tool['id'],
self.origin_metadata['metadata'])
self.storage.origin_metadata_add(
origin_id2,
self.origin_metadata2['discovery_date'],
provider['id'],
tool['id'],
self.origin_metadata2['metadata'])
self.storage.origin_metadata_add(
origin_id,
self.origin_metadata2['discovery_date'],
provider['id'],
tool['id'],
self.origin_metadata2['metadata'])
all_metadatas = list(self.storage.origin_metadata_get_by(origin_id))
metadatas_for_origin2 = list(self.storage.origin_metadata_get_by(
origin_id2))
expected_results = [{
'origin_id': origin_id,
'discovery_date': datetime.datetime(
2017, 1, 1, 23, 0,
tzinfo=datetime.timezone.utc),
'metadata': {
'name': 'test_origin_metadata',
'version': '0.0.1'
},
'provider_id': provider['id'],
'provider_name': 'hal',
'provider_type': 'deposit-client',
'provider_url': 'http:///hal/inria',
'tool_id': tool['id']
}, {
'origin_id': origin_id,
'discovery_date': datetime.datetime(
2015, 1, 1, 23, 0,
tzinfo=datetime.timezone.utc),
'metadata': {
'name': 'test_origin_metadata',
'version': '0.0.1'
},
'provider_id': provider['id'],
'provider_name': 'hal',
'provider_type': 'deposit-client',
'provider_url': 'http:///hal/inria',
'tool_id': tool['id']
}]
# then
self.assertEqual(len(all_metadatas), 2)
self.assertEqual(len(metadatas_for_origin2), 1)
self.assertCountEqual(all_metadatas, expected_results)
def test_origin_metadata_get_by_provider_type(self):
# given
origin_id = self.storage.origin_add([self.origin])[0]['id']
origin_id2 = self.storage.origin_add([self.origin2])[0]['id']
provider1_id = self.storage.metadata_provider_add(
self.provider['name'],
self.provider['type'],
self.provider['url'],
self.provider['metadata'])
provider1 = self.storage.metadata_provider_get_by({
'provider_name': self.provider['name'],
'provider_url': self.provider['url']
})
self.assertEqual(provider1,
self.storage.metadata_provider_get(provider1_id))
provider2_id = self.storage.metadata_provider_add(
'swMATH',
'registry',
'http://www.swmath.org/',
{'email': 'contact@swmath.org',
'license': 'All rights reserved'})
provider2 = self.storage.metadata_provider_get_by({
'provider_name': 'swMATH',
'provider_url': 'http://www.swmath.org/'
})
self.assertEqual(provider2,
self.storage.metadata_provider_get(provider2_id))
# using the only tool now inserted in the data.sql, but for this
# provider should be a crawler tool (not yet implemented)
tool = list(self.storage.tool_add([self.metadata_tool]))[0]
# when adding for the same origin 2 metadatas
self.storage.origin_metadata_add(
origin_id,
self.origin_metadata['discovery_date'],
provider1['id'],
tool['id'],
self.origin_metadata['metadata'])
self.storage.origin_metadata_add(
origin_id2,
self.origin_metadata2['discovery_date'],
provider2['id'],
tool['id'],
self.origin_metadata2['metadata'])
provider_type = 'registry'
m_by_provider = list(self.storage.
origin_metadata_get_by(
origin_id2,
provider_type))
for item in m_by_provider:
if 'id' in item:
del item['id']
expected_results = [{
'origin_id': origin_id2,
'discovery_date': datetime.datetime(
2017, 1, 1, 23, 0,
tzinfo=datetime.timezone.utc),
'metadata': {
'name': 'test_origin_metadata',
'version': '0.0.1'
},
'provider_id': provider2['id'],
'provider_name': 'swMATH',
'provider_type': provider_type,
'provider_url': 'http://www.swmath.org/',
'tool_id': tool['id']
}]
# then
self.assertEqual(len(m_by_provider), 1)
self.assertEqual(m_by_provider, expected_results)
class CommonPropTestStorage:
def assert_contents_ok(self, expected_contents, actual_contents,
keys_to_check={'sha1', 'data'}):
"""Assert that a given list of contents matches on a given set of keys.
"""
for k in keys_to_check:
expected_list = sorted([c[k] for c in expected_contents])
actual_list = sorted([c[k] for c in actual_contents])
self.assertEqual(actual_list, expected_list)
@given(gen_contents(min_size=1, max_size=4))
def test_generate_content_get(self, contents):
# add contents to storage
self.storage.content_add(contents)
# input the list of sha1s we want from storage
get_sha1s = [c['sha1'] for c in contents]
# retrieve contents
actual_contents = list(self.storage.content_get(get_sha1s))
self.assert_contents_ok(contents, actual_contents)
@given(gen_contents(min_size=1, max_size=4))
def test_generate_content_get_metadata(self, contents):
# add contents to storage
self.storage.content_add(contents)
# input the list of sha1s we want from storage
get_sha1s = [c['sha1'] for c in contents]
# retrieve contents
actual_contents = list(self.storage.content_get_metadata(get_sha1s))
self.assertEqual(len(actual_contents), len(contents))
# will check that all contents are retrieved correctly
one_content = contents[0]
# content_get_metadata does not return data
keys_to_check = set(one_content.keys()) - {'data'}
self.assert_contents_ok(contents, actual_contents,
keys_to_check=keys_to_check)
def test_generate_content_get_range_limit_none(self):
"""content_get_range call with wrong limit input should fail"""
with self.assertRaises(ValueError) as e:
self.storage.content_get_range(start=None, end=None, limit=None)
self.assertEqual(e.exception.args, (
'Development error: limit should not be None',))
@given(gen_contents(min_size=1, max_size=4))
def test_generate_content_get_range_no_limit(self, contents):
"""content_get_range returns contents within range provided"""
self.reset_storage_tables()
# add contents to storage
self.storage.content_add(contents)
# input the list of sha1s we want from storage
get_sha1s = sorted([c['sha1'] for c in contents])
start = get_sha1s[0]
end = get_sha1s[-1]
# retrieve contents
actual_result = self.storage.content_get_range(start, end)
actual_contents = actual_result['contents']
actual_next = actual_result['next']
self.assertEqual(len(contents), len(actual_contents))
self.assertIsNone(actual_next)
one_content = contents[0]
keys_to_check = set(one_content.keys()) - {'data'}
self.assert_contents_ok(contents, actual_contents, keys_to_check)
@given(gen_contents(min_size=4, max_size=4))
def test_generate_content_get_range_limit(self, contents):
"""content_get_range paginates results if limit exceeded"""
self.reset_storage_tables()
contents_map = {c['sha1']: c for c in contents}
# add contents to storage
self.storage.content_add(contents)
# input the list of sha1s we want from storage
get_sha1s = sorted([c['sha1'] for c in contents])
start = get_sha1s[0]
end = get_sha1s[-1]
# retrieve contents limited to 3 results
limited_results = len(contents) - 1
actual_result = self.storage.content_get_range(start, end,
limit=limited_results)
actual_contents = actual_result['contents']
actual_next = actual_result['next']
self.assertEqual(limited_results, len(actual_contents))
self.assertIsNotNone(actual_next)
self.assertEqual(actual_next, get_sha1s[-1])
expected_contents = [contents_map[sha1] for sha1 in get_sha1s[:-1]]
keys_to_check = set(contents[0].keys()) - {'data'}
self.assert_contents_ok(expected_contents, actual_contents,
keys_to_check)
# retrieve next part
actual_results2 = self.storage.content_get_range(start=end, end=end)
actual_contents2 = actual_results2['contents']
actual_next2 = actual_results2['next']
self.assertEqual(1, len(actual_contents2))
self.assertIsNone(actual_next2)
self.assert_contents_ok([contents_map[actual_next]], actual_contents2,
keys_to_check)
def test_origin_get_invalid_id_legacy(self):
invalid_origin_id = 1
origin_info = self.storage.origin_get({'id': invalid_origin_id})
self.assertIsNone(origin_info)
origin_visits = list(self.storage.origin_visit_get(
invalid_origin_id))
self.assertEqual(origin_visits, [])
def test_origin_get_invalid_id(self):
origin_info = self.storage.origin_get([{'id': 1}, {'id': 2}])
self.assertEqual(origin_info, [None, None])
origin_visits = list(self.storage.origin_visit_get(1))
self.assertEqual(origin_visits, [])
@given(gen_origins(min_size=100, max_size=100))
def test_origin_get_range(self, new_origins):
nb_origins = len(new_origins)
self.storage.origin_add(new_origins)
origin_from = random.randint(1, nb_origins)
origin_count = random.randint(1, nb_origins - origin_from)
expected_origins = []
for i in range(origin_from, origin_from + origin_count):
expected_origins.append(self.storage.origin_get({'id': i}))
actual_origins = list(
self.storage.origin_get_range(origin_from=origin_from,
origin_count=origin_count))
self.assertEqual(actual_origins, expected_origins)
origin_from = -1
origin_count = 10
origins = list(
self.storage.origin_get_range(origin_from=origin_from,
origin_count=origin_count))
self.assertEqual(len(origins), origin_count)
origin_from = 10000
origins = list(
self.storage.origin_get_range(origin_from=origin_from,
origin_count=origin_count))
self.assertEqual(len(origins), 0)
def test_origin_count(self):
new_origins = [
{
'type': 'git',
'url': 'https://github.com/user1/repo1'
},
{
'type': 'git',
'url': 'https://github.com/user2/repo1'
},
{
'type': 'git',
'url': 'https://github.com/user3/repo1'
},
{
'type': 'git',
'url': 'https://gitlab.com/user1/repo1'
},
{
'type': 'git',
'url': 'https://gitlab.com/user2/repo1'
}
]
self.storage.origin_add(new_origins)
self.assertEqual(self.storage.origin_count('github'), 3)
self.assertEqual(self.storage.origin_count('gitlab'), 2)
self.assertEqual(
self.storage.origin_count('.*user.*', regexp=True), 5)
self.assertEqual(
self.storage.origin_count('.*user.*', regexp=False), 0)
self.assertEqual(
self.storage.origin_count('.*user1.*', regexp=True), 2)
self.assertEqual(
self.storage.origin_count('.*user1.*', regexp=False), 0)
@pytest.mark.db
class TestLocalStorage(CommonTestStorage, StorageTestDbFixture,
unittest.TestCase):
"""Test the local storage"""
# Can only be tested with local storage as you can't mock
# datetimes for the remote server
def test_fetch_history(self):
origin = self.storage.origin_add_one(self.origin)
with patch('datetime.datetime'):
datetime.datetime.now.return_value = self.fetch_history_date
fetch_history_id = self.storage.fetch_history_start(origin)
datetime.datetime.now.assert_called_with(tz=datetime.timezone.utc)
with patch('datetime.datetime'):
datetime.datetime.now.return_value = self.fetch_history_end
self.storage.fetch_history_end(fetch_history_id,
self.fetch_history_data)
fetch_history = self.storage.fetch_history_get(fetch_history_id)
expected_fetch_history = self.fetch_history_data.copy()
expected_fetch_history['id'] = fetch_history_id
expected_fetch_history['origin'] = origin
expected_fetch_history['date'] = self.fetch_history_date
expected_fetch_history['duration'] = self.fetch_history_duration
self.assertEqual(expected_fetch_history, fetch_history)
# This test is only relevant on the local storage, with an actual
# objstorage raising an exception
def test_content_add_objstorage_exception(self):
self.storage.objstorage.add = Mock(
side_effect=Exception('mocked broken objstorage')
)
with self.assertRaises(Exception) as e:
self.storage.content_add([self.cont])
self.assertEqual(e.exception.args, ('mocked broken objstorage',))
missing = list(self.storage.content_missing([self.cont]))
self.assertEqual(missing, [self.cont['sha1']])
@pytest.mark.db
@pytest.mark.property_based
class PropTestLocalStorage(CommonPropTestStorage, StorageTestDbFixture,
unittest.TestCase):
pass
class AlteringSchemaTest(TestStorageData, StorageTestDbFixture,
unittest.TestCase):
"""This class is dedicated for the rare case where the schema needs to
be altered dynamically.
Otherwise, the tests could be blocking when ran altogether.
"""
def test_content_update(self):
self.storage.journal_writer = None # TODO, not supported
cont = copy.deepcopy(self.cont)
self.storage.content_add([cont])
# alter the sha1_git for example
cont['sha1_git'] = hash_to_bytes(
'3a60a5275d0333bf13468e8b3dcab90f4046e654')
self.storage.content_update([cont], keys=['sha1_git'])
with self.storage.get_db().transaction() as cur:
cur.execute('SELECT sha1, sha1_git, sha256, length, status'
' FROM content WHERE sha1 = %s',
(cont['sha1'],))
datum = cur.fetchone()
self.assertEqual(
(datum[0], datum[1], datum[2],
datum[3], datum[4]),
(cont['sha1'], cont['sha1_git'], cont['sha256'],
cont['length'], 'visible'))
def test_content_update_with_new_cols(self):
self.storage.journal_writer = None # TODO, not supported
with self.storage.get_db().transaction() as cur:
cur.execute("""alter table content
add column test text default null,
add column test2 text default null""")
cont = copy.deepcopy(self.cont2)
self.storage.content_add([cont])
cont['test'] = 'value-1'
cont['test2'] = 'value-2'
self.storage.content_update([cont], keys=['test', 'test2'])
with self.storage.get_db().transaction() as cur:
cur.execute(
'SELECT sha1, sha1_git, sha256, length, status, test, test2'
' FROM content WHERE sha1 = %s',
(cont['sha1'],))
datum = cur.fetchone()
self.assertEqual(
(datum[0], datum[1], datum[2],
datum[3], datum[4], datum[5], datum[6]),
(cont['sha1'], cont['sha1_git'], cont['sha256'],
cont['length'], 'visible', cont['test'], cont['test2']))
with self.storage.get_db().transaction() as cur:
cur.execute("""alter table content drop column test,
drop column test2""")
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Thu, Sep 18, 4:49 PM (1 d, 18 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3261519
Attached To
rDSTO Storage manager
Event Timeline
Log In to Comment