Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/in_memory.py
Show All 13 Lines | |||||
from collections import defaultdict | from collections import defaultdict | ||||
from datetime import timedelta | from datetime import timedelta | ||||
from typing import ( | from typing import ( | ||||
Any, | Any, | ||||
Callable, | Callable, | ||||
Dict, | Dict, | ||||
Generic, | Generic, | ||||
Hashable, | |||||
Iterable, | Iterable, | ||||
Iterator, | Iterator, | ||||
List, | List, | ||||
Optional, | Optional, | ||||
Tuple, | Tuple, | ||||
TypeVar, | TypeVar, | ||||
Union, | Union, | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 63 Lines • ▼ Show 20 Lines | class SortedList(collections.UserList, Generic[SortedListKey, SortedListItem]): | ||||
def iter_from(self, start_key: SortedListKey) -> Iterator[SortedListItem]: | def iter_from(self, start_key: SortedListKey) -> Iterator[SortedListItem]: | ||||
"""Returns an iterator over all the elements whose key is greater | """Returns an iterator over all the elements whose key is greater | ||||
or equal to `start_key`. | or equal to `start_key`. | ||||
(This is an efficient equivalent to: | (This is an efficient equivalent to: | ||||
`(x for x in L if key(x) >= start_key)`) | `(x for x in L if key(x) >= start_key)`) | ||||
""" | """ | ||||
from_index = bisect.bisect_left(self.data, (start_key,)) | from_index = bisect.bisect_left(self.data, (start_key,)) | ||||
for (k, item) in itertools.islice(self.data, from_index, None): | for (k, item) in itertools.islice(self.data, from_index, None): | ||||
ardumont: Missing a rebase on latest master
That's D2987, ain't it? | |||||
yield item | yield item | ||||
class InMemoryStorage: | class InMemoryStorage: | ||||
def __init__(self, journal_writer=None): | def __init__(self, journal_writer=None): | ||||
self.reset() | self.reset() | ||||
self.journal_writer = JournalWriter(journal_writer) | self.journal_writer = JournalWriter(journal_writer) | ||||
def reset(self): | def reset(self): | ||||
self._contents = {} | self._contents = {} | ||||
self._content_indexes = defaultdict(lambda: defaultdict(set)) | self._content_indexes = defaultdict(lambda: defaultdict(set)) | ||||
self._skipped_contents = {} | self._skipped_contents = {} | ||||
self._skipped_content_indexes = defaultdict(lambda: defaultdict(set)) | self._skipped_content_indexes = defaultdict(lambda: defaultdict(set)) | ||||
self._directories = {} | self._directories = {} | ||||
self._revisions = {} | self._revisions = {} | ||||
self._releases = {} | self._releases = {} | ||||
self._snapshots = {} | self._snapshots = {} | ||||
self._origins = {} | self._origins = {} | ||||
self._origins_by_id = [] | self._origins_by_id = [] | ||||
self._origins_by_sha1 = {} | self._origins_by_sha1 = {} | ||||
self._origin_visits = {} | self._origin_visits = {} | ||||
self._origin_visit_statuses: Dict[Tuple[str, int], List[OriginVisitStatus]] = {} | self._origin_visit_statuses: Dict[Tuple[str, int], List[OriginVisitStatus]] = {} | ||||
self._persons = [] | self._persons = [] | ||||
self._origin_metadata = defaultdict(list) | |||||
self._tools = {} | |||||
self._metadata_providers = {} | |||||
self._objects = defaultdict(list) | |||||
# {origin_url: {authority: [metadata]}} | |||||
self._origin_metadata: Dict[ | |||||
str, Dict[Hashable, SortedList[datetime.datetime, Dict[str, Any]]] | |||||
] = defaultdict( | |||||
lambda: defaultdict(lambda: SortedList(key=lambda x: x["discovery_date"])) | |||||
) # noqa | |||||
self._metadata_fetchers: Dict[Hashable, Dict[str, Any]] = {} | |||||
self._metadata_authorities: Dict[Hashable, Dict[str, Any]] = {} | |||||
self._objects = defaultdict(list) | |||||
self._sorted_sha1s = SortedList[bytes, bytes]() | self._sorted_sha1s = SortedList[bytes, bytes]() | ||||
self.objstorage = ObjStorage({"cls": "memory", "args": {}}) | self.objstorage = ObjStorage({"cls": "memory", "args": {}}) | ||||
def check_config(self, *, check_write): | def check_config(self, *, check_write): | ||||
return True | return True | ||||
def _content_add(self, contents: Iterable[Content], with_data: bool) -> Dict: | def _content_add(self, contents: Iterable[Content], with_data: bool) -> Dict: | ||||
self.journal_writer.content_add(contents) | self.journal_writer.content_add(contents) | ||||
content_add = 0 | content_add = 0 | ||||
content_add_bytes = 0 | |||||
if with_data: | if with_data: | ||||
summary = self.objstorage.content_add( | summary = self.objstorage.content_add( | ||||
c for c in contents if c.status != "absent" | c for c in contents if c.status != "absent" | ||||
) | ) | ||||
content_add_bytes = summary["content:add:bytes"] | content_add_bytes = summary["content:add:bytes"] | ||||
for content in contents: | for content in contents: | ||||
key = self._content_key(content) | key = self._content_key(content) | ||||
▲ Show 20 Lines • Show All 884 Lines • ▼ Show 20 Lines | def stat_counters(self): | ||||
for (obj_type, obj_id) in itertools.chain(*self._objects.values()) | for (obj_type, obj_id) in itertools.chain(*self._objects.values()) | ||||
) | ) | ||||
) | ) | ||||
return stats | return stats | ||||
def refresh_stat_counters(self): | def refresh_stat_counters(self): | ||||
pass | pass | ||||
def origin_metadata_add(self, origin_url, ts, provider, tool, metadata): | def origin_metadata_add( | ||||
self, | |||||
origin_url: str, | |||||
discovery_date: datetime.datetime, | |||||
authority: Dict[str, Any], | |||||
fetcher: Dict[str, Any], | |||||
format: str, | |||||
metadata: bytes, | |||||
) -> None: | |||||
if not isinstance(origin_url, str): | if not isinstance(origin_url, str): | ||||
raise TypeError("origin_id must be str, not %r" % (origin_url,)) | raise StorageArgumentException( | ||||
"origin_id must be str, not %r" % (origin_url,) | |||||
if isinstance(ts, str): | ) | ||||
ts = dateutil.parser.parse(ts) | authority_key = self._metadata_authority_key(authority) | ||||
if authority_key not in self._metadata_authorities: | |||||
raise StorageArgumentException(f"Unknown authority {authority}") | |||||
fetcher_key = self._metadata_fetcher_key(fetcher) | |||||
if fetcher_key not in self._metadata_fetchers: | |||||
raise StorageArgumentException(f"Unknown fetcher {fetcher}") | |||||
origin_metadata = { | origin_metadata = { | ||||
"origin_url": origin_url, | "origin_url": origin_url, | ||||
"discovery_date": ts, | "discovery_date": discovery_date, | ||||
"tool_id": tool, | "authority": authority_key, | ||||
"fetcher": fetcher_key, | |||||
"format": format, | |||||
"metadata": metadata, | "metadata": metadata, | ||||
"provider_id": provider, | |||||
} | } | ||||
self._origin_metadata[origin_url].append(origin_metadata) | self._origin_metadata[origin_url][authority_key].add(origin_metadata) | ||||
return None | return None | ||||
def origin_metadata_get_by(self, origin_url, provider_type=None): | def origin_metadata_get( | ||||
self, | |||||
origin_url: str, | |||||
authority: Dict[str, str], | |||||
after: Optional[datetime.datetime] = None, | |||||
limit: Optional[int] = None, | |||||
) -> List[Dict[str, Any]]: | |||||
if not isinstance(origin_url, str): | if not isinstance(origin_url, str): | ||||
raise TypeError("origin_url must be str, not %r" % (origin_url,)) | raise TypeError("origin_url must be str, not %r" % (origin_url,)) | ||||
metadata = [] | |||||
for item in self._origin_metadata[origin_url]: | |||||
item = copy.deepcopy(item) | |||||
provider = self.metadata_provider_get(item["provider_id"]) | |||||
for attr_name in ("name", "type", "url"): | |||||
item["provider_" + attr_name] = provider["provider_" + attr_name] | |||||
metadata.append(item) | |||||
return metadata | |||||
def tool_add(self, tools): | |||||
inserted = [] | |||||
for tool in tools: | |||||
key = self._tool_key(tool) | |||||
assert "id" not in tool | |||||
record = copy.deepcopy(tool) | |||||
record["id"] = key # TODO: remove this | |||||
if key not in self._tools: | |||||
self._tools[key] = record | |||||
inserted.append(copy.deepcopy(self._tools[key])) | |||||
return inserted | authority_key = self._metadata_authority_key(authority) | ||||
def tool_get(self, tool): | if after is None: | ||||
return self._tools.get(self._tool_key(tool)) | entries = iter(self._origin_metadata[origin_url][authority_key]) | ||||
else: | |||||
entries = self._origin_metadata[origin_url][authority_key].iter_from(after) | |||||
if limit: | |||||
entries = itertools.islice(entries, 0, limit) | |||||
def metadata_provider_add( | results = [] | ||||
self, provider_name, provider_type, provider_url, metadata | for entry in entries: | ||||
): | authority = self._metadata_authorities[entry["authority"]] | ||||
provider = { | fetcher = self._metadata_fetchers[entry["fetcher"]] | ||||
"provider_name": provider_name, | results.append( | ||||
"provider_type": provider_type, | { | ||||
"provider_url": provider_url, | **entry, | ||||
"authority": {"type": authority["type"], "url": authority["url"],}, | |||||
"fetcher": { | |||||
"name": fetcher["name"], | |||||
"version": fetcher["version"], | |||||
}, | |||||
} | |||||
) | |||||
return results | |||||
def metadata_fetcher_add( | |||||
self, name: str, version: str, metadata: Dict[str, Any] | |||||
) -> None: | |||||
fetcher = { | |||||
"name": name, | |||||
"version": version, | |||||
"metadata": metadata, | |||||
} | |||||
key = self._metadata_fetcher_key(fetcher) | |||||
if key not in self._metadata_fetchers: | |||||
self._metadata_fetchers[key] = fetcher | |||||
def metadata_fetcher_get(self, name: str, version: str) -> Optional[Dict[str, Any]]: | |||||
return self._metadata_fetchers.get( | |||||
self._metadata_fetcher_key({"name": name, "version": version}) | |||||
) | |||||
def metadata_authority_add( | |||||
self, type: str, url: str, metadata: Dict[str, Any] | |||||
) -> None: | |||||
authority = { | |||||
"type": type, | |||||
"url": url, | |||||
"metadata": metadata, | "metadata": metadata, | ||||
} | } | ||||
key = self._metadata_provider_key(provider) | key = self._metadata_authority_key(authority) | ||||
provider["id"] = key | self._metadata_authorities[key] = authority | ||||
self._metadata_providers[key] = provider | |||||
return key | def metadata_authority_get(self, type: str, url: str) -> Optional[Dict[str, Any]]: | ||||
return self._metadata_authorities.get( | |||||
def metadata_provider_get(self, provider_id): | self._metadata_authority_key({"type": type, "url": url}) | ||||
return self._metadata_providers.get(provider_id) | ) | ||||
def metadata_provider_get_by(self, provider): | |||||
key = self._metadata_provider_key(provider) | |||||
return self._metadata_providers.get(key) | |||||
def _get_origin_url(self, origin): | def _get_origin_url(self, origin): | ||||
if isinstance(origin, str): | if isinstance(origin, str): | ||||
return origin | return origin | ||||
else: | else: | ||||
raise TypeError("origin must be a string.") | raise TypeError("origin must be a string.") | ||||
def _person_add(self, person): | def _person_add(self, person): | ||||
Show All 10 Lines | |||||
@staticmethod | @staticmethod | ||||
def _content_key(content): | def _content_key(content): | ||||
""" A stable key and the algorithm for a content""" | """ A stable key and the algorithm for a content""" | ||||
if isinstance(content, BaseContent): | if isinstance(content, BaseContent): | ||||
content = content.to_dict() | content = content.to_dict() | ||||
return tuple((key, content.get(key)) for key in sorted(DEFAULT_ALGORITHMS)) | return tuple((key, content.get(key)) for key in sorted(DEFAULT_ALGORITHMS)) | ||||
@staticmethod | @staticmethod | ||||
def _tool_key(tool): | def _metadata_fetcher_key(fetcher: Dict) -> Hashable: | ||||
return "%r %r %r" % ( | return (fetcher["name"], fetcher["version"]) | ||||
tool["name"], | |||||
tool["version"], | |||||
tuple(sorted(tool["configuration"].items())), | |||||
) | |||||
@staticmethod | @staticmethod | ||||
def _metadata_provider_key(provider): | def _metadata_authority_key(authority: Dict) -> Hashable: | ||||
return "%r %r" % (provider["provider_name"], provider["provider_url"]) | return (authority["type"], authority["url"]) | ||||
def diff_directories(self, from_dir, to_dir, track_renaming=False): | def diff_directories(self, from_dir, to_dir, track_renaming=False): | ||||
raise NotImplementedError("InMemoryStorage.diff_directories") | raise NotImplementedError("InMemoryStorage.diff_directories") | ||||
def diff_revisions(self, from_rev, to_rev, track_renaming=False): | def diff_revisions(self, from_rev, to_rev, track_renaming=False): | ||||
raise NotImplementedError("InMemoryStorage.diff_revisions") | raise NotImplementedError("InMemoryStorage.diff_revisions") | ||||
def diff_revision(self, revision, track_renaming=False): | def diff_revision(self, revision, track_renaming=False): | ||||
Show All 10 Lines |
Missing a rebase on latest master
That's D2987, ain't it?