Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/metadata.py
Show All 26 Lines | |||||
from swh.indexer.origin_head import get_head_swhid | from swh.indexer.origin_head import get_head_swhid | ||||
from swh.indexer.storage import INDEXER_CFG_KEY, Sha1 | from swh.indexer.storage import INDEXER_CFG_KEY, Sha1 | ||||
from swh.indexer.storage.model import ( | from swh.indexer.storage.model import ( | ||||
ContentMetadataRow, | ContentMetadataRow, | ||||
DirectoryIntrinsicMetadataRow, | DirectoryIntrinsicMetadataRow, | ||||
OriginIntrinsicMetadataRow, | OriginIntrinsicMetadataRow, | ||||
) | ) | ||||
from swh.model import hashutil | from swh.model import hashutil | ||||
from swh.model.model import Directory, Origin, Sha1Git | from swh.model.model import Directory | ||||
from swh.model.swhids import ObjectType | from swh.model.model import ObjectType as ModelObjectType | ||||
from swh.model.model import Origin, Sha1Git | |||||
from swh.model.swhids import CoreSWHID, ObjectType | |||||
REVISION_GET_BATCH_SIZE = 10 | REVISION_GET_BATCH_SIZE = 10 | ||||
RELEASE_GET_BATCH_SIZE = 10 | |||||
ORIGIN_GET_BATCH_SIZE = 10 | ORIGIN_GET_BATCH_SIZE = 10 | ||||
T1 = TypeVar("T1") | T1 = TypeVar("T1") | ||||
T2 = TypeVar("T2") | T2 = TypeVar("T2") | ||||
def call_with_batches( | def call_with_batches( | ||||
▲ Show 20 Lines • Show All 277 Lines • ▼ Show 20 Lines | ): | ||||
def __init__(self, config=None, **kwargs) -> None: | def __init__(self, config=None, **kwargs) -> None: | ||||
super().__init__(config=config, **kwargs) | super().__init__(config=config, **kwargs) | ||||
self.directory_metadata_indexer = DirectoryMetadataIndexer(config=config) | self.directory_metadata_indexer = DirectoryMetadataIndexer(config=config) | ||||
def index_list( | def index_list( | ||||
self, origins: List[Origin], check_origin_known: bool = True, **kwargs | self, origins: List[Origin], check_origin_known: bool = True, **kwargs | ||||
) -> List[Tuple[OriginIntrinsicMetadataRow, DirectoryIntrinsicMetadataRow]]: | ) -> List[Tuple[OriginIntrinsicMetadataRow, DirectoryIntrinsicMetadataRow]]: | ||||
head_rev_ids = [] | head_rev_ids = [] | ||||
origins_with_head = [] | head_rel_ids = [] | ||||
origin_heads: Dict[Origin, CoreSWHID] = {} | |||||
# Filter out origins not in the storage | # Filter out origins not in the storage | ||||
if check_origin_known: | if check_origin_known: | ||||
known_origins = list( | known_origins = list( | ||||
call_with_batches( | call_with_batches( | ||||
self.storage.origin_get, | self.storage.origin_get, | ||||
[origin.url for origin in origins], | [origin.url for origin in origins], | ||||
ORIGIN_GET_BATCH_SIZE, | ORIGIN_GET_BATCH_SIZE, | ||||
) | ) | ||||
) | ) | ||||
else: | else: | ||||
known_origins = list(origins) | known_origins = list(origins) | ||||
for origin in known_origins: | for origin in known_origins: | ||||
if origin is None: | if origin is None: | ||||
continue | continue | ||||
head_swhid = get_head_swhid(self.storage, origin.url) | head_swhid = get_head_swhid(self.storage, origin.url) | ||||
if head_swhid: | if head_swhid: | ||||
# TODO: add support for releases | origin_heads[origin] = head_swhid | ||||
assert head_swhid.object_type == ObjectType.REVISION, head_swhid | if head_swhid.object_type == ObjectType.REVISION: | ||||
origins_with_head.append(origin) | |||||
head_rev_ids.append(head_swhid.object_id) | head_rev_ids.append(head_swhid.object_id) | ||||
elif head_swhid.object_type == ObjectType.RELEASE: | |||||
head_rel_ids.append(head_swhid.object_id) | |||||
else: | |||||
ardumont: Why not raise something more explicit? | |||||
assert False, head_swhid | |||||
head_revs = list( | head_revs = dict( | ||||
zip( | |||||
head_rev_ids, | |||||
call_with_batches( | call_with_batches( | ||||
self.storage.revision_get, head_rev_ids, REVISION_GET_BATCH_SIZE | self.storage.revision_get, head_rev_ids, REVISION_GET_BATCH_SIZE | ||||
), | |||||
) | |||||
) | |||||
head_rels = dict( | |||||
zip( | |||||
head_rel_ids, | |||||
call_with_batches( | |||||
self.storage.release_get, head_rel_ids, RELEASE_GET_BATCH_SIZE | |||||
), | |||||
) | ) | ||||
) | ) | ||||
assert len(head_revs) == len(head_rev_ids) | |||||
results = [] | results = [] | ||||
for (origin, rev) in zip(origins_with_head, head_revs): | for (origin, head_swhid) in origin_heads.items(): | ||||
if head_swhid.object_type == ObjectType.REVISION: | |||||
rev = head_revs[head_swhid.object_id] | |||||
if not rev: | if not rev: | ||||
self.log.warning("Missing head revision of origin %r", origin.url) | self.log.warning( | ||||
"Missing head object %s of origin %r", head_swhid, origin.url | |||||
) | |||||
continue | continue | ||||
directory_id = rev.directory | |||||
elif head_swhid.object_type == ObjectType.RELEASE: | |||||
rel = head_rels[head_swhid.object_id] | |||||
if not rel: | |||||
self.log.warning( | |||||
"Missing head object %s of origin %r", head_swhid, origin.url | |||||
) | |||||
continue | |||||
if rel.target_type != ModelObjectType.DIRECTORY: | |||||
# TODO | |||||
self.log.warning( | |||||
"Head release %s of %r has unexpected target type %s", | |||||
head_swhid, | |||||
origin.url, | |||||
rel.target_type, | |||||
) | |||||
continue | |||||
assert rel.target, rel | |||||
directory_id = rel.target | |||||
else: | |||||
assert False, head_swhid | |||||
for dir_metadata in self.directory_metadata_indexer.index(rev.directory): | for dir_metadata in self.directory_metadata_indexer.index(directory_id): | ||||
# There is at most one dir_metadata | # There is at most one dir_metadata | ||||
orig_metadata = OriginIntrinsicMetadataRow( | orig_metadata = OriginIntrinsicMetadataRow( | ||||
from_directory=dir_metadata.id, | from_directory=dir_metadata.id, | ||||
id=origin.url, | id=origin.url, | ||||
metadata=dir_metadata.metadata, | metadata=dir_metadata.metadata, | ||||
mappings=dir_metadata.mappings, | mappings=dir_metadata.mappings, | ||||
indexer_configuration_id=dir_metadata.indexer_configuration_id, | indexer_configuration_id=dir_metadata.indexer_configuration_id, | ||||
) | ) | ||||
Show All 31 Lines |
Why not raise something more explicit?