Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/storage.py
Show First 20 Lines • Show All 185 Lines • ▼ Show 20 Lines | def _content_add_metadata(self, db, cur, | ||||
""" | """ | ||||
if content_with_data: | if content_with_data: | ||||
# create temporary table for metadata injection | # create temporary table for metadata injection | ||||
db.mktemp('content', cur) | db.mktemp('content', cur) | ||||
db.copy_to(content_with_data, 'tmp_content', | db.copy_to(content_with_data, 'tmp_content', | ||||
db.content_add_keys, cur) | db.content_add_keys, cur) | ||||
# Create a read/write dependency between transactions that would | |||||
# write the same content, so that we get a SerializationFailure | |||||
# (read/write conflict) instead of an IntegrityError (write/write | |||||
# conflict) | |||||
cur.execute('SELECT 1 FROM content WHERE sha1 IN %s', | |||||
(tuple(cont['sha1'] for cont in content_with_data),)) | |||||
list(cur) | |||||
# move metadata in place | # move metadata in place | ||||
try: | try: | ||||
db.content_add_from_temp(cur) | db.content_add_from_temp(cur) | ||||
except psycopg2.IntegrityError as e: | except psycopg2.IntegrityError as e: | ||||
from . import HashCollision | from . import HashCollision | ||||
if e.diag.sqlstate == '23505' and \ | if e.diag.sqlstate == '23505' and \ | ||||
e.diag.table_name == 'content': | e.diag.table_name == 'content': | ||||
constraint_to_hash_name = { | constraint_to_hash_name = { | ||||
▲ Show 20 Lines • Show All 57 Lines • ▼ Show 20 Lines | def content_add(self, content, db, cur): | ||||
Returns: | Returns: | ||||
Summary dict with the following key and associated values: | Summary dict with the following key and associated values: | ||||
content:add: New contents added | content:add: New contents added | ||||
content:add:bytes: Sum of the contents' length data | content:add:bytes: Sum of the contents' length data | ||||
skipped_content:add: New skipped contents (no data) added | skipped_content:add: New skipped contents (no data) added | ||||
""" | """ | ||||
cur.execute('SET TRANSACTION ISOLATION LEVEL SERIALIZABLE') | |||||
content = [dict(c.items()) for c in content] # semi-shallow copy | content = [dict(c.items()) for c in content] # semi-shallow copy | ||||
now = datetime.datetime.now(tz=datetime.timezone.utc) | now = datetime.datetime.now(tz=datetime.timezone.utc) | ||||
for item in content: | for item in content: | ||||
item['ctime'] = now | item['ctime'] = now | ||||
content = [self._normalize_content(c) for c in content] | content = [self._normalize_content(c) for c in content] | ||||
for c in content: | for c in content: | ||||
self._validate_content(c) | self._validate_content(c) | ||||
▲ Show 20 Lines • Show All 99 Lines • ▼ Show 20 Lines | def content_add_metadata(self, content, db, cur): | ||||
- ctime (datetime): time of insertion in the archive | - ctime (datetime): time of insertion in the archive | ||||
Returns: | Returns: | ||||
Summary dict with the following key and associated values: | Summary dict with the following key and associated values: | ||||
content:add: New contents added | content:add: New contents added | ||||
skipped_content:add: New skipped contents (no data) added | skipped_content:add: New skipped contents (no data) added | ||||
""" | """ | ||||
cur.execute('SET TRANSACTION ISOLATION LEVEL SERIALIZABLE') | |||||
content = [self._normalize_content(c) for c in content] | content = [self._normalize_content(c) for c in content] | ||||
for c in content: | for c in content: | ||||
self._validate_content(c) | self._validate_content(c) | ||||
(content_with_data, content_without_data, summary) = \ | (content_with_data, content_without_data, summary) = \ | ||||
self._filter_new_content(content, db, cur) | self._filter_new_content(content, db, cur) | ||||
▲ Show 20 Lines • Show All 1,437 Lines • Show Last 20 Lines |