Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/storage.py
Show First 20 Lines • Show All 252 Lines • ▼ Show 20 Lines | def content_add(self, content, db, cur): | ||||
content:add:bytes: Sum of the contents' length data | content:add:bytes: Sum of the contents' length data | ||||
skipped_content:add: New skipped contents (no data) added | skipped_content:add: New skipped contents (no data) added | ||||
""" | """ | ||||
content = [dict(c.items()) for c in content] # semi-shallow copy | content = [dict(c.items()) for c in content] # semi-shallow copy | ||||
now = datetime.datetime.now(tz=datetime.timezone.utc) | now = datetime.datetime.now(tz=datetime.timezone.utc) | ||||
for item in content: | for item in content: | ||||
item['ctime'] = now | item['ctime'] = now | ||||
if self.journal_writer: | |||||
for item in content: | |||||
if 'data' in item: | |||||
item = item.copy() | |||||
del item['data'] | |||||
self.journal_writer.write_addition('content', item) | |||||
content = [self._normalize_content(c) for c in content] | content = [self._normalize_content(c) for c in content] | ||||
for c in content: | for c in content: | ||||
self._validate_content(c) | self._validate_content(c) | ||||
(content_with_data, content_without_data, summary) = \ | (content_with_data, content_without_data, summary) = \ | ||||
self._filter_new_content(content, db, cur) | self._filter_new_content(content, db, cur) | ||||
if self.journal_writer: | |||||
for item in content_with_data: | |||||
if 'data' in item: | |||||
item = item.copy() | |||||
del item['data'] | |||||
self.journal_writer.write_addition('content', item) | |||||
for item in content_without_data: | |||||
self.journal_writer.write_addition('content', item) | |||||
def add_to_objstorage(): | def add_to_objstorage(): | ||||
"""Add to objstorage the new missing_content | """Add to objstorage the new missing_content | ||||
Returns: | Returns: | ||||
Sum of all the content's data length pushed to the | Sum of all the content's data length pushed to the | ||||
objstorage. Content present twice is only sent once. | objstorage. Content present twice is only sent once. | ||||
""" | """ | ||||
▲ Show 20 Lines • Show All 78 Lines • ▼ Show 20 Lines | def content_add_metadata(self, content, db, cur): | ||||
- ctime (datetime): time of insertion in the archive | - ctime (datetime): time of insertion in the archive | ||||
Returns: | Returns: | ||||
Summary dict with the following key and associated values: | Summary dict with the following key and associated values: | ||||
content:add: New contents added | content:add: New contents added | ||||
skipped_content:add: New skipped contents (no data) added | skipped_content:add: New skipped contents (no data) added | ||||
""" | """ | ||||
if self.journal_writer: | |||||
for item in content: | |||||
assert 'data' not in content | |||||
self.journal_writer.write_addition('content', item) | |||||
content = [self._normalize_content(c) for c in content] | content = [self._normalize_content(c) for c in content] | ||||
for c in content: | for c in content: | ||||
self._validate_content(c) | self._validate_content(c) | ||||
(content_with_data, content_without_data, summary) = \ | (content_with_data, content_without_data, summary) = \ | ||||
self._filter_new_content(content, db, cur) | self._filter_new_content(content, db, cur) | ||||
if self.journal_writer: | |||||
for item in itertools.chain(content_with_data, | |||||
content_without_data): | |||||
assert 'data' not in content | |||||
self.journal_writer.write_addition('content', item) | |||||
self._content_add_metadata( | self._content_add_metadata( | ||||
db, cur, content_with_data, content_without_data) | db, cur, content_with_data, content_without_data) | ||||
return summary | return summary | ||||
def content_get(self, content): | def content_get(self, content): | ||||
"""Retrieve in bulk contents and their data. | """Retrieve in bulk contents and their data. | ||||
▲ Show 20 Lines • Show All 202 Lines • ▼ Show 20 Lines | def directory_add(self, directories, db, cur): | ||||
Returns: | Returns: | ||||
Summary dict of keys with associated count as values: | Summary dict of keys with associated count as values: | ||||
directory:add: Number of directories actually added | directory:add: Number of directories actually added | ||||
""" | """ | ||||
summary = {'directory:add': 0} | summary = {'directory:add': 0} | ||||
if self.journal_writer: | |||||
self.journal_writer.write_additions('directory', directories) | |||||
dirs = set() | dirs = set() | ||||
dir_entries = { | dir_entries = { | ||||
'file': defaultdict(list), | 'file': defaultdict(list), | ||||
'dir': defaultdict(list), | 'dir': defaultdict(list), | ||||
'rev': defaultdict(list), | 'rev': defaultdict(list), | ||||
} | } | ||||
for cur_dir in directories: | for cur_dir in directories: | ||||
dir_id = cur_dir['id'] | dir_id = cur_dir['id'] | ||||
dirs.add(dir_id) | dirs.add(dir_id) | ||||
for src_entry in cur_dir['entries']: | for src_entry in cur_dir['entries']: | ||||
entry = src_entry.copy() | entry = src_entry.copy() | ||||
entry['dir_id'] = dir_id | entry['dir_id'] = dir_id | ||||
if entry['type'] not in ('file', 'dir', 'rev'): | if entry['type'] not in ('file', 'dir', 'rev'): | ||||
raise ValueError( | raise ValueError( | ||||
'Entry type must be file, dir, or rev; not %s' | 'Entry type must be file, dir, or rev; not %s' | ||||
% entry['type']) | % entry['type']) | ||||
dir_entries[entry['type']][dir_id].append(entry) | dir_entries[entry['type']][dir_id].append(entry) | ||||
dirs_missing = set(self.directory_missing(dirs, db=db, cur=cur)) | dirs_missing = set(self.directory_missing(dirs, db=db, cur=cur)) | ||||
if not dirs_missing: | if not dirs_missing: | ||||
return summary | return summary | ||||
if self.journal_writer: | |||||
self.journal_writer.write_additions( | |||||
'directory', | |||||
(dir_ for dir_ in directories | |||||
if dir_['id'] in dirs_missing)) | |||||
# Copy directory ids | # Copy directory ids | ||||
dirs_missing_dict = ({'id': dir} for dir in dirs_missing) | dirs_missing_dict = ({'id': dir} for dir in dirs_missing) | ||||
db.mktemp('directory', cur) | db.mktemp('directory', cur) | ||||
db.copy_to(dirs_missing_dict, 'tmp_directory', ['id'], cur) | db.copy_to(dirs_missing_dict, 'tmp_directory', ['id'], cur) | ||||
# Copy entries | # Copy entries | ||||
for entry_type, entry_list in dir_entries.items(): | for entry_type, entry_list in dir_entries.items(): | ||||
entries = itertools.chain.from_iterable( | entries = itertools.chain.from_iterable( | ||||
▲ Show 20 Lines • Show All 106 Lines • ▼ Show 20 Lines | def revision_add(self, revisions, db, cur): | ||||
Returns: | Returns: | ||||
Summary dict of keys with associated count as values | Summary dict of keys with associated count as values | ||||
revision:add: New objects actually stored in db | revision:add: New objects actually stored in db | ||||
""" | """ | ||||
summary = {'revision:add': 0} | summary = {'revision:add': 0} | ||||
if self.journal_writer: | |||||
self.journal_writer.write_additions('revision', revisions) | |||||
revisions_missing = set(self.revision_missing( | revisions_missing = set(self.revision_missing( | ||||
set(revision['id'] for revision in revisions), | set(revision['id'] for revision in revisions), | ||||
db=db, cur=cur)) | db=db, cur=cur)) | ||||
if not revisions_missing: | if not revisions_missing: | ||||
return summary | return summary | ||||
db.mktemp_revision(cur) | db.mktemp_revision(cur) | ||||
revisions_filtered = ( | revisions_filtered = [ | ||||
converters.revision_to_db(revision) for revision in revisions | revision for revision in revisions | ||||
if revision['id'] in revisions_missing) | if revision['id'] in revisions_missing] | ||||
if self.journal_writer: | |||||
self.journal_writer.write_additions('revision', revisions_filtered) | |||||
revisions_filtered = map(converters.revision_to_db, revisions_filtered) | |||||
parents_filtered = [] | parents_filtered = [] | ||||
db.copy_to( | db.copy_to( | ||||
revisions_filtered, 'tmp_revision', db.revision_add_cols, | revisions_filtered, 'tmp_revision', db.revision_add_cols, | ||||
cur, | cur, | ||||
lambda rev: parents_filtered.extend(rev['parents'])) | lambda rev: parents_filtered.extend(rev['parents'])) | ||||
▲ Show 20 Lines • Show All 102 Lines • ▼ Show 20 Lines | def release_add(self, releases, db, cur): | ||||
Returns: | Returns: | ||||
Summary dict of keys with associated count as values | Summary dict of keys with associated count as values | ||||
release:add: New objects contents actually stored in db | release:add: New objects contents actually stored in db | ||||
""" | """ | ||||
summary = {'release:add': 0} | summary = {'release:add': 0} | ||||
if self.journal_writer: | |||||
self.journal_writer.write_additions('release', releases) | |||||
release_ids = set(release['id'] for release in releases) | release_ids = set(release['id'] for release in releases) | ||||
releases_missing = set(self.release_missing(release_ids, | releases_missing = set(self.release_missing(release_ids, | ||||
db=db, cur=cur)) | db=db, cur=cur)) | ||||
if not releases_missing: | if not releases_missing: | ||||
return summary | return summary | ||||
db.mktemp_release(cur) | db.mktemp_release(cur) | ||||
releases_missing = list(releases_missing) | releases_missing = list(releases_missing) | ||||
releases_filtered = ( | releases_filtered = [ | ||||
converters.release_to_db(release) for release in releases | release for release in releases | ||||
if release['id'] in releases_missing | if release['id'] in releases_missing | ||||
) | ] | ||||
if self.journal_writer: | |||||
self.journal_writer.write_additions('release', releases_filtered) | |||||
releases_filtered = map(converters.release_to_db, releases_filtered) | |||||
db.copy_to(releases_filtered, 'tmp_release', db.release_add_cols, | db.copy_to(releases_filtered, 'tmp_release', db.release_add_cols, | ||||
cur) | cur) | ||||
db.release_add_from_temp(cur) | db.release_add_from_temp(cur) | ||||
return {'release:add': len(releases_missing)} | return {'release:add': len(releases_missing)} | ||||
▲ Show 20 Lines • Show All 1,019 Lines • Show Last 20 Lines |