Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/mercurial/loader.py
Show All 22 Lines | |||||
from queue import Empty | from queue import Empty | ||||
import random | import random | ||||
import re | import re | ||||
import time | import time | ||||
from dateutil import parser | from dateutil import parser | ||||
from shutil import rmtree | from shutil import rmtree | ||||
from tempfile import mkdtemp | from tempfile import mkdtemp | ||||
from typing import Dict, Iterable, Optional | |||||
import billiard | import billiard | ||||
from swh.model import identifiers | from swh.model import identifiers | ||||
from swh.model.model import ( | |||||
BaseContent, Content, Directory, ObjectType, Origin, Person, | |||||
Release, Revision, RevisionType, SkippedContent, Snapshot, SnapshotBranch, | |||||
TargetType, TimestampWithTimezone | |||||
) | |||||
from swh.model.hashutil import ( | from swh.model.hashutil import ( | ||||
MultiHash, hash_to_hex, hash_to_bytehex, hash_to_bytes, | MultiHash, hash_to_hex, hash_to_bytehex, hash_to_bytes, | ||||
DEFAULT_ALGORITHMS | DEFAULT_ALGORITHMS | ||||
) | ) | ||||
from swh.loader.core.loader import DVCSLoader | from swh.loader.core.loader import DVCSLoader | ||||
from swh.loader.core.utils import clean_dangling_folders | from swh.loader.core.utils import clean_dangling_folders | ||||
from . import converters | from . import converters | ||||
▲ Show 20 Lines • Show All 41 Lines • ▼ Show 20 Lines | def __init__(self, url, visit_date=None, directory=None, | ||||
self.reduce_effort_flag = self.config['reduce_effort'] | self.reduce_effort_flag = self.config['reduce_effort'] | ||||
self.empty_repository = None | self.empty_repository = None | ||||
self.temp_directory = self.config['temp_directory'] | self.temp_directory = self.config['temp_directory'] | ||||
self.cache1_size = self.config['cache1_size'] | self.cache1_size = self.config['cache1_size'] | ||||
self.cache2_size = self.config['cache2_size'] | self.cache2_size = self.config['cache2_size'] | ||||
self.clone_timeout = self.config['clone_timeout_seconds'] | self.clone_timeout = self.config['clone_timeout_seconds'] | ||||
self.working_directory = None | self.working_directory = None | ||||
self.bundle_path = None | self.bundle_path = None | ||||
self.heads = {} | |||||
self.releases = {} | |||||
def pre_cleanup(self): | def pre_cleanup(self): | ||||
"""Cleanup potential dangling files from prior runs (e.g. OOM killed | """Cleanup potential dangling files from prior runs (e.g. OOM killed | ||||
tasks) | tasks) | ||||
""" | """ | ||||
clean_dangling_folders(self.temp_directory, | clean_dangling_folders(self.temp_directory, | ||||
pattern_check=TEMPORARY_DIR_PREFIX_PATTERN, | pattern_check=TEMPORARY_DIR_PREFIX_PATTERN, | ||||
Show All 28 Lines | def get_heads(self, repo): | ||||
if bookmarks and bookmarks[0]: | if bookmarks and bookmarks[0]: | ||||
for bookmark_name, _, target_short in bookmarks[0]: | for bookmark_name, _, target_short in bookmarks[0]: | ||||
target = repo[target_short].node() | target = repo[target_short].node() | ||||
b[bookmark_name] = (None, hash_to_bytes(target.decode())) | b[bookmark_name] = (None, hash_to_bytes(target.decode())) | ||||
return b | return b | ||||
def prepare_origin_visit(self, *args, **kwargs): | def prepare_origin_visit(self, *args, **kwargs): | ||||
self.origin = {'url': self.origin_url} | self.origin = Origin(url=self.origin_url) | ||||
visit_date = self.visit_date | visit_date = self.visit_date | ||||
if isinstance(visit_date, str): # visit_date can be string or datetime | if isinstance(visit_date, str): # visit_date can be string or datetime | ||||
visit_date = parser.parse(visit_date) | visit_date = parser.parse(visit_date) | ||||
self.visit_date = visit_date | self.visit_date = visit_date | ||||
self.last_visit = self.storage.origin_visit_get_latest( | self.last_visit = self.storage.origin_visit_get_latest( | ||||
self.origin_url, require_snapshot=True) | self.origin_url, require_snapshot=True) | ||||
@staticmethod | @staticmethod | ||||
▲ Show 20 Lines • Show All 48 Lines • ▼ Show 20 Lines | def prepare(self, *args, **kwargs): | ||||
visit_date (str/datetime): Date of the visit | visit_date (str/datetime): Date of the visit | ||||
directory (str/None): The local directory to load | directory (str/None): The local directory to load | ||||
""" | """ | ||||
self.branches = {} | self.branches = {} | ||||
self.tags = [] | self.tags = [] | ||||
self.releases = {} | self.releases = {} | ||||
self.node_2_rev = {} | self.node_2_rev = {} | ||||
self.heads = {} | |||||
directory = self.directory | directory = self.directory | ||||
if not directory: # remote repository | if not directory: # remote repository | ||||
self.working_directory = mkdtemp( | self.working_directory = mkdtemp( | ||||
prefix=TEMPORARY_DIR_PREFIX_PATTERN, | prefix=TEMPORARY_DIR_PREFIX_PATTERN, | ||||
suffix='-%s' % os.getpid(), | suffix='-%s' % os.getpid(), | ||||
dir=self.temp_directory) | dir=self.temp_directory) | ||||
▲ Show 20 Lines • Show All 59 Lines • ▼ Show 20 Lines | class HgBundle20Loader(DVCSLoader): | ||||
def has_releases(self): | def has_releases(self): | ||||
return not self.empty_repository | return not self.empty_repository | ||||
def fetch_data(self): | def fetch_data(self): | ||||
"""Fetch the data from the data source.""" | """Fetch the data from the data source.""" | ||||
pass | pass | ||||
def get_contents(self): | def get_contents(self) -> Iterable[BaseContent]: | ||||
"""Get the contents that need to be loaded.""" | """Get the contents that need to be loaded.""" | ||||
# NOTE: This method generates blobs twice to reduce memory usage | # NOTE: This method generates blobs twice to reduce memory usage | ||||
# without generating disk writes. | # without generating disk writes. | ||||
self.file_node_to_hash = {} | self.file_node_to_hash = {} | ||||
hash_to_info = {} | hash_to_info = {} | ||||
self.num_contents = 0 | self.num_contents = 0 | ||||
contents = {} | contents: Dict[bytes, BaseContent] = {} | ||||
missing_contents = set() | missing_contents = set() | ||||
for blob, node_info in self.br.yield_all_blobs(): | for blob, node_info in self.br.yield_all_blobs(): | ||||
self.num_contents += 1 | self.num_contents += 1 | ||||
file_name = node_info[0] | file_name = node_info[0] | ||||
header = node_info[2] | header = node_info[2] | ||||
length = len(blob) | length = len(blob) | ||||
if header['linknode'] in self.reduce_effort: | if header['linknode'] in self.reduce_effort: | ||||
algorithms = [ALGO] | algorithms = set([ALGO]) | ||||
else: | else: | ||||
algorithms = DEFAULT_ALGORITHMS | algorithms = DEFAULT_ALGORITHMS | ||||
h = MultiHash.from_data(blob, hash_names=algorithms) | h = MultiHash.from_data(blob, hash_names=algorithms) | ||||
content = h.digest() | content = h.digest() | ||||
content['length'] = length | content['length'] = length | ||||
blob_hash = content[ALGO] | blob_hash = content[ALGO] | ||||
self.file_node_to_hash[header['node']] = blob_hash | self.file_node_to_hash[header['node']] = blob_hash | ||||
if header['linknode'] in self.reduce_effort: | if header['linknode'] in self.reduce_effort: | ||||
continue | continue | ||||
hash_to_info[blob_hash] = node_info | hash_to_info[blob_hash] = node_info | ||||
contents[blob_hash] = content | if (self.max_content_size is not None | ||||
missing_contents.add(blob_hash) | and length >= self.max_content_size): | ||||
contents[blob_hash] = SkippedContent( | |||||
status='absent', reason='Content too large', **content) | |||||
else: | |||||
contents[blob_hash] = Content( | |||||
data=blob, status='visible', **content) | |||||
if file_name == b'.hgtags': | if file_name == b'.hgtags': | ||||
# https://www.mercurial-scm.org/wiki/GitConcepts#Tag_model | # https://www.mercurial-scm.org/wiki/GitConcepts#Tag_model | ||||
# overwrite until the last one | # overwrite until the last one | ||||
self.tags = (t for t in blob.split(b'\n') if t != b'') | self.tags = (t for t in blob.split(b'\n') if t != b'') | ||||
if contents: | if contents: | ||||
missing_contents = set( | missing_contents = set( | ||||
self.storage.content_missing( | self.storage.content_missing( | ||||
list(contents.values()), | map(lambda c: c.to_dict(), contents.values()), | ||||
key_hash=ALGO | key_hash=ALGO | ||||
) | ) | ||||
) | ) | ||||
# Clusters needed blobs by file offset and then only fetches the | # Clusters needed blobs by file offset and then only fetches the | ||||
# groups at the needed offsets. | # groups at the needed offsets. | ||||
focs = {} # "file/offset/contents" | focs: Dict[int, Dict[bytes, bytes]] = {} # "file/offset/contents" | ||||
for blob_hash in missing_contents: | for blob_hash in missing_contents: | ||||
_, file_offset, header = hash_to_info[blob_hash] | _, file_offset, header = hash_to_info[blob_hash] | ||||
focs.setdefault(file_offset, {}) | focs.setdefault(file_offset, {}) | ||||
focs[file_offset][header['node']] = blob_hash | focs[file_offset][header['node']] = blob_hash | ||||
hash_to_info = None | |||||
for offset, node_hashes in sorted(focs.items()): | for offset, node_hashes in sorted(focs.items()): | ||||
for header, data, *_ in self.br.yield_group_objects( | for header, data, *_ in self.br.yield_group_objects( | ||||
group_offset=offset | group_offset=offset | ||||
): | ): | ||||
node = header['node'] | node = header['node'] | ||||
if node in node_hashes: | if node in node_hashes: | ||||
blob, meta = self.br.extract_meta_from_blob(data) | blob, meta = self.br.extract_meta_from_blob(data) | ||||
content = contents.pop(node_hashes[node], None) | content = contents.pop(node_hashes[node], None) | ||||
if content: | if content: | ||||
content['data'] = blob | if (self.max_content_size is not None | ||||
yield content | and len(blob) >= self.max_content_size): | ||||
yield SkippedContent.from_data( | |||||
blob, reason='Content too large') | |||||
else: | |||||
yield Content.from_data(blob) | |||||
def load_directories(self): | def load_directories(self): | ||||
"""This is where the work is done to convert manifest deltas from the | """This is where the work is done to convert manifest deltas from the | ||||
repository bundle into SWH directories. | repository bundle into SWH directories. | ||||
""" | """ | ||||
self.mnode_to_tree_id = {} | self.mnode_to_tree_id = {} | ||||
cache_hints = self.br.build_manifest_hints() | cache_hints = self.br.build_manifest_hints() | ||||
Show All 28 Lines | def load_directories(self): | ||||
if header['linknode'] in self.reduce_effort: | if header['linknode'] in self.reduce_effort: | ||||
self.trees.store(node, tree) | self.trees.store(node, tree) | ||||
else: | else: | ||||
new_dirs = [] | new_dirs = [] | ||||
self.mnode_to_tree_id[node] = tree.hash_changed(new_dirs) | self.mnode_to_tree_id[node] = tree.hash_changed(new_dirs) | ||||
self.trees.store(node, tree) | self.trees.store(node, tree) | ||||
yield header, tree, new_dirs | yield header, tree, new_dirs | ||||
def get_directories(self): | def get_directories(self) -> Iterable[Directory]: | ||||
"""Compute directories to load | """Compute directories to load | ||||
""" | """ | ||||
dirs = {} | dirs = {} | ||||
self.num_directories = 0 | self.num_directories = 0 | ||||
for _, _, new_dirs in self.load_directories(): | for _, _, new_dirs in self.load_directories(): | ||||
for d in new_dirs: | for d in new_dirs: | ||||
self.num_directories += 1 | self.num_directories += 1 | ||||
dirs[d['id']] = d | dirs[d['id']] = Directory.from_dict(d) | ||||
missing_dirs = list(dirs.keys()) | missing_dirs = list(dirs.keys()) | ||||
if missing_dirs: | if missing_dirs: | ||||
missing_dirs = self.storage.directory_missing(missing_dirs) | missing_dirs = self.storage.directory_missing(missing_dirs) | ||||
for _id in missing_dirs: | for _id in missing_dirs: | ||||
yield dirs[_id] | yield dirs[_id] | ||||
dirs = {} | dirs = {} | ||||
def get_revisions(self): | def get_revisions(self) -> Iterable[Revision]: | ||||
"""Compute revisions to load | """Compute revisions to load | ||||
""" | """ | ||||
revisions = {} | revisions = {} | ||||
self.num_revisions = 0 | self.num_revisions = 0 | ||||
for header, commit in self.br.yield_all_changesets(): | for header, commit in self.br.yield_all_changesets(): | ||||
if header['node'] in self.reduce_effort: | if header['node'] in self.reduce_effort: | ||||
continue | continue | ||||
Show All 15 Lines | def get_revisions(self) -> Iterable[Revision]: | ||||
k, v = e.split(b':', 1) | k, v = e.split(b':', 1) | ||||
k = k.decode('utf-8') | k = k.decode('utf-8') | ||||
# transplant_source stores binary reference to a changeset | # transplant_source stores binary reference to a changeset | ||||
# prefer to dump hexadecimal one in the revision metadata | # prefer to dump hexadecimal one in the revision metadata | ||||
if k == 'transplant_source': | if k == 'transplant_source': | ||||
v = hash_to_bytehex(v) | v = hash_to_bytehex(v) | ||||
extra_meta.append([k, v]) | extra_meta.append([k, v]) | ||||
revision = { | parents = [] | ||||
'author': author_dict, | p1 = self.node_2_rev.get(header['p1']) | ||||
'date': date_dict, | p2 = self.node_2_rev.get(header['p2']) | ||||
'committer': author_dict, | if p1: | ||||
'committer_date': date_dict, | parents.append(p1) | ||||
'type': 'hg', | if p2: | ||||
'directory': directory_id, | parents.append(p2) | ||||
'message': commit['message'], | |||||
'metadata': { | revision = Revision( | ||||
author=Person.from_dict(author_dict), | |||||
date=TimestampWithTimezone.from_dict(date_dict), | |||||
committer=Person.from_dict(author_dict), | |||||
committer_date=TimestampWithTimezone.from_dict(date_dict), | |||||
type=RevisionType.MERCURIAL, | |||||
directory=directory_id, | |||||
message=commit['message'], | |||||
metadata={ | |||||
'node': hash_to_hex(header['node']), | 'node': hash_to_hex(header['node']), | ||||
'extra_headers': [ | 'extra_headers': [ | ||||
['time_offset_seconds', | ['time_offset_seconds', | ||||
str(commit['time_offset_seconds']).encode('utf-8')], | str(commit['time_offset_seconds']).encode('utf-8')], | ||||
] + extra_meta | ] + extra_meta | ||||
}, | }, | ||||
'synthetic': False, | synthetic=False, | ||||
'parents': [] | parents=parents, | ||||
} | |||||
p1 = self.node_2_rev.get(header['p1']) | |||||
p2 = self.node_2_rev.get(header['p2']) | |||||
if p1: | |||||
revision['parents'].append(p1) | |||||
if p2: | |||||
revision['parents'].append(p2) | |||||
revision['id'] = hash_to_bytes( | |||||
identifiers.revision_identifier(revision) | |||||
) | ) | ||||
self.node_2_rev[header['node']] = revision['id'] | |||||
revisions[revision['id']] = revision | self.node_2_rev[header['node']] = revision.id | ||||
revisions[revision.id] = revision | |||||
# Converts heads to use swh ids | # Converts heads to use swh ids | ||||
self.heads = { | self.heads = { | ||||
branch_name: (pointer_nature, self.node_2_rev[node_id]) | branch_name: (pointer_nature, self.node_2_rev[node_id]) | ||||
for branch_name, (pointer_nature, node_id) in self.heads.items() | for branch_name, (pointer_nature, node_id) in self.heads.items() | ||||
} | } | ||||
missing_revs = revisions.keys() | missing_revs = set(revisions.keys()) | ||||
if missing_revs: | if missing_revs: | ||||
missing_revs = set( | missing_revs = set( | ||||
self.storage.revision_missing(list(missing_revs)) | self.storage.revision_missing(missing_revs) | ||||
) | ) | ||||
for r in missing_revs: | for rev in missing_revs: | ||||
yield revisions[r] | yield revisions[rev] | ||||
self.mnode_to_tree_id = None | self.mnode_to_tree_id = None | ||||
def _read_tag(self, tag, split_byte=b' '): | def _read_tag(self, tag, split_byte=b' '): | ||||
node, *name = tag.split(split_byte) | node, *name = tag.split(split_byte) | ||||
name = split_byte.join(name) | name = split_byte.join(name) | ||||
return node, name | return node, name | ||||
def get_releases(self): | def get_releases(self) -> Iterable[Release]: | ||||
"""Get the releases that need to be loaded.""" | """Get the releases that need to be loaded.""" | ||||
self.num_releases = 0 | self.num_releases = 0 | ||||
releases = {} | releases = {} | ||||
missing_releases = [] | missing_releases = set() | ||||
for t in self.tags: | for t in self.tags: | ||||
self.num_releases += 1 | self.num_releases += 1 | ||||
node, name = self._read_tag(t) | node, name = self._read_tag(t) | ||||
node = node.decode() | node = node.decode() | ||||
node_bytes = hash_to_bytes(node) | node_bytes = hash_to_bytes(node) | ||||
if not TAG_PATTERN.match(node): | if not TAG_PATTERN.match(node): | ||||
self.log.warn('Wrong pattern (%s) found in tags. Skipping' % ( | self.log.warn('Wrong pattern (%s) found in tags. Skipping' % ( | ||||
node, )) | node, )) | ||||
continue | continue | ||||
if node_bytes not in self.node_2_rev: | if node_bytes not in self.node_2_rev: | ||||
self.log.warn('No matching revision for tag %s ' | self.log.warn('No matching revision for tag %s ' | ||||
'(hg changeset: %s). Skipping' % | '(hg changeset: %s). Skipping' % | ||||
(name.decode(), node)) | (name.decode(), node)) | ||||
continue | continue | ||||
tgt_rev = self.node_2_rev[node_bytes] | tgt_rev = self.node_2_rev[node_bytes] | ||||
release = { | release = Release( | ||||
'name': name, | name=name, | ||||
'target': tgt_rev, | target=tgt_rev, | ||||
'target_type': 'revision', | target_type=ObjectType.REVISION, | ||||
'message': None, | message=None, | ||||
'metadata': None, | metadata=None, | ||||
'synthetic': False, | synthetic=False, | ||||
'author': {'name': None, 'email': None, 'fullname': b''}, | author=Person(name=None, email=None, fullname=b''), | ||||
'date': None | date=None, | ||||
} | ) | ||||
id_hash = hash_to_bytes( | missing_releases.add(release.id) | ||||
identifiers.release_identifier(release)) | releases[release.id] = release | ||||
release['id'] = id_hash | self.releases[name] = release.id | ||||
missing_releases.append(id_hash) | |||||
releases[id_hash] = release | |||||
self.releases[name] = id_hash | |||||
if missing_releases: | if missing_releases: | ||||
missing_releases = set( | missing_releases = set( | ||||
self.storage.release_missing(missing_releases)) | self.storage.release_missing(missing_releases)) | ||||
for _id in missing_releases: | for _id in missing_releases: | ||||
yield releases[_id] | yield releases[_id] | ||||
def get_snapshot(self): | def get_snapshot(self) -> Snapshot: | ||||
"""Get the snapshot that need to be loaded.""" | """Get the snapshot that need to be loaded.""" | ||||
branches = {} | branches: Dict[bytes, Optional[SnapshotBranch]] = {} | ||||
for name, (pointer_nature, target) in self.heads.items(): | for name, (pointer_nature, target) in self.heads.items(): | ||||
branches[name] = {'target': target, 'target_type': 'revision'} | branches[name] = SnapshotBranch( | ||||
target=target, target_type=TargetType.REVISION) | |||||
if pointer_nature == HEAD_POINTER_NAME: | if pointer_nature == HEAD_POINTER_NAME: | ||||
branches[b'HEAD'] = {'target': name, 'target_type': 'alias'} | branches[b'HEAD'] = SnapshotBranch( | ||||
target=name, target_type=TargetType.ALIAS) | |||||
for name, target in self.releases.items(): | for name, target in self.releases.items(): | ||||
branches[name] = {'target': target, 'target_type': 'release'} | branches[name] = SnapshotBranch( | ||||
target=target, target_type=TargetType.RELEASE) | |||||
snap = { | self.snapshot = Snapshot(branches=branches) | ||||
'id': None, | return self.snapshot | ||||
'branches': branches, | |||||
} | |||||
snap['id'] = identifiers.identifier_to_bytes( | |||||
identifiers.snapshot_identifier(snap)) | |||||
return snap | |||||
def get_fetch_history_result(self): | def get_fetch_history_result(self): | ||||
"""Return the data to store in fetch_history.""" | """Return the data to store in fetch_history.""" | ||||
return { | return { | ||||
'contents': self.num_contents, | 'contents': self.num_contents, | ||||
'directories': self.num_directories, | 'directories': self.num_directories, | ||||
'revisions': self.num_revisions, | 'revisions': self.num_revisions, | ||||
'releases': self.num_releases, | 'releases': self.num_releases, | ||||
} | } | ||||
def load_status(self): | def load_status(self): | ||||
snapshot = self.get_snapshot() | snapshot = self.get_snapshot() | ||||
load_status = 'eventful' | load_status = 'eventful' | ||||
if (self.last_visit is not None and | if (self.last_visit is not None and | ||||
self.last_visit['snapshot'] == snapshot['id']): | self.last_visit['snapshot'] == snapshot.id): | ||||
load_status = 'uneventful' | load_status = 'uneventful' | ||||
return { | return { | ||||
'status': load_status, | 'status': load_status, | ||||
} | } | ||||
class HgArchiveBundle20Loader(HgBundle20Loader): | class HgArchiveBundle20Loader(HgBundle20Loader): | ||||
"""Mercurial loader for repository wrapped within archives. | """Mercurial loader for repository wrapped within archives. | ||||
Show All 25 Lines |