Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/postgresql/provenance.py
Show First 20 Lines • Show All 55 Lines • ▼ Show 20 Lines | def flavor(self) -> str: | ||||
assert self._flavor is not None | assert self._flavor is not None | ||||
return self._flavor | return self._flavor | ||||
@property | @property | ||||
def denormalized(self) -> bool: | def denormalized(self) -> bool: | ||||
return "denormalized" in self.flavor | return "denormalized" in self.flavor | ||||
def content_add( | def content_add( | ||||
self, cnts: Union[Iterable[Sha1Git], Dict[Sha1Git, datetime]] | self, cnts: Union[Iterable[Sha1Git], Dict[Sha1Git, Optional[datetime]]] | ||||
) -> bool: | ) -> bool: | ||||
return self._entity_set_date("content", cnts) | return self._entity_set_date("content", cnts) | ||||
def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: | def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]: | ||||
sql = "SELECT * FROM swh_provenance_content_find_first(%s)" | sql = "SELECT * FROM swh_provenance_content_find_first(%s)" | ||||
with self.transaction(readonly=True) as cursor: | with self.transaction(readonly=True) as cursor: | ||||
cursor.execute(query=sql, vars=(id,)) | cursor.execute(query=sql, vars=(id,)) | ||||
row = cursor.fetchone() | row = cursor.fetchone() | ||||
return ProvenanceResult(**row) if row is not None else None | return ProvenanceResult(**row) if row is not None else None | ||||
def content_find_all( | def content_find_all( | ||||
self, id: Sha1Git, limit: Optional[int] = None | self, id: Sha1Git, limit: Optional[int] = None | ||||
) -> Generator[ProvenanceResult, None, None]: | ) -> Generator[ProvenanceResult, None, None]: | ||||
sql = "SELECT * FROM swh_provenance_content_find_all(%s, %s)" | sql = "SELECT * FROM swh_provenance_content_find_all(%s, %s)" | ||||
with self.transaction(readonly=True) as cursor: | with self.transaction(readonly=True) as cursor: | ||||
cursor.execute(query=sql, vars=(id, limit)) | cursor.execute(query=sql, vars=(id, limit)) | ||||
yield from (ProvenanceResult(**row) for row in cursor) | yield from (ProvenanceResult(**row) for row in cursor) | ||||
def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: | def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: | ||||
return self._entity_get_date("content", ids) | return self._entity_get_date("content", ids) | ||||
def directory_add( | def directory_add( | ||||
self, dirs: Union[Iterable[Sha1Git], Dict[Sha1Git, datetime]] | self, dirs: Union[Iterable[Sha1Git], Dict[Sha1Git, Optional[datetime]]] | ||||
) -> bool: | ) -> bool: | ||||
return self._entity_set_date("directory", dirs) | return self._entity_set_date("directory", dirs) | ||||
def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: | def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]: | ||||
return self._entity_get_date("directory", ids) | return self._entity_get_date("directory", ids) | ||||
def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]: | def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]: | ||||
with self.transaction(readonly=True) as cursor: | with self.transaction(readonly=True) as cursor: | ||||
▲ Show 20 Lines • Show All 108 Lines • ▼ Show 20 Lines | def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]: | ||||
(row["sha1"], RevisionData(date=row["date"], origin=row["origin"])) | (row["sha1"], RevisionData(date=row["date"], origin=row["origin"])) | ||||
for row in cursor | for row in cursor | ||||
) | ) | ||||
return result | return result | ||||
def relation_add( | def relation_add( | ||||
self, relation: RelationType, data: Dict[Sha1Git, Set[RelationData]] | self, relation: RelationType, data: Dict[Sha1Git, Set[RelationData]] | ||||
) -> bool: | ) -> bool: | ||||
rows = [ | rows = [(src, rel.dst, rel.path) for src, dsts in data.items() for rel in dsts] | ||||
(src, rel.dst, rel.path) for src, dsts in data.items() for rel in dsts | |||||
] | |||||
try: | try: | ||||
if rows: | if rows: | ||||
rel_table = relation.value | rel_table = relation.value | ||||
src_table, *_, dst_table = rel_table.split("_") | src_table, *_, dst_table = rel_table.split("_") | ||||
# Put the next three queries in a manual single transaction: | # Put the next three queries in a manual single transaction: | ||||
# they use the same temp table | # they use the same temp table | ||||
with self.transaction() as cursor: | with self.transaction() as cursor: | ||||
▲ Show 20 Lines • Show All 42 Lines • ▼ Show 20 Lines | ) -> Dict[Sha1Git, datetime]: | ||||
with self.transaction(readonly=True) as cursor: | with self.transaction(readonly=True) as cursor: | ||||
cursor.execute(query=sql, vars=sha1s) | cursor.execute(query=sql, vars=sha1s) | ||||
dates.update((row["sha1"], row["date"]) for row in cursor) | dates.update((row["sha1"], row["date"]) for row in cursor) | ||||
return dates | return dates | ||||
def _entity_set_date( | def _entity_set_date( | ||||
self, | self, | ||||
entity: Literal["content", "directory"], | entity: Literal["content", "directory"], | ||||
dates: Union[Iterable[Sha1Git], Dict[Sha1Git, datetime]], | dates: Union[Iterable[Sha1Git], Dict[Sha1Git, Optional[datetime]]], | ||||
) -> bool: | ) -> bool: | ||||
data = dates if isinstance(dates, dict) else dict.fromkeys(dates) | data = dates if isinstance(dates, dict) else dict.fromkeys(dates) | ||||
try: | try: | ||||
if data: | if data: | ||||
sql = f""" | sql = f""" | ||||
INSERT INTO {entity}(sha1, date) VALUES %s | INSERT INTO {entity}(sha1, date) VALUES %s | ||||
ON CONFLICT (sha1) DO | ON CONFLICT (sha1) DO | ||||
UPDATE SET date=LEAST(EXCLUDED.date,{entity}.date) | UPDATE SET date=LEAST(EXCLUDED.date,{entity}.date) | ||||
▲ Show 20 Lines • Show All 43 Lines • Show Last 20 Lines |