Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/origin.py
from typing import Optional | from typing import Optional | ||||
from swh.model.model import ObjectType, Origin, TargetType | from swh.model.model import ObjectType, Origin, TargetType | ||||
from .archive import ArchiveInterface | from .archive import ArchiveInterface | ||||
from .revision import RevisionEntry | from .model import OriginEntry, RevisionEntry | ||||
class OriginEntry: | |||||
def __init__(self, url, revisions, id=None): | |||||
self.id = id | |||||
self.url = url | |||||
self.revisions = revisions | |||||
################################################################################ | ################################################################################ | ||||
################################################################################ | ################################################################################ | ||||
class OriginIterator: | class FileOriginIterator: | ||||
"""Iterator interface.""" | |||||
def __iter__(self): | |||||
pass | |||||
def __next__(self): | |||||
pass | |||||
class FileOriginIterator(OriginIterator): | |||||
"""Iterator over origins present in the given CSV file.""" | """Iterator over origins present in the given CSV file.""" | ||||
def __init__( | def __init__( | ||||
self, filename: str, archive: ArchiveInterface, limit: Optional[int] = None | self, filename: str, archive: ArchiveInterface, limit: Optional[int] = None | ||||
): | ): | ||||
self.file = open(filename) | self.file = open(filename) | ||||
self.limit = limit | self.limit = limit | ||||
# self.mutex = threading.Lock() | |||||
self.archive = archive | self.archive = archive | ||||
def __iter__(self): | def __iter__(self): | ||||
yield from iterate_statuses( | yield from iterate_statuses( | ||||
[Origin(url.strip()) for url in self.file], self.archive, self.limit | [Origin(url.strip()) for url in self.file], self.archive, self.limit | ||||
) | ) | ||||
class ArchiveOriginIterator: | class ArchiveOriginIterator: | ||||
"""Iterator over origins present in the given storage.""" | """Iterator over origins present in the given storage.""" | ||||
def __init__(self, archive: ArchiveInterface, limit: Optional[int] = None): | def __init__(self, archive: ArchiveInterface, limit: Optional[int] = None): | ||||
self.limit = limit | self.limit = limit | ||||
# self.mutex = threading.Lock() | |||||
self.archive = archive | self.archive = archive | ||||
def __iter__(self): | def __iter__(self): | ||||
yield from iterate_statuses( | yield from iterate_statuses( | ||||
self.archive.iter_origins(), self.archive, self.limit | self.archive.iter_origins(), self.archive, self.limit | ||||
) | ) | ||||
Show All 15 Lines | for origin in origins: | ||||
elif ( | elif ( | ||||
snapshot.branches[branch].target_type == TargetType.RELEASE | snapshot.branches[branch].target_type == TargetType.RELEASE | ||||
): | ): | ||||
releases_set.add(snapshot.branches[branch].target) | releases_set.add(snapshot.branches[branch].target) | ||||
# This is done to keep the query in release_get small, hence avoiding | # This is done to keep the query in release_get small, hence avoiding | ||||
# a timeout. | # a timeout. | ||||
batchsize = 100 | batchsize = 100 | ||||
releases = list(releases_set) | while releases_set: | ||||
while releases: | releases = [ | ||||
for release in archive.release_get(releases[:batchsize]): | releases_set.pop() for i in range(batchsize) if releases_set | ||||
] | |||||
for release in archive.release_get(releases): | |||||
if release is not None: | if release is not None: | ||||
if release.target_type == ObjectType.REVISION: | if release.target_type == ObjectType.REVISION: | ||||
targets_set.add(release.target) | targets_set.add(release.target) | ||||
releases[:batchsize] = [] | |||||
# This is done to keep the query in revision_get small, hence avoiding | # This is done to keep the query in revision_get small, hence avoiding | ||||
# a timeout. | # a timeout. | ||||
revisions = set() | revisions = set() | ||||
targets = list(targets_set) | while targets_set: | ||||
while targets: | targets = [ | ||||
for revision in archive.revision_get(targets[:batchsize]): | targets_set.pop() for i in range(batchsize) if targets_set | ||||
] | |||||
for revision in archive.revision_get(targets): | |||||
if revision is not None: | if revision is not None: | ||||
parents = list( | revisions.add(RevisionEntry(revision.id)) | ||||
map( | # target_set |= set(revision.parents) | ||||
lambda id: RevisionEntry(archive, id), | |||||
revision.parents, | |||||
) | |||||
) | |||||
revisions.add( | |||||
RevisionEntry(archive, revision.id, parents=parents) | |||||
) | |||||
targets[:batchsize] = [] | |||||
yield OriginEntry(status.origin, list(revisions)) | yield OriginEntry(status.origin, list(revisions)) | ||||
idx += 1 | idx += 1 | ||||
if idx == limit: | if idx == limit: | ||||
return | return |