Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/storage.py
| Show First 20 Lines • Show All 185 Lines • ▼ Show 20 Lines | def _content_add_metadata(self, db, cur, | ||||
| """ | """ | ||||
| if content_with_data: | if content_with_data: | ||||
| # create temporary table for metadata injection | # create temporary table for metadata injection | ||||
| db.mktemp('content', cur) | db.mktemp('content', cur) | ||||
| db.copy_to(content_with_data, 'tmp_content', | db.copy_to(content_with_data, 'tmp_content', | ||||
| db.content_add_keys, cur) | db.content_add_keys, cur) | ||||
| # Create a read/write dependency between transactions that would | |||||
| # write the same content, so that we get a SerializationFailure | |||||
| # (read/write conflict) instead of an IntegrityError (write/write | |||||
| # conflict) | |||||
| cur.execute('SELECT 1 FROM content WHERE sha1 IN %s', | |||||
| (tuple(cont['sha1'] for cont in content_with_data),)) | |||||
| list(cur) | |||||
| # move metadata in place | # move metadata in place | ||||
| try: | try: | ||||
| db.content_add_from_temp(cur) | db.content_add_from_temp(cur) | ||||
| except psycopg2.IntegrityError as e: | except psycopg2.IntegrityError as e: | ||||
| from . import HashCollision | from . import HashCollision | ||||
| if e.diag.sqlstate == '23505' and \ | if e.diag.sqlstate == '23505' and \ | ||||
| e.diag.table_name == 'content': | e.diag.table_name == 'content': | ||||
| constraint_to_hash_name = { | constraint_to_hash_name = { | ||||
| ▲ Show 20 Lines • Show All 57 Lines • ▼ Show 20 Lines | def content_add(self, content, db, cur): | ||||
| Returns: | Returns: | ||||
| Summary dict with the following key and associated values: | Summary dict with the following key and associated values: | ||||
| content:add: New contents added | content:add: New contents added | ||||
| content:add:bytes: Sum of the contents' length data | content:add:bytes: Sum of the contents' length data | ||||
| skipped_content:add: New skipped contents (no data) added | skipped_content:add: New skipped contents (no data) added | ||||
| """ | """ | ||||
| cur.execute('SET TRANSACTION ISOLATION LEVEL SERIALIZABLE') | |||||
| content = [dict(c.items()) for c in content] # semi-shallow copy | content = [dict(c.items()) for c in content] # semi-shallow copy | ||||
| now = datetime.datetime.now(tz=datetime.timezone.utc) | now = datetime.datetime.now(tz=datetime.timezone.utc) | ||||
| for item in content: | for item in content: | ||||
| item['ctime'] = now | item['ctime'] = now | ||||
| content = [self._normalize_content(c) for c in content] | content = [self._normalize_content(c) for c in content] | ||||
| for c in content: | for c in content: | ||||
| self._validate_content(c) | self._validate_content(c) | ||||
| ▲ Show 20 Lines • Show All 99 Lines • ▼ Show 20 Lines | def content_add_metadata(self, content, db, cur): | ||||
| - ctime (datetime): time of insertion in the archive | - ctime (datetime): time of insertion in the archive | ||||
| Returns: | Returns: | ||||
| Summary dict with the following key and associated values: | Summary dict with the following key and associated values: | ||||
| content:add: New contents added | content:add: New contents added | ||||
| skipped_content:add: New skipped contents (no data) added | skipped_content:add: New skipped contents (no data) added | ||||
| """ | """ | ||||
| cur.execute('SET TRANSACTION ISOLATION LEVEL SERIALIZABLE') | |||||
| content = [self._normalize_content(c) for c in content] | content = [self._normalize_content(c) for c in content] | ||||
| for c in content: | for c in content: | ||||
| self._validate_content(c) | self._validate_content(c) | ||||
| (content_with_data, content_without_data, summary) = \ | (content_with_data, content_without_data, summary) = \ | ||||
| self._filter_new_content(content, db, cur) | self._filter_new_content(content, db, cur) | ||||
| ▲ Show 20 Lines • Show All 1,437 Lines • Show Last 20 Lines | |||||