Changeset View
Standalone View
swh/provenance/postgresql/archive.py
from typing import Any, Dict, Iterable, List, Set | from typing import Any, Dict, Iterable, List | ||||||||||||||||||||||||
from methodtools import lru_cache | from methodtools import lru_cache | ||||||||||||||||||||||||
import psycopg2 | import psycopg2 | ||||||||||||||||||||||||
from swh.model.model import ObjectType, Revision, Sha1Git, TargetType | from swh.model.model import Sha1Git | ||||||||||||||||||||||||
from swh.storage.postgresql.storage import Storage | from swh.storage.postgresql.storage import Storage | ||||||||||||||||||||||||
class ArchivePostgreSQL: | class ArchivePostgreSQL: | ||||||||||||||||||||||||
def __init__(self, conn: psycopg2.extensions.connection): | def __init__(self, conn: psycopg2.extensions.connection): | ||||||||||||||||||||||||
self.conn = conn | self.conn = conn | ||||||||||||||||||||||||
self.storage = Storage(conn, objstorage={"cls": "memory"}) | self.storage = Storage(conn, objstorage={"cls": "memory"}) | ||||||||||||||||||||||||
def directory_ls(self, id: Sha1Git) -> Iterable[Dict[str, Any]]: | def directory_ls(self, id: Sha1Git) -> Iterable[Dict[str, Any]]: | ||||||||||||||||||||||||
# TODO: only call directory_ls_internal if the id is not being queried by | |||||||||||||||||||||||||
# someone else. Otherwise wait until results get properly cached. | |||||||||||||||||||||||||
entries = self.directory_ls_internal(id) | entries = self.directory_ls_internal(id) | ||||||||||||||||||||||||
yield from entries | yield from entries | ||||||||||||||||||||||||
@lru_cache(maxsize=100000) | @lru_cache(maxsize=100000) | ||||||||||||||||||||||||
def directory_ls_internal(self, id: Sha1Git) -> List[Dict[str, Any]]: | def directory_ls_internal(self, id: Sha1Git) -> List[Dict[str, Any]]: | ||||||||||||||||||||||||
# TODO: add file size filtering | # TODO: add file size filtering | ||||||||||||||||||||||||
with self.conn.cursor() as cursor: | with self.conn.cursor() as cursor: | ||||||||||||||||||||||||
cursor.execute( | cursor.execute( | ||||||||||||||||||||||||
"""WITH | """ | ||||||||||||||||||||||||
WITH | |||||||||||||||||||||||||
dir AS (SELECT id AS dir_id, dir_entries, file_entries, rev_entries | dir AS (SELECT id AS dir_id, dir_entries, file_entries, rev_entries | ||||||||||||||||||||||||
FROM directory WHERE id=%s), | FROM directory WHERE id=%s), | ||||||||||||||||||||||||
ls_d AS (SELECT dir_id, UNNEST(dir_entries) AS entry_id FROM dir), | ls_d AS (SELECT dir_id, UNNEST(dir_entries) AS entry_id FROM dir), | ||||||||||||||||||||||||
ls_f AS (SELECT dir_id, UNNEST(file_entries) AS entry_id FROM dir), | ls_f AS (SELECT dir_id, UNNEST(file_entries) AS entry_id FROM dir), | ||||||||||||||||||||||||
ls_r AS (SELECT dir_id, UNNEST(rev_entries) AS entry_id FROM dir) | ls_r AS (SELECT dir_id, UNNEST(rev_entries) AS entry_id FROM dir) | ||||||||||||||||||||||||
(SELECT 'dir'::directory_entry_type AS type, e.target, e.name, | (SELECT 'dir'::directory_entry_type AS type, e.target, e.name, | ||||||||||||||||||||||||
NULL::sha1_git | NULL::sha1_git | ||||||||||||||||||||||||
FROM ls_d | FROM ls_d | ||||||||||||||||||||||||
LEFT JOIN directory_entry_dir e ON ls_d.entry_id=e.id) | LEFT JOIN directory_entry_dir e ON ls_d.entry_id=e.id) | ||||||||||||||||||||||||
UNION | UNION | ||||||||||||||||||||||||
(WITH known_contents AS | (WITH known_contents AS | ||||||||||||||||||||||||
(SELECT 'file'::directory_entry_type AS type, e.target, e.name, | (SELECT 'file'::directory_entry_type AS type, e.target, e.name, | ||||||||||||||||||||||||
c.sha1_git | c.sha1_git | ||||||||||||||||||||||||
FROM ls_f | FROM ls_f | ||||||||||||||||||||||||
LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id | LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id | ||||||||||||||||||||||||
INNER JOIN content c ON e.target=c.sha1_git) | INNER JOIN content c ON e.target=c.sha1_git) | ||||||||||||||||||||||||
SELECT * FROM known_contents | SELECT * FROM known_contents | ||||||||||||||||||||||||
UNION | UNION | ||||||||||||||||||||||||
(SELECT 'file'::directory_entry_type AS type, e.target, e.name, | (SELECT 'file'::directory_entry_type AS type, e.target, e.name, | ||||||||||||||||||||||||
c.sha1_git | c.sha1_git | ||||||||||||||||||||||||
FROM ls_f | FROM ls_f | ||||||||||||||||||||||||
LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id | LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id | ||||||||||||||||||||||||
LEFT JOIN skipped_content c ON e.target=c.sha1_git | LEFT JOIN skipped_content c ON e.target=c.sha1_git | ||||||||||||||||||||||||
WHERE NOT EXISTS ( | WHERE NOT EXISTS ( | ||||||||||||||||||||||||
SELECT 1 FROM known_contents | SELECT 1 FROM known_contents | ||||||||||||||||||||||||
WHERE known_contents.sha1_git=e.target | WHERE known_contents.sha1_git=e.target | ||||||||||||||||||||||||
) | ) | ||||||||||||||||||||||||
) | ) | ||||||||||||||||||||||||
) | ) | ||||||||||||||||||||||||
ORDER BY name | |||||||||||||||||||||||||
douardda: why is this removed? | |||||||||||||||||||||||||
Done Inline Actionsthere's no need to guaranty any order when iterating the contents of a directory aeviso: there's no need to guaranty any order when iterating the contents of a directory | |||||||||||||||||||||||||
""", | """, | ||||||||||||||||||||||||
(id,), | (id,), | ||||||||||||||||||||||||
) | ) | ||||||||||||||||||||||||
return [ | return [ | ||||||||||||||||||||||||
{"type": row[0], "target": row[1], "name": row[2]} | {"type": row[0], "target": row[1], "name": row[2]} | ||||||||||||||||||||||||
for row in cursor.fetchall() | for row in cursor.fetchall() | ||||||||||||||||||||||||
] | ] | ||||||||||||||||||||||||
def revision_get(self, ids: Iterable[Sha1Git]) -> Iterable[Revision]: | def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]: | ||||||||||||||||||||||||
with self.conn.cursor() as cursor: | with self.conn.cursor() as cursor: | ||||||||||||||||||||||||
psycopg2.extras.execute_values( | cursor.execute( | ||||||||||||||||||||||||
cursor, | |||||||||||||||||||||||||
""" | """ | ||||||||||||||||||||||||
SELECT t.id, revision.date, revision.directory, | SELECT RH.parent_id::bytea | ||||||||||||||||||||||||
ARRAY( | FROM revision_history AS RH | ||||||||||||||||||||||||
SELECT rh.parent_id::bytea | WHERE RH.id=%s | ||||||||||||||||||||||||
FROM revision_history rh | ORDER BY RH.parent_rank | ||||||||||||||||||||||||
Done Inline Actionssince you do only return elements from this array for one revision only, I don't think you need to recreate the ARRAY structure. douardda: since you do only return elements from this array for one revision only, I don't think you need… | |||||||||||||||||||||||||
Done Inline Actions
since you do only return elements from this array for one revision only, I don't think you need to recreate the ARRAY structure. If using suggested SQL code, then just yield from returned rows. douardda: since you do only return elements from this array for one revision only, I don't think you need… | |||||||||||||||||||||||||
Done Inline ActionsI agree the query can be further simplified, but wouldn't row be a tuple with a single Sha1Git element? I guess I still need to yield row[0] aeviso: I agree the query can be further simplified, but wouldn't `row` be a tuple with a single… | |||||||||||||||||||||||||
Done Inline Actionsyes but the idea is to prevent sending unused data on the wire (pg connection). douardda: yes but the idea is to prevent sending unused data on the wire (pg connection). | |||||||||||||||||||||||||
WHERE rh.id = t.id | |||||||||||||||||||||||||
ORDER BY rh.parent_rank | |||||||||||||||||||||||||
) | |||||||||||||||||||||||||
FROM (VALUES %s) as t(sortkey, id) | |||||||||||||||||||||||||
LEFT JOIN revision ON t.id = revision.id | |||||||||||||||||||||||||
LEFT JOIN person author ON revision.author = author.id | |||||||||||||||||||||||||
LEFT JOIN person committer ON revision.committer = committer.id | |||||||||||||||||||||||||
ORDER BY sortkey | |||||||||||||||||||||||||
""", | """, | ||||||||||||||||||||||||
((sortkey, id) for sortkey, id in enumerate(ids)), | (id,), | ||||||||||||||||||||||||
) | |||||||||||||||||||||||||
for row in cursor.fetchall(): | |||||||||||||||||||||||||
parents = [] | |||||||||||||||||||||||||
for parent in row[3]: | |||||||||||||||||||||||||
if parent: | |||||||||||||||||||||||||
parents.append(parent) | |||||||||||||||||||||||||
yield Revision.from_dict( | |||||||||||||||||||||||||
{ | |||||||||||||||||||||||||
"id": row[0], | |||||||||||||||||||||||||
"date": row[1], | |||||||||||||||||||||||||
"directory": row[2], | |||||||||||||||||||||||||
"parents": tuple(parents), | |||||||||||||||||||||||||
} | |||||||||||||||||||||||||
) | ) | ||||||||||||||||||||||||
# There should be at most one row anyway | |||||||||||||||||||||||||
yield from (row[0] for row in cursor.fetchall()) | |||||||||||||||||||||||||
def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: | def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: | ||||||||||||||||||||||||
# TODO: this code is duplicated here (same as in swh.provenance.storage.archive) | with self.conn.cursor() as cursor: | ||||||||||||||||||||||||
# but it's just temporary. This method should actually perform a direct query to | cursor.execute( | ||||||||||||||||||||||||
# the SQL db of the archive. | """ | ||||||||||||||||||||||||
from swh.core.utils import grouper | WITH S AS (SELECT object_id FROM snapshot WHERE snapshot.id=%s) | ||||||||||||||||||||||||
from swh.storage.algos.snapshot import snapshot_get_all_branches | (SELECT B.target AS head | ||||||||||||||||||||||||
FROM S | |||||||||||||||||||||||||
snapshot = snapshot_get_all_branches(self.storage, id) | JOIN snapshot_branches AS BS ON (S.object_id=BS.snapshot_id) | ||||||||||||||||||||||||
assert snapshot is not None | JOIN snapshot_branch AS B ON (BS.branch_id=B.object_id) | ||||||||||||||||||||||||
WHERE B.target_type='revision'::snapshot_target) | |||||||||||||||||||||||||
targets_set = set() | UNION | ||||||||||||||||||||||||
releases_set = set() | (SELECT R.target AS head | ||||||||||||||||||||||||
if snapshot is not None: | FROM S | ||||||||||||||||||||||||
for branch in snapshot.branches: | JOIN snapshot_branches AS BS ON (S.object_id=BS.snapshot_id) | ||||||||||||||||||||||||
if snapshot.branches[branch].target_type == TargetType.REVISION: | JOIN snapshot_branch AS B ON (BS.branch_id=B.object_id) | ||||||||||||||||||||||||
targets_set.add(snapshot.branches[branch].target) | JOIN release AS R ON (B.target=R.id) | ||||||||||||||||||||||||
elif snapshot.branches[branch].target_type == TargetType.RELEASE: | WHERE B.target_type='release'::snapshot_target | ||||||||||||||||||||||||
Done Inline Actions
An OR should do the trick here douardda: An `OR` should do the trick here | |||||||||||||||||||||||||
Done Inline ActionsOK, I'll try it aeviso: OK, I'll try it | |||||||||||||||||||||||||
Done Inline ActionsSorry, I don't see how these queries are equivalent. There is a missing JOIN in the proposed one aeviso: Sorry, I don't see how these queries are equivalent. There is a missing `JOIN` in the proposed… | |||||||||||||||||||||||||
Done Inline Actionsyou're right, I missed that. You can probably just pack the missing join in there, I don't think it would hurt the query performances (to be checked), so something like: WITH S AS (SELECT object_id FROM snapshot WHERE snapshot.id=%s ) SELECT B.target AS head FROM S JOIN snapshot_branches AS BS ON (S.object_id=BS.snapshot_id) JOIN snapshot_branch AS B ON (BS.branch_id=B.object_id) LEFT JOIN release AS R ON (B.target=R.id) WHERE B.target_type='release'::snapshot_target AND R.target_type='revision'::object_type; douardda: you're right, I missed that. You can probably just pack the missing join in there, I don't… | |||||||||||||||||||||||||
Done Inline ActionsI rather keep the current query for now aeviso: I rather keep the current query for now | |||||||||||||||||||||||||
releases_set.add(snapshot.branches[branch].target) | AND R.target_type='revision'::object_type) | ||||||||||||||||||||||||
""", | |||||||||||||||||||||||||
batchsize = 100 | (id,), | ||||||||||||||||||||||||
for releases in grouper(releases_set, batchsize): | |||||||||||||||||||||||||
targets_set.update( | |||||||||||||||||||||||||
release.target | |||||||||||||||||||||||||
for release in self.storage.release_get(releases) | |||||||||||||||||||||||||
if release is not None and release.target_type == ObjectType.REVISION | |||||||||||||||||||||||||
) | |||||||||||||||||||||||||
revisions: Set[Sha1Git] = set() | |||||||||||||||||||||||||
for targets in grouper(targets_set, batchsize): | |||||||||||||||||||||||||
revisions.update( | |||||||||||||||||||||||||
revision.id | |||||||||||||||||||||||||
for revision in self.storage.revision_get(targets) | |||||||||||||||||||||||||
if revision is not None | |||||||||||||||||||||||||
) | ) | ||||||||||||||||||||||||
heads = [row[0] for row in cursor.fetchall()] | |||||||||||||||||||||||||
Done Inline Actionsprint statement douardda: print statement | |||||||||||||||||||||||||
yield from revisions | yield from heads |
why is this removed?