diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py --- a/swh/storage/in_memory.py +++ b/swh/storage/in_memory.py @@ -74,11 +74,6 @@ return True def _content_add(self, contents, with_data): - if self.journal_writer: - for content in contents: - content = attr.evolve(content, data=None) - self.journal_writer.write_addition('content', content) - content_with_data = [] content_without_data = [] for content in contents: @@ -87,9 +82,18 @@ if content.length is None: content.length = -1 if content.status == 'visible': - content_with_data.append(content) + if self._content_key(content) not in self._contents: + content_with_data.append(content) elif content.status == 'absent': - content_without_data.append(content) + if self._content_key(content) not in self._skipped_contents: + content_without_data.append(content) + + if self.journal_writer: + for content in content_with_data: + content = attr.evolve(content, data=None) + self.journal_writer.write_addition('content', content) + for content in content_without_data: + self.journal_writer.write_addition('content', content) count_content_added, count_content_bytes_added = \ self._content_add_present(content_with_data, with_data) @@ -440,7 +444,10 @@ """ if self.journal_writer: - self.journal_writer.write_additions('directory', directories) + self.journal_writer.write_additions( + 'directory', + (dir_ for dir_ in directories + if dir_['id'] not in self._directories)) directories = [Directory.from_dict(d) for d in directories] @@ -592,7 +599,10 @@ """ if self.journal_writer: - self.journal_writer.write_additions('revision', revisions) + self.journal_writer.write_additions( + 'revision', + (rev for rev in revisions + if rev['id'] not in self._revisions)) revisions = [Revision.from_dict(rev) for rev in revisions] @@ -695,7 +705,10 @@ """ if self.journal_writer: - self.journal_writer.write_additions('release', releases) + self.journal_writer.write_additions( + 'release', + (rel for rel in releases + if rel['id'] not in self._releases)) releases = [Release.from_dict(rel) for rel in releases] diff --git a/swh/storage/storage.py b/swh/storage/storage.py --- a/swh/storage/storage.py +++ b/swh/storage/storage.py @@ -258,13 +258,6 @@ for item in content: item['ctime'] = now - if self.journal_writer: - for item in content: - if 'data' in item: - item = item.copy() - del item['data'] - self.journal_writer.write_addition('content', item) - content = [self._normalize_content(c) for c in content] for c in content: self._validate_content(c) @@ -272,6 +265,15 @@ (content_with_data, content_without_data, summary) = \ self._filter_new_content(content, db, cur) + if self.journal_writer: + for item in content_with_data: + if 'data' in item: + item = item.copy() + del item['data'] + self.journal_writer.write_addition('content', item) + for item in content_without_data: + self.journal_writer.write_addition('content', item) + def add_to_objstorage(): """Add to objstorage the new missing_content @@ -366,10 +368,6 @@ content:add: New contents added skipped_content:add: New skipped contents (no data) added """ - if self.journal_writer: - for item in content: - assert 'data' not in content - self.journal_writer.write_addition('content', item) content = [self._normalize_content(c) for c in content] for c in content: @@ -378,6 +376,12 @@ (content_with_data, content_without_data, summary) = \ self._filter_new_content(content, db, cur) + if self.journal_writer: + for item in itertools.chain(content_with_data, + content_without_data): + assert 'data' not in content + self.journal_writer.write_addition('content', item) + self._content_add_metadata( db, cur, content_with_data, content_without_data) @@ -596,8 +600,6 @@ """ summary = {'directory:add': 0} - if self.journal_writer: - self.journal_writer.write_additions('directory', directories) dirs = set() dir_entries = { @@ -622,6 +624,12 @@ if not dirs_missing: return summary + if self.journal_writer: + self.journal_writer.write_additions( + 'directory', + (dir_ for dir_ in directories + if dir_['id'] in dirs_missing)) + # Copy directory ids dirs_missing_dict = ({'id': dir} for dir in dirs_missing) db.mktemp('directory', cur) @@ -744,9 +752,6 @@ """ summary = {'revision:add': 0} - if self.journal_writer: - self.journal_writer.write_additions('revision', revisions) - revisions_missing = set(self.revision_missing( set(revision['id'] for revision in revisions), db=db, cur=cur)) @@ -756,9 +761,14 @@ db.mktemp_revision(cur) - revisions_filtered = ( - converters.revision_to_db(revision) for revision in revisions - if revision['id'] in revisions_missing) + revisions_filtered = [ + revision for revision in revisions + if revision['id'] in revisions_missing] + + if self.journal_writer: + self.journal_writer.write_additions('revision', revisions_filtered) + + revisions_filtered = map(converters.revision_to_db, revisions_filtered) parents_filtered = [] @@ -877,9 +887,6 @@ """ summary = {'release:add': 0} - if self.journal_writer: - self.journal_writer.write_additions('release', releases) - release_ids = set(release['id'] for release in releases) releases_missing = set(self.release_missing(release_ids, db=db, cur=cur)) @@ -891,10 +898,15 @@ releases_missing = list(releases_missing) - releases_filtered = ( - converters.release_to_db(release) for release in releases + releases_filtered = [ + release for release in releases if release['id'] in releases_missing - ) + ] + + if self.journal_writer: + self.journal_writer.write_additions('release', releases_filtered) + + releases_filtered = map(converters.release_to_db, releases_filtered) db.copy_to(releases_filtered, 'tmp_release', db.release_add_cols, cur) diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py --- a/swh/storage/tests/test_storage.py +++ b/swh/storage/tests/test_storage.py @@ -662,13 +662,14 @@ 'skipped_content:add': 0 }) - def test_content_add_again(self): + def test_content_add_twice(self): actual_result = self.storage.content_add([self.cont]) self.assertEqual(actual_result, { 'content:add': 1, 'content:add:bytes': self.cont['length'], 'skipped_content:add': 0 }) + self.assertEqual(len(self.journal_writer.objects), 1) actual_result = self.storage.content_add([self.cont, self.cont2]) self.assertEqual(actual_result, { @@ -676,6 +677,7 @@ 'content:add:bytes': self.cont2['length'], 'skipped_content:add': 0 }) + self.assertEqual(len(self.journal_writer.objects), 2) self.assertEqual(len(self.storage.content_find(self.cont)), 1) self.assertEqual(len(self.storage.content_find(self.cont2)), 1) @@ -1029,6 +1031,19 @@ self.assertEqual(cm.exception.pgcode, psycopg2.errorcodes.NOT_NULL_VIOLATION) + def test_directory_add_twice(self): + actual_result = self.storage.directory_add([self.dir]) + self.assertEqual(actual_result, {'directory:add': 1}) + + self.assertEqual(list(self.journal_writer.objects), + [('directory', self.dir)]) + + actual_result = self.storage.directory_add([self.dir]) + self.assertEqual(actual_result, {'directory:add': 0}) + + self.assertEqual(list(self.journal_writer.objects), + [('directory', self.dir)]) + def test_directory_get_recursive(self): init_missing = list(self.storage.directory_missing([self.dir['id']])) self.assertEqual([self.dir['id']], init_missing) @@ -1177,10 +1192,6 @@ self.assertEqual(list(self.journal_writer.objects), [('revision', self.revision)]) - # already there so nothing added - actual_result = self.storage.revision_add([self.revision]) - self.assertEqual(actual_result, {'revision:add': 0}) - def test_revision_add_validation(self): rev = copy.deepcopy(self.revision) rev['date']['offset'] = 2**16 @@ -1215,6 +1226,21 @@ self.assertEqual(cm.exception.pgcode, psycopg2.errorcodes.INVALID_TEXT_REPRESENTATION) + def test_revision_add_twice(self): + actual_result = self.storage.revision_add([self.revision]) + self.assertEqual(actual_result, {'revision:add': 1}) + + self.assertEqual(list(self.journal_writer.objects), + [('revision', self.revision)]) + + actual_result = self.storage.revision_add( + [self.revision, self.revision2]) + self.assertEqual(actual_result, {'revision:add': 1}) + + self.assertEqual(list(self.journal_writer.objects), + [('revision', self.revision), + ('revision', self.revision2)]) + def test_revision_add_name_clash(self): revision1 = self.revision.copy() revision2 = self.revision2.copy() @@ -1354,10 +1380,6 @@ [('release', self.release), ('release', self.release2)]) - # already present so nothing added - actual_result = self.storage.release_add([self.release, self.release2]) - self.assertEqual(actual_result, {'release:add': 0}) - def test_release_add_no_author_date(self): release = self.release.copy() release['author'] = None @@ -1395,6 +1417,20 @@ self.assertEqual(cm.exception.pgcode, psycopg2.errorcodes.CHECK_VIOLATION) + def test_release_add_twice(self): + actual_result = self.storage.release_add([self.release]) + self.assertEqual(actual_result, {'release:add': 1}) + + self.assertEqual(list(self.journal_writer.objects), + [('release', self.release)]) + + actual_result = self.storage.release_add([self.release, self.release2]) + self.assertEqual(actual_result, {'release:add': 1}) + + self.assertEqual(list(self.journal_writer.objects), + [('release', self.release), + ('release', self.release2)]) + def test_release_add_name_clash(self): release1 = self.release.copy() release2 = self.release2.copy() @@ -1478,8 +1514,17 @@ def test_origin_add_twice(self): add1 = self.storage.origin_add([self.origin, self.origin2]) + + self.assertEqual(list(self.journal_writer.objects), + [('origin', self.origin), + ('origin', self.origin2)]) + add2 = self.storage.origin_add([self.origin, self.origin2]) + self.assertEqual(list(self.journal_writer.objects), + [('origin', self.origin), + ('origin', self.origin2)]) + self.assertEqual(add1, add2) def test_origin_add_validation(self): @@ -2505,6 +2550,20 @@ {**self.snapshot, 'next_branch': None}, self.storage.snapshot_get(self.snapshot['id'])) + def test_snapshot_add_twice(self): + actual_result = self.storage.snapshot_add([self.empty_snapshot]) + self.assertEqual(actual_result, {'snapshot:add': 1}) + + self.assertEqual(list(self.journal_writer.objects), + [('snapshot', self.empty_snapshot)]) + + actual_result = self.storage.snapshot_add([self.snapshot]) + self.assertEqual(actual_result, {'snapshot:add': 1}) + + self.assertEqual(list(self.journal_writer.objects), + [('snapshot', self.empty_snapshot), + ('snapshot', self.snapshot)]) + def test_snapshot_add_validation(self): snap = copy.deepcopy(self.snapshot) snap['branches'][b'foo'] = {'target_type': 'revision'} @@ -2764,7 +2823,7 @@ self.assertEqual(list(self.journal_writer.objects), [ ('snapshot', self.snapshot)]) - def test_snapshot_add_twice(self): + def test_snapshot_add_twice__by_origin_visit(self): origin_id = self.storage.origin_add_one(self.origin) origin_visit1 = self.storage.origin_visit_add(origin_id, self.date_visit1)