diff --git a/swh/loader/mercurial/loader.py b/swh/loader/mercurial/loader.py --- a/swh/loader/mercurial/loader.py +++ b/swh/loader/mercurial/loader.py @@ -28,10 +28,16 @@ from dateutil import parser from shutil import rmtree from tempfile import mkdtemp +from typing import Dict, Iterable, Optional import billiard from swh.model import identifiers +from swh.model.model import ( + BaseContent, Content, Directory, ObjectType, Origin, Person, + Release, Revision, RevisionType, SkippedContent, Snapshot, SnapshotBranch, + TargetType, TimestampWithTimezone +) from swh.model.hashutil import ( MultiHash, hash_to_hex, hash_to_bytehex, hash_to_bytes, DEFAULT_ALGORITHMS @@ -89,6 +95,8 @@ self.clone_timeout = self.config['clone_timeout_seconds'] self.working_directory = None self.bundle_path = None + self.heads = {} + self.releases = {} def pre_cleanup(self): """Cleanup potential dangling files from prior runs (e.g. OOM killed @@ -133,7 +141,7 @@ return b def prepare_origin_visit(self, *args, **kwargs): - self.origin = {'url': self.origin_url} + self.origin = Origin(url=self.origin_url) visit_date = self.visit_date if isinstance(visit_date, str): # visit_date can be string or datetime visit_date = parser.parse(visit_date) @@ -198,6 +206,7 @@ self.tags = [] self.releases = {} self.node_2_rev = {} + self.heads = {} directory = self.directory @@ -273,7 +282,7 @@ """Fetch the data from the data source.""" pass - def get_contents(self): + def get_contents(self) -> Iterable[BaseContent]: """Get the contents that need to be loaded.""" # NOTE: This method generates blobs twice to reduce memory usage @@ -281,7 +290,7 @@ self.file_node_to_hash = {} hash_to_info = {} self.num_contents = 0 - contents = {} + contents: Dict[bytes, BaseContent] = {} missing_contents = set() for blob, node_info in self.br.yield_all_blobs(): @@ -291,7 +300,7 @@ length = len(blob) if header['linknode'] in self.reduce_effort: - algorithms = [ALGO] + algorithms = set([ALGO]) else: algorithms = DEFAULT_ALGORITHMS h = MultiHash.from_data(blob, hash_names=algorithms) @@ -304,8 +313,13 @@ continue hash_to_info[blob_hash] = node_info - contents[blob_hash] = content - missing_contents.add(blob_hash) + if (self.max_content_size is not None + and length >= self.max_content_size): + contents[blob_hash] = SkippedContent( + status='absent', reason='Content too large', **content) + else: + contents[blob_hash] = Content( + data=blob, status='visible', **content) if file_name == b'.hgtags': # https://www.mercurial-scm.org/wiki/GitConcepts#Tag_model @@ -315,20 +329,19 @@ if contents: missing_contents = set( self.storage.content_missing( - list(contents.values()), + map(lambda c: c.to_dict(), contents.values()), key_hash=ALGO ) ) # Clusters needed blobs by file offset and then only fetches the # groups at the needed offsets. - focs = {} # "file/offset/contents" + focs: Dict[int, Dict[bytes, bytes]] = {} # "file/offset/contents" for blob_hash in missing_contents: _, file_offset, header = hash_to_info[blob_hash] focs.setdefault(file_offset, {}) focs[file_offset][header['node']] = blob_hash - hash_to_info = None for offset, node_hashes in sorted(focs.items()): for header, data, *_ in self.br.yield_group_objects( group_offset=offset @@ -338,8 +351,12 @@ blob, meta = self.br.extract_meta_from_blob(data) content = contents.pop(node_hashes[node], None) if content: - content['data'] = blob - yield content + if (self.max_content_size is not None + and len(blob) >= self.max_content_size): + yield SkippedContent.from_data( + blob, reason='Content too large') + else: + yield Content.from_data(blob) def load_directories(self): """This is where the work is done to convert manifest deltas from the @@ -384,7 +401,7 @@ self.trees.store(node, tree) yield header, tree, new_dirs - def get_directories(self): + def get_directories(self) -> Iterable[Directory]: """Compute directories to load """ @@ -393,7 +410,7 @@ for _, _, new_dirs in self.load_directories(): for d in new_dirs: self.num_directories += 1 - dirs[d['id']] = d + dirs[d['id']] = Directory.from_dict(d) missing_dirs = list(dirs.keys()) if missing_dirs: @@ -403,7 +420,7 @@ yield dirs[_id] dirs = {} - def get_revisions(self): + def get_revisions(self) -> Iterable[Revision]: """Compute revisions to load """ @@ -435,37 +452,35 @@ v = hash_to_bytehex(v) extra_meta.append([k, v]) - revision = { - 'author': author_dict, - 'date': date_dict, - 'committer': author_dict, - 'committer_date': date_dict, - 'type': 'hg', - 'directory': directory_id, - 'message': commit['message'], - 'metadata': { + parents = [] + p1 = self.node_2_rev.get(header['p1']) + p2 = self.node_2_rev.get(header['p2']) + if p1: + parents.append(p1) + if p2: + parents.append(p2) + + revision = Revision( + author=Person.from_dict(author_dict), + date=TimestampWithTimezone.from_dict(date_dict), + committer=Person.from_dict(author_dict), + committer_date=TimestampWithTimezone.from_dict(date_dict), + type=RevisionType.MERCURIAL, + directory=directory_id, + message=commit['message'], + metadata={ 'node': hash_to_hex(header['node']), 'extra_headers': [ ['time_offset_seconds', str(commit['time_offset_seconds']).encode('utf-8')], ] + extra_meta }, - 'synthetic': False, - 'parents': [] - } - - p1 = self.node_2_rev.get(header['p1']) - p2 = self.node_2_rev.get(header['p2']) - if p1: - revision['parents'].append(p1) - if p2: - revision['parents'].append(p2) - - revision['id'] = hash_to_bytes( - identifiers.revision_identifier(revision) + synthetic=False, + parents=parents, ) - self.node_2_rev[header['node']] = revision['id'] - revisions[revision['id']] = revision + + self.node_2_rev[header['node']] = revision.id + revisions[revision.id] = revision # Converts heads to use swh ids self.heads = { @@ -473,14 +488,14 @@ for branch_name, (pointer_nature, node_id) in self.heads.items() } - missing_revs = revisions.keys() + missing_revs = set(revisions.keys()) if missing_revs: missing_revs = set( - self.storage.revision_missing(list(missing_revs)) + self.storage.revision_missing(missing_revs) ) - for r in missing_revs: - yield revisions[r] + for rev in missing_revs: + yield revisions[rev] self.mnode_to_tree_id = None def _read_tag(self, tag, split_byte=b' '): @@ -488,11 +503,11 @@ name = split_byte.join(name) return node, name - def get_releases(self): + def get_releases(self) -> Iterable[Release]: """Get the releases that need to be loaded.""" self.num_releases = 0 releases = {} - missing_releases = [] + missing_releases = set() for t in self.tags: self.num_releases += 1 node, name = self._read_tag(t) @@ -508,22 +523,19 @@ (name.decode(), node)) continue tgt_rev = self.node_2_rev[node_bytes] - release = { - 'name': name, - 'target': tgt_rev, - 'target_type': 'revision', - 'message': None, - 'metadata': None, - 'synthetic': False, - 'author': {'name': None, 'email': None, 'fullname': b''}, - 'date': None - } - id_hash = hash_to_bytes( - identifiers.release_identifier(release)) - release['id'] = id_hash - missing_releases.append(id_hash) - releases[id_hash] = release - self.releases[name] = id_hash + release = Release( + name=name, + target=tgt_rev, + target_type=ObjectType.REVISION, + message=None, + metadata=None, + synthetic=False, + author=Person(name=None, email=None, fullname=b''), + date=None, + ) + missing_releases.add(release.id) + releases[release.id] = release + self.releases[name] = release.id if missing_releases: missing_releases = set( @@ -532,23 +544,21 @@ for _id in missing_releases: yield releases[_id] - def get_snapshot(self): + def get_snapshot(self) -> Snapshot: """Get the snapshot that need to be loaded.""" - branches = {} + branches: Dict[bytes, Optional[SnapshotBranch]] = {} for name, (pointer_nature, target) in self.heads.items(): - branches[name] = {'target': target, 'target_type': 'revision'} + branches[name] = SnapshotBranch( + target=target, target_type=TargetType.REVISION) if pointer_nature == HEAD_POINTER_NAME: - branches[b'HEAD'] = {'target': name, 'target_type': 'alias'} + branches[b'HEAD'] = SnapshotBranch( + target=name, target_type=TargetType.ALIAS) for name, target in self.releases.items(): - branches[name] = {'target': target, 'target_type': 'release'} + branches[name] = SnapshotBranch( + target=target, target_type=TargetType.RELEASE) - snap = { - 'id': None, - 'branches': branches, - } - snap['id'] = identifiers.identifier_to_bytes( - identifiers.snapshot_identifier(snap)) - return snap + self.snapshot = Snapshot(branches=branches) + return self.snapshot def get_fetch_history_result(self): """Return the data to store in fetch_history.""" @@ -563,7 +573,7 @@ snapshot = self.get_snapshot() load_status = 'eventful' if (self.last_visit is not None and - self.last_visit['snapshot'] == snapshot['id']): + self.last_visit['snapshot'] == snapshot.id): load_status = 'uneventful' return { 'status': load_status, diff --git a/swh/loader/mercurial/tests/common.py b/swh/loader/mercurial/tests/common.py --- a/swh/loader/mercurial/tests/common.py +++ b/swh/loader/mercurial/tests/common.py @@ -19,12 +19,7 @@ 'save_data_path': '', 'max_content_size': 104857600, 'storage': { - 'cls': 'pipeline', - 'steps': [ - {'cls': 'validate'}, - {'cls': 'filter'}, - {'cls': 'memory'}, - ] + 'cls': 'memory', }, 'temp_directory': '/tmp/swh.loader.mercurial' } @@ -41,6 +36,7 @@ objects. """ + def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.origin_id = 1