Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/storage.py
Show First 20 Lines • Show All 1,193 Lines • ▼ Show 20 Lines | ) -> None: | ||||
metadata = list(metadata) | metadata = list(metadata) | ||||
self.journal_writer.raw_extrinsic_metadata_add(metadata) | self.journal_writer.raw_extrinsic_metadata_add(metadata) | ||||
counter = Counter[MetadataTargetType]() | counter = Counter[MetadataTargetType]() | ||||
for metadata_entry in metadata: | for metadata_entry in metadata: | ||||
authority_id = self._get_authority_id(metadata_entry.authority, db, cur) | authority_id = self._get_authority_id(metadata_entry.authority, db, cur) | ||||
fetcher_id = self._get_fetcher_id(metadata_entry.fetcher, db, cur) | fetcher_id = self._get_fetcher_id(metadata_entry.fetcher, db, cur) | ||||
db.raw_extrinsic_metadata_add( | db.raw_extrinsic_metadata_add( | ||||
object_type=metadata_entry.type.value, | type=metadata_entry.type.value, | ||||
id=str(metadata_entry.id), | id=str(metadata_entry.id), | ||||
discovery_date=metadata_entry.discovery_date, | discovery_date=metadata_entry.discovery_date, | ||||
authority_id=authority_id, | authority_id=authority_id, | ||||
fetcher_id=fetcher_id, | fetcher_id=fetcher_id, | ||||
format=metadata_entry.format, | format=metadata_entry.format, | ||||
metadata=metadata_entry.metadata, | metadata=metadata_entry.metadata, | ||||
origin=metadata_entry.origin, | origin=metadata_entry.origin, | ||||
visit=metadata_entry.visit, | visit=metadata_entry.visit, | ||||
snapshot=map_optional(str, metadata_entry.snapshot), | snapshot=map_optional(str, metadata_entry.snapshot), | ||||
release=map_optional(str, metadata_entry.release), | release=map_optional(str, metadata_entry.release), | ||||
revision=map_optional(str, metadata_entry.revision), | revision=map_optional(str, metadata_entry.revision), | ||||
path=metadata_entry.path, | path=metadata_entry.path, | ||||
directory=map_optional(str, metadata_entry.directory), | directory=map_optional(str, metadata_entry.directory), | ||||
cur=cur, | cur=cur, | ||||
) | ) | ||||
counter[metadata_entry.type] += 1 | counter[metadata_entry.type] += 1 | ||||
for (object_type, count) in counter.items(): | for (type, count) in counter.items(): | ||||
send_metric( | send_metric( | ||||
f"{object_type.value}_metadata:add", | f"{type.value}_metadata:add", | ||||
count=count, | count=count, | ||||
method_name=f"{object_type.value}_metadata_add", | method_name=f"{type.value}_metadata_add", | ||||
) | ) | ||||
@db_transaction() | @db_transaction() | ||||
def raw_extrinsic_metadata_get( | def raw_extrinsic_metadata_get( | ||||
self, | self, | ||||
object_type: MetadataTargetType, | type: MetadataTargetType, | ||||
id: Union[str, SWHID], | id: Union[str, SWHID], | ||||
authority: MetadataAuthority, | authority: MetadataAuthority, | ||||
after: Optional[datetime.datetime] = None, | after: Optional[datetime.datetime] = None, | ||||
page_token: Optional[bytes] = None, | page_token: Optional[bytes] = None, | ||||
limit: int = 1000, | limit: int = 1000, | ||||
db=None, | db=None, | ||||
cur=None, | cur=None, | ||||
) -> PagedResult[RawExtrinsicMetadata]: | ) -> PagedResult[RawExtrinsicMetadata]: | ||||
if object_type == MetadataTargetType.ORIGIN: | if type == MetadataTargetType.ORIGIN: | ||||
if isinstance(id, SWHID): | if isinstance(id, SWHID): | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
f"raw_extrinsic_metadata_get called with object_type='origin', " | f"raw_extrinsic_metadata_get called with type='origin', " | ||||
f"but provided id is an SWHID: {id!r}" | f"but provided id is an SWHID: {id!r}" | ||||
) | ) | ||||
else: | else: | ||||
if not isinstance(id, SWHID): | if not isinstance(id, SWHID): | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
f"raw_extrinsic_metadata_get called with object_type!='origin', " | f"raw_extrinsic_metadata_get called with type!='origin', " | ||||
f"but provided id is not an SWHID: {id!r}" | f"but provided id is not an SWHID: {id!r}" | ||||
) | ) | ||||
if page_token: | if page_token: | ||||
(after_time, after_fetcher) = msgpack_loads(base64.b64decode(page_token)) | (after_time, after_fetcher) = msgpack_loads(base64.b64decode(page_token)) | ||||
if after and after_time < after: | if after and after_time < after: | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
"page_token is inconsistent with the value of 'after'." | "page_token is inconsistent with the value of 'after'." | ||||
) | ) | ||||
else: | else: | ||||
after_time = after | after_time = after | ||||
after_fetcher = None | after_fetcher = None | ||||
authority_id = self._get_authority_id(authority, db, cur) | authority_id = self._get_authority_id(authority, db, cur) | ||||
if not authority_id: | if not authority_id: | ||||
return PagedResult(next_page_token=None, results=[],) | return PagedResult(next_page_token=None, results=[],) | ||||
rows = db.raw_extrinsic_metadata_get( | rows = db.raw_extrinsic_metadata_get( | ||||
object_type, | type, str(id), authority_id, after_time, after_fetcher, limit + 1, cur, | ||||
str(id), | |||||
authority_id, | |||||
after_time, | |||||
after_fetcher, | |||||
limit + 1, | |||||
cur, | |||||
) | ) | ||||
rows = [dict(zip(db.raw_extrinsic_metadata_get_cols, row)) for row in rows] | rows = [dict(zip(db.raw_extrinsic_metadata_get_cols, row)) for row in rows] | ||||
results = [] | results = [] | ||||
for row in rows: | for row in rows: | ||||
assert str(id) == row["raw_extrinsic_metadata.id"] | assert str(id) == row["raw_extrinsic_metadata.id"] | ||||
results.append(converters.db_to_raw_extrinsic_metadata(row)) | results.append(converters.db_to_raw_extrinsic_metadata(row)) | ||||
if len(results) > limit: | if len(results) > limit: | ||||
▲ Show 20 Lines • Show All 111 Lines • Show Last 20 Lines |