Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/postgresql/archive.py
Show All 16 Lines | |||||
class ArchivePostgreSQL: | class ArchivePostgreSQL: | ||||
def __init__(self, conn: psycopg2.extensions.connection) -> None: | def __init__(self, conn: psycopg2.extensions.connection) -> None: | ||||
self.storage = get_storage( | self.storage = get_storage( | ||||
"postgresql", db=conn.dsn, objstorage={"cls": "memory"} | "postgresql", db=conn.dsn, objstorage={"cls": "memory"} | ||||
) | ) | ||||
self.conn = conn | self.conn = conn | ||||
def directory_ls(self, id: Sha1Git) -> Iterable[Dict[str, Any]]: | def directory_ls(self, id: Sha1Git, minsize: int = 0) -> Iterable[Dict[str, Any]]: | ||||
entries = self._directory_ls(id) | entries = self._directory_ls(id, minsize=minsize) | ||||
yield from entries | yield from entries | ||||
@lru_cache(maxsize=100000) | @lru_cache(maxsize=100000) | ||||
@statsd.timed(metric=ARCHIVE_DURATION_METRIC, tags={"method": "directory_ls"}) | @statsd.timed(metric=ARCHIVE_DURATION_METRIC, tags={"method": "directory_ls"}) | ||||
def _directory_ls(self, id: Sha1Git) -> List[Dict[str, Any]]: | def _directory_ls(self, id: Sha1Git, minsize: int = 0) -> List[Dict[str, Any]]: | ||||
# TODO: add file size filtering | |||||
with self.conn.cursor() as cursor: | with self.conn.cursor() as cursor: | ||||
cursor.execute( | cursor.execute( | ||||
""" | """ | ||||
WITH | WITH | ||||
dir AS (SELECT id AS dir_id, dir_entries, file_entries, rev_entries | dir AS (SELECT id AS dir_id, dir_entries, file_entries, rev_entries | ||||
FROM directory WHERE id=%s), | FROM directory WHERE id=%s), | ||||
ls_d AS (SELECT dir_id, UNNEST(dir_entries) AS entry_id FROM dir), | ls_d AS (SELECT dir_id, UNNEST(dir_entries) AS entry_id FROM dir), | ||||
ls_f AS (SELECT dir_id, UNNEST(file_entries) AS entry_id FROM dir), | ls_f AS (SELECT dir_id, UNNEST(file_entries) AS entry_id FROM dir), | ||||
ls_r AS (SELECT dir_id, UNNEST(rev_entries) AS entry_id FROM dir) | ls_r AS (SELECT dir_id, UNNEST(rev_entries) AS entry_id FROM dir) | ||||
(SELECT 'dir'::directory_entry_type AS type, e.target, e.name, | (SELECT 'dir'::directory_entry_type AS type, e.target, e.name, | ||||
NULL::sha1_git | NULL::sha1_git | ||||
FROM ls_d | FROM ls_d | ||||
LEFT JOIN directory_entry_dir e ON ls_d.entry_id=e.id) | LEFT JOIN directory_entry_dir e ON ls_d.entry_id=e.id) | ||||
UNION | UNION | ||||
(WITH known_contents AS | (WITH known_contents AS | ||||
(SELECT 'file'::directory_entry_type AS type, e.target, e.name, | (SELECT 'file'::directory_entry_type AS type, e.target, e.name, | ||||
c.sha1_git | c.sha1_git | ||||
FROM ls_f | FROM ls_f | ||||
LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id | LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id | ||||
INNER JOIN content c ON e.target=c.sha1_git) | INNER JOIN content c ON e.target=c.sha1_git | ||||
WHERE c.length >= %s | |||||
) | |||||
SELECT * FROM known_contents | SELECT * FROM known_contents | ||||
UNION | UNION | ||||
(SELECT 'file'::directory_entry_type AS type, e.target, e.name, | (SELECT 'file'::directory_entry_type AS type, e.target, e.name, | ||||
c.sha1_git | c.sha1_git | ||||
FROM ls_f | FROM ls_f | ||||
LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id | LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id | ||||
LEFT JOIN skipped_content c ON e.target=c.sha1_git | LEFT JOIN skipped_content c ON e.target=c.sha1_git | ||||
WHERE NOT EXISTS ( | WHERE NOT EXISTS ( | ||||
SELECT 1 FROM known_contents | SELECT 1 FROM known_contents | ||||
WHERE known_contents.sha1_git=e.target | WHERE known_contents.sha1_git=e.target | ||||
) | ) | ||||
AND c.length >= %s | |||||
) | ) | ||||
) | ) | ||||
""", | """, | ||||
(id,), | (id, minsize, minsize), | ||||
) | ) | ||||
return [ | return [ | ||||
{"type": row[0], "target": row[1], "name": row[2]} for row in cursor | {"type": row[0], "target": row[1], "name": row[2]} for row in cursor | ||||
] | ] | ||||
@statsd.timed( | @statsd.timed( | ||||
metric=ARCHIVE_DURATION_METRIC, tags={"method": "revision_get_parents"} | metric=ARCHIVE_DURATION_METRIC, tags={"method": "revision_get_parents"} | ||||
) | ) | ||||
def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]: | def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]: | ||||
with self.conn.cursor() as cursor: | with self.conn.cursor() as cursor: | ||||
cursor.execute( | cursor.execute( | ||||
""" | """ | ||||
SELECT RH.parent_id::bytea | SELECT RH.parent_id::bytea | ||||
FROM revision_history AS RH | FROM revision_history AS RH | ||||
WHERE RH.id=%s | WHERE RH.id=%s | ||||
ORDER BY RH.parent_rank | ORDER BY RH.parent_rank | ||||
""", | """, | ||||
(id,), | (id,), | ||||
) | ) | ||||
# There should be at most one row anyway | |||||
yield from (row[0] for row in cursor) | yield from (row[0] for row in cursor) | ||||
@statsd.timed(metric=ARCHIVE_DURATION_METRIC, tags={"method": "snapshot_get_heads"}) | @statsd.timed(metric=ARCHIVE_DURATION_METRIC, tags={"method": "snapshot_get_heads"}) | ||||
def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: | def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: | ||||
with self.conn.cursor() as cursor: | with self.conn.cursor() as cursor: | ||||
cursor.execute( | cursor.execute( | ||||
""" | """ | ||||
WITH | WITH | ||||
Show All 29 Lines |