Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/storage.py
# Copyright (C) 2015-2020 The Software Heritage developers | # Copyright (C) 2015-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import contextlib | import contextlib | ||||
import datetime | import datetime | ||||
import itertools | import itertools | ||||
import json | import json | ||||
from collections import defaultdict | from collections import defaultdict | ||||
from concurrent.futures import ThreadPoolExecutor | |||||
from contextlib import contextmanager | from contextlib import contextmanager | ||||
from typing import Any, Dict, Iterable, List, Optional, Union | from typing import Any, Dict, Iterable, List, Optional, Union | ||||
import attr | import attr | ||||
import dateutil.parser | import dateutil.parser | ||||
import psycopg2 | import psycopg2 | ||||
import psycopg2.pool | import psycopg2.pool | ||||
import psycopg2.errors | import psycopg2.errors | ||||
▲ Show 20 Lines • Show All 159 Lines • ▼ Show 20 Lines | def _content_add_metadata(self, db, cur, content): | ||||
raise HashCollision( | raise HashCollision( | ||||
hash_name, hash_id, collision_contents_hashes | hash_name, hash_id, collision_contents_hashes | ||||
) from None | ) from None | ||||
else: | else: | ||||
raise | raise | ||||
@timed | @timed | ||||
@process_metrics | @process_metrics | ||||
@db_transaction() | |||||
def content_add( | def content_add( | ||||
self, content: Iterable[Content], db=None, cur=None) -> Dict: | self, content: Iterable[Content]) -> Dict: | ||||
now = datetime.datetime.now(tz=datetime.timezone.utc) | now = datetime.datetime.now(tz=datetime.timezone.utc) | ||||
contents = [attr.evolve(c, ctime=now) for c in content] | contents = [attr.evolve(c, ctime=now) for c in content] | ||||
objstorage_summary = self.objstorage.content_add(contents) | |||||
with self.db() as db: | |||||
with db.transaction() as cur: | |||||
missing = list(self.content_missing( | missing = list(self.content_missing( | ||||
map(Content.to_dict, contents), key_hash='sha1_git', | map(Content.to_dict, contents), key_hash='sha1_git', | ||||
db=db, cur=cur, | db=db, cur=cur, | ||||
)) | )) | ||||
contents = [c for c in contents if c.sha1_git in missing] | contents = [c for c in contents if c.sha1_git in missing] | ||||
self.journal_writer.content_add(contents) | self.journal_writer.content_add(contents) | ||||
def add_to_objstorage(): | |||||
"""Add to objstorage the new missing_content | |||||
Returns: | |||||
Sum of all the content's data length pushed to the | |||||
objstorage. Content present twice is only sent once. | |||||
""" | |||||
summary = self.objstorage.content_add(contents) | |||||
return summary['content:add:bytes'] | |||||
with ThreadPoolExecutor(max_workers=1) as executor: | |||||
added_to_objstorage = executor.submit(add_to_objstorage) | |||||
self._content_add_metadata(db, cur, contents) | self._content_add_metadata(db, cur, contents) | ||||
# Wait for objstorage addition before returning from the | |||||
# transaction, bubbling up any exception | |||||
content_bytes_added = added_to_objstorage.result() | |||||
return { | return { | ||||
'content:add': len(contents), | 'content:add': len(contents), | ||||
'content:add:bytes': content_bytes_added, | 'content:add:bytes': objstorage_summary['content:add:bytes'], | ||||
} | } | ||||
@timed | @timed | ||||
@db_transaction() | @db_transaction() | ||||
def content_update(self, content, keys=[], db=None, cur=None): | def content_update(self, content, keys=[], db=None, cur=None): | ||||
# TODO: Add a check on input keys. How to properly implement | # TODO: Add a check on input keys. How to properly implement | ||||
# this? We don't know yet the new columns. | # this? We don't know yet the new columns. | ||||
self.journal_writer.content_update(content) | self.journal_writer.content_update(content) | ||||
▲ Show 20 Lines • Show All 927 Lines • Show Last 20 Lines |