Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/storage/db.py b/swh/storage/db.py
index 7dd0c1da..0c534f65 100644
--- a/swh/storage/db.py
+++ b/swh/storage/db.py
@@ -1,1049 +1,1049 @@
# Copyright (C) 2015-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import binascii
import datetime
import enum
import functools
import json
import os
import select
import threading
from contextlib import contextmanager
import psycopg2
import psycopg2.extras
from .db_utils import execute_values_generator
TMP_CONTENT_TABLE = 'tmp_content'
psycopg2.extras.register_uuid()
def stored_procedure(stored_proc):
"""decorator to execute remote stored procedure, specified as argument
Generally, the body of the decorated function should be empty. If it is
not, the stored procedure will be executed first; the function body then.
"""
def wrap(meth):
@functools.wraps(meth)
def _meth(self, *args, **kwargs):
cur = kwargs.get('cur', None)
self._cursor(cur).execute('SELECT %s()' % stored_proc)
meth(self, *args, **kwargs)
return _meth
return wrap
def jsonize(value):
"""Convert a value to a psycopg2 JSON object if necessary"""
if isinstance(value, dict):
return psycopg2.extras.Json(value)
return value
def entry_to_bytes(entry):
"""Convert an entry coming from the database to bytes"""
if isinstance(entry, memoryview):
return entry.tobytes()
if isinstance(entry, list):
return [entry_to_bytes(value) for value in entry]
return entry
def line_to_bytes(line):
"""Convert a line coming from the database to bytes"""
if not line:
return line
if isinstance(line, dict):
return {k: entry_to_bytes(v) for k, v in line.items()}
return line.__class__(entry_to_bytes(entry) for entry in line)
def cursor_to_bytes(cursor):
"""Yield all the data from a cursor as bytes"""
yield from (line_to_bytes(line) for line in cursor)
def execute_values_to_bytes(*args, **kwargs):
for line in execute_values_generator(*args, **kwargs):
yield line_to_bytes(line)
class BaseDb:
"""Base class for swh.storage.*Db.
cf. swh.storage.db.Db, swh.archiver.db.ArchiverDb
"""
@classmethod
def connect(cls, *args, **kwargs):
"""factory method to create a DB proxy
Accepts all arguments of psycopg2.connect; only some specific
possibilities are reported below.
Args:
connstring: libpq2 connection string
"""
conn = psycopg2.connect(*args, **kwargs)
return cls(conn)
@classmethod
def from_pool(cls, pool):
return cls(pool.getconn(), pool=pool)
def _cursor(self, cur_arg):
"""get a cursor: from cur_arg if given, or a fresh one otherwise
meant to avoid boilerplate if/then/else in methods that proxy stored
procedures
"""
if cur_arg is not None:
return cur_arg
# elif self.cur is not None:
# return self.cur
else:
return self.conn.cursor()
def __init__(self, conn, pool=None):
"""create a DB proxy
Args:
conn: psycopg2 connection to the SWH DB
pool: psycopg2 pool of connections
"""
self.conn = conn
self.pool = pool
def __del__(self):
if self.pool:
self.pool.putconn(self.conn)
@contextmanager
def transaction(self):
"""context manager to execute within a DB transaction
Yields:
a psycopg2 cursor
"""
with self.conn.cursor() as cur:
try:
yield cur
self.conn.commit()
except Exception:
if not self.conn.closed:
self.conn.rollback()
raise
def copy_to(self, items, tblname, columns, cur=None, item_cb=None):
"""Copy items' entries to table tblname with columns information.
Args:
items (dict): dictionary of data to copy over tblname
tblname (str): Destination table's name
columns ([str]): keys to access data in items and also the
column names in the destination table.
item_cb (fn): optional function to apply to items's entry
"""
def escape(data):
if data is None:
return ''
if isinstance(data, bytes):
return '\\x%s' % binascii.hexlify(data).decode('ascii')
elif isinstance(data, str):
return '"%s"' % data.replace('"', '""')
elif isinstance(data, datetime.datetime):
# We escape twice to make sure the string generated by
# isoformat gets escaped
return escape(data.isoformat())
elif isinstance(data, dict):
return escape(json.dumps(data))
elif isinstance(data, list):
return escape("{%s}" % ','.join(escape(d) for d in data))
elif isinstance(data, psycopg2.extras.Range):
# We escape twice here too, so that we make sure
# everything gets passed to copy properly
return escape(
'%s%s,%s%s' % (
'[' if data.lower_inc else '(',
'-infinity' if data.lower_inf else escape(data.lower),
'infinity' if data.upper_inf else escape(data.upper),
']' if data.upper_inc else ')',
)
)
elif isinstance(data, enum.IntEnum):
return escape(int(data))
else:
# We don't escape here to make sure we pass literals properly
return str(data)
read_file, write_file = os.pipe()
def writer():
cursor = self._cursor(cur)
with open(read_file, 'r') as f:
cursor.copy_expert('COPY %s (%s) FROM STDIN CSV' % (
tblname, ', '.join(columns)), f)
write_thread = threading.Thread(target=writer)
write_thread.start()
try:
with open(write_file, 'w') as f:
for d in items:
if item_cb is not None:
item_cb(d)
line = [escape(d.get(k)) for k in columns]
f.write(','.join(line))
f.write('\n')
finally:
# No problem bubbling up exceptions, but we still need to make sure
# we finish copying, even though we're probably going to cancel the
# transaction.
write_thread.join()
def mktemp(self, tblname, cur=None):
self._cursor(cur).execute('SELECT swh_mktemp(%s)', (tblname,))
class Db(BaseDb):
"""Proxy to the SWH DB, with wrappers around stored procedures
"""
def mktemp_dir_entry(self, entry_type, cur=None):
self._cursor(cur).execute('SELECT swh_mktemp_dir_entry(%s)',
(('directory_entry_%s' % entry_type),))
@stored_procedure('swh_mktemp_revision')
def mktemp_revision(self, cur=None): pass
@stored_procedure('swh_mktemp_release')
def mktemp_release(self, cur=None): pass
@stored_procedure('swh_mktemp_snapshot_branch')
def mktemp_snapshot_branch(self, cur=None): pass
def register_listener(self, notify_queue, cur=None):
"""Register a listener for NOTIFY queue `notify_queue`"""
self._cursor(cur).execute("LISTEN %s" % notify_queue)
def listen_notifies(self, timeout):
"""Listen to notifications for `timeout` seconds"""
if select.select([self.conn], [], [], timeout) == ([], [], []):
return
else:
self.conn.poll()
while self.conn.notifies:
yield self.conn.notifies.pop(0)
@stored_procedure('swh_content_add')
def content_add_from_temp(self, cur=None): pass
@stored_procedure('swh_directory_add')
def directory_add_from_temp(self, cur=None): pass
@stored_procedure('swh_skipped_content_add')
def skipped_content_add_from_temp(self, cur=None): pass
@stored_procedure('swh_revision_add')
def revision_add_from_temp(self, cur=None): pass
@stored_procedure('swh_release_add')
def release_add_from_temp(self, cur=None): pass
def content_update_from_temp(self, keys_to_update, cur=None):
cur = self._cursor(cur)
cur.execute("""select swh_content_update(ARRAY[%s] :: text[])""" %
keys_to_update)
content_get_metadata_keys = [
'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', 'status']
skipped_content_keys = [
'sha1', 'sha1_git', 'sha256', 'blake2s256',
'length', 'reason', 'status', 'origin']
def content_get_metadata_from_sha1s(self, sha1s, cur=None):
cur = self._cursor(cur)
yield from execute_values_to_bytes(
cur, """
select t.sha1, %s from (values %%s) as t (sha1)
left join content using (sha1)
""" % ', '.join(self.content_get_metadata_keys[1:]),
((sha1,) for sha1 in sha1s),
)
def content_get_range(self, start, end, limit=None, cur=None):
"""Retrieve contents within range [start, end].
"""
cur = self._cursor(cur)
query = """select %s from content
where %%s <= sha1 and sha1 <= %%s
order by sha1
limit %%s""" % ', '.join(self.content_get_metadata_keys)
cur.execute(query, (start, end, limit))
yield from cursor_to_bytes(cur)
content_hash_keys = ['sha1', 'sha1_git', 'sha256', 'blake2s256']
def content_missing_from_list(self, contents, cur=None):
cur = self._cursor(cur)
keys = ', '.join(self.content_hash_keys)
equality = ' AND '.join(
('t.%s = c.%s' % (key, key))
for key in self.content_hash_keys
)
yield from execute_values_to_bytes(
cur, """
SELECT %s
FROM (VALUES %%s) as t(%s)
WHERE NOT EXISTS (
SELECT 1 FROM content c
WHERE %s
)
""" % (keys, keys, equality),
(tuple(c[key] for key in self.content_hash_keys) for c in contents)
)
def content_missing_per_sha1(self, sha1s, cur=None):
cur = self._cursor(cur)
yield from execute_values_to_bytes(cur, """
SELECT t.sha1 FROM (VALUES %s) AS t(sha1)
WHERE NOT EXISTS (
SELECT 1 FROM content c WHERE c.sha1 = t.sha1
)""", ((sha1,) for sha1 in sha1s))
def skipped_content_missing_from_temp(self, cur=None):
cur = self._cursor(cur)
cur.execute("""SELECT sha1, sha1_git, sha256, blake2s256
FROM swh_skipped_content_missing()""")
yield from cursor_to_bytes(cur)
def snapshot_exists(self, snapshot_id, cur=None):
"""Check whether a snapshot with the given id exists"""
cur = self._cursor(cur)
cur.execute("""SELECT 1 FROM snapshot where id=%s""", (snapshot_id,))
return bool(cur.fetchone())
def snapshot_add(self, origin, visit, snapshot_id, cur=None):
"""Add a snapshot for origin/visit from the temporary table"""
cur = self._cursor(cur)
cur.execute("""SELECT swh_snapshot_add(%s, %s, %s)""",
(origin, visit, snapshot_id))
snapshot_count_cols = ['target_type', 'count']
def snapshot_count_branches(self, snapshot_id, cur=None):
cur = self._cursor(cur)
query = """\
SELECT %s FROM swh_snapshot_count_branches(%%s)
""" % ', '.join(self.snapshot_count_cols)
cur.execute(query, (snapshot_id,))
yield from cursor_to_bytes(cur)
snapshot_get_cols = ['snapshot_id', 'name', 'target', 'target_type']
def snapshot_get_by_id(self, snapshot_id, branches_from=b'',
branches_count=None, target_types=None,
cur=None):
cur = self._cursor(cur)
query = """\
SELECT %s
FROM swh_snapshot_get_by_id(%%s, %%s, %%s, %%s :: snapshot_target[])
""" % ', '.join(self.snapshot_get_cols)
cur.execute(query, (snapshot_id, branches_from, branches_count,
target_types))
yield from cursor_to_bytes(cur)
def snapshot_get_by_origin_visit(self, origin_id, visit_id, cur=None):
cur = self._cursor(cur)
query = """\
SELECT swh_snapshot_get_by_origin_visit(%s, %s)
"""
cur.execute(query, (origin_id, visit_id))
ret = cur.fetchone()
if ret:
return line_to_bytes(ret)[0]
content_find_cols = ['sha1', 'sha1_git', 'sha256', 'blake2s256', 'length',
'ctime', 'status']
def content_find(self, sha1=None, sha1_git=None, sha256=None,
blake2s256=None, cur=None):
"""Find the content optionally on a combination of the following
checksums sha1, sha1_git, sha256 or blake2s256.
Args:
sha1: sha1 content
git_sha1: the sha1 computed `a la git` sha1 of the content
sha256: sha256 content
blake2s256: blake2s256 content
Returns:
The tuple (sha1, sha1_git, sha256, blake2s256) if found or None.
"""
cur = self._cursor(cur)
cur.execute("""SELECT %s
FROM swh_content_find(%%s, %%s, %%s, %%s)
LIMIT 1""" % ','.join(self.content_find_cols),
(sha1, sha1_git, sha256, blake2s256))
content = line_to_bytes(cur.fetchone())
if set(content) == {None}:
return None
else:
return content
def directory_missing_from_list(self, directories, cur=None):
cur = self._cursor(cur)
yield from execute_values_to_bytes(
cur, """
SELECT id FROM (VALUES %s) as t(id)
WHERE NOT EXISTS (
SELECT 1 FROM directory d WHERE d.id = t.id
)
""", ((id,) for id in directories))
directory_ls_cols = ['dir_id', 'type', 'target', 'name', 'perms',
'status', 'sha1', 'sha1_git', 'sha256', 'length']
def directory_walk_one(self, directory, cur=None):
cur = self._cursor(cur)
cols = ', '.join(self.directory_ls_cols)
query = 'SELECT %s FROM swh_directory_walk_one(%%s)' % cols
cur.execute(query, (directory,))
yield from cursor_to_bytes(cur)
def directory_walk(self, directory, cur=None):
cur = self._cursor(cur)
cols = ', '.join(self.directory_ls_cols)
query = 'SELECT %s FROM swh_directory_walk(%%s)' % cols
cur.execute(query, (directory,))
yield from cursor_to_bytes(cur)
def directory_entry_get_by_path(self, directory, paths, cur=None):
"""Retrieve a directory entry by path.
"""
cur = self._cursor(cur)
cols = ', '.join(self.directory_ls_cols)
query = (
'SELECT %s FROM swh_find_directory_entry_by_path(%%s, %%s)' % cols)
cur.execute(query, (directory, paths))
data = cur.fetchone()
if set(data) == {None}:
return None
return line_to_bytes(data)
def revision_missing_from_list(self, revisions, cur=None):
cur = self._cursor(cur)
yield from execute_values_to_bytes(
cur, """
SELECT id FROM (VALUES %s) as t(id)
WHERE NOT EXISTS (
SELECT 1 FROM revision r WHERE r.id = t.id
)
""", ((id,) for id in revisions))
revision_add_cols = [
'id', 'date', 'date_offset', 'date_neg_utc_offset', 'committer_date',
'committer_date_offset', 'committer_date_neg_utc_offset', 'type',
'directory', 'message', 'author_fullname', 'author_name',
'author_email', 'committer_fullname', 'committer_name',
'committer_email', 'metadata', 'synthetic',
]
revision_get_cols = revision_add_cols + [
'author_id', 'committer_id', 'parents']
def origin_visit_add(self, origin, ts, cur=None):
"""Add a new origin_visit for origin origin at timestamp ts with
status 'ongoing'.
Args:
origin: origin concerned by the visit
ts: the date of the visit
Returns:
The new visit index step for that origin
"""
cur = self._cursor(cur)
self._cursor(cur).execute('SELECT swh_origin_visit_add(%s, %s)',
(origin, ts))
return cur.fetchone()[0]
def origin_visit_update(self, origin, visit_id, status,
metadata, cur=None):
"""Update origin_visit's status."""
cur = self._cursor(cur)
update = """UPDATE origin_visit
SET status=%s, metadata=%s
WHERE origin=%s AND visit=%s"""
cur.execute(update, (status, jsonize(metadata), origin, visit_id))
origin_visit_get_cols = ['origin', 'visit', 'date', 'status', 'metadata',
'snapshot']
def origin_visit_get_all(self, origin_id,
last_visit=None, limit=None, cur=None):
"""Retrieve all visits for origin with id origin_id.
Args:
origin_id: The occurrence's origin
Yields:
The occurrence's history visits
"""
cur = self._cursor(cur)
if last_visit:
extra_condition = 'and visit > %s'
args = (origin_id, last_visit, limit)
else:
extra_condition = ''
args = (origin_id, limit)
query = """\
SELECT %s,
(select id from snapshot where object_id = snapshot_id) as snapshot
FROM origin_visit
WHERE origin=%%s %s
order by visit asc
limit %%s""" % (
', '.join(self.origin_visit_get_cols[:-1]), extra_condition
)
cur.execute(query, args)
yield from cursor_to_bytes(cur)
def origin_visit_get(self, origin_id, visit_id, cur=None):
"""Retrieve information on visit visit_id of origin origin_id.
Args:
origin_id: the origin concerned
visit_id: The visit step for that origin
Returns:
The origin_visit information
"""
cur = self._cursor(cur)
query = """\
SELECT %s,
(select id from snapshot where object_id = snapshot_id)
as snapshot
FROM origin_visit
WHERE origin = %%s AND visit = %%s
""" % (', '.join(self.origin_visit_get_cols[:-1]))
cur.execute(query, (origin_id, visit_id))
r = cur.fetchall()
if not r:
return None
return line_to_bytes(r[0])
def origin_visit_get_latest_snapshot(self, origin_id,
allowed_statuses=None,
cur=None):
"""Retrieve the most recent origin_visit which references a snapshot
Args:
origin_id: the origin concerned
allowed_statuses: the visit statuses allowed for the returned visit
Returns:
The origin_visit information, or None if no visit matches.
"""
cur = self._cursor(cur)
extra_clause = ""
if allowed_statuses:
extra_clause = cur.mogrify("AND status IN %s",
(tuple(allowed_statuses),)).decode()
query = """\
SELECT %s,
(select id from snapshot where object_id = snapshot_id)
as snapshot
FROM origin_visit
WHERE
origin = %%s AND snapshot_id is not null %s
ORDER BY date DESC, visit DESC
LIMIT 1
""" % (', '.join(self.origin_visit_get_cols[:-1]), extra_clause)
cur.execute(query, (origin_id,))
r = cur.fetchone()
if not r:
return None
return line_to_bytes(r)
@staticmethod
def mangle_query_key(key, main_table):
if key == 'id':
return 't.id'
if key == 'parents':
return '''
ARRAY(
SELECT rh.parent_id::bytea
FROM revision_history rh
WHERE rh.id = t.id
ORDER BY rh.parent_rank
)'''
if '_' not in key:
return '%s.%s' % (main_table, key)
head, tail = key.split('_', 1)
if (head in ('author', 'committer')
and tail in ('name', 'email', 'id', 'fullname')):
return '%s.%s' % (head, tail)
return '%s.%s' % (main_table, key)
def revision_get_from_list(self, revisions, cur=None):
cur = self._cursor(cur)
query_keys = ', '.join(
self.mangle_query_key(k, 'revision')
for k in self.revision_get_cols
)
yield from execute_values_to_bytes(
cur, """
SELECT %s FROM (VALUES %%s) as t(id)
LEFT JOIN revision ON t.id = revision.id
LEFT JOIN person author ON revision.author = author.id
LEFT JOIN person committer ON revision.committer = committer.id
""" % query_keys,
((id,) for id in revisions))
def revision_log(self, root_revisions, limit=None, cur=None):
cur = self._cursor(cur)
query = """SELECT %s
FROM swh_revision_log(%%s, %%s)
""" % ', '.join(self.revision_get_cols)
cur.execute(query, (root_revisions, limit))
yield from cursor_to_bytes(cur)
revision_shortlog_cols = ['id', 'parents']
def revision_shortlog(self, root_revisions, limit=None, cur=None):
cur = self._cursor(cur)
query = """SELECT %s
FROM swh_revision_list(%%s, %%s)
""" % ', '.join(self.revision_shortlog_cols)
cur.execute(query, (root_revisions, limit))
yield from cursor_to_bytes(cur)
def release_missing_from_list(self, releases, cur=None):
cur = self._cursor(cur)
yield from execute_values_to_bytes(
cur, """
SELECT id FROM (VALUES %s) as t(id)
WHERE NOT EXISTS (
SELECT 1 FROM release r WHERE r.id = t.id
)
""", ((id,) for id in releases))
object_find_by_sha1_git_cols = ['sha1_git', 'type', 'id', 'object_id']
def object_find_by_sha1_git(self, ids, cur=None):
cur = self._cursor(cur)
yield from execute_values_to_bytes(
cur, """
WITH t (id) AS (VALUES %s),
known_objects as ((
select
id as sha1_git,
'release'::object_type as type,
id,
object_id
from release r
where exists (select 1 from t where t.id = r.id)
) union all (
select
id as sha1_git,
'revision'::object_type as type,
id,
object_id
from revision r
where exists (select 1 from t where t.id = r.id)
) union all (
select
id as sha1_git,
'directory'::object_type as type,
id,
object_id
from directory d
where exists (select 1 from t where t.id = d.id)
) union all (
select
sha1_git as sha1_git,
'content'::object_type as type,
sha1 as id,
object_id
from content c
where exists (select 1 from t where t.id = c.sha1_git)
))
select t.id as sha1_git, k.type, k.id, k.object_id
from t
left join known_objects k on t.id = k.sha1_git
""",
((id,) for id in ids)
)
def stat_counters(self, cur=None):
cur = self._cursor(cur)
cur.execute('SELECT * FROM swh_stat_counters()')
yield from cur
fetch_history_cols = ['origin', 'date', 'status', 'result', 'stdout',
'stderr', 'duration']
def create_fetch_history(self, fetch_history, cur=None):
"""Create a fetch_history entry with the data in fetch_history"""
cur = self._cursor(cur)
query = '''INSERT INTO fetch_history (%s)
VALUES (%s) RETURNING id''' % (
','.join(self.fetch_history_cols),
','.join(['%s'] * len(self.fetch_history_cols))
)
cur.execute(query, [fetch_history.get(col) for col in
self.fetch_history_cols])
return cur.fetchone()[0]
def get_fetch_history(self, fetch_history_id, cur=None):
"""Get a fetch_history entry with the given id"""
cur = self._cursor(cur)
query = '''SELECT %s FROM fetch_history WHERE id=%%s''' % (
', '.join(self.fetch_history_cols),
)
cur.execute(query, (fetch_history_id,))
data = cur.fetchone()
if not data:
return None
ret = {'id': fetch_history_id}
for i, col in enumerate(self.fetch_history_cols):
ret[col] = data[i]
return ret
def update_fetch_history(self, fetch_history, cur=None):
"""Update the fetch_history entry from the data in fetch_history"""
cur = self._cursor(cur)
query = '''UPDATE fetch_history
SET %s
WHERE id=%%s''' % (
','.join('%s=%%s' % col for col in self.fetch_history_cols)
)
cur.execute(query, [jsonize(fetch_history.get(col)) for col in
self.fetch_history_cols + ['id']])
def origin_add(self, type, url, cur=None):
"""Insert a new origin and return the new identifier."""
insert = """INSERT INTO origin (type, url) values (%s, %s)
RETURNING id"""
cur.execute(insert, (type, url))
return cur.fetchone()[0]
origin_cols = ['id', 'type', 'url']
def origin_get_with(self, type, url, cur=None):
"""Retrieve the origin id from its type and url if found."""
cur = self._cursor(cur)
query = """SELECT %s
FROM origin
WHERE type=%%s AND url=%%s
""" % ','.join(self.origin_cols)
cur.execute(query, (type, url))
data = cur.fetchone()
if data:
return line_to_bytes(data)
return None
def origin_get(self, id, cur=None):
"""Retrieve the origin per its identifier.
"""
cur = self._cursor(cur)
query = """SELECT %s
FROM origin WHERE id=%%s
""" % ','.join(self.origin_cols)
cur.execute(query, (id,))
data = cur.fetchone()
if data:
return line_to_bytes(data)
return None
def origin_search(self, url_pattern, offset=0, limit=50,
regexp=False, with_visit=False, cur=None):
"""Search for origins whose urls contain a provided string pattern
or match a provided regular expression.
The search is performed in a case insensitive way.
Args:
url_pattern (str): the string pattern to search for in origin urls
offset (int): number of found origins to skip before returning
results
limit (int): the maximum number of found origins to return
regexp (bool): if True, consider the provided pattern as a regular
expression and returns origins whose urls match it
with_visit (bool): if True, filter out origins with no visit
"""
cur = self._cursor(cur)
origin_cols = ','.join(self.origin_cols)
query = """SELECT %s
FROM origin
WHERE """
if with_visit:
query += """
EXISTS (SELECT 1 from origin_visit WHERE origin=origin.id)
AND """
query += """
url %s %%s
ORDER BY id
OFFSET %%s LIMIT %%s"""
if not regexp:
query = query % (origin_cols, 'ILIKE')
query_params = ('%'+url_pattern+'%', offset, limit)
else:
query = query % (origin_cols, '~*')
query_params = (url_pattern, offset, limit)
cur.execute(query, query_params)
yield from cursor_to_bytes(cur)
person_cols = ['fullname', 'name', 'email']
person_get_cols = person_cols + ['id']
def person_add(self, person, cur=None):
"""Add a person identified by its name and email.
Returns:
The new person's id
"""
cur = self._cursor(cur)
query_new_person = '''\
INSERT INTO person(%s)
VALUES (%s)
RETURNING id''' % (
', '.join(self.person_cols),
', '.join('%s' for i in range(len(self.person_cols)))
)
cur.execute(query_new_person,
[person[col] for col in self.person_cols])
return cur.fetchone()[0]
def person_get(self, ids, cur=None):
"""Retrieve the persons identified by the list of ids.
"""
cur = self._cursor(cur)
query = """SELECT %s
FROM person
WHERE id IN %%s""" % ', '.join(self.person_get_cols)
cur.execute(query, (tuple(ids),))
yield from cursor_to_bytes(cur)
release_add_cols = [
'id', 'target', 'target_type', 'date', 'date_offset',
'date_neg_utc_offset', 'name', 'comment', 'synthetic',
'author_fullname', 'author_name', 'author_email',
]
release_get_cols = release_add_cols + ['author_id']
def release_get_from_list(self, releases, cur=None):
cur = self._cursor(cur)
query_keys = ', '.join(
self.mangle_query_key(k, 'release')
for k in self.release_get_cols
)
yield from execute_values_to_bytes(
cur, """
SELECT %s FROM (VALUES %%s) as t(id)
LEFT JOIN release ON t.id = release.id
LEFT JOIN person author ON release.author = author.id
""" % query_keys,
((id,) for id in releases))
def origin_metadata_add(self, origin, ts, provider, tool,
metadata, cur=None):
""" Add an origin_metadata for the origin at ts with provider, tool and
metadata.
Args:
origin (int): the origin's id for which the metadata is added
ts (datetime): time when the metadata was found
provider (int): the metadata provider identifier
tool (int): the tool's identifier used to extract metadata
metadata (jsonb): the metadata retrieved at the time and location
Returns:
id (int): the origin_metadata unique id
"""
cur = self._cursor(cur)
insert = """INSERT INTO origin_metadata (origin_id, discovery_date,
provider_id, tool_id, metadata) values (%s, %s, %s, %s, %s)
RETURNING id"""
cur.execute(insert, (origin, ts, provider, tool, jsonize(metadata)))
return cur.fetchone()[0]
origin_metadata_get_cols = ['origin_id', 'discovery_date',
'tool_id', 'metadata', 'provider_id',
'provider_name', 'provider_type',
'provider_url']
def origin_metadata_get_by(self, origin_id, provider_type=None, cur=None):
"""Retrieve all origin_metadata entries for one origin_id
"""
cur = self._cursor(cur)
if not provider_type:
query = '''SELECT %s
FROM swh_origin_metadata_get_by_origin(
%%s)''' % (','.join(
self.origin_metadata_get_cols))
cur.execute(query, (origin_id, ))
else:
query = '''SELECT %s
FROM swh_origin_metadata_get_by_provider_type(
%%s, %%s)''' % (','.join(
self.origin_metadata_get_cols))
cur.execute(query, (origin_id, provider_type))
yield from cursor_to_bytes(cur)
tool_cols = ['id', 'name', 'version', 'configuration']
@stored_procedure('swh_mktemp_tool')
def mktemp_tool(self, cur=None):
pass
def tool_add_from_temp(self, cur=None):
cur = self._cursor(cur)
cur.execute("SELECT %s from swh_tool_add()" % (
','.join(self.tool_cols), ))
yield from cursor_to_bytes(cur)
def tool_get(self, name, version, configuration, cur=None):
cur = self._cursor(cur)
cur.execute('''select %s
from tool
where name=%%s and
version=%%s and
configuration=%%s''' % (
','.join(self.tool_cols)),
(name, version, configuration))
data = cur.fetchone()
if not data:
return None
return line_to_bytes(data)
metadata_provider_cols = ['id', 'provider_name', 'provider_type',
'provider_url', 'metadata']
def metadata_provider_add(self, provider_name, provider_type,
provider_url, metadata, cur=None):
"""Insert a new provider and return the new identifier."""
cur = self._cursor(cur)
insert = """INSERT INTO metadata_provider (provider_name, provider_type,
provider_url, metadata) values (%s, %s, %s, %s)
RETURNING id"""
cur.execute(insert, (provider_name, provider_type, provider_url,
jsonize(metadata)))
return cur.fetchone()[0]
def metadata_provider_get(self, provider_id, cur=None):
cur = self._cursor(cur)
cur.execute('''select %s
from metadata_provider
- where provider_id=%%s ''' % (
+ where id=%%s ''' % (
','.join(self.metadata_provider_cols)),
(provider_id, ))
data = cur.fetchone()
if not data:
return None
return line_to_bytes(data)
def metadata_provider_get_by(self, provider_name, provider_url,
cur=None):
cur = self._cursor(cur)
cur.execute('''select %s
from metadata_provider
where provider_name=%%s and
provider_url=%%s''' % (
','.join(self.metadata_provider_cols)),
(provider_name, provider_url))
data = cur.fetchone()
if not data:
return None
return line_to_bytes(data)
diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py
index e04ed395..7a0c9a02 100644
--- a/swh/storage/tests/test_storage.py
+++ b/swh/storage/tests/test_storage.py
@@ -1,2161 +1,2181 @@
# Copyright (C) 2015-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import copy
import datetime
import unittest
from collections import defaultdict
from unittest.mock import Mock, patch
import pytest
from hypothesis import given, strategies
from swh.model import from_disk, identifiers
from swh.model.hashutil import hash_to_bytes
from swh.storage.tests.storage_testing import StorageTestFixture
from swh.storage import HashCollision
from .generate_data_test import gen_contents
@pytest.mark.db
class StorageTestDbFixture(StorageTestFixture):
def setUp(self):
super().setUp()
db = self.test_db[self.TEST_DB_NAME]
self.conn = db.conn
self.cursor = db.cursor
self.maxDiff = None
def tearDown(self):
self.reset_storage_tables()
super().tearDown()
class TestStorageData:
def setUp(self):
super().setUp()
self.cont = {
'data': b'42\n',
'length': 3,
'sha1': hash_to_bytes(
'34973274ccef6ab4dfaaf86599792fa9c3fe4689'),
'sha1_git': hash_to_bytes(
'd81cc0710eb6cf9efd5b920a8453e1e07157b6cd'),
'sha256': hash_to_bytes(
'673650f936cb3b0a2f93ce09d81be107'
'48b1b203c19e8176b4eefc1964a0cf3a'),
'blake2s256': hash_to_bytes('d5fe1939576527e42cfd76a9455a2'
'432fe7f56669564577dd93c4280e76d661d'),
'status': 'visible',
}
self.cont2 = {
'data': b'4242\n',
'length': 5,
'sha1': hash_to_bytes(
'61c2b3a30496d329e21af70dd2d7e097046d07b7'),
'sha1_git': hash_to_bytes(
'36fade77193cb6d2bd826161a0979d64c28ab4fa'),
'sha256': hash_to_bytes(
'859f0b154fdb2d630f45e1ecae4a8629'
'15435e663248bb8461d914696fc047cd'),
'blake2s256': hash_to_bytes('849c20fad132b7c2d62c15de310adfe87be'
'94a379941bed295e8141c6219810d'),
'status': 'visible',
}
self.cont3 = {
'data': b'424242\n',
'length': 7,
'sha1': hash_to_bytes(
'3e21cc4942a4234c9e5edd8a9cacd1670fe59f13'),
'sha1_git': hash_to_bytes(
'c932c7649c6dfa4b82327d121215116909eb3bea'),
'sha256': hash_to_bytes(
'92fb72daf8c6818288a35137b72155f5'
'07e5de8d892712ab96277aaed8cf8a36'),
'blake2s256': hash_to_bytes('76d0346f44e5a27f6bafdd9c2befd304af'
'f83780f93121d801ab6a1d4769db11'),
'status': 'visible',
}
self.missing_cont = {
'data': b'missing\n',
'length': 8,
'sha1': hash_to_bytes(
'f9c24e2abb82063a3ba2c44efd2d3c797f28ac90'),
'sha1_git': hash_to_bytes(
'33e45d56f88993aae6a0198013efa80716fd8919'),
'sha256': hash_to_bytes(
'6bbd052ab054ef222c1c87be60cd191a'
'ddedd24cc882d1f5f7f7be61dc61bb3a'),
'blake2s256': hash_to_bytes('306856b8fd879edb7b6f1aeaaf8db9bbecc9'
'93cd7f776c333ac3a782fa5c6eba'),
'status': 'absent',
}
self.skipped_cont = {
'length': 1024 * 1024 * 200,
'sha1_git': hash_to_bytes(
'33e45d56f88993aae6a0198013efa80716fd8920'),
'sha1': hash_to_bytes(
'43e45d56f88993aae6a0198013efa80716fd8920'),
'sha256': hash_to_bytes(
'7bbd052ab054ef222c1c87be60cd191a'
'ddedd24cc882d1f5f7f7be61dc61bb3a'),
'blake2s256': hash_to_bytes(
'ade18b1adecb33f891ca36664da676e1'
'2c772cc193778aac9a137b8dc5834b9b'),
'reason': 'Content too long',
'status': 'absent',
}
self.skipped_cont2 = {
'length': 1024 * 1024 * 300,
'sha1_git': hash_to_bytes(
'44e45d56f88993aae6a0198013efa80716fd8921'),
'sha1': hash_to_bytes(
'54e45d56f88993aae6a0198013efa80716fd8920'),
'sha256': hash_to_bytes(
'8cbd052ab054ef222c1c87be60cd191a'
'ddedd24cc882d1f5f7f7be61dc61bb3a'),
'blake2s256': hash_to_bytes(
'9ce18b1adecb33f891ca36664da676e1'
'2c772cc193778aac9a137b8dc5834b9b'),
'reason': 'Content too long',
'status': 'absent',
}
self.dir = {
'id': b'4\x013\x422\x531\x000\xf51\xe62\xa73\xff7\xc3\xa90',
'entries': [
{
'name': b'foo',
'type': 'file',
'target': self.cont['sha1_git'],
'perms': from_disk.DentryPerms.content,
},
{
'name': b'bar\xc3',
'type': 'dir',
'target': b'12345678901234567890',
'perms': from_disk.DentryPerms.directory,
},
],
}
self.dir2 = {
'id': b'4\x013\x422\x531\x000\xf51\xe62\xa73\xff7\xc3\xa95',
'entries': [
{
'name': b'oof',
'type': 'file',
'target': self.cont2['sha1_git'],
'perms': from_disk.DentryPerms.content,
}
],
}
self.dir3 = {
'id': hash_to_bytes('33e45d56f88993aae6a0198013efa80716fd8921'),
'entries': [
{
'name': b'foo',
'type': 'file',
'target': self.cont['sha1_git'],
'perms': from_disk.DentryPerms.content,
},
{
'name': b'bar',
'type': 'dir',
'target': b'12345678901234560000',
'perms': from_disk.DentryPerms.directory,
},
{
'name': b'hello',
'type': 'file',
'target': b'12345678901234567890',
'perms': from_disk.DentryPerms.content,
},
],
}
self.minus_offset = datetime.timezone(datetime.timedelta(minutes=-120))
self.plus_offset = datetime.timezone(datetime.timedelta(minutes=120))
self.revision = {
'id': b'56789012345678901234',
'message': b'hello',
'author': {
'name': b'Nicolas Dandrimont',
'email': b'nicolas@example.com',
'fullname': b'Nicolas Dandrimont <nicolas@example.com> ',
},
'date': {
'timestamp': 1234567890,
'offset': 120,
'negative_utc': None,
},
'committer': {
'name': b'St\xc3fano Zacchiroli',
'email': b'stefano@example.com',
'fullname': b'St\xc3fano Zacchiroli <stefano@example.com>'
},
'committer_date': {
'timestamp': 1123456789,
'offset': 0,
'negative_utc': True,
},
'parents': [b'01234567890123456789', b'23434512345123456789'],
'type': 'git',
'directory': self.dir['id'],
'metadata': {
'checksums': {
'sha1': 'tarball-sha1',
'sha256': 'tarball-sha256',
},
'signed-off-by': 'some-dude',
'extra_headers': [
['gpgsig', b'test123'],
['mergetags', [b'foo\\bar', b'\x22\xaf\x89\x80\x01\x00']],
],
},
'synthetic': True
}
self.revision2 = {
'id': b'87659012345678904321',
'message': b'hello again',
'author': {
'name': b'Roberto Dicosmo',
'email': b'roberto@example.com',
'fullname': b'Roberto Dicosmo <roberto@example.com>',
},
'date': {
'timestamp': {
'seconds': 1234567843,
'microseconds': 220000,
},
'offset': -720,
'negative_utc': None,
},
'committer': {
'name': b'tony',
'email': b'ar@dumont.fr',
'fullname': b'tony <ar@dumont.fr>',
},
'committer_date': {
'timestamp': 1123456789,
'offset': 0,
'negative_utc': False,
},
'parents': [b'01234567890123456789'],
'type': 'git',
'directory': self.dir2['id'],
'metadata': None,
'synthetic': False
}
self.revision3 = {
'id': hash_to_bytes('7026b7c1a2af56521e951c01ed20f255fa054238'),
'message': b'a simple revision with no parents this time',
'author': {
'name': b'Roberto Dicosmo',
'email': b'roberto@example.com',
'fullname': b'Roberto Dicosmo <roberto@example.com>',
},
'date': {
'timestamp': {
'seconds': 1234567843,
'microseconds': 220000,
},
'offset': -720,
'negative_utc': None,
},
'committer': {
'name': b'tony',
'email': b'ar@dumont.fr',
'fullname': b'tony <ar@dumont.fr>',
},
'committer_date': {
'timestamp': 1127351742,
'offset': 0,
'negative_utc': False,
},
'parents': [],
'type': 'git',
'directory': self.dir2['id'],
'metadata': None,
'synthetic': True
}
self.revision4 = {
'id': hash_to_bytes('368a48fe15b7db2383775f97c6b247011b3f14f4'),
'message': b'parent of self.revision2',
'author': {
'name': b'me',
'email': b'me@soft.heri',
'fullname': b'me <me@soft.heri>',
},
'date': {
'timestamp': {
'seconds': 1244567843,
'microseconds': 220000,
},
'offset': -720,
'negative_utc': None,
},
'committer': {
'name': b'committer-dude',
'email': b'committer@dude.com',
'fullname': b'committer-dude <committer@dude.com>',
},
'committer_date': {
'timestamp': {
'seconds': 1244567843,
'microseconds': 220000,
},
'offset': -720,
'negative_utc': None,
},
'parents': [self.revision3['id']],
'type': 'git',
'directory': self.dir['id'],
'metadata': None,
'synthetic': False
}
self.origin = {
'url': 'file:///dev/null',
'type': 'git',
}
self.origin2 = {
'url': 'file:///dev/zero',
'type': 'git',
}
self.provider = {
'name': 'hal',
'type': 'deposit-client',
'url': 'http:///hal/inria',
'metadata': {
'location': 'France'
}
}
self.metadata_tool = {
'name': 'swh-deposit',
'version': '0.0.1',
'configuration': {
'sword_version': '2'
}
}
self.origin_metadata = {
'origin': self.origin,
'discovery_date': datetime.datetime(2015, 1, 1, 23, 0, 0,
tzinfo=datetime.timezone.utc),
'provider': self.provider,
'tool': 'swh-deposit',
'metadata': {
'name': 'test_origin_metadata',
'version': '0.0.1'
}
}
self.origin_metadata2 = {
'origin': self.origin,
'discovery_date': datetime.datetime(2017, 1, 1, 23, 0, 0,
tzinfo=datetime.timezone.utc),
'provider': self.provider,
'tool': 'swh-deposit',
'metadata': {
'name': 'test_origin_metadata',
'version': '0.0.1'
}
}
self.date_visit1 = datetime.datetime(2015, 1, 1, 23, 0, 0,
tzinfo=datetime.timezone.utc)
self.date_visit2 = datetime.datetime(2017, 1, 1, 23, 0, 0,
tzinfo=datetime.timezone.utc)
self.date_visit3 = datetime.datetime(2018, 1, 1, 23, 0, 0,
tzinfo=datetime.timezone.utc)
self.release = {
'id': b'87659012345678901234',
'name': b'v0.0.1',
'author': {
'name': b'olasd',
'email': b'nic@olasd.fr',
'fullname': b'olasd <nic@olasd.fr>',
},
'date': {
'timestamp': 1234567890,
'offset': 42,
'negative_utc': None,
},
'target': b'43210987654321098765',
'target_type': 'revision',
'message': b'synthetic release',
'synthetic': True,
}
self.release2 = {
'id': b'56789012348765901234',
'name': b'v0.0.2',
'author': {
'name': b'tony',
'email': b'ar@dumont.fr',
'fullname': b'tony <ar@dumont.fr>',
},
'date': {
'timestamp': 1634366813,
'offset': -120,
'negative_utc': None,
},
'target': b'432109\xa9765432\xc309\x00765',
'target_type': 'revision',
'message': b'v0.0.2\nMisc performance improvements + bug fixes',
'synthetic': False
}
self.release3 = {
'id': b'87659012345678904321',
'name': b'v0.0.2',
'author': {
'name': b'tony',
'email': b'tony@ardumont.fr',
'fullname': b'tony <tony@ardumont.fr>',
},
'date': {
'timestamp': 1634336813,
'offset': 0,
'negative_utc': False,
},
'target': self.revision2['id'],
'target_type': 'revision',
'message': b'yet another synthetic release',
'synthetic': True,
}
self.fetch_history_date = datetime.datetime(
2015, 1, 2, 21, 0, 0,
tzinfo=datetime.timezone.utc)
self.fetch_history_end = datetime.datetime(
2015, 1, 2, 23, 0, 0,
tzinfo=datetime.timezone.utc)
self.fetch_history_duration = (self.fetch_history_end -
self.fetch_history_date)
self.fetch_history_data = {
'status': True,
'result': {'foo': 'bar'},
'stdout': 'blabla',
'stderr': 'blablabla',
}
self.snapshot = {
'id': hash_to_bytes('2498dbf535f882bc7f9a18fb16c9ad27fda7bab7'),
'branches': {
b'master': {
'target': self.revision['id'],
'target_type': 'revision',
},
},
'next_branch': None
}
self.empty_snapshot = {
'id': hash_to_bytes('1a8893e6a86f444e8be8e7bda6cb34fb1735a00e'),
'branches': {},
'next_branch': None
}
self.complete_snapshot = {
'id': hash_to_bytes('6e65b86363953b780d92b0a928f3e8fcdd10db36'),
'branches': {
b'directory': {
'target': hash_to_bytes(
'1bd0e65f7d2ff14ae994de17a1e7fe65111dcad8'),
'target_type': 'directory',
},
b'content': {
'target': hash_to_bytes(
'fe95a46679d128ff167b7c55df5d02356c5a1ae1'),
'target_type': 'content',
},
b'alias': {
'target': b'revision',
'target_type': 'alias',
},
b'revision': {
'target': hash_to_bytes(
'aafb16d69fd30ff58afdd69036a26047f3aebdc6'),
'target_type': 'revision',
},
b'release': {
'target': hash_to_bytes(
'7045404f3d1c54e6473c71bbb716529fbad4be24'),
'target_type': 'release',
},
b'snapshot': {
'target': hash_to_bytes(
'1a8893e6a86f444e8be8e7bda6cb34fb1735a00e'),
'target_type': 'snapshot',
},
b'dangling': None,
},
'next_branch': None
}
class CommonTestStorage(TestStorageData):
"""Base class for Storage testing.
This class is used as-is to test local storage (see TestLocalStorage
below) and remote storage (see TestRemoteStorage in
test_remote_storage.py.
We need to have the two classes inherit from this base class
separately to avoid nosetests running the tests from the base
class twice.
"""
@staticmethod
def normalize_entity(entity):
entity = copy.deepcopy(entity)
for key in ('date', 'committer_date'):
if key in entity:
entity[key] = identifiers.normalize_timestamp(entity[key])
return entity
def test_check_config(self):
self.assertTrue(self.storage.check_config(check_write=True))
self.assertTrue(self.storage.check_config(check_write=False))
def test_content_add(self):
cont = self.cont
self.storage.content_add([cont])
if hasattr(self.storage, 'objstorage'):
self.assertIn(cont['sha1'], self.storage.objstorage)
self.cursor.execute('SELECT sha1, sha1_git, sha256, length, status'
' FROM content WHERE sha1 = %s',
(cont['sha1'],))
datum = self.cursor.fetchone()
self.assertEqual(
(datum[0].tobytes(), datum[1].tobytes(), datum[2].tobytes(),
datum[3], datum[4]),
(cont['sha1'], cont['sha1_git'], cont['sha256'],
cont['length'], 'visible'))
def test_content_add_collision(self):
cont1 = self.cont
# create (corrupted) content with same sha1{,_git} but != sha256
cont1b = cont1.copy()
sha256_array = bytearray(cont1b['sha256'])
sha256_array[0] += 1
cont1b['sha256'] = bytes(sha256_array)
with self.assertRaises(HashCollision) as cm:
self.storage.content_add([cont1, cont1b])
self.assertIn(cm.exception.args[0], ['sha1', 'sha1_git', 'blake2s256'])
def test_skipped_content_add(self):
cont = self.skipped_cont.copy()
cont2 = self.skipped_cont2.copy()
cont2['blake2s256'] = None
self.storage.content_add([cont, cont, cont2])
self.cursor.execute('SELECT sha1, sha1_git, sha256, blake2s256, '
'length, status, reason '
'FROM skipped_content ORDER BY sha1_git')
datums = self.cursor.fetchall()
self.assertEqual(2, len(datums))
datum = datums[0]
self.assertEqual(
(datum[0].tobytes(), datum[1].tobytes(), datum[2].tobytes(),
datum[3].tobytes(), datum[4], datum[5], datum[6]),
(cont['sha1'], cont['sha1_git'], cont['sha256'],
cont['blake2s256'], cont['length'], 'absent',
'Content too long')
)
datum2 = datums[1]
self.assertEqual(
(datum2[0].tobytes(), datum2[1].tobytes(), datum2[2].tobytes(),
datum2[3], datum2[4], datum2[5], datum2[6]),
(cont2['sha1'], cont2['sha1_git'], cont2['sha256'],
cont2['blake2s256'], cont2['length'], 'absent',
'Content too long')
)
@pytest.mark.property_based
@given(strategies.sets(
elements=strategies.sampled_from(
['sha256', 'sha1_git', 'blake2s256']),
min_size=0))
def test_content_missing(self, algos):
algos |= {'sha1'}
cont2 = self.cont2
missing_cont = self.missing_cont
self.storage.content_add([cont2])
test_contents = [cont2]
missing_per_hash = defaultdict(list)
for i in range(256):
test_content = missing_cont.copy()
for hash in algos:
test_content[hash] = bytes([i]) + test_content[hash][1:]
missing_per_hash[hash].append(test_content[hash])
test_contents.append(test_content)
self.assertCountEqual(
self.storage.content_missing(test_contents),
missing_per_hash['sha1']
)
for hash in algos:
self.assertCountEqual(
self.storage.content_missing(test_contents, key_hash=hash),
missing_per_hash[hash]
)
def test_content_missing_per_sha1(self):
# given
cont2 = self.cont2
missing_cont = self.missing_cont
self.storage.content_add([cont2])
# when
gen = self.storage.content_missing_per_sha1([cont2['sha1'],
missing_cont['sha1']])
# then
self.assertEqual(list(gen), [missing_cont['sha1']])
def test_content_get_metadata(self):
cont1 = self.cont.copy()
cont2 = self.cont2.copy()
self.storage.content_add([cont1, cont2])
gen = self.storage.content_get_metadata([cont1['sha1'], cont2['sha1']])
# we only retrieve the metadata
cont1.pop('data')
cont2.pop('data')
self.assertCountEqual(list(gen), [cont1, cont2])
def test_content_get_metadata_missing_sha1(self):
cont1 = self.cont.copy()
cont2 = self.cont2.copy()
missing_cont = self.missing_cont.copy()
self.storage.content_add([cont1, cont2])
gen = self.storage.content_get_metadata([missing_cont['sha1']])
# All the metadata keys are None
missing_cont.pop('data')
for key in list(missing_cont):
if key != 'sha1':
missing_cont[key] = None
self.assertEqual(list(gen), [missing_cont])
def test_directory_add(self):
init_missing = list(self.storage.directory_missing([self.dir['id']]))
self.assertEqual([self.dir['id']], init_missing)
self.storage.directory_add([self.dir])
stored_data = list(self.storage.directory_ls(self.dir['id']))
data_to_store = []
for ent in self.dir['entries']:
data_to_store.append({
'dir_id': self.dir['id'],
'type': ent['type'],
'target': ent['target'],
'name': ent['name'],
'perms': ent['perms'],
'status': None,
'sha1': None,
'sha1_git': None,
'sha256': None,
'length': None,
})
self.assertCountEqual(data_to_store, stored_data)
after_missing = list(self.storage.directory_missing([self.dir['id']]))
self.assertEqual([], after_missing)
def test_directory_entry_get_by_path(self):
# given
init_missing = list(self.storage.directory_missing([self.dir3['id']]))
self.assertEqual([self.dir3['id']], init_missing)
self.storage.directory_add([self.dir3])
expected_entries = [
{
'dir_id': self.dir3['id'],
'name': b'foo',
'type': 'file',
'target': self.cont['sha1_git'],
'sha1': None,
'sha1_git': None,
'sha256': None,
'status': None,
'perms': from_disk.DentryPerms.content,
'length': None,
},
{
'dir_id': self.dir3['id'],
'name': b'bar',
'type': 'dir',
'target': b'12345678901234560000',
'sha1': None,
'sha1_git': None,
'sha256': None,
'status': None,
'perms': from_disk.DentryPerms.directory,
'length': None,
},
{
'dir_id': self.dir3['id'],
'name': b'hello',
'type': 'file',
'target': b'12345678901234567890',
'sha1': None,
'sha1_git': None,
'sha256': None,
'status': None,
'perms': from_disk.DentryPerms.content,
'length': None,
},
]
# when (all must be found here)
for entry, expected_entry in zip(self.dir3['entries'],
expected_entries):
actual_entry = self.storage.directory_entry_get_by_path(
self.dir3['id'],
[entry['name']])
self.assertEqual(actual_entry, expected_entry)
# when (nothing should be found here since self.dir is not persisted.)
for entry in self.dir['entries']:
actual_entry = self.storage.directory_entry_get_by_path(
self.dir['id'],
[entry['name']])
self.assertIsNone(actual_entry)
def test_revision_add(self):
init_missing = self.storage.revision_missing([self.revision['id']])
self.assertEqual([self.revision['id']], list(init_missing))
self.storage.revision_add([self.revision])
end_missing = self.storage.revision_missing([self.revision['id']])
self.assertEqual([], list(end_missing))
def test_revision_log(self):
# given
# self.revision4 -is-child-of-> self.revision3
self.storage.revision_add([self.revision3,
self.revision4])
# when
actual_results = list(self.storage.revision_log(
[self.revision4['id']]))
# hack: ids generated
for actual_result in actual_results:
if 'id' in actual_result['author']:
del actual_result['author']['id']
if 'id' in actual_result['committer']:
del actual_result['committer']['id']
self.assertEqual(len(actual_results), 2) # rev4 -child-> rev3
self.assertEqual(actual_results[0],
self.normalize_entity(self.revision4))
self.assertEqual(actual_results[1],
self.normalize_entity(self.revision3))
def test_revision_log_with_limit(self):
# given
# self.revision4 -is-child-of-> self.revision3
self.storage.revision_add([self.revision3,
self.revision4])
actual_results = list(self.storage.revision_log(
[self.revision4['id']], 1))
# hack: ids generated
for actual_result in actual_results:
if 'id' in actual_result['author']:
del actual_result['author']['id']
if 'id' in actual_result['committer']:
del actual_result['committer']['id']
self.assertEqual(len(actual_results), 1)
self.assertEqual(actual_results[0], self.revision4)
@staticmethod
def _short_revision(revision):
return [revision['id'], revision['parents']]
def test_revision_shortlog(self):
# given
# self.revision4 -is-child-of-> self.revision3
self.storage.revision_add([self.revision3,
self.revision4])
# when
actual_results = list(self.storage.revision_shortlog(
[self.revision4['id']]))
self.assertEqual(len(actual_results), 2) # rev4 -child-> rev3
self.assertEqual(list(actual_results[0]),
self._short_revision(self.revision4))
self.assertEqual(list(actual_results[1]),
self._short_revision(self.revision3))
def test_revision_shortlog_with_limit(self):
# given
# self.revision4 -is-child-of-> self.revision3
self.storage.revision_add([self.revision3,
self.revision4])
actual_results = list(self.storage.revision_shortlog(
[self.revision4['id']], 1))
self.assertEqual(len(actual_results), 1)
self.assertEqual(list(actual_results[0]),
self._short_revision(self.revision4))
def test_revision_get(self):
self.storage.revision_add([self.revision])
actual_revisions = list(self.storage.revision_get(
[self.revision['id'], self.revision2['id']]))
# when
if 'id' in actual_revisions[0]['author']:
del actual_revisions[0]['author']['id'] # hack: ids are generated
if 'id' in actual_revisions[0]['committer']:
del actual_revisions[0]['committer']['id']
self.assertEqual(len(actual_revisions), 2)
self.assertEqual(actual_revisions[0],
self.normalize_entity(self.revision))
self.assertIsNone(actual_revisions[1])
def test_revision_get_no_parents(self):
self.storage.revision_add([self.revision3])
get = list(self.storage.revision_get([self.revision3['id']]))
self.assertEqual(len(get), 1)
self.assertEqual(get[0]['parents'], []) # no parents on this one
def test_release_add(self):
init_missing = self.storage.release_missing([self.release['id'],
self.release2['id']])
self.assertEqual([self.release['id'], self.release2['id']],
list(init_missing))
self.storage.release_add([self.release, self.release2])
end_missing = self.storage.release_missing([self.release['id'],
self.release2['id']])
self.assertEqual([], list(end_missing))
def test_release_get(self):
# given
self.storage.release_add([self.release, self.release2])
# when
actual_releases = list(self.storage.release_get([self.release['id'],
self.release2['id']]))
# then
for actual_release in actual_releases:
if 'id' in actual_release['author']:
del actual_release['author']['id'] # hack: ids are generated
self.assertEqual([self.normalize_entity(self.release),
self.normalize_entity(self.release2)],
[actual_releases[0], actual_releases[1]])
unknown_releases = \
list(self.storage.release_get([self.release3['id']]))
self.assertIsNone(unknown_releases[0])
def test_origin_add_one(self):
origin0 = self.storage.origin_get(self.origin)
self.assertIsNone(origin0)
id = self.storage.origin_add_one(self.origin)
actual_origin = self.storage.origin_get({'url': self.origin['url'],
'type': self.origin['type']})
self.assertEqual(actual_origin['id'], id)
id2 = self.storage.origin_add_one(self.origin)
self.assertEqual(id, id2)
def test_origin_add(self):
origin0 = self.storage.origin_get(self.origin)
self.assertIsNone(origin0)
origin1, origin2 = self.storage.origin_add([self.origin, self.origin2])
actual_origin = self.storage.origin_get({
'url': self.origin['url'],
'type': self.origin['type'],
})
self.assertEqual(actual_origin['id'], origin1['id'])
actual_origin2 = self.storage.origin_get({
'url': self.origin2['url'],
'type': self.origin2['type'],
})
self.assertEqual(actual_origin2['id'], origin2['id'])
def test_origin_add_twice(self):
add1 = self.storage.origin_add([self.origin, self.origin2])
add2 = self.storage.origin_add([self.origin, self.origin2])
self.assertEqual(add1, add2)
def test_origin_get(self):
self.assertIsNone(self.storage.origin_get(self.origin))
id = self.storage.origin_add_one(self.origin)
# lookup per type and url (returns id)
actual_origin0 = self.storage.origin_get({'url': self.origin['url'],
'type': self.origin['type']})
self.assertEqual(actual_origin0['id'], id)
# lookup per id (returns dict)
actual_origin1 = self.storage.origin_get({'id': id})
self.assertEqual(actual_origin1, {'id': id,
'type': self.origin['type'],
'url': self.origin['url']})
def test_origin_search(self):
found_origins = list(self.storage.origin_search(self.origin['url']))
self.assertEqual(len(found_origins), 0)
found_origins = list(self.storage.origin_search(self.origin['url'],
regexp=True))
self.assertEqual(len(found_origins), 0)
id = self.storage.origin_add_one(self.origin)
origin_data = {'id': id,
'type': self.origin['type'],
'url': self.origin['url']}
found_origins = list(self.storage.origin_search(self.origin['url']))
self.assertEqual(len(found_origins), 1)
self.assertEqual(found_origins[0], origin_data)
found_origins = list(self.storage.origin_search(
'.' + self.origin['url'][1:-1] + '.', regexp=True))
self.assertEqual(len(found_origins), 1)
self.assertEqual(found_origins[0], origin_data)
id2 = self.storage.origin_add_one(self.origin2)
origin2_data = {'id': id2,
'type': self.origin2['type'],
'url': self.origin2['url']}
found_origins = list(self.storage.origin_search(self.origin2['url']))
self.assertEqual(len(found_origins), 1)
self.assertEqual(found_origins[0], origin2_data)
found_origins = list(self.storage.origin_search(
'.' + self.origin2['url'][1:-1] + '.', regexp=True))
self.assertEqual(len(found_origins), 1)
self.assertEqual(found_origins[0], origin2_data)
found_origins = list(self.storage.origin_search('/'))
self.assertEqual(len(found_origins), 2)
found_origins = list(self.storage.origin_search('.*/.*', regexp=True))
self.assertEqual(len(found_origins), 2)
found_origins = list(self.storage.origin_search('/', offset=0, limit=1)) # noqa
self.assertEqual(len(found_origins), 1)
self.assertEqual(found_origins[0], origin_data)
found_origins = list(self.storage.origin_search('.*/.*', offset=0, limit=1, regexp=True)) # noqa
self.assertEqual(len(found_origins), 1)
self.assertEqual(found_origins[0], origin_data)
found_origins = list(self.storage.origin_search('/', offset=1, limit=1)) # noqa
self.assertEqual(len(found_origins), 1)
self.assertEqual(found_origins[0], origin2_data)
found_origins = list(self.storage.origin_search('.*/.*', offset=1, limit=1, regexp=True)) # noqa
self.assertEqual(len(found_origins), 1)
self.assertEqual(found_origins[0], origin2_data)
def test_origin_visit_add(self):
# given
self.assertIsNone(self.storage.origin_get(self.origin2))
origin_id = self.storage.origin_add_one(self.origin2)
self.assertIsNotNone(origin_id)
# when
origin_visit1 = self.storage.origin_visit_add(
origin_id,
date=self.date_visit2)
# then
self.assertEqual(origin_visit1['origin'], origin_id)
self.assertIsNotNone(origin_visit1['visit'])
actual_origin_visits = list(self.storage.origin_visit_get(origin_id))
self.assertEqual(actual_origin_visits,
[{
'origin': origin_id,
'date': self.date_visit2,
'visit': origin_visit1['visit'],
'status': 'ongoing',
'metadata': None,
'snapshot': None,
}])
def test_origin_visit_update(self):
# given
origin_id = self.storage.origin_add_one(self.origin2)
origin_id2 = self.storage.origin_add_one(self.origin)
origin_visit1 = self.storage.origin_visit_add(
origin_id,
date=self.date_visit2)
origin_visit2 = self.storage.origin_visit_add(
origin_id,
date=self.date_visit3)
origin_visit3 = self.storage.origin_visit_add(
origin_id2,
date=self.date_visit3)
# when
visit1_metadata = {
'contents': 42,
'directories': 22,
}
self.storage.origin_visit_update(
origin_id, origin_visit1['visit'], status='full',
metadata=visit1_metadata)
self.storage.origin_visit_update(origin_id2, origin_visit3['visit'],
status='partial')
# then
actual_origin_visits = list(self.storage.origin_visit_get(origin_id))
self.assertEqual(actual_origin_visits, [{
'origin': origin_visit2['origin'],
'date': self.date_visit2,
'visit': origin_visit1['visit'],
'status': 'full',
'metadata': visit1_metadata,
'snapshot': None,
}, {
'origin': origin_visit2['origin'],
'date': self.date_visit3,
'visit': origin_visit2['visit'],
'status': 'ongoing',
'metadata': None,
'snapshot': None,
}])
actual_origin_visits_bis = list(self.storage.origin_visit_get(
origin_id, limit=1))
self.assertEqual(actual_origin_visits_bis,
[{
'origin': origin_visit2['origin'],
'date': self.date_visit2,
'visit': origin_visit1['visit'],
'status': 'full',
'metadata': visit1_metadata,
'snapshot': None,
}])
actual_origin_visits_ter = list(self.storage.origin_visit_get(
origin_id, last_visit=origin_visit1['visit']))
self.assertEqual(actual_origin_visits_ter,
[{
'origin': origin_visit2['origin'],
'date': self.date_visit3,
'visit': origin_visit2['visit'],
'status': 'ongoing',
'metadata': None,
'snapshot': None,
}])
actual_origin_visits2 = list(self.storage.origin_visit_get(origin_id2))
self.assertEqual(actual_origin_visits2,
[{
'origin': origin_visit3['origin'],
'date': self.date_visit3,
'visit': origin_visit3['visit'],
'status': 'partial',
'metadata': None,
'snapshot': None,
}])
def test_origin_visit_get_by(self):
origin_id = self.storage.origin_add_one(self.origin2)
origin_id2 = self.storage.origin_add_one(self.origin)
origin_visit1 = self.storage.origin_visit_add(
origin_id,
date=self.date_visit2)
self.storage.snapshot_add(origin_id, origin_visit1['visit'],
self.snapshot)
# Add some other {origin, visit} entries
self.storage.origin_visit_add(origin_id, date=self.date_visit3)
self.storage.origin_visit_add(origin_id2, date=self.date_visit3)
# when
visit1_metadata = {
'contents': 42,
'directories': 22,
}
self.storage.origin_visit_update(
origin_id, origin_visit1['visit'], status='full',
metadata=visit1_metadata)
expected_origin_visit = origin_visit1.copy()
expected_origin_visit.update({
'origin': origin_id,
'visit': origin_visit1['visit'],
'date': self.date_visit2,
'metadata': visit1_metadata,
'status': 'full',
'snapshot': self.snapshot['id'],
})
# when
actual_origin_visit1 = self.storage.origin_visit_get_by(
origin_visit1['origin'], origin_visit1['visit'])
# then
self.assertEqual(actual_origin_visit1, expected_origin_visit)
def test_origin_visit_get_by_no_result(self):
# No result
actual_origin_visit = self.storage.origin_visit_get_by(
10, 999)
self.assertIsNone(actual_origin_visit)
def test_snapshot_add_get_empty(self):
origin_id = self.storage.origin_add_one(self.origin)
origin_visit1 = self.storage.origin_visit_add(origin_id,
self.date_visit1)
visit_id = origin_visit1['visit']
self.storage.snapshot_add(origin_id, visit_id, self.empty_snapshot)
by_id = self.storage.snapshot_get(self.empty_snapshot['id'])
self.assertEqual(by_id, self.empty_snapshot)
by_ov = self.storage.snapshot_get_by_origin_visit(origin_id, visit_id)
self.assertEqual(by_ov, self.empty_snapshot)
def test_snapshot_add_get_complete(self):
origin_id = self.storage.origin_add_one(self.origin)
origin_visit1 = self.storage.origin_visit_add(origin_id,
self.date_visit1)
visit_id = origin_visit1['visit']
self.storage.snapshot_add(origin_id, visit_id, self.complete_snapshot)
by_id = self.storage.snapshot_get(self.complete_snapshot['id'])
self.assertEqual(by_id, self.complete_snapshot)
by_ov = self.storage.snapshot_get_by_origin_visit(origin_id, visit_id)
self.assertEqual(by_ov, self.complete_snapshot)
def test_snapshot_add_count_branches(self):
origin_id = self.storage.origin_add_one(self.origin)
origin_visit1 = self.storage.origin_visit_add(origin_id,
self.date_visit1)
visit_id = origin_visit1['visit']
self.storage.snapshot_add(origin_id, visit_id, self.complete_snapshot)
snp_id = self.complete_snapshot['id']
snp_size = self.storage.snapshot_count_branches(snp_id)
expected_snp_size = {
'alias': 1,
'content': 1,
'directory': 1,
'release': 1,
'revision': 1,
'snapshot': 1,
None: 1
}
self.assertEqual(snp_size, expected_snp_size)
def test_snapshot_add_get_paginated(self):
origin_id = self.storage.origin_add_one(self.origin)
origin_visit1 = self.storage.origin_visit_add(origin_id,
self.date_visit1)
visit_id = origin_visit1['visit']
self.storage.snapshot_add(origin_id, visit_id, self.complete_snapshot)
snp_id = self.complete_snapshot['id']
branches = self.complete_snapshot['branches']
branch_names = list(sorted(branches))
snapshot = self.storage.snapshot_get_branches(snp_id,
branches_from=b'release')
rel_idx = branch_names.index(b'release')
expected_snapshot = {
'id': snp_id,
'branches': {
name: branches[name]
for name in branch_names[rel_idx:]
},
'next_branch': None,
}
self.assertEqual(snapshot, expected_snapshot)
snapshot = self.storage.snapshot_get_branches(snp_id,
branches_count=1)
expected_snapshot = {
'id': snp_id,
'branches': {
branch_names[0]: branches[branch_names[0]],
},
'next_branch': b'content',
}
self.assertEqual(snapshot, expected_snapshot)
snapshot = self.storage.snapshot_get_branches(
snp_id, branches_from=b'directory', branches_count=3)
dir_idx = branch_names.index(b'directory')
expected_snapshot = {
'id': snp_id,
'branches': {
name: branches[name]
for name in branch_names[dir_idx:dir_idx + 3]
},
'next_branch': branch_names[dir_idx + 3],
}
self.assertEqual(snapshot, expected_snapshot)
def test_snapshot_add_get_filtered(self):
origin_id = self.storage.origin_add_one(self.origin)
origin_visit1 = self.storage.origin_visit_add(origin_id,
self.date_visit1)
visit_id = origin_visit1['visit']
self.storage.snapshot_add(origin_id, visit_id, self.complete_snapshot)
snp_id = self.complete_snapshot['id']
branches = self.complete_snapshot['branches']
snapshot = self.storage.snapshot_get_branches(
snp_id, target_types=['release', 'revision'])
expected_snapshot = {
'id': snp_id,
'branches': {
name: tgt
for name, tgt in branches.items()
if tgt and tgt['target_type'] in ['release', 'revision']
},
'next_branch': None,
}
self.assertEqual(snapshot, expected_snapshot)
snapshot = self.storage.snapshot_get_branches(snp_id,
target_types=['alias'])
expected_snapshot = {
'id': snp_id,
'branches': {
name: tgt
for name, tgt in branches.items()
if tgt and tgt['target_type'] == 'alias'
},
'next_branch': None,
}
self.assertEqual(snapshot, expected_snapshot)
def test_snapshot_add_get(self):
origin_id = self.storage.origin_add_one(self.origin)
origin_visit1 = self.storage.origin_visit_add(origin_id,
self.date_visit1)
visit_id = origin_visit1['visit']
self.storage.snapshot_add(origin_id, visit_id, self.snapshot)
by_id = self.storage.snapshot_get(self.snapshot['id'])
self.assertEqual(by_id, self.snapshot)
by_ov = self.storage.snapshot_get_by_origin_visit(origin_id, visit_id)
self.assertEqual(by_ov, self.snapshot)
origin_visit_info = self.storage.origin_visit_get_by(origin_id,
visit_id)
self.assertEqual(origin_visit_info['snapshot'], self.snapshot['id'])
def test_snapshot_add_twice(self):
origin_id = self.storage.origin_add_one(self.origin)
origin_visit1 = self.storage.origin_visit_add(origin_id,
self.date_visit1)
visit1_id = origin_visit1['visit']
self.storage.snapshot_add(origin_id, visit1_id, self.snapshot)
by_ov1 = self.storage.snapshot_get_by_origin_visit(origin_id,
visit1_id)
self.assertEqual(by_ov1, self.snapshot)
origin_visit2 = self.storage.origin_visit_add(origin_id,
self.date_visit2)
visit2_id = origin_visit2['visit']
self.storage.snapshot_add(origin_id, visit2_id, self.snapshot)
by_ov2 = self.storage.snapshot_get_by_origin_visit(origin_id,
visit2_id)
self.assertEqual(by_ov2, self.snapshot)
def test_snapshot_get_nonexistent(self):
bogus_snapshot_id = b'bogus snapshot id 00'
bogus_origin_id = 1
bogus_visit_id = 1
by_id = self.storage.snapshot_get(bogus_snapshot_id)
self.assertIsNone(by_id)
by_ov = self.storage.snapshot_get_by_origin_visit(bogus_origin_id,
bogus_visit_id)
self.assertIsNone(by_ov)
def test_snapshot_get_latest(self):
origin_id = self.storage.origin_add_one(self.origin)
origin_visit1 = self.storage.origin_visit_add(origin_id,
self.date_visit1)
visit1_id = origin_visit1['visit']
origin_visit2 = self.storage.origin_visit_add(origin_id,
self.date_visit2)
visit2_id = origin_visit2['visit']
# Two visits, both with no snapshot: latest snapshot is None
self.assertIsNone(self.storage.snapshot_get_latest(origin_id))
# Add snapshot to visit1, latest snapshot = visit 1 snapshot
self.storage.snapshot_add(origin_id, visit1_id, self.complete_snapshot)
self.assertEqual(self.complete_snapshot,
self.storage.snapshot_get_latest(origin_id))
# Status filter: both visits are status=ongoing, so no snapshot
# returned
self.assertIsNone(
self.storage.snapshot_get_latest(origin_id,
allowed_statuses=['full'])
)
# Mark the first visit as completed and check status filter again
self.storage.origin_visit_update(origin_id, visit1_id, status='full')
self.assertEqual(
self.complete_snapshot,
self.storage.snapshot_get_latest(origin_id,
allowed_statuses=['full']),
)
# Add snapshot to visit2 and check that the new snapshot is returned
self.storage.snapshot_add(origin_id, visit2_id, self.empty_snapshot)
self.assertEqual(self.empty_snapshot,
self.storage.snapshot_get_latest(origin_id))
# Check that the status filter is still working
self.assertEqual(
self.complete_snapshot,
self.storage.snapshot_get_latest(origin_id,
allowed_statuses=['full']),
)
def test_stat_counters(self):
expected_keys = ['content', 'directory',
'origin', 'person', 'revision']
# Initially, all counters are 0
self.storage.refresh_stat_counters()
counters = self.storage.stat_counters()
self.assertTrue(set(expected_keys) <= set(counters))
for key in expected_keys:
self.assertEqual(counters[key], 0)
# Add a content. Only the content counter should increase.
self.storage.content_add([self.cont])
self.storage.refresh_stat_counters()
counters = self.storage.stat_counters()
self.assertTrue(set(expected_keys) <= set(counters))
for key in expected_keys:
if key != 'content':
self.assertEqual(counters[key], 0)
self.assertEqual(counters['content'], 1)
# Add other objects. Check their counter increased as well.
origin_id = self.storage.origin_add_one(self.origin2)
origin_visit1 = self.storage.origin_visit_add(
origin_id,
date=self.date_visit2)
self.storage.snapshot_add(origin_id, origin_visit1['visit'],
self.snapshot)
self.storage.directory_add([self.dir])
self.storage.revision_add([self.revision])
self.storage.refresh_stat_counters()
counters = self.storage.stat_counters()
self.assertEqual(counters['content'], 1)
self.assertEqual(counters['directory'], 1)
self.assertEqual(counters['snapshot'], 1)
self.assertEqual(counters['origin'], 1)
self.assertEqual(counters['revision'], 1)
def test_content_find_with_present_content(self):
# 1. with something to find
cont = self.cont
self.storage.content_add([cont])
actually_present = self.storage.content_find({'sha1': cont['sha1']})
actually_present.pop('ctime')
self.assertEqual(actually_present, {
'sha1': cont['sha1'],
'sha256': cont['sha256'],
'sha1_git': cont['sha1_git'],
'blake2s256': cont['blake2s256'],
'length': cont['length'],
'status': 'visible'
})
# 2. with something to find
actually_present = self.storage.content_find(
{'sha1_git': cont['sha1_git']})
actually_present.pop('ctime')
self.assertEqual(actually_present, {
'sha1': cont['sha1'],
'sha256': cont['sha256'],
'sha1_git': cont['sha1_git'],
'blake2s256': cont['blake2s256'],
'length': cont['length'],
'status': 'visible'
})
# 3. with something to find
actually_present = self.storage.content_find(
{'sha256': cont['sha256']})
actually_present.pop('ctime')
self.assertEqual(actually_present, {
'sha1': cont['sha1'],
'sha256': cont['sha256'],
'sha1_git': cont['sha1_git'],
'blake2s256': cont['blake2s256'],
'length': cont['length'],
'status': 'visible'
})
# 4. with something to find
actually_present = self.storage.content_find({
'sha1': cont['sha1'],
'sha1_git': cont['sha1_git'],
'sha256': cont['sha256'],
'blake2s256': cont['blake2s256'],
})
actually_present.pop('ctime')
self.assertEqual(actually_present, {
'sha1': cont['sha1'],
'sha256': cont['sha256'],
'sha1_git': cont['sha1_git'],
'blake2s256': cont['blake2s256'],
'length': cont['length'],
'status': 'visible'
})
def test_content_find_with_non_present_content(self):
# 1. with something that does not exist
missing_cont = self.missing_cont
actually_present = self.storage.content_find(
{'sha1': missing_cont['sha1']})
self.assertIsNone(actually_present)
# 2. with something that does not exist
actually_present = self.storage.content_find(
{'sha1_git': missing_cont['sha1_git']})
self.assertIsNone(actually_present)
# 3. with something that does not exist
actually_present = self.storage.content_find(
{'sha256': missing_cont['sha256']})
self.assertIsNone(actually_present)
def test_content_find_bad_input(self):
# 1. with bad input
with self.assertRaises(ValueError):
self.storage.content_find({}) # empty is bad
# 2. with bad input
with self.assertRaises(ValueError):
self.storage.content_find(
{'unknown-sha1': 'something'}) # not the right key
def test_object_find_by_sha1_git(self):
sha1_gits = [b'00000000000000000000']
expected = {
b'00000000000000000000': [],
}
self.storage.content_add([self.cont])
sha1_gits.append(self.cont['sha1_git'])
expected[self.cont['sha1_git']] = [{
'sha1_git': self.cont['sha1_git'],
'type': 'content',
'id': self.cont['sha1'],
}]
self.storage.directory_add([self.dir])
sha1_gits.append(self.dir['id'])
expected[self.dir['id']] = [{
'sha1_git': self.dir['id'],
'type': 'directory',
'id': self.dir['id'],
}]
self.storage.revision_add([self.revision])
sha1_gits.append(self.revision['id'])
expected[self.revision['id']] = [{
'sha1_git': self.revision['id'],
'type': 'revision',
'id': self.revision['id'],
}]
self.storage.release_add([self.release])
sha1_gits.append(self.release['id'])
expected[self.release['id']] = [{
'sha1_git': self.release['id'],
'type': 'release',
'id': self.release['id'],
}]
ret = self.storage.object_find_by_sha1_git(sha1_gits)
for val in ret.values():
for obj in val:
del obj['object_id']
self.assertEqual(expected, ret)
def test_tool_add(self):
tool = {
'name': 'some-unknown-tool',
'version': 'some-version',
'configuration': {"debian-package": "some-package"},
}
actual_tool = self.storage.tool_get(tool)
self.assertIsNone(actual_tool) # does not exist
# add it
actual_tools = list(self.storage.tool_add([tool]))
self.assertEqual(len(actual_tools), 1)
actual_tool = actual_tools[0]
self.assertIsNotNone(actual_tool) # now it exists
new_id = actual_tool.pop('id')
self.assertEqual(actual_tool, tool)
actual_tools2 = list(self.storage.tool_add([tool]))
actual_tool2 = actual_tools2[0]
self.assertIsNotNone(actual_tool2) # now it exists
new_id2 = actual_tool2.pop('id')
self.assertEqual(new_id, new_id2)
self.assertEqual(actual_tool, actual_tool2)
def test_tool_add_multiple(self):
tool = {
'name': 'some-unknown-tool',
'version': 'some-version',
'configuration': {"debian-package": "some-package"},
}
actual_tools = list(self.storage.tool_add([tool]))
self.assertEqual(len(actual_tools), 1)
new_tools = [tool, {
'name': 'yet-another-tool',
'version': 'version',
'configuration': {},
}]
actual_tools = list(self.storage.tool_add(new_tools))
self.assertEqual(len(actual_tools), 2)
# order not guaranteed, so we iterate over results to check
for tool in actual_tools:
_id = tool.pop('id')
self.assertIsNotNone(_id)
self.assertIn(tool, new_tools)
def test_tool_get_missing(self):
tool = {
'name': 'unknown-tool',
'version': '3.1.0rc2-31-ga2cbb8c',
'configuration': {"command_line": "nomossa <filepath>"},
}
actual_tool = self.storage.tool_get(tool)
self.assertIsNone(actual_tool)
def test_tool_metadata_get_missing_context(self):
tool = {
'name': 'swh-metadata-translator',
'version': '0.0.1',
'configuration': {"context": "unknown-context"},
}
actual_tool = self.storage.tool_get(tool)
self.assertIsNone(actual_tool)
def test_tool_metadata_get(self):
tool = {
'name': 'swh-metadata-translator',
'version': '0.0.1',
'configuration': {"type": "local", "context": "npm"},
}
tools = list(self.storage.tool_add([tool]))
expected_tool = tools[0]
# when
actual_tool = self.storage.tool_get(tool)
# then
self.assertEqual(expected_tool, actual_tool)
+ def test_metadata_provider_get(self):
+ # given
+ no_provider = self.storage.metadata_provider_get(6459456445615)
+ self.assertIsNone(no_provider)
+ # when
+ provider_id = self.storage.metadata_provider_add(
+ self.provider['name'],
+ self.provider['type'],
+ self.provider['url'],
+ self.provider['metadata'])
+
+ actual_provider = self.storage.metadata_provider_get(provider_id)
+ expected_provider = {
+ 'provider_name': self.provider['name'],
+ 'provider_url': self.provider['url']
+ }
+ # then
+ del actual_provider['id']
+ self.assertTrue(actual_provider, expected_provider)
+
def test_metadata_provider_get_by(self):
# given
no_provider = self.storage.metadata_provider_get_by({
'provider_name': self.provider['name'],
'provider_url': self.provider['url']
})
self.assertIsNone(no_provider)
# when
provider_id = self.storage.metadata_provider_add(
self.provider['name'],
self.provider['type'],
self.provider['url'],
self.provider['metadata'])
actual_provider = self.storage.metadata_provider_get_by({
'provider_name': self.provider['name'],
'provider_url': self.provider['url']
})
# then
self.assertTrue(provider_id, actual_provider['id'])
def test_origin_metadata_add(self):
# given
origin_id = self.storage.origin_add([self.origin])[0]['id']
origin_metadata0 = list(self.storage.origin_metadata_get_by(origin_id))
self.assertTrue(len(origin_metadata0) == 0)
tools = list(self.storage.tool_add([self.metadata_tool]))
tool = tools[0]
self.storage.metadata_provider_add(
self.provider['name'],
self.provider['type'],
self.provider['url'],
self.provider['metadata'])
provider = self.storage.metadata_provider_get_by({
'provider_name': self.provider['name'],
'provider_url': self.provider['url']
})
# when adding for the same origin 2 metadatas
self.storage.origin_metadata_add(
origin_id,
self.origin_metadata['discovery_date'],
provider['id'],
tool['id'],
self.origin_metadata['metadata'])
actual_om1 = list(self.storage.origin_metadata_get_by(origin_id))
# then
self.assertEqual(len(actual_om1), 1)
self.assertEqual(actual_om1[0]['origin_id'], origin_id)
def test_origin_metadata_get(self):
# given
origin_id = self.storage.origin_add([self.origin])[0]['id']
origin_id2 = self.storage.origin_add([self.origin2])[0]['id']
self.storage.metadata_provider_add(self.provider['name'],
self.provider['type'],
self.provider['url'],
self.provider['metadata'])
provider = self.storage.metadata_provider_get_by({
'provider_name': self.provider['name'],
'provider_url': self.provider['url']
})
tool = list(self.storage.tool_add([self.metadata_tool]))[0]
# when adding for the same origin 2 metadatas
self.storage.origin_metadata_add(
origin_id,
self.origin_metadata['discovery_date'],
provider['id'],
tool['id'],
self.origin_metadata['metadata'])
self.storage.origin_metadata_add(
origin_id2,
self.origin_metadata2['discovery_date'],
provider['id'],
tool['id'],
self.origin_metadata2['metadata'])
self.storage.origin_metadata_add(
origin_id,
self.origin_metadata2['discovery_date'],
provider['id'],
tool['id'],
self.origin_metadata2['metadata'])
all_metadatas = list(self.storage.origin_metadata_get_by(origin_id))
metadatas_for_origin2 = list(self.storage.origin_metadata_get_by(
origin_id2))
expected_results = [{
'origin_id': origin_id,
'discovery_date': datetime.datetime(
2017, 1, 1, 23, 0,
tzinfo=datetime.timezone.utc),
'metadata': {
'name': 'test_origin_metadata',
'version': '0.0.1'
},
'provider_id': provider['id'],
'provider_name': 'hal',
'provider_type': 'deposit-client',
'provider_url': 'http:///hal/inria',
'tool_id': tool['id']
}, {
'origin_id': origin_id,
'discovery_date': datetime.datetime(
2015, 1, 1, 23, 0,
tzinfo=datetime.timezone.utc),
'metadata': {
'name': 'test_origin_metadata',
'version': '0.0.1'
},
'provider_id': provider['id'],
'provider_name': 'hal',
'provider_type': 'deposit-client',
'provider_url': 'http:///hal/inria',
'tool_id': tool['id']
}]
# then
self.assertEqual(len(all_metadatas), 2)
self.assertEqual(len(metadatas_for_origin2), 1)
self.assertCountEqual(all_metadatas, expected_results)
def test_origin_metadata_get_by_provider_type(self):
# given
origin_id = self.storage.origin_add([self.origin])[0]['id']
origin_id2 = self.storage.origin_add([self.origin2])[0]['id']
provider1_id = self.storage.metadata_provider_add(
self.provider['name'],
self.provider['type'],
self.provider['url'],
self.provider['metadata'])
provider1 = self.storage.metadata_provider_get_by({
'provider_name': self.provider['name'],
'provider_url': self.provider['url']
})
self.assertEqual(provider1,
self.storage.metadata_provider_get(provider1_id))
provider2_id = self.storage.metadata_provider_add(
'swMATH',
'registry',
'http://www.swmath.org/',
{'email': 'contact@swmath.org',
'license': 'All rights reserved'})
provider2 = self.storage.metadata_provider_get_by({
'provider_name': 'swMATH',
'provider_url': 'http://www.swmath.org/'
})
self.assertEqual(provider2,
self.storage.metadata_provider_get(provider2_id))
# using the only tool now inserted in the data.sql, but for this
# provider should be a crawler tool (not yet implemented)
tool = list(self.storage.tool_add([self.metadata_tool]))[0]
# when adding for the same origin 2 metadatas
self.storage.origin_metadata_add(
origin_id,
self.origin_metadata['discovery_date'],
provider1['id'],
tool['id'],
self.origin_metadata['metadata'])
self.storage.origin_metadata_add(
origin_id2,
self.origin_metadata2['discovery_date'],
provider2['id'],
tool['id'],
self.origin_metadata2['metadata'])
provider_type = 'registry'
m_by_provider = list(self.storage.
origin_metadata_get_by(
origin_id2,
provider_type))
for item in m_by_provider:
if 'id' in item:
del item['id']
expected_results = [{
'origin_id': origin_id2,
'discovery_date': datetime.datetime(
2017, 1, 1, 23, 0,
tzinfo=datetime.timezone.utc),
'metadata': {
'name': 'test_origin_metadata',
'version': '0.0.1'
},
'provider_id': provider2['id'],
'provider_name': 'swMATH',
'provider_type': provider_type,
'provider_url': 'http://www.swmath.org/',
'tool_id': tool['id']
}]
# then
self.assertEqual(len(m_by_provider), 1)
self.assertEqual(m_by_provider, expected_results)
class CommonPropTestStorage:
def assert_contents_ok(self, expected_contents, actual_contents,
keys_to_check={'sha1', 'data'}):
"""Assert that a given list of contents matches on a given set of keys.
"""
for k in keys_to_check:
expected_list = sorted([c[k] for c in expected_contents])
actual_list = sorted([c[k] for c in actual_contents])
self.assertEqual(actual_list, expected_list)
@given(gen_contents(min_size=1, max_size=4))
def test_generate_content_get(self, contents):
# add contents to storage
self.storage.content_add(contents)
# input the list of sha1s we want from storage
get_sha1s = [c['sha1'] for c in contents]
# retrieve contents
actual_contents = list(self.storage.content_get(get_sha1s))
self.assert_contents_ok(contents, actual_contents)
@given(gen_contents(min_size=1, max_size=4))
def test_generate_content_get_metadata(self, contents):
# add contents to storage
self.storage.content_add(contents)
# input the list of sha1s we want from storage
get_sha1s = [c['sha1'] for c in contents]
# retrieve contents
actual_contents = list(self.storage.content_get_metadata(get_sha1s))
self.assertEqual(len(actual_contents), len(contents))
# will check that all contents are retrieved correctly
one_content = contents[0]
# content_get_metadata does not return data
keys_to_check = set(one_content.keys()) - {'data'}
self.assert_contents_ok(contents, actual_contents,
keys_to_check=keys_to_check)
def test_generate_content_get_range_limit_none(self):
"""content_get_range call with wrong limit input should fail"""
with self.assertRaises(ValueError) as e:
self.storage.content_get_range(start=None, end=None, limit=None)
self.assertEqual(e.exception.args, (
'Development error: limit should not be None',))
@given(gen_contents(min_size=1, max_size=4))
def test_generate_content_get_range_no_limit(self, contents):
"""content_get_range returns contents within range provided"""
self.reset_storage_tables()
# add contents to storage
self.storage.content_add(contents)
# input the list of sha1s we want from storage
get_sha1s = sorted([c['sha1'] for c in contents])
start = get_sha1s[0]
end = get_sha1s[-1]
# retrieve contents
actual_result = self.storage.content_get_range(start, end)
actual_contents = actual_result['contents']
actual_next = actual_result['next']
self.assertEqual(len(contents), len(actual_contents))
self.assertIsNone(actual_next)
one_content = contents[0]
keys_to_check = set(one_content.keys()) - {'data'}
self.assert_contents_ok(contents, actual_contents, keys_to_check)
@given(gen_contents(min_size=4, max_size=4))
def test_generate_content_get_range_limit(self, contents):
"""content_get_range paginates results if limit exceeded"""
self.reset_storage_tables()
contents_map = {c['sha1']: c for c in contents}
# add contents to storage
self.storage.content_add(contents)
# input the list of sha1s we want from storage
get_sha1s = sorted([c['sha1'] for c in contents])
start = get_sha1s[0]
end = get_sha1s[-1]
# retrieve contents limited to 3 results
limited_results = len(contents) - 1
actual_result = self.storage.content_get_range(start, end,
limit=limited_results)
actual_contents = actual_result['contents']
actual_next = actual_result['next']
self.assertEqual(limited_results, len(actual_contents))
self.assertIsNotNone(actual_next)
self.assertEqual(actual_next, get_sha1s[-1])
expected_contents = [contents_map[sha1] for sha1 in get_sha1s[:-1]]
keys_to_check = set(contents[0].keys()) - {'data'}
self.assert_contents_ok(expected_contents, actual_contents,
keys_to_check)
# retrieve next part
actual_results2 = self.storage.content_get_range(start=end, end=end)
actual_contents2 = actual_results2['contents']
actual_next2 = actual_results2['next']
self.assertEqual(1, len(actual_contents2))
self.assertIsNone(actual_next2)
self.assert_contents_ok([contents_map[actual_next]], actual_contents2,
keys_to_check)
@pytest.mark.db
class TestLocalStorage(CommonTestStorage, StorageTestDbFixture,
unittest.TestCase):
"""Test the local storage"""
# Can only be tested with local storage as you can't mock
# datetimes for the remote server
def test_fetch_history(self):
origin = self.storage.origin_add_one(self.origin)
with patch('datetime.datetime'):
datetime.datetime.now.return_value = self.fetch_history_date
fetch_history_id = self.storage.fetch_history_start(origin)
datetime.datetime.now.assert_called_with(tz=datetime.timezone.utc)
with patch('datetime.datetime'):
datetime.datetime.now.return_value = self.fetch_history_end
self.storage.fetch_history_end(fetch_history_id,
self.fetch_history_data)
fetch_history = self.storage.fetch_history_get(fetch_history_id)
expected_fetch_history = self.fetch_history_data.copy()
expected_fetch_history['id'] = fetch_history_id
expected_fetch_history['origin'] = origin
expected_fetch_history['date'] = self.fetch_history_date
expected_fetch_history['duration'] = self.fetch_history_duration
self.assertEqual(expected_fetch_history, fetch_history)
# The remote API doesn't expose _person_add
def test_person_get(self):
# given
person0 = {
'fullname': b'bob <alice@bob>',
'name': b'bob',
'email': b'alice@bob',
}
id0 = self.storage._person_add(person0)
person1 = {
'fullname': b'tony <tony@bob>',
'name': b'tony',
'email': b'tony@bob',
}
id1 = self.storage._person_add(person1)
# when
actual_persons = self.storage.person_get([id0, id1])
# given (person injection through release for example)
self.assertEqual(
list(actual_persons), [
{
'id': id0,
'fullname': person0['fullname'],
'name': person0['name'],
'email': person0['email'],
},
{
'id': id1,
'fullname': person1['fullname'],
'name': person1['name'],
'email': person1['email'],
},
])
# This test is only relevant on the local storage, with an actual
# objstorage raising an exception
def test_content_add_objstorage_exception(self):
self.storage.objstorage.add = Mock(
side_effect=Exception('mocked broken objstorage')
)
with self.assertRaises(Exception) as e:
self.storage.content_add([self.cont])
self.assertEqual(e.exception.args, ('mocked broken objstorage',))
missing = list(self.storage.content_missing([self.cont]))
self.assertEqual(missing, [self.cont['sha1']])
@pytest.mark.db
@pytest.mark.property_based
class PropTestLocalStorage(CommonPropTestStorage, StorageTestDbFixture,
unittest.TestCase):
pass
class AlteringSchemaTest(TestStorageData, StorageTestDbFixture,
unittest.TestCase):
"""This class is dedicated for the rare case where the schema needs to
be altered dynamically.
Otherwise, the tests could be blocking when ran altogether.
"""
def test_content_update(self):
cont = copy.deepcopy(self.cont)
self.storage.content_add([cont])
# alter the sha1_git for example
cont['sha1_git'] = hash_to_bytes(
'3a60a5275d0333bf13468e8b3dcab90f4046e654')
self.storage.content_update([cont], keys=['sha1_git'])
with self.storage.get_db().transaction() as cur:
cur.execute('SELECT sha1, sha1_git, sha256, length, status'
' FROM content WHERE sha1 = %s',
(cont['sha1'],))
datum = cur.fetchone()
self.assertEqual(
(datum[0].tobytes(), datum[1].tobytes(), datum[2].tobytes(),
datum[3], datum[4]),
(cont['sha1'], cont['sha1_git'], cont['sha256'],
cont['length'], 'visible'))
def test_content_update_with_new_cols(self):
with self.storage.get_db().transaction() as cur:
cur.execute("""alter table content
add column test text default null,
add column test2 text default null""")
cont = copy.deepcopy(self.cont2)
self.storage.content_add([cont])
cont['test'] = 'value-1'
cont['test2'] = 'value-2'
self.storage.content_update([cont], keys=['test', 'test2'])
with self.storage.get_db().transaction() as cur:
cur.execute(
'SELECT sha1, sha1_git, sha256, length, status, test, test2'
' FROM content WHERE sha1 = %s',
(cont['sha1'],))
datum = cur.fetchone()
self.assertEqual(
(datum[0].tobytes(), datum[1].tobytes(), datum[2].tobytes(),
datum[3], datum[4], datum[5], datum[6]),
(cont['sha1'], cont['sha1_git'], cont['sha256'],
cont['length'], 'visible', cont['test'], cont['test2']))
with self.storage.get_db().transaction() as cur:
cur.execute("""alter table content drop column test,
drop column test2""")

File Metadata

Mime Type
text/x-diff
Expires
Fri, Jul 4, 3:41 PM (1 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3253927

Event Timeline