Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/in_memory.py
Show First 20 Lines • Show All 98 Lines • ▼ Show 20 Lines | def _content_add(self, contents, with_data): | ||||
'skipped_content:add': count_contents - count_content_added, | 'skipped_content:add': count_contents - count_content_added, | ||||
} | } | ||||
if with_data: | if with_data: | ||||
summary['content:add:bytes'] = count_content_bytes_added | summary['content:add:bytes'] = count_content_bytes_added | ||||
return summary | return summary | ||||
def content_add(self, contents): | def content_add(self, content): | ||||
"""Add content blobs to the storage | """Add content blobs to the storage | ||||
Args: | Args: | ||||
content (iterable): iterable of dictionaries representing | content (iterable): iterable of dictionaries representing | ||||
individual pieces of content to add. Each dictionary has the | individual pieces of content to add. Each dictionary has the | ||||
following keys: | following keys: | ||||
- data (bytes): the actual content | - data (bytes): the actual content | ||||
Show All 12 Lines | def content_add(self, content): | ||||
Returns: | Returns: | ||||
Summary dict with the following key and associated values: | Summary dict with the following key and associated values: | ||||
content:add: New contents added | content:add: New contents added | ||||
content_bytes:add: Sum of the contents' length data | content_bytes:add: Sum of the contents' length data | ||||
skipped_content:add: New skipped contents (no data) added | skipped_content:add: New skipped contents (no data) added | ||||
""" | """ | ||||
contents = [dict(c.items()) for c in contents] # semi-shallow copy | content = [dict(c.items()) for c in content] # semi-shallow copy | ||||
now = datetime.datetime.now(tz=datetime.timezone.utc) | now = datetime.datetime.now(tz=datetime.timezone.utc) | ||||
for item in contents: | for item in content: | ||||
item['ctime'] = now | item['ctime'] = now | ||||
return self._content_add(contents, with_data=True) | return self._content_add(content, with_data=True) | ||||
def content_add_metadata(self, contents): | def content_add_metadata(self, content): | ||||
"""Add content metadata to the storage (like `content_add`, but | """Add content metadata to the storage (like `content_add`, but | ||||
without inserting to the objstorage). | without inserting to the objstorage). | ||||
Args: | Args: | ||||
content (iterable): iterable of dictionaries representing | content (iterable): iterable of dictionaries representing | ||||
individual pieces of content to add. Each dictionary has the | individual pieces of content to add. Each dictionary has the | ||||
following keys: | following keys: | ||||
Show All 12 Lines | def content_add_metadata(self, content): | ||||
Returns: | Returns: | ||||
Summary dict with the following key and associated values: | Summary dict with the following key and associated values: | ||||
content:add: New contents added | content:add: New contents added | ||||
skipped_content:add: New skipped contents (no data) added | skipped_content:add: New skipped contents (no data) added | ||||
""" | """ | ||||
return self._content_add(contents, with_data=False) | return self._content_add(content, with_data=False) | ||||
def content_get(self, ids): | def content_get(self, content): | ||||
"""Retrieve in bulk contents and their data. | """Retrieve in bulk contents and their data. | ||||
This function may yield more blobs than provided sha1 identifiers, | This function may yield more blobs than provided sha1 identifiers, | ||||
in case they collide. | in case they collide. | ||||
Args: | Args: | ||||
content: iterables of sha1 | content: iterables of sha1 | ||||
Yields: | Yields: | ||||
Dict[str, bytes]: Generates streams of contents as dict with their | Dict[str, bytes]: Generates streams of contents as dict with their | ||||
raw data: | raw data: | ||||
- sha1 (bytes): content id | - sha1 (bytes): content id | ||||
- data (bytes): content's raw data | - data (bytes): content's raw data | ||||
Raises: | Raises: | ||||
ValueError in case of too much contents are required. | ValueError in case of too much contents are required. | ||||
cf. BULK_BLOCK_CONTENT_LEN_MAX | cf. BULK_BLOCK_CONTENT_LEN_MAX | ||||
""" | """ | ||||
# FIXME: Make this method support slicing the `data`. | # FIXME: Make this method support slicing the `data`. | ||||
if len(ids) > BULK_BLOCK_CONTENT_LEN_MAX: | if len(content) > BULK_BLOCK_CONTENT_LEN_MAX: | ||||
raise ValueError( | raise ValueError( | ||||
"Sending at most %s contents." % BULK_BLOCK_CONTENT_LEN_MAX) | "Sending at most %s contents." % BULK_BLOCK_CONTENT_LEN_MAX) | ||||
for obj_id in ids: | for obj_id in content: | ||||
try: | try: | ||||
data = self.objstorage.get(obj_id) | data = self.objstorage.get(obj_id) | ||||
except ObjNotFoundError: | except ObjNotFoundError: | ||||
yield None | yield None | ||||
continue | continue | ||||
yield {'sha1': obj_id, 'data': data} | yield {'sha1': obj_id, 'data': data} | ||||
Show All 36 Lines | def content_get_range(self, start, end, limit=1000, db=None, cur=None): | ||||
matched.append({ | matched.append({ | ||||
**self._contents[key], | **self._contents[key], | ||||
}) | }) | ||||
return { | return { | ||||
'contents': matched, | 'contents': matched, | ||||
'next': next_content, | 'next': next_content, | ||||
} | } | ||||
def content_get_metadata(self, sha1s): | def content_get_metadata(self, content): | ||||
"""Retrieve content metadata in bulk | """Retrieve content metadata in bulk | ||||
Args: | Args: | ||||
content: iterable of content identifiers (sha1) | content: iterable of content identifiers (sha1) | ||||
Returns: | Returns: | ||||
an iterable with content metadata corresponding to the given ids | an iterable with content metadata corresponding to the given ids | ||||
""" | """ | ||||
# FIXME: the return value should be a mapping from search key to found | # FIXME: the return value should be a mapping from search key to found | ||||
# content*s* | # content*s* | ||||
for sha1 in sha1s: | for sha1 in content: | ||||
if sha1 in self._content_indexes['sha1']: | if sha1 in self._content_indexes['sha1']: | ||||
objs = self._content_indexes['sha1'][sha1] | objs = self._content_indexes['sha1'][sha1] | ||||
# FIXME: rather than selecting one of the objects with that | # FIXME: rather than selecting one of the objects with that | ||||
# hash, we should return all of them. See: | # hash, we should return all of them. See: | ||||
# https://forge.softwareheritage.org/D645?id=1994#inline-3389 | # https://forge.softwareheritage.org/D645?id=1994#inline-3389 | ||||
key = random.sample(objs, 1)[0] | key = random.sample(objs, 1)[0] | ||||
data = copy.deepcopy(self._contents[key]) | data = copy.deepcopy(self._contents[key]) | ||||
data.pop('ctime') | data.pop('ctime') | ||||
Show All 20 Lines | def content_find(self, content): | ||||
found.append(self._content_indexes[algo][hash]) | found.append(self._content_indexes[algo][hash]) | ||||
if not found: | if not found: | ||||
return [] | return [] | ||||
keys = list(set.intersection(*found)) | keys = list(set.intersection(*found)) | ||||
return copy.deepcopy([self._contents[key] for key in keys]) | return copy.deepcopy([self._contents[key] for key in keys]) | ||||
def content_missing(self, contents, key_hash='sha1'): | def content_missing(self, content, key_hash='sha1'): | ||||
"""List content missing from storage | """List content missing from storage | ||||
Args: | Args: | ||||
contents ([dict]): iterable of dictionaries whose keys are | contents ([dict]): iterable of dictionaries whose keys are | ||||
either 'length' or an item of | either 'length' or an item of | ||||
:data:`swh.model.hashutil.ALGORITHMS`; | :data:`swh.model.hashutil.ALGORITHMS`; | ||||
mapped to the corresponding checksum | mapped to the corresponding checksum | ||||
(or length). | (or length). | ||||
key_hash (str): name of the column to use as hash id | key_hash (str): name of the column to use as hash id | ||||
result (default: 'sha1') | result (default: 'sha1') | ||||
Returns: | Returns: | ||||
iterable ([bytes]): missing content ids (as per the | iterable ([bytes]): missing content ids (as per the | ||||
key_hash column) | key_hash column) | ||||
""" | """ | ||||
for content in contents: | for cont in content: | ||||
for (algo, hash_) in content.items(): | for (algo, hash_) in cont.items(): | ||||
if algo not in DEFAULT_ALGORITHMS: | if algo not in DEFAULT_ALGORITHMS: | ||||
continue | continue | ||||
if hash_ not in self._content_indexes.get(algo, []): | if hash_ not in self._content_indexes.get(algo, []): | ||||
yield content[key_hash] | yield cont[key_hash] | ||||
break | break | ||||
else: | else: | ||||
for result in self.content_find(content): | for result in self.content_find(cont): | ||||
if result['status'] == 'missing': | if result['status'] == 'missing': | ||||
yield content[key_hash] | yield cont[key_hash] | ||||
def content_missing_per_sha1(self, contents): | def content_missing_per_sha1(self, contents): | ||||
"""List content missing from storage based only on sha1. | """List content missing from storage based only on sha1. | ||||
Args: | Args: | ||||
contents: Iterable of sha1 to check for absence. | contents: Iterable of sha1 to check for absence. | ||||
Returns: | Returns: | ||||
Show All 39 Lines | def directory_add(self, directories): | ||||
if directory['id'] not in self._directories: | if directory['id'] not in self._directories: | ||||
count += 1 | count += 1 | ||||
self._directories[directory['id']] = copy.deepcopy(directory) | self._directories[directory['id']] = copy.deepcopy(directory) | ||||
self._objects[directory['id']].append( | self._objects[directory['id']].append( | ||||
('directory', directory['id'])) | ('directory', directory['id'])) | ||||
return {'directory:add': count} | return {'directory:add': count} | ||||
def directory_missing(self, directory_ids): | def directory_missing(self, directories): | ||||
"""List directories missing from storage | """List directories missing from storage | ||||
Args: | Args: | ||||
directories (iterable): an iterable of directory ids | directories (iterable): an iterable of directory ids | ||||
Yields: | Yields: | ||||
missing directory ids | missing directory ids | ||||
""" | """ | ||||
for id in directory_ids: | for id in directories: | ||||
if id not in self._directories: | if id not in self._directories: | ||||
yield id | yield id | ||||
def _join_dentry_to_content(self, dentry): | def _join_dentry_to_content(self, dentry): | ||||
keys = ( | keys = ( | ||||
'status', | 'status', | ||||
'sha1', | 'sha1', | ||||
'sha1_git', | 'sha1_git', | ||||
Show All 17 Lines | def _directory_ls(self, directory_id, recursive, prefix=b''): | ||||
ret = self._join_dentry_to_content(entry) | ret = self._join_dentry_to_content(entry) | ||||
ret['name'] = prefix + ret['name'] | ret['name'] = prefix + ret['name'] | ||||
ret['dir_id'] = directory_id | ret['dir_id'] = directory_id | ||||
yield ret | yield ret | ||||
if recursive and ret['type'] == 'dir': | if recursive and ret['type'] == 'dir': | ||||
yield from self._directory_ls( | yield from self._directory_ls( | ||||
ret['target'], True, prefix + ret['name'] + b'/') | ret['target'], True, prefix + ret['name'] + b'/') | ||||
def directory_ls(self, directory_id, recursive=False): | def directory_ls(self, directory, recursive=False): | ||||
"""Get entries for one directory. | """Get entries for one directory. | ||||
Args: | Args: | ||||
- directory: the directory to list entries from. | - directory: the directory to list entries from. | ||||
- recursive: if flag on, this list recursively from this directory. | - recursive: if flag on, this list recursively from this directory. | ||||
Returns: | Returns: | ||||
List of entries for such directory. | List of entries for such directory. | ||||
If `recursive=True`, names in the path of a dir/file not at the | If `recursive=True`, names in the path of a dir/file not at the | ||||
root are concatenated with a slash (`/`). | root are concatenated with a slash (`/`). | ||||
""" | """ | ||||
yield from self._directory_ls(directory_id, recursive) | yield from self._directory_ls(directory, recursive) | ||||
def directory_entry_get_by_path(self, directory, paths): | def directory_entry_get_by_path(self, directory, paths): | ||||
"""Get the directory entry (either file or dir) from directory with path. | """Get the directory entry (either file or dir) from directory with path. | ||||
Args: | Args: | ||||
- directory: sha1 of the top level directory | - directory: sha1 of the top level directory | ||||
- paths: path to lookup from the top level directory. From left | - paths: path to lookup from the top level directory. From left | ||||
(top) to right (bottom). | (top) to right (bottom). | ||||
▲ Show 20 Lines • Show All 76 Lines • ▼ Show 20 Lines | def revision_add(self, revisions): | ||||
rev['committer_date'] = normalize_timestamp( | rev['committer_date'] = normalize_timestamp( | ||||
rev.get('committer_date')) | rev.get('committer_date')) | ||||
self._objects[revision['id']].append( | self._objects[revision['id']].append( | ||||
('revision', revision['id'])) | ('revision', revision['id'])) | ||||
count += 1 | count += 1 | ||||
return {'revision:add': count} | return {'revision:add': count} | ||||
def revision_missing(self, revision_ids): | def revision_missing(self, revisions): | ||||
"""List revisions missing from storage | """List revisions missing from storage | ||||
Args: | Args: | ||||
revisions (iterable): revision ids | revisions (iterable): revision ids | ||||
Yields: | Yields: | ||||
missing revision ids | missing revision ids | ||||
""" | """ | ||||
for id in revision_ids: | for id in revisions: | ||||
if id not in self._revisions: | if id not in self._revisions: | ||||
yield id | yield id | ||||
def revision_get(self, revision_ids): | def revision_get(self, revisions): | ||||
for id in revision_ids: | for id in revisions: | ||||
yield copy.deepcopy(self._revisions.get(id)) | yield copy.deepcopy(self._revisions.get(id)) | ||||
def _get_parent_revs(self, rev_id, seen, limit): | def _get_parent_revs(self, rev_id, seen, limit): | ||||
if limit and len(seen) >= limit: | if limit and len(seen) >= limit: | ||||
return | return | ||||
if rev_id in seen or rev_id not in self._revisions: | if rev_id in seen or rev_id not in self._revisions: | ||||
return | return | ||||
seen.add(rev_id) | seen.add(rev_id) | ||||
yield self._revisions[rev_id] | yield self._revisions[rev_id] | ||||
for parent in self._revisions[rev_id]['parents']: | for parent in self._revisions[rev_id]['parents']: | ||||
yield from self._get_parent_revs(parent, seen, limit) | yield from self._get_parent_revs(parent, seen, limit) | ||||
def revision_log(self, revision_ids, limit=None): | def revision_log(self, revisions, limit=None): | ||||
"""Fetch revision entry from the given root revisions. | """Fetch revision entry from the given root revisions. | ||||
Args: | Args: | ||||
revisions: array of root revision to lookup | revisions: array of root revision to lookup | ||||
limit: limitation on the output result. Default to None. | limit: limitation on the output result. Default to None. | ||||
Yields: | Yields: | ||||
List of revision log from such revisions root. | List of revision log from such revisions root. | ||||
""" | """ | ||||
seen = set() | seen = set() | ||||
for rev_id in revision_ids: | for rev_id in revisions: | ||||
yield from self._get_parent_revs(rev_id, seen, limit) | yield from self._get_parent_revs(rev_id, seen, limit) | ||||
def revision_shortlog(self, revisions, limit=None): | def revision_shortlog(self, revisions, limit=None): | ||||
"""Fetch the shortlog for the given revisions | """Fetch the shortlog for the given revisions | ||||
Args: | Args: | ||||
revisions: list of root revisions to lookup | revisions: list of root revisions to lookup | ||||
limit: depth limitation for the output | limit: depth limitation for the output | ||||
▲ Show 20 Lines • Show All 69 Lines • ▼ Show 20 Lines | def release_get(self, releases): | ||||
Yields: | Yields: | ||||
dicts with the same keys as those given to `release_add` | dicts with the same keys as those given to `release_add` | ||||
(or ``None`` if a release does not exist) | (or ``None`` if a release does not exist) | ||||
""" | """ | ||||
for rel_id in releases: | for rel_id in releases: | ||||
yield copy.deepcopy(self._releases.get(rel_id)) | yield copy.deepcopy(self._releases.get(rel_id)) | ||||
def snapshot_add(self, snapshots, legacy_arg1=None, legacy_arg2=None): | def snapshot_add(self, snapshots, origin=None, visit=None): | ||||
"""Add a snapshot to the storage | """Add a snapshot to the storage | ||||
Args: | Args: | ||||
snapshot ([dict]): the snapshots to add, containing the | snapshot ([dict]): the snapshots to add, containing the | ||||
following keys: | following keys: | ||||
- **id** (:class:`bytes`): id of the snapshot | - **id** (:class:`bytes`): id of the snapshot | ||||
- **branches** (:class:`dict`): branches the snapshot contains, | - **branches** (:class:`dict`): branches the snapshot contains, | ||||
Show All 12 Lines | def snapshot_add(self, snapshots, origin=None, visit=None): | ||||
ValueError: if the origin's or visit's identifier does not exist. | ValueError: if the origin's or visit's identifier does not exist. | ||||
Returns: | Returns: | ||||
Summary dict of keys with associated count as values | Summary dict of keys with associated count as values | ||||
snapshot_added: Count of object actually stored in db | snapshot_added: Count of object actually stored in db | ||||
""" | """ | ||||
if legacy_arg1: | if origin: | ||||
assert legacy_arg2 | if not visit: | ||||
(origin, visit, snapshots) = \ | raise TypeError( | ||||
(snapshots, legacy_arg1, [legacy_arg2]) | 'snapshot_add expects one argument (or, as a legacy ' | ||||
'behavior, three arguments), not two') | |||||
if isinstance(snapshots, (int, bytes)): | |||||
# Called by legacy code that uses the new api/client.py | |||||
(origin_id, visit_id, snapshots) = \ | |||||
(snapshots, origin, [visit]) | |||||
else: | else: | ||||
origin = visit = None | # Called by legacy code that uses the old api/client.py | ||||
origin_id = origin | |||||
visit_id = visit | |||||
snapshots = [snapshots] | |||||
count = 0 | count = 0 | ||||
for snapshot in snapshots: | for snapshot in snapshots: | ||||
snapshot_id = snapshot['id'] | snapshot_id = snapshot['id'] | ||||
if snapshot_id not in self._snapshots: | if snapshot_id not in self._snapshots: | ||||
if self.journal_writer: | if self.journal_writer: | ||||
self.journal_writer.write_addition('snapshot', snapshot) | self.journal_writer.write_addition('snapshot', snapshot) | ||||
self._snapshots[snapshot_id] = { | self._snapshots[snapshot_id] = { | ||||
'id': snapshot_id, | 'id': snapshot_id, | ||||
'branches': copy.deepcopy(snapshot['branches']), | 'branches': copy.deepcopy(snapshot['branches']), | ||||
'_sorted_branch_names': sorted(snapshot['branches']) | '_sorted_branch_names': sorted(snapshot['branches']) | ||||
} | } | ||||
self._objects[snapshot_id].append(('snapshot', snapshot_id)) | self._objects[snapshot_id].append(('snapshot', snapshot_id)) | ||||
count += 1 | count += 1 | ||||
if origin: | if visit_id: | ||||
# Legacy API, there can be only one snapshot | # Legacy API, there can be only one snapshot | ||||
self.origin_visit_update( | self.origin_visit_update( | ||||
origin, visit, snapshot=snapshots[0]['id']) | origin_id, visit_id, snapshot=snapshots[0]['id']) | ||||
return {'snapshot:add': count} | return {'snapshot:add': count} | ||||
def snapshot_get(self, snapshot_id): | def snapshot_get(self, snapshot_id): | ||||
"""Get the content, possibly partial, of a snapshot with the given id | """Get the content, possibly partial, of a snapshot with the given id | ||||
The branches of the snapshot are iterated in the lexicographical | The branches of the snapshot are iterated in the lexicographical | ||||
order of their names. | order of their names. | ||||
▲ Show 20 Lines • Show All 300 Lines • ▼ Show 20 Lines | def origin_search(self, url_pattern, offset=0, limit=50, | ||||
Returns: | Returns: | ||||
An iterable of dict containing origin information as returned | An iterable of dict containing origin information as returned | ||||
by :meth:`swh.storage.storage.Storage.origin_get`. | by :meth:`swh.storage.storage.Storage.origin_get`. | ||||
""" | """ | ||||
origins = self._origins | origins = self._origins | ||||
if regexp: | if regexp: | ||||
pat = re.compile(url_pattern) | pat = re.compile(url_pattern) | ||||
origins = [orig for orig in origins if pat.match(orig['url'])] | origins = [orig for orig in origins if pat.search(orig['url'])] | ||||
else: | else: | ||||
origins = [orig for orig in origins if url_pattern in orig['url']] | origins = [orig for orig in origins if url_pattern in orig['url']] | ||||
if with_visit: | if with_visit: | ||||
origins = [orig for orig in origins | origins = [orig for orig in origins | ||||
if len(self._origin_visits[orig['id']-1]) > 0] | if len(self._origin_visits[orig['id']-1]) > 0] | ||||
origins = copy.deepcopy(origins[offset:offset+limit]) | origins = copy.deepcopy(origins[offset:offset+limit]) | ||||
return origins | return origins | ||||
▲ Show 20 Lines • Show All 488 Lines • ▼ Show 20 Lines | |||||
@staticmethod | @staticmethod | ||||
def _content_key(content): | def _content_key(content): | ||||
"""A stable key for a content""" | """A stable key for a content""" | ||||
return tuple(content.get(key) for key in sorted(DEFAULT_ALGORITHMS)) | return tuple(content.get(key) for key in sorted(DEFAULT_ALGORITHMS)) | ||||
@staticmethod | @staticmethod | ||||
def _tool_key(tool): | def _tool_key(tool): | ||||
return (tool['name'], tool['version'], | return '%r %r %r' % (tool['name'], tool['version'], | ||||
tuple(sorted(tool['configuration'].items()))) | tuple(sorted(tool['configuration'].items()))) | ||||
@staticmethod | @staticmethod | ||||
def _metadata_provider_key(provider): | def _metadata_provider_key(provider): | ||||
return (provider['name'], provider['url']) | return '%r %r' % (provider['name'], provider['url']) |