Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/swhgraph/archive.py
Show All 23 Lines | class ArchiveGraph: | ||||
@statsd.timed( | @statsd.timed( | ||||
metric=ARCHIVE_DURATION_METRIC, tags={"method": "revision_get_parents"} | metric=ARCHIVE_DURATION_METRIC, tags={"method": "revision_get_parents"} | ||||
) | ) | ||||
def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]: | def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]: | ||||
src = CoreSWHID(object_type=ObjectType.REVISION, object_id=id) | src = CoreSWHID(object_type=ObjectType.REVISION, object_id=id) | ||||
request = self.graph.neighbors(str(src), edges="rev:rev", return_types="rev") | request = self.graph.neighbors(str(src), edges="rev:rev", return_types="rev") | ||||
yield from (CoreSWHID.from_string(swhid).object_id for swhid in request if swhid) | yield from ( | ||||
CoreSWHID.from_string(swhid).object_id for swhid in request if swhid | |||||
) | |||||
@statsd.timed(metric=ARCHIVE_DURATION_METRIC, tags={"method": "snapshot_get_heads"}) | @statsd.timed(metric=ARCHIVE_DURATION_METRIC, tags={"method": "snapshot_get_heads"}) | ||||
def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: | def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: | ||||
src = CoreSWHID(object_type=ObjectType.SNAPSHOT, object_id=id) | src = CoreSWHID(object_type=ObjectType.SNAPSHOT, object_id=id) | ||||
request = self.graph.visit_nodes( | request = self.graph.visit_nodes( | ||||
str(src), edges="snp:rev,snp:rel,rel:rev", return_types="rev" | str(src), edges="snp:rev,snp:rel,rel:rev", return_types="rev" | ||||
) | ) | ||||
yield from (CoreSWHID.from_string(swhid).object_id for swhid in request if swhid) | yield from ( | ||||
CoreSWHID.from_string(swhid).object_id for swhid in request if swhid | |||||
) |