Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/storage.py
Show First 20 Lines • Show All 198 Lines • ▼ Show 20 Lines | def _content_add_metadata(self, db, cur, content): | ||||
raise HashCollision( | raise HashCollision( | ||||
hash_name, hash_id, collision_contents_hashes | hash_name, hash_id, collision_contents_hashes | ||||
) from None | ) from None | ||||
else: | else: | ||||
raise | raise | ||||
@timed | @timed | ||||
@process_metrics | @process_metrics | ||||
def content_add(self, content: Iterable[Content]) -> Dict: | def content_add(self, content: List[Content]) -> Dict: | ||||
ctime = now() | ctime = now() | ||||
contents = [attr.evolve(c, ctime=ctime) for c in content] | contents = [attr.evolve(c, ctime=ctime) for c in content] | ||||
objstorage_summary = self.objstorage.content_add(contents) | objstorage_summary = self.objstorage.content_add(contents) | ||||
with self.db() as db: | with self.db() as db: | ||||
with db.transaction() as cur: | with db.transaction() as cur: | ||||
Show All 26 Lines | def content_update(self, content, keys=[], db=None, cur=None): | ||||
select_keys = list(set(db.content_get_metadata_keys).union(set(keys))) | select_keys = list(set(db.content_get_metadata_keys).union(set(keys))) | ||||
with convert_validation_exceptions(): | with convert_validation_exceptions(): | ||||
db.copy_to(content, "tmp_content", select_keys, cur) | db.copy_to(content, "tmp_content", select_keys, cur) | ||||
db.content_update_from_temp(keys_to_update=keys, cur=cur) | db.content_update_from_temp(keys_to_update=keys, cur=cur) | ||||
@timed | @timed | ||||
@process_metrics | @process_metrics | ||||
@db_transaction() | @db_transaction() | ||||
def content_add_metadata( | def content_add_metadata(self, content: List[Content], db=None, cur=None) -> Dict: | ||||
self, content: Iterable[Content], db=None, cur=None | |||||
) -> Dict: | |||||
contents = list(content) | |||||
missing = self.content_missing( | missing = self.content_missing( | ||||
(c.to_dict() for c in contents), key_hash="sha1_git", db=db, cur=cur, | (c.to_dict() for c in content), key_hash="sha1_git", db=db, cur=cur, | ||||
) | ) | ||||
contents = [c for c in contents if c.sha1_git in missing] | contents = [c for c in content if c.sha1_git in missing] | ||||
self.journal_writer.content_add_metadata(contents) | self.journal_writer.content_add_metadata(contents) | ||||
self._content_add_metadata(db, cur, contents) | self._content_add_metadata(db, cur, contents) | ||||
return { | return { | ||||
"content:add": len(contents), | "content:add": len(contents), | ||||
} | } | ||||
▲ Show 20 Lines • Show All 122 Lines • ▼ Show 20 Lines | def _skipped_content_normalize(d): | ||||
if d.get("status") is None: | if d.get("status") is None: | ||||
d["status"] = "absent" | d["status"] = "absent" | ||||
if d.get("length") is None: | if d.get("length") is None: | ||||
d["length"] = -1 | d["length"] = -1 | ||||
return d | return d | ||||
def _skipped_content_add_metadata(self, db, cur, content: Iterable[SkippedContent]): | def _skipped_content_add_metadata(self, db, cur, content: List[SkippedContent]): | ||||
origin_ids = db.origin_id_get_by_url([cont.origin for cont in content], cur=cur) | origin_ids = db.origin_id_get_by_url([cont.origin for cont in content], cur=cur) | ||||
content = [ | content = [ | ||||
attr.evolve(c, origin=origin_id) | attr.evolve(c, origin=origin_id) | ||||
for (c, origin_id) in zip(content, origin_ids) | for (c, origin_id) in zip(content, origin_ids) | ||||
] | ] | ||||
db.mktemp("skipped_content", cur) | db.mktemp("skipped_content", cur) | ||||
db.copy_to( | db.copy_to( | ||||
[c.to_dict() for c in content], | [c.to_dict() for c in content], | ||||
"tmp_skipped_content", | "tmp_skipped_content", | ||||
db.skipped_content_keys, | db.skipped_content_keys, | ||||
cur, | cur, | ||||
) | ) | ||||
# move metadata in place | # move metadata in place | ||||
db.skipped_content_add_from_temp(cur) | db.skipped_content_add_from_temp(cur) | ||||
@timed | @timed | ||||
@process_metrics | @process_metrics | ||||
@db_transaction() | @db_transaction() | ||||
def skipped_content_add( | def skipped_content_add( | ||||
self, content: Iterable[SkippedContent], db=None, cur=None | self, content: List[SkippedContent], db=None, cur=None | ||||
) -> Dict: | ) -> Dict: | ||||
ctime = now() | ctime = now() | ||||
content = [attr.evolve(c, ctime=ctime) for c in content] | content = [attr.evolve(c, ctime=ctime) for c in content] | ||||
missing_contents = self.skipped_content_missing( | missing_contents = self.skipped_content_missing( | ||||
(c.to_dict() for c in content), db=db, cur=cur, | (c.to_dict() for c in content), db=db, cur=cur, | ||||
) | ) | ||||
content = [ | content = [ | ||||
Show All 20 Lines | class Storage: | ||||
def skipped_content_missing(self, contents, db=None, cur=None): | def skipped_content_missing(self, contents, db=None, cur=None): | ||||
contents = list(contents) | contents = list(contents) | ||||
for content in db.skipped_content_missing(contents, cur): | for content in db.skipped_content_missing(contents, cur): | ||||
yield dict(zip(db.content_hash_keys, content)) | yield dict(zip(db.content_hash_keys, content)) | ||||
@timed | @timed | ||||
@process_metrics | @process_metrics | ||||
@db_transaction() | @db_transaction() | ||||
def directory_add( | def directory_add(self, directories: List[Directory], db=None, cur=None) -> Dict: | ||||
self, directories: Iterable[Directory], db=None, cur=None | |||||
) -> Dict: | |||||
directories = list(directories) | |||||
summary = {"directory:add": 0} | summary = {"directory:add": 0} | ||||
dirs = set() | dirs = set() | ||||
dir_entries: Dict[str, defaultdict] = { | dir_entries: Dict[str, defaultdict] = { | ||||
"file": defaultdict(list), | "file": defaultdict(list), | ||||
"dir": defaultdict(list), | "dir": defaultdict(list), | ||||
"rev": defaultdict(list), | "rev": defaultdict(list), | ||||
} | } | ||||
▲ Show 20 Lines • Show All 69 Lines • ▼ Show 20 Lines | class Storage: | ||||
@timed | @timed | ||||
@db_transaction() | @db_transaction() | ||||
def directory_get_random(self, db=None, cur=None): | def directory_get_random(self, db=None, cur=None): | ||||
return db.directory_get_random(cur) | return db.directory_get_random(cur) | ||||
@timed | @timed | ||||
@process_metrics | @process_metrics | ||||
@db_transaction() | @db_transaction() | ||||
def revision_add(self, revisions: Iterable[Revision], db=None, cur=None) -> Dict: | def revision_add(self, revisions: List[Revision], db=None, cur=None) -> Dict: | ||||
revisions = list(revisions) | |||||
summary = {"revision:add": 0} | summary = {"revision:add": 0} | ||||
revisions_missing = set( | revisions_missing = set( | ||||
self.revision_missing( | self.revision_missing( | ||||
set(revision.id for revision in revisions), db=db, cur=cur | set(revision.id for revision in revisions), db=db, cur=cur | ||||
) | ) | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 70 Lines • ▼ Show 20 Lines | class Storage: | ||||
@timed | @timed | ||||
@db_transaction() | @db_transaction() | ||||
def revision_get_random(self, db=None, cur=None): | def revision_get_random(self, db=None, cur=None): | ||||
return db.revision_get_random(cur) | return db.revision_get_random(cur) | ||||
@timed | @timed | ||||
@process_metrics | @process_metrics | ||||
@db_transaction() | @db_transaction() | ||||
def release_add(self, releases: Iterable[Release], db=None, cur=None) -> Dict: | def release_add(self, releases: List[Release], db=None, cur=None) -> Dict: | ||||
releases = list(releases) | |||||
summary = {"release:add": 0} | summary = {"release:add": 0} | ||||
release_ids = set(release.id for release in releases) | release_ids = set(release.id for release in releases) | ||||
releases_missing = set(self.release_missing(release_ids, db=db, cur=cur)) | releases_missing = set(self.release_missing(release_ids, db=db, cur=cur)) | ||||
if not releases_missing: | if not releases_missing: | ||||
return summary | return summary | ||||
Show All 33 Lines | class Storage: | ||||
@timed | @timed | ||||
@db_transaction() | @db_transaction() | ||||
def release_get_random(self, db=None, cur=None): | def release_get_random(self, db=None, cur=None): | ||||
return db.release_get_random(cur) | return db.release_get_random(cur) | ||||
@timed | @timed | ||||
@process_metrics | @process_metrics | ||||
@db_transaction() | @db_transaction() | ||||
def snapshot_add(self, snapshots: Iterable[Snapshot], db=None, cur=None) -> Dict: | def snapshot_add(self, snapshots: List[Snapshot], db=None, cur=None) -> Dict: | ||||
created_temp_table = False | created_temp_table = False | ||||
count = 0 | count = 0 | ||||
for snapshot in snapshots: | for snapshot in snapshots: | ||||
if not db.snapshot_exists(snapshot.id, cur): | if not db.snapshot_exists(snapshot.id, cur): | ||||
if not created_temp_table: | if not created_temp_table: | ||||
db.mktemp_snapshot_branch(cur) | db.mktemp_snapshot_branch(cur) | ||||
created_temp_table = True | created_temp_table = True | ||||
▲ Show 20 Lines • Show All 103 Lines • ▼ Show 20 Lines | class Storage: | ||||
@timed | @timed | ||||
@db_transaction() | @db_transaction() | ||||
def snapshot_get_random(self, db=None, cur=None): | def snapshot_get_random(self, db=None, cur=None): | ||||
return db.snapshot_get_random(cur) | return db.snapshot_get_random(cur) | ||||
@timed | @timed | ||||
@db_transaction() | @db_transaction() | ||||
def origin_visit_add( | def origin_visit_add( | ||||
self, visits: Iterable[OriginVisit], db=None, cur=None | self, visits: List[OriginVisit], db=None, cur=None | ||||
) -> Iterable[OriginVisit]: | ) -> Iterable[OriginVisit]: | ||||
for visit in visits: | for visit in visits: | ||||
origin = self.origin_get([visit.origin], db=db, cur=cur)[0] | origin = self.origin_get([visit.origin], db=db, cur=cur)[0] | ||||
if not origin: # Cannot add a visit without an origin | if not origin: # Cannot add a visit without an origin | ||||
raise StorageArgumentException("Unknown origin %s", visit.origin) | raise StorageArgumentException("Unknown origin %s", visit.origin) | ||||
all_visits = [] | all_visits = [] | ||||
nb_visits = 0 | nb_visits = 0 | ||||
Show All 31 Lines | ) -> None: | ||||
db.origin_visit_status_add(visit_status, cur=cur) | db.origin_visit_status_add(visit_status, cur=cur) | ||||
send_metric( | send_metric( | ||||
"origin_visit_status:add", count=1, method_name="origin_visit_status" | "origin_visit_status:add", count=1, method_name="origin_visit_status" | ||||
) | ) | ||||
@timed | @timed | ||||
@db_transaction() | @db_transaction() | ||||
def origin_visit_status_add( | def origin_visit_status_add( | ||||
self, visit_statuses: Iterable[OriginVisitStatus], db=None, cur=None, | self, visit_statuses: List[OriginVisitStatus], db=None, cur=None, | ||||
) -> None: | ) -> None: | ||||
# First round to check existence (fail early if any is ko) | # First round to check existence (fail early if any is ko) | ||||
for visit_status in visit_statuses: | for visit_status in visit_statuses: | ||||
origin_url = self.origin_get([visit_status.origin], db=db, cur=cur)[0] | origin_url = self.origin_get([visit_status.origin], db=db, cur=cur)[0] | ||||
if not origin_url: | if not origin_url: | ||||
raise StorageArgumentException(f"Unknown origin {visit_status.origin}") | raise StorageArgumentException(f"Unknown origin {visit_status.origin}") | ||||
for visit_status in visit_statuses: | for visit_status in visit_statuses: | ||||
▲ Show 20 Lines • Show All 131 Lines • ▼ Show 20 Lines | def object_find_by_sha1_git(self, ids, db=None, cur=None): | ||||
dict(zip(db.object_find_by_sha1_git_cols, retval)) | dict(zip(db.object_find_by_sha1_git_cols, retval)) | ||||
) | ) | ||||
return ret | return ret | ||||
@timed | @timed | ||||
@db_transaction(statement_timeout=500) | @db_transaction(statement_timeout=500) | ||||
def origin_get( | def origin_get( | ||||
self, origins: Iterable[str], db=None, cur=None | self, origins: List[str], db=None, cur=None | ||||
) -> Iterable[Optional[Origin]]: | ) -> Iterable[Optional[Origin]]: | ||||
origin_urls = list(origins) | rows = db.origin_get_by_url(origins, cur) | ||||
rows = db.origin_get_by_url(origin_urls, cur) | |||||
result: List[Optional[Origin]] = [] | result: List[Optional[Origin]] = [] | ||||
for row in rows: | for row in rows: | ||||
origin_d = dict(zip(db.origin_cols, row)) | origin_d = dict(zip(db.origin_cols, row)) | ||||
url = origin_d["url"] | url = origin_d["url"] | ||||
result.append(None if url is None else Origin(url=url)) | result.append(None if url is None else Origin(url=url)) | ||||
return result | return result | ||||
@timed | @timed | ||||
▲ Show 20 Lines • Show All 58 Lines • ▼ Show 20 Lines | class Storage: | ||||
def origin_count( | def origin_count( | ||||
self, url_pattern, regexp=False, with_visit=False, db=None, cur=None | self, url_pattern, regexp=False, with_visit=False, db=None, cur=None | ||||
): | ): | ||||
return db.origin_count(url_pattern, regexp, with_visit, cur) | return db.origin_count(url_pattern, regexp, with_visit, cur) | ||||
@timed | @timed | ||||
@process_metrics | @process_metrics | ||||
@db_transaction() | @db_transaction() | ||||
def origin_add( | def origin_add(self, origins: List[Origin], db=None, cur=None) -> Dict[str, int]: | ||||
self, origins: Iterable[Origin], db=None, cur=None | |||||
) -> Dict[str, int]: | |||||
urls = [o.url for o in origins] | urls = [o.url for o in origins] | ||||
known_origins = set(url for (url,) in db.origin_get_by_url(urls, cur)) | known_origins = set(url for (url,) in db.origin_get_by_url(urls, cur)) | ||||
# use lists here to keep origins sorted; some tests depend on this | # use lists here to keep origins sorted; some tests depend on this | ||||
to_add = [url for url in urls if url not in known_origins] | to_add = [url for url in urls if url not in known_origins] | ||||
self.journal_writer.origin_add([Origin(url=url) for url in to_add]) | self.journal_writer.origin_add([Origin(url=url) for url in to_add]) | ||||
added = 0 | added = 0 | ||||
for url in to_add: | for url in to_add: | ||||
Show All 23 Lines | def refresh_stat_counters(self, db=None, cur=None): | ||||
"snapshot", | "snapshot", | ||||
] | ] | ||||
for key in keys: | for key in keys: | ||||
cur.execute("select * from swh_update_counter(%s)", (key,)) | cur.execute("select * from swh_update_counter(%s)", (key,)) | ||||
@db_transaction() | @db_transaction() | ||||
def raw_extrinsic_metadata_add( | def raw_extrinsic_metadata_add( | ||||
self, metadata: Iterable[RawExtrinsicMetadata], db, cur, | self, metadata: List[RawExtrinsicMetadata], db, cur, | ||||
) -> None: | ) -> None: | ||||
metadata = list(metadata) | metadata = list(metadata) | ||||
self.journal_writer.raw_extrinsic_metadata_add(metadata) | self.journal_writer.raw_extrinsic_metadata_add(metadata) | ||||
counter = Counter[MetadataTargetType]() | counter = Counter[MetadataTargetType]() | ||||
for metadata_entry in metadata: | for metadata_entry in metadata: | ||||
authority_id = self._get_authority_id(metadata_entry.authority, db, cur) | authority_id = self._get_authority_id(metadata_entry.authority, db, cur) | ||||
fetcher_id = self._get_fetcher_id(metadata_entry.fetcher, db, cur) | fetcher_id = self._get_fetcher_id(metadata_entry.fetcher, db, cur) | ||||
▲ Show 20 Lines • Show All 123 Lines • ▼ Show 20 Lines | ) -> Dict[str, Union[Optional[bytes], List[RawExtrinsicMetadata]]]: | ||||
return { | return { | ||||
"next_page_token": next_page_token, | "next_page_token": next_page_token, | ||||
"results": results, | "results": results, | ||||
} | } | ||||
@timed | @timed | ||||
@db_transaction() | @db_transaction() | ||||
def metadata_fetcher_add( | def metadata_fetcher_add( | ||||
self, fetchers: Iterable[MetadataFetcher], db=None, cur=None | self, fetchers: List[MetadataFetcher], db=None, cur=None | ||||
) -> None: | ) -> None: | ||||
fetchers = list(fetchers) | fetchers = list(fetchers) | ||||
self.journal_writer.metadata_fetcher_add(fetchers) | self.journal_writer.metadata_fetcher_add(fetchers) | ||||
count = 0 | count = 0 | ||||
for fetcher in fetchers: | for fetcher in fetchers: | ||||
if fetcher.metadata is None: | if fetcher.metadata is None: | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
"MetadataFetcher.metadata may not be None in metadata_fetcher_add." | "MetadataFetcher.metadata may not be None in metadata_fetcher_add." | ||||
Show All 12 Lines | ) -> Optional[MetadataFetcher]: | ||||
row = db.metadata_fetcher_get(name, version, cur=cur) | row = db.metadata_fetcher_get(name, version, cur=cur) | ||||
if not row: | if not row: | ||||
return None | return None | ||||
return MetadataFetcher.from_dict(dict(zip(db.metadata_fetcher_cols, row))) | return MetadataFetcher.from_dict(dict(zip(db.metadata_fetcher_cols, row))) | ||||
@timed | @timed | ||||
@db_transaction() | @db_transaction() | ||||
def metadata_authority_add( | def metadata_authority_add( | ||||
self, authorities: Iterable[MetadataAuthority], db=None, cur=None | self, authorities: List[MetadataAuthority], db=None, cur=None | ||||
) -> None: | ) -> None: | ||||
authorities = list(authorities) | authorities = list(authorities) | ||||
self.journal_writer.metadata_authority_add(authorities) | self.journal_writer.metadata_authority_add(authorities) | ||||
count = 0 | count = 0 | ||||
for authority in authorities: | for authority in authorities: | ||||
if authority.metadata is None: | if authority.metadata is None: | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
"MetadataAuthority.metadata may not be None in " | "MetadataAuthority.metadata may not be None in " | ||||
Show All 24 Lines | |||||
@timed | @timed | ||||
def diff_revisions(self, from_rev, to_rev, track_renaming=False): | def diff_revisions(self, from_rev, to_rev, track_renaming=False): | ||||
return diff.diff_revisions(self, from_rev, to_rev, track_renaming) | return diff.diff_revisions(self, from_rev, to_rev, track_renaming) | ||||
@timed | @timed | ||||
def diff_revision(self, revision, track_renaming=False): | def diff_revision(self, revision, track_renaming=False): | ||||
return diff.diff_revision(self, revision, track_renaming) | return diff.diff_revision(self, revision, track_renaming) | ||||
def clear_buffers(self, object_types: Optional[Iterable[str]] = None) -> None: | def clear_buffers(self, object_types: Optional[List[str]] = None) -> None: | ||||
"""Do nothing | """Do nothing | ||||
""" | """ | ||||
return None | return None | ||||
def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: | def flush(self, object_types: Optional[List[str]] = None) -> Dict: | ||||
return {} | return {} | ||||
def _get_authority_id(self, authority: MetadataAuthority, db, cur): | def _get_authority_id(self, authority: MetadataAuthority, db, cur): | ||||
authority_id = db.metadata_authority_get_id( | authority_id = db.metadata_authority_get_id( | ||||
authority.type.value, authority.url, cur | authority.type.value, authority.url, cur | ||||
) | ) | ||||
if not authority_id: | if not authority_id: | ||||
raise StorageArgumentException(f"Unknown authority {authority}") | raise StorageArgumentException(f"Unknown authority {authority}") | ||||
return authority_id | return authority_id | ||||
def _get_fetcher_id(self, fetcher: MetadataFetcher, db, cur): | def _get_fetcher_id(self, fetcher: MetadataFetcher, db, cur): | ||||
fetcher_id = db.metadata_fetcher_get_id(fetcher.name, fetcher.version, cur) | fetcher_id = db.metadata_fetcher_get_id(fetcher.name, fetcher.version, cur) | ||||
if not fetcher_id: | if not fetcher_id: | ||||
raise StorageArgumentException(f"Unknown fetcher {fetcher}") | raise StorageArgumentException(f"Unknown fetcher {fetcher}") | ||||
return fetcher_id | return fetcher_id |