Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/db.py
Show First 20 Lines • Show All 44 Lines • ▼ Show 20 Lines | |||||
def jsonize(value): | def jsonize(value): | ||||
"""Convert a value to a psycopg2 JSON object if necessary""" | """Convert a value to a psycopg2 JSON object if necessary""" | ||||
if isinstance(value, dict): | if isinstance(value, dict): | ||||
return psycopg2.extras.Json(value) | return psycopg2.extras.Json(value) | ||||
return value | return value | ||||
def entry_to_bytes(entry): | def typecast_bytea(value, cur): | ||||
"""Convert an entry coming from the database to bytes""" | if value is not None: | ||||
if isinstance(entry, memoryview): | data = psycopg2.BINARY(value, cur) | ||||
return entry.tobytes() | return data.tobytes() | ||||
if isinstance(entry, list): | |||||
return [entry_to_bytes(value) for value in entry] | |||||
return entry | |||||
def line_to_bytes(line): | |||||
"""Convert a line coming from the database to bytes""" | |||||
if not line: | |||||
return line | |||||
if isinstance(line, dict): | |||||
return {k: entry_to_bytes(v) for k, v in line.items()} | |||||
return line.__class__(entry_to_bytes(entry) for entry in line) | |||||
def cursor_to_bytes(cursor): | |||||
"""Yield all the data from a cursor as bytes""" | |||||
yield from (line_to_bytes(line) for line in cursor) | |||||
def execute_values_to_bytes(*args, **kwargs): | |||||
for line in execute_values_generator(*args, **kwargs): | |||||
yield line_to_bytes(line) | |||||
class BaseDb: | class BaseDb: | ||||
"""Base class for swh.storage.*Db. | """Base class for swh.storage.*Db. | ||||
cf. swh.storage.db.Db, swh.archiver.db.ArchiverDb | cf. swh.storage.db.Db, swh.archiver.db.ArchiverDb | ||||
""" | """ | ||||
@classmethod | @classmethod | ||||
def adapt_conn(cls, conn): | |||||
"""Makes psycopg2 use 'bytes' to decode bytea instead of | |||||
'memoryview', for this connection.""" | |||||
cur = conn.cursor() | |||||
cur.execute("SELECT null::bytea, null::bytea[]") | |||||
bytea_oid = cur.description[0][1] | |||||
bytea_array_oid = cur.description[1][1] | |||||
t_bytes = psycopg2.extensions.new_type( | |||||
(bytea_oid,), "bytea", typecast_bytea) | |||||
psycopg2.extensions.register_type(t_bytes, conn) | |||||
t_bytes_array = psycopg2.extensions.new_array_type( | |||||
(bytea_array_oid,), "bytea[]", t_bytes) | |||||
psycopg2.extensions.register_type(t_bytes_array, conn) | |||||
@classmethod | |||||
def connect(cls, *args, **kwargs): | def connect(cls, *args, **kwargs): | ||||
"""factory method to create a DB proxy | """factory method to create a DB proxy | ||||
Accepts all arguments of psycopg2.connect; only some specific | Accepts all arguments of psycopg2.connect; only some specific | ||||
possibilities are reported below. | possibilities are reported below. | ||||
Args: | Args: | ||||
connstring: libpq2 connection string | connstring: libpq2 connection string | ||||
""" | """ | ||||
conn = psycopg2.connect(*args, **kwargs) | conn = psycopg2.connect(*args, **kwargs) | ||||
cls.adapt_conn(conn) | |||||
return cls(conn) | return cls(conn) | ||||
@classmethod | @classmethod | ||||
def from_pool(cls, pool): | def from_pool(cls, pool): | ||||
return cls(pool.getconn(), pool=pool) | conn = pool.getconn() | ||||
cls.adapt_conn(conn) | |||||
return cls(conn, pool=pool) | |||||
def _cursor(self, cur_arg): | def _cursor(self, cur_arg): | ||||
"""get a cursor: from cur_arg if given, or a fresh one otherwise | """get a cursor: from cur_arg if given, or a fresh one otherwise | ||||
meant to avoid boilerplate if/then/else in methods that proxy stored | meant to avoid boilerplate if/then/else in methods that proxy stored | ||||
procedures | procedures | ||||
""" | """ | ||||
▲ Show 20 Lines • Show All 163 Lines • ▼ Show 20 Lines | class Db(BaseDb): | ||||
skipped_content_keys = [ | skipped_content_keys = [ | ||||
'sha1', 'sha1_git', 'sha256', 'blake2s256', | 'sha1', 'sha1_git', 'sha256', 'blake2s256', | ||||
'length', 'reason', 'status', 'origin'] | 'length', 'reason', 'status', 'origin'] | ||||
def content_get_metadata_from_sha1s(self, sha1s, cur=None): | def content_get_metadata_from_sha1s(self, sha1s, cur=None): | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
yield from execute_values_to_bytes( | yield from execute_values_generator( | ||||
cur, """ | cur, """ | ||||
select t.sha1, %s from (values %%s) as t (sha1) | select t.sha1, %s from (values %%s) as t (sha1) | ||||
left join content using (sha1) | left join content using (sha1) | ||||
""" % ', '.join(self.content_get_metadata_keys[1:]), | """ % ', '.join(self.content_get_metadata_keys[1:]), | ||||
((sha1,) for sha1 in sha1s), | ((sha1,) for sha1 in sha1s), | ||||
) | ) | ||||
def content_get_range(self, start, end, limit=None, cur=None): | def content_get_range(self, start, end, limit=None, cur=None): | ||||
"""Retrieve contents within range [start, end]. | """Retrieve contents within range [start, end]. | ||||
""" | """ | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
query = """select %s from content | query = """select %s from content | ||||
where %%s <= sha1 and sha1 <= %%s | where %%s <= sha1 and sha1 <= %%s | ||||
order by sha1 | order by sha1 | ||||
limit %%s""" % ', '.join(self.content_get_metadata_keys) | limit %%s""" % ', '.join(self.content_get_metadata_keys) | ||||
cur.execute(query, (start, end, limit)) | cur.execute(query, (start, end, limit)) | ||||
yield from cursor_to_bytes(cur) | yield from cur | ||||
content_hash_keys = ['sha1', 'sha1_git', 'sha256', 'blake2s256'] | content_hash_keys = ['sha1', 'sha1_git', 'sha256', 'blake2s256'] | ||||
def content_missing_from_list(self, contents, cur=None): | def content_missing_from_list(self, contents, cur=None): | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
keys = ', '.join(self.content_hash_keys) | keys = ', '.join(self.content_hash_keys) | ||||
equality = ' AND '.join( | equality = ' AND '.join( | ||||
('t.%s = c.%s' % (key, key)) | ('t.%s = c.%s' % (key, key)) | ||||
for key in self.content_hash_keys | for key in self.content_hash_keys | ||||
) | ) | ||||
yield from execute_values_to_bytes( | yield from execute_values_generator( | ||||
cur, """ | cur, """ | ||||
SELECT %s | SELECT %s | ||||
FROM (VALUES %%s) as t(%s) | FROM (VALUES %%s) as t(%s) | ||||
WHERE NOT EXISTS ( | WHERE NOT EXISTS ( | ||||
SELECT 1 FROM content c | SELECT 1 FROM content c | ||||
WHERE %s | WHERE %s | ||||
) | ) | ||||
""" % (keys, keys, equality), | """ % (keys, keys, equality), | ||||
(tuple(c[key] for key in self.content_hash_keys) for c in contents) | (tuple(c[key] for key in self.content_hash_keys) for c in contents) | ||||
) | ) | ||||
def content_missing_per_sha1(self, sha1s, cur=None): | def content_missing_per_sha1(self, sha1s, cur=None): | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
yield from execute_values_to_bytes(cur, """ | yield from execute_values_generator(cur, """ | ||||
SELECT t.sha1 FROM (VALUES %s) AS t(sha1) | SELECT t.sha1 FROM (VALUES %s) AS t(sha1) | ||||
WHERE NOT EXISTS ( | WHERE NOT EXISTS ( | ||||
SELECT 1 FROM content c WHERE c.sha1 = t.sha1 | SELECT 1 FROM content c WHERE c.sha1 = t.sha1 | ||||
)""", ((sha1,) for sha1 in sha1s)) | )""", ((sha1,) for sha1 in sha1s)) | ||||
def skipped_content_missing_from_temp(self, cur=None): | def skipped_content_missing_from_temp(self, cur=None): | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
cur.execute("""SELECT sha1, sha1_git, sha256, blake2s256 | cur.execute("""SELECT sha1, sha1_git, sha256, blake2s256 | ||||
FROM swh_skipped_content_missing()""") | FROM swh_skipped_content_missing()""") | ||||
yield from cursor_to_bytes(cur) | yield from cur | ||||
def snapshot_exists(self, snapshot_id, cur=None): | def snapshot_exists(self, snapshot_id, cur=None): | ||||
"""Check whether a snapshot with the given id exists""" | """Check whether a snapshot with the given id exists""" | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
cur.execute("""SELECT 1 FROM snapshot where id=%s""", (snapshot_id,)) | cur.execute("""SELECT 1 FROM snapshot where id=%s""", (snapshot_id,)) | ||||
return bool(cur.fetchone()) | return bool(cur.fetchone()) | ||||
Show All 10 Lines | class Db(BaseDb): | ||||
def snapshot_count_branches(self, snapshot_id, cur=None): | def snapshot_count_branches(self, snapshot_id, cur=None): | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
query = """\ | query = """\ | ||||
SELECT %s FROM swh_snapshot_count_branches(%%s) | SELECT %s FROM swh_snapshot_count_branches(%%s) | ||||
""" % ', '.join(self.snapshot_count_cols) | """ % ', '.join(self.snapshot_count_cols) | ||||
cur.execute(query, (snapshot_id,)) | cur.execute(query, (snapshot_id,)) | ||||
yield from cursor_to_bytes(cur) | yield from cur | ||||
snapshot_get_cols = ['snapshot_id', 'name', 'target', 'target_type'] | snapshot_get_cols = ['snapshot_id', 'name', 'target', 'target_type'] | ||||
def snapshot_get_by_id(self, snapshot_id, branches_from=b'', | def snapshot_get_by_id(self, snapshot_id, branches_from=b'', | ||||
branches_count=None, target_types=None, | branches_count=None, target_types=None, | ||||
cur=None): | cur=None): | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
query = """\ | query = """\ | ||||
SELECT %s | SELECT %s | ||||
FROM swh_snapshot_get_by_id(%%s, %%s, %%s, %%s :: snapshot_target[]) | FROM swh_snapshot_get_by_id(%%s, %%s, %%s, %%s :: snapshot_target[]) | ||||
""" % ', '.join(self.snapshot_get_cols) | """ % ', '.join(self.snapshot_get_cols) | ||||
cur.execute(query, (snapshot_id, branches_from, branches_count, | cur.execute(query, (snapshot_id, branches_from, branches_count, | ||||
target_types)) | target_types)) | ||||
yield from cursor_to_bytes(cur) | yield from cur | ||||
def snapshot_get_by_origin_visit(self, origin_id, visit_id, cur=None): | def snapshot_get_by_origin_visit(self, origin_id, visit_id, cur=None): | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
query = """\ | query = """\ | ||||
SELECT swh_snapshot_get_by_origin_visit(%s, %s) | SELECT swh_snapshot_get_by_origin_visit(%s, %s) | ||||
""" | """ | ||||
cur.execute(query, (origin_id, visit_id)) | cur.execute(query, (origin_id, visit_id)) | ||||
ret = cur.fetchone() | ret = cur.fetchone() | ||||
if ret: | if ret: | ||||
return line_to_bytes(ret)[0] | return ret[0] | ||||
content_find_cols = ['sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', | content_find_cols = ['sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', | ||||
'ctime', 'status'] | 'ctime', 'status'] | ||||
def content_find(self, sha1=None, sha1_git=None, sha256=None, | def content_find(self, sha1=None, sha1_git=None, sha256=None, | ||||
blake2s256=None, cur=None): | blake2s256=None, cur=None): | ||||
"""Find the content optionally on a combination of the following | """Find the content optionally on a combination of the following | ||||
checksums sha1, sha1_git, sha256 or blake2s256. | checksums sha1, sha1_git, sha256 or blake2s256. | ||||
Show All 10 Lines | def content_find(self, sha1=None, sha1_git=None, sha256=None, | ||||
""" | """ | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
cur.execute("""SELECT %s | cur.execute("""SELECT %s | ||||
FROM swh_content_find(%%s, %%s, %%s, %%s) | FROM swh_content_find(%%s, %%s, %%s, %%s) | ||||
LIMIT 1""" % ','.join(self.content_find_cols), | LIMIT 1""" % ','.join(self.content_find_cols), | ||||
(sha1, sha1_git, sha256, blake2s256)) | (sha1, sha1_git, sha256, blake2s256)) | ||||
content = line_to_bytes(cur.fetchone()) | content = cur.fetchone() | ||||
if set(content) == {None}: | if set(content) == {None}: | ||||
return None | return None | ||||
else: | else: | ||||
return content | return content | ||||
def directory_missing_from_list(self, directories, cur=None): | def directory_missing_from_list(self, directories, cur=None): | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
yield from execute_values_to_bytes( | yield from execute_values_generator( | ||||
cur, """ | cur, """ | ||||
SELECT id FROM (VALUES %s) as t(id) | SELECT id FROM (VALUES %s) as t(id) | ||||
WHERE NOT EXISTS ( | WHERE NOT EXISTS ( | ||||
SELECT 1 FROM directory d WHERE d.id = t.id | SELECT 1 FROM directory d WHERE d.id = t.id | ||||
) | ) | ||||
""", ((id,) for id in directories)) | """, ((id,) for id in directories)) | ||||
directory_ls_cols = ['dir_id', 'type', 'target', 'name', 'perms', | directory_ls_cols = ['dir_id', 'type', 'target', 'name', 'perms', | ||||
'status', 'sha1', 'sha1_git', 'sha256', 'length'] | 'status', 'sha1', 'sha1_git', 'sha256', 'length'] | ||||
def directory_walk_one(self, directory, cur=None): | def directory_walk_one(self, directory, cur=None): | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
cols = ', '.join(self.directory_ls_cols) | cols = ', '.join(self.directory_ls_cols) | ||||
query = 'SELECT %s FROM swh_directory_walk_one(%%s)' % cols | query = 'SELECT %s FROM swh_directory_walk_one(%%s)' % cols | ||||
cur.execute(query, (directory,)) | cur.execute(query, (directory,)) | ||||
yield from cursor_to_bytes(cur) | yield from cur | ||||
def directory_walk(self, directory, cur=None): | def directory_walk(self, directory, cur=None): | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
cols = ', '.join(self.directory_ls_cols) | cols = ', '.join(self.directory_ls_cols) | ||||
query = 'SELECT %s FROM swh_directory_walk(%%s)' % cols | query = 'SELECT %s FROM swh_directory_walk(%%s)' % cols | ||||
cur.execute(query, (directory,)) | cur.execute(query, (directory,)) | ||||
yield from cursor_to_bytes(cur) | yield from cur | ||||
def directory_entry_get_by_path(self, directory, paths, cur=None): | def directory_entry_get_by_path(self, directory, paths, cur=None): | ||||
"""Retrieve a directory entry by path. | """Retrieve a directory entry by path. | ||||
""" | """ | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
cols = ', '.join(self.directory_ls_cols) | cols = ', '.join(self.directory_ls_cols) | ||||
query = ( | query = ( | ||||
'SELECT %s FROM swh_find_directory_entry_by_path(%%s, %%s)' % cols) | 'SELECT %s FROM swh_find_directory_entry_by_path(%%s, %%s)' % cols) | ||||
cur.execute(query, (directory, paths)) | cur.execute(query, (directory, paths)) | ||||
data = cur.fetchone() | data = cur.fetchone() | ||||
if set(data) == {None}: | if set(data) == {None}: | ||||
return None | return None | ||||
return line_to_bytes(data) | return data | ||||
def revision_missing_from_list(self, revisions, cur=None): | def revision_missing_from_list(self, revisions, cur=None): | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
yield from execute_values_to_bytes( | yield from execute_values_generator( | ||||
cur, """ | cur, """ | ||||
SELECT id FROM (VALUES %s) as t(id) | SELECT id FROM (VALUES %s) as t(id) | ||||
WHERE NOT EXISTS ( | WHERE NOT EXISTS ( | ||||
SELECT 1 FROM revision r WHERE r.id = t.id | SELECT 1 FROM revision r WHERE r.id = t.id | ||||
) | ) | ||||
""", ((id,) for id in revisions)) | """, ((id,) for id in revisions)) | ||||
revision_add_cols = [ | revision_add_cols = [ | ||||
▲ Show 20 Lines • Show All 63 Lines • ▼ Show 20 Lines | def origin_visit_get_all(self, origin_id, | ||||
WHERE origin=%%s %s | WHERE origin=%%s %s | ||||
order by visit asc | order by visit asc | ||||
limit %%s""" % ( | limit %%s""" % ( | ||||
', '.join(self.origin_visit_get_cols[:-1]), extra_condition | ', '.join(self.origin_visit_get_cols[:-1]), extra_condition | ||||
) | ) | ||||
cur.execute(query, args) | cur.execute(query, args) | ||||
yield from cursor_to_bytes(cur) | yield from cur | ||||
def origin_visit_get(self, origin_id, visit_id, cur=None): | def origin_visit_get(self, origin_id, visit_id, cur=None): | ||||
"""Retrieve information on visit visit_id of origin origin_id. | """Retrieve information on visit visit_id of origin origin_id. | ||||
Args: | Args: | ||||
origin_id: the origin concerned | origin_id: the origin concerned | ||||
visit_id: The visit step for that origin | visit_id: The visit step for that origin | ||||
Show All 10 Lines | def origin_visit_get(self, origin_id, visit_id, cur=None): | ||||
FROM origin_visit | FROM origin_visit | ||||
WHERE origin = %%s AND visit = %%s | WHERE origin = %%s AND visit = %%s | ||||
""" % (', '.join(self.origin_visit_get_cols[:-1])) | """ % (', '.join(self.origin_visit_get_cols[:-1])) | ||||
cur.execute(query, (origin_id, visit_id)) | cur.execute(query, (origin_id, visit_id)) | ||||
r = cur.fetchall() | r = cur.fetchall() | ||||
if not r: | if not r: | ||||
return None | return None | ||||
return line_to_bytes(r[0]) | return r[0] | ||||
def origin_visit_exists(self, origin_id, visit_id, cur=None): | def origin_visit_exists(self, origin_id, visit_id, cur=None): | ||||
"""Check whether an origin visit with the given ids exists""" | """Check whether an origin visit with the given ids exists""" | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
query = "SELECT 1 FROM origin_visit where origin = %s AND visit = %s" | query = "SELECT 1 FROM origin_visit where origin = %s AND visit = %s" | ||||
cur.execute(query, (origin_id, visit_id)) | cur.execute(query, (origin_id, visit_id)) | ||||
Show All 29 Lines | def origin_visit_get_latest_snapshot(self, origin_id, | ||||
ORDER BY date DESC, visit DESC | ORDER BY date DESC, visit DESC | ||||
LIMIT 1 | LIMIT 1 | ||||
""" % (', '.join(self.origin_visit_get_cols[:-1]), extra_clause) | """ % (', '.join(self.origin_visit_get_cols[:-1]), extra_clause) | ||||
cur.execute(query, (origin_id,)) | cur.execute(query, (origin_id,)) | ||||
r = cur.fetchone() | r = cur.fetchone() | ||||
if not r: | if not r: | ||||
return None | return None | ||||
return line_to_bytes(r) | return r | ||||
@staticmethod | @staticmethod | ||||
def mangle_query_key(key, main_table): | def mangle_query_key(key, main_table): | ||||
if key == 'id': | if key == 'id': | ||||
return 't.id' | return 't.id' | ||||
if key == 'parents': | if key == 'parents': | ||||
return ''' | return ''' | ||||
ARRAY( | ARRAY( | ||||
Show All 15 Lines | class Db(BaseDb): | ||||
def revision_get_from_list(self, revisions, cur=None): | def revision_get_from_list(self, revisions, cur=None): | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
query_keys = ', '.join( | query_keys = ', '.join( | ||||
self.mangle_query_key(k, 'revision') | self.mangle_query_key(k, 'revision') | ||||
for k in self.revision_get_cols | for k in self.revision_get_cols | ||||
) | ) | ||||
yield from execute_values_to_bytes( | yield from execute_values_generator( | ||||
cur, """ | cur, """ | ||||
SELECT %s FROM (VALUES %%s) as t(id) | SELECT %s FROM (VALUES %%s) as t(id) | ||||
LEFT JOIN revision ON t.id = revision.id | LEFT JOIN revision ON t.id = revision.id | ||||
LEFT JOIN person author ON revision.author = author.id | LEFT JOIN person author ON revision.author = author.id | ||||
LEFT JOIN person committer ON revision.committer = committer.id | LEFT JOIN person committer ON revision.committer = committer.id | ||||
""" % query_keys, | """ % query_keys, | ||||
((id,) for id in revisions)) | ((id,) for id in revisions)) | ||||
def revision_log(self, root_revisions, limit=None, cur=None): | def revision_log(self, root_revisions, limit=None, cur=None): | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
query = """SELECT %s | query = """SELECT %s | ||||
FROM swh_revision_log(%%s, %%s) | FROM swh_revision_log(%%s, %%s) | ||||
""" % ', '.join(self.revision_get_cols) | """ % ', '.join(self.revision_get_cols) | ||||
cur.execute(query, (root_revisions, limit)) | cur.execute(query, (root_revisions, limit)) | ||||
yield from cursor_to_bytes(cur) | yield from cur | ||||
revision_shortlog_cols = ['id', 'parents'] | revision_shortlog_cols = ['id', 'parents'] | ||||
def revision_shortlog(self, root_revisions, limit=None, cur=None): | def revision_shortlog(self, root_revisions, limit=None, cur=None): | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
query = """SELECT %s | query = """SELECT %s | ||||
FROM swh_revision_list(%%s, %%s) | FROM swh_revision_list(%%s, %%s) | ||||
""" % ', '.join(self.revision_shortlog_cols) | """ % ', '.join(self.revision_shortlog_cols) | ||||
cur.execute(query, (root_revisions, limit)) | cur.execute(query, (root_revisions, limit)) | ||||
yield from cursor_to_bytes(cur) | yield from cur | ||||
def release_missing_from_list(self, releases, cur=None): | def release_missing_from_list(self, releases, cur=None): | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
yield from execute_values_to_bytes( | yield from execute_values_generator( | ||||
cur, """ | cur, """ | ||||
SELECT id FROM (VALUES %s) as t(id) | SELECT id FROM (VALUES %s) as t(id) | ||||
WHERE NOT EXISTS ( | WHERE NOT EXISTS ( | ||||
SELECT 1 FROM release r WHERE r.id = t.id | SELECT 1 FROM release r WHERE r.id = t.id | ||||
) | ) | ||||
""", ((id,) for id in releases)) | """, ((id,) for id in releases)) | ||||
object_find_by_sha1_git_cols = ['sha1_git', 'type', 'id', 'object_id'] | object_find_by_sha1_git_cols = ['sha1_git', 'type', 'id', 'object_id'] | ||||
def object_find_by_sha1_git(self, ids, cur=None): | def object_find_by_sha1_git(self, ids, cur=None): | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
yield from execute_values_to_bytes( | yield from execute_values_generator( | ||||
cur, """ | cur, """ | ||||
WITH t (id) AS (VALUES %s), | WITH t (id) AS (VALUES %s), | ||||
known_objects as (( | known_objects as (( | ||||
select | select | ||||
id as sha1_git, | id as sha1_git, | ||||
'release'::object_type as type, | 'release'::object_type as type, | ||||
id, | id, | ||||
object_id | object_id | ||||
▲ Show 20 Lines • Show All 99 Lines • ▼ Show 20 Lines | def origin_get_with(self, type, url, cur=None): | ||||
query = """SELECT %s | query = """SELECT %s | ||||
FROM origin | FROM origin | ||||
WHERE type=%%s AND url=%%s | WHERE type=%%s AND url=%%s | ||||
""" % ','.join(self.origin_cols) | """ % ','.join(self.origin_cols) | ||||
cur.execute(query, (type, url)) | cur.execute(query, (type, url)) | ||||
data = cur.fetchone() | data = cur.fetchone() | ||||
if data: | if data: | ||||
return line_to_bytes(data) | return data | ||||
return None | return None | ||||
def origin_get(self, id, cur=None): | def origin_get(self, id, cur=None): | ||||
"""Retrieve the origin per its identifier. | """Retrieve the origin per its identifier. | ||||
""" | """ | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
query = """SELECT %s | query = """SELECT %s | ||||
FROM origin WHERE id=%%s | FROM origin WHERE id=%%s | ||||
""" % ','.join(self.origin_cols) | """ % ','.join(self.origin_cols) | ||||
cur.execute(query, (id,)) | cur.execute(query, (id,)) | ||||
data = cur.fetchone() | data = cur.fetchone() | ||||
if data: | if data: | ||||
return line_to_bytes(data) | return data | ||||
return None | return None | ||||
ardumont: can't we just return the `fetchone()`'s result now?
```
return cur.fetchone()
``` | |||||
def origin_search(self, url_pattern, offset=0, limit=50, | def origin_search(self, url_pattern, offset=0, limit=50, | ||||
regexp=False, with_visit=False, cur=None): | regexp=False, with_visit=False, cur=None): | ||||
"""Search for origins whose urls contain a provided string pattern | """Search for origins whose urls contain a provided string pattern | ||||
or match a provided regular expression. | or match a provided regular expression. | ||||
The search is performed in a case insensitive way. | The search is performed in a case insensitive way. | ||||
Args: | Args: | ||||
Show All 23 Lines | def origin_search(self, url_pattern, offset=0, limit=50, | ||||
if not regexp: | if not regexp: | ||||
query = query % (origin_cols, 'ILIKE') | query = query % (origin_cols, 'ILIKE') | ||||
query_params = ('%'+url_pattern+'%', offset, limit) | query_params = ('%'+url_pattern+'%', offset, limit) | ||||
else: | else: | ||||
query = query % (origin_cols, '~*') | query = query % (origin_cols, '~*') | ||||
query_params = (url_pattern, offset, limit) | query_params = (url_pattern, offset, limit) | ||||
cur.execute(query, query_params) | cur.execute(query, query_params) | ||||
yield from cursor_to_bytes(cur) | yield from cur | ||||
person_cols = ['fullname', 'name', 'email'] | person_cols = ['fullname', 'name', 'email'] | ||||
person_get_cols = person_cols + ['id'] | person_get_cols = person_cols + ['id'] | ||||
def person_get(self, ids, cur=None): | def person_get(self, ids, cur=None): | ||||
"""Retrieve the persons identified by the list of ids. | """Retrieve the persons identified by the list of ids. | ||||
""" | """ | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
query = """SELECT %s | query = """SELECT %s | ||||
FROM person | FROM person | ||||
WHERE id IN %%s""" % ', '.join(self.person_get_cols) | WHERE id IN %%s""" % ', '.join(self.person_get_cols) | ||||
cur.execute(query, (tuple(ids),)) | cur.execute(query, (tuple(ids),)) | ||||
yield from cursor_to_bytes(cur) | yield from cur | ||||
release_add_cols = [ | release_add_cols = [ | ||||
'id', 'target', 'target_type', 'date', 'date_offset', | 'id', 'target', 'target_type', 'date', 'date_offset', | ||||
'date_neg_utc_offset', 'name', 'comment', 'synthetic', | 'date_neg_utc_offset', 'name', 'comment', 'synthetic', | ||||
'author_fullname', 'author_name', 'author_email', | 'author_fullname', 'author_name', 'author_email', | ||||
] | ] | ||||
release_get_cols = release_add_cols + ['author_id'] | release_get_cols = release_add_cols + ['author_id'] | ||||
def release_get_from_list(self, releases, cur=None): | def release_get_from_list(self, releases, cur=None): | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
query_keys = ', '.join( | query_keys = ', '.join( | ||||
self.mangle_query_key(k, 'release') | self.mangle_query_key(k, 'release') | ||||
for k in self.release_get_cols | for k in self.release_get_cols | ||||
) | ) | ||||
yield from execute_values_to_bytes( | yield from execute_values_generator( | ||||
cur, """ | cur, """ | ||||
SELECT %s FROM (VALUES %%s) as t(id) | SELECT %s FROM (VALUES %%s) as t(id) | ||||
LEFT JOIN release ON t.id = release.id | LEFT JOIN release ON t.id = release.id | ||||
LEFT JOIN person author ON release.author = author.id | LEFT JOIN person author ON release.author = author.id | ||||
""" % query_keys, | """ % query_keys, | ||||
((id,) for id in releases)) | ((id,) for id in releases)) | ||||
def origin_metadata_add(self, origin, ts, provider, tool, | def origin_metadata_add(self, origin, ts, provider, tool, | ||||
▲ Show 20 Lines • Show All 41 Lines • ▼ Show 20 Lines | def origin_metadata_get_by(self, origin_id, provider_type=None, cur=None): | ||||
else: | else: | ||||
query = '''SELECT %s | query = '''SELECT %s | ||||
FROM swh_origin_metadata_get_by_provider_type( | FROM swh_origin_metadata_get_by_provider_type( | ||||
%%s, %%s)''' % (','.join( | %%s, %%s)''' % (','.join( | ||||
self.origin_metadata_get_cols)) | self.origin_metadata_get_cols)) | ||||
cur.execute(query, (origin_id, provider_type)) | cur.execute(query, (origin_id, provider_type)) | ||||
yield from cursor_to_bytes(cur) | yield from cur | ||||
tool_cols = ['id', 'name', 'version', 'configuration'] | tool_cols = ['id', 'name', 'version', 'configuration'] | ||||
@stored_procedure('swh_mktemp_tool') | @stored_procedure('swh_mktemp_tool') | ||||
def mktemp_tool(self, cur=None): | def mktemp_tool(self, cur=None): | ||||
pass | pass | ||||
def tool_add_from_temp(self, cur=None): | def tool_add_from_temp(self, cur=None): | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
cur.execute("SELECT %s from swh_tool_add()" % ( | cur.execute("SELECT %s from swh_tool_add()" % ( | ||||
','.join(self.tool_cols), )) | ','.join(self.tool_cols), )) | ||||
yield from cursor_to_bytes(cur) | yield from cur | ||||
def tool_get(self, name, version, configuration, cur=None): | def tool_get(self, name, version, configuration, cur=None): | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
cur.execute('''select %s | cur.execute('''select %s | ||||
from tool | from tool | ||||
where name=%%s and | where name=%%s and | ||||
version=%%s and | version=%%s and | ||||
configuration=%%s''' % ( | configuration=%%s''' % ( | ||||
','.join(self.tool_cols)), | ','.join(self.tool_cols)), | ||||
(name, version, configuration)) | (name, version, configuration)) | ||||
data = cur.fetchone() | data = cur.fetchone() | ||||
if not data: | if not data: | ||||
return None | return None | ||||
return line_to_bytes(data) | return data | ||||
metadata_provider_cols = ['id', 'provider_name', 'provider_type', | metadata_provider_cols = ['id', 'provider_name', 'provider_type', | ||||
'provider_url', 'metadata'] | 'provider_url', 'metadata'] | ||||
def metadata_provider_add(self, provider_name, provider_type, | def metadata_provider_add(self, provider_name, provider_type, | ||||
provider_url, metadata, cur=None): | provider_url, metadata, cur=None): | ||||
"""Insert a new provider and return the new identifier.""" | """Insert a new provider and return the new identifier.""" | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
Show All 11 Lines | def metadata_provider_get(self, provider_id, cur=None): | ||||
from metadata_provider | from metadata_provider | ||||
where id=%%s ''' % ( | where id=%%s ''' % ( | ||||
','.join(self.metadata_provider_cols)), | ','.join(self.metadata_provider_cols)), | ||||
(provider_id, )) | (provider_id, )) | ||||
data = cur.fetchone() | data = cur.fetchone() | ||||
if not data: | if not data: | ||||
return None | return None | ||||
return line_to_bytes(data) | return data | ||||
def metadata_provider_get_by(self, provider_name, provider_url, | def metadata_provider_get_by(self, provider_name, provider_url, | ||||
cur=None): | cur=None): | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
cur.execute('''select %s | cur.execute('''select %s | ||||
from metadata_provider | from metadata_provider | ||||
where provider_name=%%s and | where provider_name=%%s and | ||||
provider_url=%%s''' % ( | provider_url=%%s''' % ( | ||||
','.join(self.metadata_provider_cols)), | ','.join(self.metadata_provider_cols)), | ||||
(provider_name, provider_url)) | (provider_name, provider_url)) | ||||
data = cur.fetchone() | data = cur.fetchone() | ||||
if not data: | if not data: | ||||
return None | return None | ||||
return line_to_bytes(data) | return data |
can't we just return the fetchone()'s result now?