Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/git/loader.py
Show All 17 Lines | |||||
from swh.model import hashutil | from swh.model import hashutil | ||||
from swh.loader.core.loader import UnbufferedLoader | from swh.loader.core.loader import UnbufferedLoader | ||||
from swh.storage.algos.snapshot import snapshot_get_all_branches | from swh.storage.algos.snapshot import snapshot_get_all_branches | ||||
from . import converters | from . import converters | ||||
class RepoRepresentation: | class RepoRepresentation: | ||||
"""Repository representation for a Software Heritage origin.""" | """Repository representation for a Software Heritage origin.""" | ||||
def __init__(self, storage, origin_id, base_snapshot=None, | def __init__(self, storage, base_snapshot=None, | ||||
ignore_history=False): | ignore_history=False): | ||||
self.storage = storage | self.storage = storage | ||||
self._parents_cache = {} | self._parents_cache = {} | ||||
self._type_cache = {} | self._type_cache = {} | ||||
self.ignore_history = ignore_history | self.ignore_history = ignore_history | ||||
if origin_id and not ignore_history: | if base_snapshot and not ignore_history: | ||||
self.heads = set(self._cache_heads(origin_id, base_snapshot)) | self.heads = set(self._cache_heads(base_snapshot)) | ||||
else: | else: | ||||
self.heads = set() | self.heads = set() | ||||
def _fill_parents_cache(self, commits): | def _fill_parents_cache(self, commits): | ||||
"""When querying for a commit's parents, we fill the cache to a depth of 1000 | """When querying for a commit's parents, we fill the cache to a depth of 1000 | ||||
commits.""" | commits.""" | ||||
root_revs = self._encode_for_storage(commits) | root_revs = self._encode_for_storage(commits) | ||||
for rev, parents in self.storage.revision_shortlog(root_revs, 1000): | for rev, parents in self.storage.revision_shortlog(root_revs, 1000): | ||||
rev_id = hashutil.hash_to_bytehex(rev) | rev_id = hashutil.hash_to_bytehex(rev) | ||||
if rev_id not in self._parents_cache: | if rev_id not in self._parents_cache: | ||||
self._parents_cache[rev_id] = [ | self._parents_cache[rev_id] = [ | ||||
hashutil.hash_to_bytehex(parent) for parent in parents | hashutil.hash_to_bytehex(parent) for parent in parents | ||||
] | ] | ||||
for rev in commits: | for rev in commits: | ||||
if rev not in self._parents_cache: | if rev not in self._parents_cache: | ||||
self._parents_cache[rev] = [] | self._parents_cache[rev] = [] | ||||
def _cache_heads(self, origin_id, base_snapshot): | def _cache_heads(self, base_snapshot): | ||||
"""Return all the known head commits for `origin_id`""" | """Return all the known head commits for the given snapshot""" | ||||
_git_types = ['content', 'directory', 'revision', 'release'] | _git_types = ['content', 'directory', 'revision', 'release'] | ||||
if not base_snapshot: | if not base_snapshot: | ||||
return [] | return [] | ||||
snapshot_targets = set() | snapshot_targets = set() | ||||
for target in base_snapshot['branches'].values(): | for target in base_snapshot['branches'].values(): | ||||
if target and target['target_type'] in _git_types: | if target and target['target_type'] in _git_types: | ||||
▲ Show 20 Lines • Show All 136 Lines • ▼ Show 20 Lines | def __init__(self, repo_representation=RepoRepresentation, config=None): | ||||
which is in charge of filtering between known and remote | which is in charge of filtering between known and remote | ||||
data. | data. | ||||
""" | """ | ||||
super().__init__(logging_class='swh.loader.git.BulkLoader', | super().__init__(logging_class='swh.loader.git.BulkLoader', | ||||
config=config) | config=config) | ||||
self.repo_representation = repo_representation | self.repo_representation = repo_representation | ||||
def fetch_pack_from_origin(self, origin_url, base_origin_id, | def fetch_pack_from_origin(self, origin_url, | ||||
base_snapshot, do_activity): | base_snapshot, do_activity): | ||||
"""Fetch a pack from the origin""" | """Fetch a pack from the origin""" | ||||
pack_buffer = BytesIO() | pack_buffer = BytesIO() | ||||
base_repo = self.repo_representation( | base_repo = self.repo_representation( | ||||
storage=self.storage, | storage=self.storage, | ||||
origin_id=base_origin_id, | |||||
base_snapshot=base_snapshot, | base_snapshot=base_snapshot, | ||||
ignore_history=self.ignore_history, | ignore_history=self.ignore_history, | ||||
) | ) | ||||
client, path = dulwich.client.get_transport_and_path(origin_url, | client, path = dulwich.client.get_transport_and_path(origin_url, | ||||
thin_packs=False) | thin_packs=False) | ||||
size_limit = self.config['pack_size_bytes'] | size_limit = self.config['pack_size_bytes'] | ||||
def do_pack(data, | def do_pack(data): | ||||
pack_buffer=pack_buffer, | |||||
limit=size_limit, | |||||
origin_url=origin_url): | |||||
cur_size = pack_buffer.tell() | cur_size = pack_buffer.tell() | ||||
would_write = len(data) | would_write = len(data) | ||||
if cur_size + would_write > limit: | if cur_size + would_write > size_limit: | ||||
raise IOError('Pack file too big for repository %s, ' | raise IOError('Pack file too big for repository %s, ' | ||||
'limit is %d bytes, current size is %d, ' | 'limit is %d bytes, current size is %d, ' | ||||
'would write %d' % | 'would write %d' % | ||||
(origin_url, limit, cur_size, would_write)) | (origin_url, size_limit, cur_size, would_write)) | ||||
pack_buffer.write(data) | pack_buffer.write(data) | ||||
remote_refs = client.fetch_pack(path, | remote_refs = client.fetch_pack(path, | ||||
base_repo.determine_wants, | base_repo.determine_wants, | ||||
base_repo.graph_walker(), | base_repo.graph_walker(), | ||||
do_pack, | do_pack, | ||||
progress=do_activity).refs | progress=do_activity).refs | ||||
Show All 26 Lines | def list_pack(self, pack_data, pack_size): | ||||
type_to_ids[type].add(id) | type_to_ids[type].add(id) | ||||
return id_to_type, type_to_ids | return id_to_type, type_to_ids | ||||
def prepare_origin_visit(self, origin_url, **kwargs): | def prepare_origin_visit(self, origin_url, **kwargs): | ||||
self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc) | self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc) | ||||
self.origin = converters.origin_url_to_origin(origin_url) | self.origin = converters.origin_url_to_origin(origin_url) | ||||
def get_full_snapshot(self, origin_id): | def get_full_snapshot(self, origin_url): | ||||
prev_snapshot = self.storage.snapshot_get_latest(origin_id) | prev_snapshot = self.storage.snapshot_get_latest(origin_url) | ||||
if prev_snapshot and prev_snapshot.pop('next_branch', None): | if prev_snapshot and prev_snapshot.pop('next_branch', None): | ||||
return snapshot_get_all_branches(self.storage, prev_snapshot['id']) | return snapshot_get_all_branches(self.storage, prev_snapshot['id']) | ||||
return prev_snapshot | return prev_snapshot | ||||
def prepare(self, origin_url, base_url=None, ignore_history=False): | def prepare(self, origin_url, base_url=None, ignore_history=False): | ||||
base_origin_id = origin_id = self.origin_id | base_origin_url = origin_url = self.origin['url'] | ||||
prev_snapshot = None | prev_snapshot = None | ||||
if not ignore_history: | if not ignore_history: | ||||
prev_snapshot = self.get_full_snapshot(origin_id) | prev_snapshot = self.get_full_snapshot(origin_url) | ||||
if base_url and not prev_snapshot: | if base_url and not prev_snapshot: | ||||
base_origin = converters.origin_url_to_origin(base_url) | base_origin = converters.origin_url_to_origin(base_url) | ||||
base_origin = self.storage.origin_get(base_origin) | base_origin = self.storage.origin_get(base_origin) | ||||
if base_origin: | if base_origin: | ||||
base_origin_id = base_origin['id'] | base_origin_url = base_origin['url'] | ||||
prev_snapshot = self.get_full_snapshot(base_origin_id) | prev_snapshot = self.get_full_snapshot(base_origin_url) | ||||
self.base_snapshot = prev_snapshot | self.base_snapshot = prev_snapshot | ||||
self.base_origin_id = base_origin_id | self.base_origin_url = base_origin_url | ||||
self.ignore_history = ignore_history | self.ignore_history = ignore_history | ||||
def fetch_data(self): | def fetch_data(self): | ||||
def do_progress(msg): | def do_progress(msg): | ||||
sys.stderr.buffer.write(msg) | sys.stderr.buffer.write(msg) | ||||
sys.stderr.flush() | sys.stderr.flush() | ||||
fetch_info = self.fetch_pack_from_origin( | fetch_info = self.fetch_pack_from_origin( | ||||
self.origin['url'], self.base_origin_id, self.base_snapshot, | self.origin['url'], self.base_snapshot, | ||||
do_progress) | do_progress) | ||||
self.pack_buffer = fetch_info['pack_buffer'] | self.pack_buffer = fetch_info['pack_buffer'] | ||||
self.pack_size = fetch_info['pack_size'] | self.pack_size = fetch_info['pack_size'] | ||||
self.remote_refs = fetch_info['remote_refs'] | self.remote_refs = fetch_info['remote_refs'] | ||||
self.local_refs = fetch_info['local_refs'] | self.local_refs = fetch_info['local_refs'] | ||||
▲ Show 20 Lines • Show All 63 Lines • ▼ Show 20 Lines | def get_contents(self): | ||||
if raw_obj.type_name != b'blob': | if raw_obj.type_name != b'blob': | ||||
continue | continue | ||||
if raw_obj.sha().digest() not in missing_contents: | if raw_obj.sha().digest() not in missing_contents: | ||||
continue | continue | ||||
yield converters.dulwich_blob_to_content( | yield converters.dulwich_blob_to_content( | ||||
raw_obj, log=self.log, max_content_size=max_content_size, | raw_obj, log=self.log, max_content_size=max_content_size, | ||||
origin_id=self.origin_id) | origin_url=self.origin['url']) | ||||
def has_directories(self): | def has_directories(self): | ||||
return bool(self.type_to_ids[b'tree']) | return bool(self.type_to_ids[b'tree']) | ||||
def get_directory_ids(self): | def get_directory_ids(self): | ||||
"""Get the directory identifiers from the git repository""" | """Get the directory identifiers from the git repository""" | ||||
return (hashutil.hash_to_bytes(id.decode()) | return (hashutil.hash_to_bytes(id.decode()) | ||||
for id in self.type_to_ids[b'tree']) | for id in self.type_to_ids[b'tree']) | ||||
▲ Show 20 Lines • Show All 117 Lines • Show Last 20 Lines |