Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/in_memory.py
Show All 27 Lines | from typing import ( | ||||
Union, | Union, | ||||
) | ) | ||||
import attr | import attr | ||||
from deprecated import deprecated | from deprecated import deprecated | ||||
from swh.core.api.serializers import msgpack_loads, msgpack_dumps | from swh.core.api.serializers import msgpack_loads, msgpack_dumps | ||||
from swh.model.identifiers import SWHID | |||||
from swh.model.model import ( | from swh.model.model import ( | ||||
BaseContent, | BaseContent, | ||||
Content, | Content, | ||||
SkippedContent, | SkippedContent, | ||||
Directory, | Directory, | ||||
Revision, | Revision, | ||||
Release, | Release, | ||||
Snapshot, | Snapshot, | ||||
OriginVisit, | OriginVisit, | ||||
OriginVisitStatus, | OriginVisitStatus, | ||||
Origin, | Origin, | ||||
SHA1_SIZE, | SHA1_SIZE, | ||||
MetadataAuthority, | |||||
MetadataAuthorityType, | |||||
MetadataFetcher, | |||||
MetadataTargetType, | |||||
RawExtrinsicMetadata, | |||||
) | ) | ||||
from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex | from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex | ||||
from swh.storage.objstorage import ObjStorage | from swh.storage.objstorage import ObjStorage | ||||
from swh.storage.utils import now | from swh.storage.utils import now | ||||
from .converters import origin_url_to_sha1 | from .converters import origin_url_to_sha1 | ||||
from .exc import StorageArgumentException, HashCollision | from .exc import StorageArgumentException, HashCollision | ||||
from .extrinsic_metadata import check_extrinsic_metadata_context, CONTEXT_KEYS | |||||
from .utils import get_partition_bounds_bytes | from .utils import get_partition_bounds_bytes | ||||
from .writer import JournalWriter | from .writer import JournalWriter | ||||
# Max block size of contents to return | # Max block size of contents to return | ||||
BULK_BLOCK_CONTENT_LEN_MAX = 10000 | BULK_BLOCK_CONTENT_LEN_MAX = 10000 | ||||
SortedListItem = TypeVar("SortedListItem") | SortedListItem = TypeVar("SortedListItem") | ||||
▲ Show 20 Lines • Show All 69 Lines • ▼ Show 20 Lines | def reset(self): | ||||
self._snapshots = {} | self._snapshots = {} | ||||
self._origins = {} | self._origins = {} | ||||
self._origins_by_id = [] | self._origins_by_id = [] | ||||
self._origins_by_sha1 = {} | self._origins_by_sha1 = {} | ||||
self._origin_visits = {} | self._origin_visits = {} | ||||
self._origin_visit_statuses: Dict[Tuple[str, int], List[OriginVisitStatus]] = {} | self._origin_visit_statuses: Dict[Tuple[str, int], List[OriginVisitStatus]] = {} | ||||
self._persons = {} | self._persons = {} | ||||
# {origin_url: {authority: [metadata]}} | # {object_type: {id: {authority: [metadata]}}} | ||||
self._object_metadata: Dict[ | self._object_metadata: Dict[ | ||||
str, | MetadataTargetType, | ||||
Dict[ | |||||
Union[str, SWHID], | |||||
Dict[ | Dict[ | ||||
Hashable, | Hashable, | ||||
SortedList[Tuple[datetime.datetime, FetcherKey], Dict[str, Any]], | SortedList[ | ||||
Tuple[datetime.datetime, FetcherKey], RawExtrinsicMetadata | |||||
], | |||||
], | |||||
], | ], | ||||
] = defaultdict( | ] = defaultdict( | ||||
lambda: defaultdict( | lambda: defaultdict( | ||||
lambda: SortedList(key=lambda x: (x["discovery_date"], x["fetcher"])) | lambda: defaultdict( | ||||
lambda: SortedList( | |||||
key=lambda x: ( | |||||
x.discovery_date, | |||||
self._metadata_fetcher_key(x.fetcher), | |||||
) | |||||
) | |||||
) | |||||
) | ) | ||||
) # noqa | ) # noqa | ||||
self._metadata_fetchers: Dict[FetcherKey, Dict[str, Any]] = {} | self._metadata_fetchers: Dict[FetcherKey, MetadataFetcher] = {} | ||||
self._metadata_authorities: Dict[Hashable, Dict[str, Any]] = {} | self._metadata_authorities: Dict[Hashable, MetadataAuthority] = {} | ||||
self._objects = defaultdict(list) | self._objects = defaultdict(list) | ||||
self._sorted_sha1s = SortedList[bytes, bytes]() | self._sorted_sha1s = SortedList[bytes, bytes]() | ||||
self.objstorage = ObjStorage({"cls": "memory", "args": {}}) | self.objstorage = ObjStorage({"cls": "memory", "args": {}}) | ||||
def check_config(self, *, check_write): | def check_config(self, *, check_write): | ||||
return True | return True | ||||
▲ Show 20 Lines • Show All 851 Lines • ▼ Show 20 Lines | def stat_counters(self): | ||||
for (obj_type, obj_id) in itertools.chain(*self._objects.values()) | for (obj_type, obj_id) in itertools.chain(*self._objects.values()) | ||||
) | ) | ||||
) | ) | ||||
return stats | return stats | ||||
def refresh_stat_counters(self): | def refresh_stat_counters(self): | ||||
pass | pass | ||||
def content_metadata_add( | def object_metadata_add(self, metadata: Iterable[RawExtrinsicMetadata],) -> None: | ||||
self, | for metadata_entry in metadata: | ||||
id: str, | authority_key = self._metadata_authority_key(metadata_entry.authority) | ||||
context: Dict[str, Union[str, bytes, int]], | if authority_key not in self._metadata_authorities: | ||||
discovery_date: datetime.datetime, | |||||
authority: Dict[str, Any], | |||||
fetcher: Dict[str, Any], | |||||
format: str, | |||||
metadata: bytes, | |||||
) -> None: | |||||
self._object_metadata_add( | |||||
"content", | |||||
id, | |||||
discovery_date, | |||||
authority, | |||||
fetcher, | |||||
format, | |||||
metadata, | |||||
context, | |||||
) | |||||
def content_metadata_get( | |||||
self, | |||||
id: str, | |||||
authority: Dict[str, str], | |||||
after: Optional[datetime.datetime] = None, | |||||
page_token: Optional[bytes] = None, | |||||
limit: int = 1000, | |||||
) -> Dict[str, Any]: | |||||
return self._object_metadata_get( | |||||
"content", id, authority, after, page_token, limit | |||||
) | |||||
def origin_metadata_add( | |||||
self, | |||||
origin_url: str, | |||||
discovery_date: datetime.datetime, | |||||
authority: Dict[str, Any], | |||||
fetcher: Dict[str, Any], | |||||
format: str, | |||||
metadata: bytes, | |||||
) -> None: | |||||
if not isinstance(origin_url, str): | |||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
ardumont: should be covered by the scenario which you should have already added by now so i'll stop… | |||||
"origin_url must be str, not %r" % (origin_url,) | f"Unknown authority {metadata_entry.authority}" | ||||
) | |||||
context: Dict[str, Union[str, bytes, int]] = {} # origins have no context | |||||
self._object_metadata_add( | |||||
"origin", | |||||
origin_url, | |||||
discovery_date, | |||||
authority, | |||||
fetcher, | |||||
format, | |||||
metadata, | |||||
context, | |||||
) | |||||
def origin_metadata_get( | |||||
self, | |||||
origin_url: str, | |||||
authority: Dict[str, str], | |||||
after: Optional[datetime.datetime] = None, | |||||
page_token: Optional[bytes] = None, | |||||
limit: int = 1000, | |||||
) -> Dict[str, Any]: | |||||
if not isinstance(origin_url, str): | |||||
raise TypeError("origin_url must be str, not %r" % (origin_url,)) | |||||
res = self._object_metadata_get( | |||||
"origin", origin_url, authority, after, page_token, limit | |||||
) | ) | ||||
res["results"] = copy.deepcopy(res["results"]) | fetcher_key = self._metadata_fetcher_key(metadata_entry.fetcher) | ||||
for result in res["results"]: | if fetcher_key not in self._metadata_fetchers: | ||||
result["origin_url"] = result.pop("id") | |||||
return res | |||||
def _object_metadata_add( | |||||
self, | |||||
object_type: str, | |||||
id: str, | |||||
discovery_date: datetime.datetime, | |||||
authority: Dict[str, Any], | |||||
fetcher: Dict[str, Any], | |||||
format: str, | |||||
metadata: bytes, | |||||
context: Dict[str, Union[str, bytes, int]], | |||||
) -> None: | |||||
check_extrinsic_metadata_context(object_type, context) | |||||
if not isinstance(metadata, bytes): | |||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
"metadata must be bytes, not %r" % (metadata,) | f"Unknown fetcher {metadata_entry.fetcher}" | ||||
) | ) | ||||
authority_key = self._metadata_authority_key(authority) | |||||
if authority_key not in self._metadata_authorities: | |||||
raise StorageArgumentException(f"Unknown authority {authority}") | |||||
fetcher_key = self._metadata_fetcher_key(fetcher) | |||||
if fetcher_key not in self._metadata_fetchers: | |||||
raise StorageArgumentException(f"Unknown fetcher {fetcher}") | |||||
object_metadata_list = self._object_metadata[id][authority_key] | |||||
object_metadata: Dict[str, Any] = { | |||||
"id": id, | |||||
"discovery_date": discovery_date, | |||||
"authority": authority_key, | |||||
"fetcher": fetcher_key, | |||||
"format": format, | |||||
"metadata": metadata, | |||||
} | |||||
if CONTEXT_KEYS[object_type]: | object_metadata_list = self._object_metadata[metadata_entry.type][ | ||||
object_metadata["context"] = context | metadata_entry.id | ||||
][authority_key] | |||||
for existing_object_metadata in object_metadata_list: | for existing_object_metadata in object_metadata_list: | ||||
if ( | if ( | ||||
existing_object_metadata["fetcher"] == fetcher_key | self._metadata_fetcher_key(existing_object_metadata.fetcher) | ||||
and existing_object_metadata["discovery_date"] == discovery_date | == fetcher_key | ||||
and existing_object_metadata.discovery_date | |||||
== metadata_entry.discovery_date | |||||
): | ): | ||||
# Duplicate of an existing one; replace it. | # Duplicate of an existing one; ignore it. | ||||
existing_object_metadata.update(object_metadata) | |||||
break | break | ||||
else: | else: | ||||
object_metadata_list.add(object_metadata) | object_metadata_list.add(metadata_entry) | ||||
def _object_metadata_get( | def object_metadata_get( | ||||
self, | self, | ||||
object_type: str, | object_type: MetadataTargetType, | ||||
id: str, | id: Union[str, SWHID], | ||||
authority: Dict[str, str], | authority: MetadataAuthority, | ||||
after: Optional[datetime.datetime] = None, | after: Optional[datetime.datetime] = None, | ||||
page_token: Optional[bytes] = None, | page_token: Optional[bytes] = None, | ||||
limit: int = 1000, | limit: int = 1000, | ||||
) -> Dict[str, Any]: | ) -> Dict[str, Union[Optional[bytes], List[RawExtrinsicMetadata]]]: | ||||
authority_key = self._metadata_authority_key(authority) | authority_key = self._metadata_authority_key(authority) | ||||
if object_type == MetadataTargetType.ORIGIN: | |||||
if isinstance(id, SWHID): | |||||
raise StorageArgumentException( | |||||
f"object_metadata_get called with object_type='origin', but " | |||||
f"provided id is an SWHID: {id!r}" | |||||
) | |||||
else: | |||||
if not isinstance(id, SWHID): | |||||
raise StorageArgumentException( | |||||
f"object_metadata_get called with object_type!='origin', but " | |||||
f"provided id is not an SWHID: {id!r}" | |||||
Done Inline ActionsWhy aren't those checked on cassandra implem? (did not reach pg-storage yet) hmm, after reading it again, it seems it is but differently, correct? If it is, why the different approach? (curious me again) ardumont: Why aren't those checked on cassandra implem? (did not reach pg-storage yet)
hmm, after… | |||||
Done Inline ActionsBecause I wrote them about a week apart :p vlorentz: Because I wrote them about a week apart :p | |||||
) | |||||
Done Inline Actionswhy not elif not isinstance... ? (picture me curious) ardumont: why not
```
elif not isinstance...
```
?
(picture me curious) | |||||
Done Inline ActionsNatural evolution of the code vlorentz: Natural evolution of the code | |||||
Done Inline ActionsActually, no, my code is correct. I did: if A: if B: # error else: if not B: # error the code you are suggesting is: if A: if B: # error elif not B: # error which is completely different. vlorentz: Actually, no, my code is correct. I did:
```
if A:
if B:
# error
else:
if not… | |||||
Not Done Inline ActionsIndeed, i missed it. ardumont: Indeed, i missed it. | |||||
Not Done Inline ActionsI am very slow this morning, so bear with me, but I don't see how these 2 snippets of "if" statement differ. douardda: I am very slow this morning, so bear with me, but I don't see how these 2 snippets of "if"… | |||||
Done Inline Actions@douardda ugh, indeed, they don't. But I find the lack of symmetry confusing, so I'd rather keep it as is vlorentz: @douardda ugh, indeed, they don't.
But I find the lack of symmetry confusing, so I'd rather… | |||||
if page_token is not None: | if page_token is not None: | ||||
(after_time, after_fetcher) = msgpack_loads(page_token) | (after_time, after_fetcher) = msgpack_loads(page_token) | ||||
after_fetcher = tuple(after_fetcher) | after_fetcher = tuple(after_fetcher) | ||||
if after is not None and after > after_time: | if after is not None and after > after_time: | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
"page_token is inconsistent with the value of 'after'." | "page_token is inconsistent with the value of 'after'." | ||||
) | ) | ||||
entries = self._object_metadata[id][authority_key].iter_after( | entries = self._object_metadata[object_type][id][authority_key].iter_after( | ||||
(after_time, after_fetcher) | (after_time, after_fetcher) | ||||
) | ) | ||||
elif after is not None: | elif after is not None: | ||||
entries = self._object_metadata[id][authority_key].iter_from((after,)) | entries = self._object_metadata[object_type][id][authority_key].iter_from( | ||||
entries = (entry for entry in entries if entry["discovery_date"] > after) | (after,) | ||||
) | |||||
entries = (entry for entry in entries if entry.discovery_date > after) | |||||
else: | else: | ||||
entries = iter(self._object_metadata[id][authority_key]) | entries = iter(self._object_metadata[object_type][id][authority_key]) | ||||
if limit: | if limit: | ||||
entries = itertools.islice(entries, 0, limit + 1) | entries = itertools.islice(entries, 0, limit + 1) | ||||
results = [] | results = [] | ||||
for entry in entries: | for entry in entries: | ||||
authority = self._metadata_authorities[entry["authority"]] | entry_authority = self._metadata_authorities[ | ||||
fetcher = self._metadata_fetchers[entry["fetcher"]] | self._metadata_authority_key(entry.authority) | ||||
] | |||||
entry_fetcher = self._metadata_fetchers[ | |||||
self._metadata_fetcher_key(entry.fetcher) | |||||
] | |||||
if after: | if after: | ||||
assert entry["discovery_date"] > after | assert entry.discovery_date > after | ||||
results.append( | results.append( | ||||
{ | attr.evolve( | ||||
**entry, | entry, | ||||
"authority": {"type": authority["type"], "url": authority["url"],}, | authority=attr.evolve(entry_authority, metadata=None), | ||||
"fetcher": { | fetcher=attr.evolve(entry_fetcher, metadata=None), | ||||
"name": fetcher["name"], | ) | ||||
"version": fetcher["version"], | |||||
}, | |||||
} | |||||
) | ) | ||||
if len(results) > limit: | if len(results) > limit: | ||||
results.pop() | results.pop() | ||||
assert len(results) == limit | assert len(results) == limit | ||||
last_result = results[-1] | last_result = results[-1] | ||||
next_page_token: Optional[bytes] = msgpack_dumps( | next_page_token: Optional[bytes] = msgpack_dumps( | ||||
( | ( | ||||
last_result["discovery_date"], | last_result.discovery_date, | ||||
self._metadata_fetcher_key(last_result["fetcher"]), | self._metadata_fetcher_key(last_result.fetcher), | ||||
) | ) | ||||
) | ) | ||||
else: | else: | ||||
next_page_token = None | next_page_token = None | ||||
return { | return { | ||||
"next_page_token": next_page_token, | "next_page_token": next_page_token, | ||||
"results": results, | "results": results, | ||||
} | } | ||||
def metadata_fetcher_add( | def metadata_fetcher_add(self, fetchers: Iterable[MetadataFetcher]) -> None: | ||||
self, name: str, version: str, metadata: Dict[str, Any] | for fetcher in fetchers: | ||||
) -> None: | if fetcher.metadata is None: | ||||
fetcher = { | raise StorageArgumentException( | ||||
"name": name, | "MetadataFetcher.metadata may not be None in metadata_fetcher_add." | ||||
"version": version, | ) | ||||
"metadata": metadata, | |||||
} | |||||
key = self._metadata_fetcher_key(fetcher) | key = self._metadata_fetcher_key(fetcher) | ||||
if key not in self._metadata_fetchers: | if key not in self._metadata_fetchers: | ||||
self._metadata_fetchers[key] = fetcher | self._metadata_fetchers[key] = fetcher | ||||
def metadata_fetcher_get(self, name: str, version: str) -> Optional[Dict[str, Any]]: | def metadata_fetcher_get( | ||||
self, name: str, version: str | |||||
) -> Optional[MetadataFetcher]: | |||||
return self._metadata_fetchers.get( | return self._metadata_fetchers.get( | ||||
self._metadata_fetcher_key({"name": name, "version": version}) | self._metadata_fetcher_key(MetadataFetcher(name=name, version=version)) | ||||
) | ) | ||||
def metadata_authority_add( | def metadata_authority_add(self, authorities: Iterable[MetadataAuthority]) -> None: | ||||
self, type: str, url: str, metadata: Dict[str, Any] | for authority in authorities: | ||||
) -> None: | if authority.metadata is None: | ||||
authority = { | raise StorageArgumentException( | ||||
"type": type, | "MetadataAuthority.metadata may not be None in " | ||||
"url": url, | "metadata_authority_add." | ||||
"metadata": metadata, | ) | ||||
} | |||||
key = self._metadata_authority_key(authority) | key = self._metadata_authority_key(authority) | ||||
self._metadata_authorities[key] = authority | self._metadata_authorities[key] = authority | ||||
def metadata_authority_get(self, type: str, url: str) -> Optional[Dict[str, Any]]: | def metadata_authority_get( | ||||
self, type: MetadataAuthorityType, url: str | |||||
) -> Optional[MetadataAuthority]: | |||||
return self._metadata_authorities.get( | return self._metadata_authorities.get( | ||||
self._metadata_authority_key({"type": type, "url": url}) | self._metadata_authority_key(MetadataAuthority(type=type, url=url)) | ||||
) | ) | ||||
def _get_origin_url(self, origin): | def _get_origin_url(self, origin): | ||||
if isinstance(origin, str): | if isinstance(origin, str): | ||||
return origin | return origin | ||||
else: | else: | ||||
raise TypeError("origin must be a string.") | raise TypeError("origin must be a string.") | ||||
def _person_add(self, person): | def _person_add(self, person): | ||||
key = ("person", person.fullname) | key = ("person", person.fullname) | ||||
if key not in self._objects: | if key not in self._objects: | ||||
self._persons[person.fullname] = person | self._persons[person.fullname] = person | ||||
self._objects[key].append(key) | self._objects[key].append(key) | ||||
return self._persons[person.fullname] | return self._persons[person.fullname] | ||||
@staticmethod | @staticmethod | ||||
def _content_key(content): | def _content_key(content): | ||||
""" A stable key and the algorithm for a content""" | """ A stable key and the algorithm for a content""" | ||||
if isinstance(content, BaseContent): | if isinstance(content, BaseContent): | ||||
content = content.to_dict() | content = content.to_dict() | ||||
return tuple((key, content.get(key)) for key in sorted(DEFAULT_ALGORITHMS)) | return tuple((key, content.get(key)) for key in sorted(DEFAULT_ALGORITHMS)) | ||||
@staticmethod | @staticmethod | ||||
def _metadata_fetcher_key(fetcher: Dict) -> FetcherKey: | def _metadata_fetcher_key(fetcher: MetadataFetcher) -> FetcherKey: | ||||
return (fetcher["name"], fetcher["version"]) | return (fetcher.name, fetcher.version) | ||||
@staticmethod | @staticmethod | ||||
def _metadata_authority_key(authority: Dict) -> Hashable: | def _metadata_authority_key(authority: MetadataAuthority) -> Hashable: | ||||
return (authority["type"], authority["url"]) | return (authority.type, authority.url) | ||||
def diff_directories(self, from_dir, to_dir, track_renaming=False): | def diff_directories(self, from_dir, to_dir, track_renaming=False): | ||||
raise NotImplementedError("InMemoryStorage.diff_directories") | raise NotImplementedError("InMemoryStorage.diff_directories") | ||||
def diff_revisions(self, from_rev, to_rev, track_renaming=False): | def diff_revisions(self, from_rev, to_rev, track_renaming=False): | ||||
raise NotImplementedError("InMemoryStorage.diff_revisions") | raise NotImplementedError("InMemoryStorage.diff_revisions") | ||||
def diff_revision(self, revision, track_renaming=False): | def diff_revision(self, revision, track_renaming=False): | ||||
Show All 10 Lines |
should be covered by the scenario which you should have already added by now so i'll stop mentioning it :)