Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/git/loader.py
# Copyright (C) 2016-2020 The Software Heritage developers | # Copyright (C) 2016-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from collections import defaultdict | from collections import defaultdict | ||||
from dataclasses import dataclass | from dataclasses import dataclass | ||||
import datetime | import datetime | ||||
from io import BytesIO | from io import BytesIO | ||||
import logging | import logging | ||||
import os | import os | ||||
import pickle | import pickle | ||||
import sys | import sys | ||||
from typing import Any, Dict, Iterable, List, Optional, Set | from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Tuple, Type | ||||
import dulwich.client | import dulwich.client | ||||
from dulwich.object_store import ObjectStoreGraphWalker | from dulwich.object_store import ObjectStoreGraphWalker | ||||
from dulwich.pack import PackData, PackInflater | from dulwich.pack import PackData, PackInflater | ||||
from swh.model import hashutil | from swh.model import hashutil | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
BaseContent, | BaseContent, | ||||
▲ Show 20 Lines • Show All 99 Lines • ▼ Show 20 Lines | class GitLoader(DVCSLoader): | ||||
ADDITIONAL_CONFIG = { | ADDITIONAL_CONFIG = { | ||||
"pack_size_bytes": ("int", 4 * 1024 * 1024 * 1024), | "pack_size_bytes": ("int", 4 * 1024 * 1024 * 1024), | ||||
} | } | ||||
visit_type = "git" | visit_type = "git" | ||||
def __init__( | def __init__( | ||||
self, | self, | ||||
url, | url: str, | ||||
base_url=None, | base_url: Optional[str] = None, | ||||
ignore_history=False, | ignore_history: bool = False, | ||||
repo_representation=RepoRepresentation, | repo_representation: Type[RepoRepresentation] = RepoRepresentation, | ||||
config=None, | config: Optional[Dict[str, Any]] = None, | ||||
): | ): | ||||
"""Initialize the bulk updater. | """Initialize the bulk updater. | ||||
Args: | Args: | ||||
repo_representation: swh's repository representation | repo_representation: swh's repository representation | ||||
which is in charge of filtering between known and remote | which is in charge of filtering between known and remote | ||||
data. | data. | ||||
""" | """ | ||||
if config is None: | |||||
config = {} | |||||
super().__init__(logging_class="swh.loader.git.BulkLoader", config=config) | super().__init__(logging_class="swh.loader.git.BulkLoader", config=config) | ||||
self.origin_url = url | self.origin_url = url | ||||
self.base_url = base_url | self.base_url = base_url | ||||
self.ignore_history = ignore_history | self.ignore_history = ignore_history | ||||
self.repo_representation = repo_representation | self.repo_representation = repo_representation | ||||
# state initialized in fetch_data | # state initialized in fetch_data | ||||
self.remote_refs = [] | self.remote_refs: Dict[bytes, bytes] = {} | ||||
self.symbolic_refs = {} | self.symbolic_refs: Dict[bytes, bytes] = {} | ||||
def fetch_pack_from_origin(self, origin_url, base_snapshot, do_activity): | def fetch_pack_from_origin( | ||||
self, | |||||
origin_url: str, | |||||
base_snapshot: Optional[Snapshot], | |||||
do_activity: Callable[[bytes], None], | |||||
) -> FetchPackReturn: | |||||
"""Fetch a pack from the origin""" | """Fetch a pack from the origin""" | ||||
pack_buffer = BytesIO() | pack_buffer = BytesIO() | ||||
base_repo = self.repo_representation( | base_repo = self.repo_representation( | ||||
storage=self.storage, | storage=self.storage, | ||||
base_snapshot=base_snapshot, | base_snapshot=base_snapshot, | ||||
ignore_history=self.ignore_history, | ignore_history=self.ignore_history, | ||||
) | ) | ||||
client, path = dulwich.client.get_transport_and_path( | client, path = dulwich.client.get_transport_and_path( | ||||
origin_url, thin_packs=False | origin_url, thin_packs=False | ||||
) | ) | ||||
size_limit = self.config["pack_size_bytes"] | size_limit = self.config["pack_size_bytes"] | ||||
def do_pack(data): | def do_pack(data: bytes) -> None: | ||||
cur_size = pack_buffer.tell() | cur_size = pack_buffer.tell() | ||||
would_write = len(data) | would_write = len(data) | ||||
if cur_size + would_write > size_limit: | if cur_size + would_write > size_limit: | ||||
raise IOError( | raise IOError( | ||||
"Pack file too big for repository %s, " | "Pack file too big for repository %s, " | ||||
"limit is %d bytes, current size is %d, " | "limit is %d bytes, current size is %d, " | ||||
"would write %d" % (origin_url, size_limit, cur_size, would_write) | "would write %d" % (origin_url, size_limit, cur_size, would_write) | ||||
) | ) | ||||
Show All 17 Lines | ) -> FetchPackReturn: | ||||
return FetchPackReturn( | return FetchPackReturn( | ||||
remote_refs=filter_refs(remote_refs), | remote_refs=filter_refs(remote_refs), | ||||
symbolic_refs=filter_refs(symbolic_refs), | symbolic_refs=filter_refs(symbolic_refs), | ||||
pack_buffer=pack_buffer, | pack_buffer=pack_buffer, | ||||
pack_size=pack_size, | pack_size=pack_size, | ||||
) | ) | ||||
def list_pack(self, pack_data, pack_size): | def list_pack( | ||||
self, pack_data, pack_size | |||||
) -> Tuple[Dict[bytes, bytes], Dict[bytes, Set[bytes]]]: | |||||
id_to_type = {} | id_to_type = {} | ||||
type_to_ids = defaultdict(set) | type_to_ids = defaultdict(set) | ||||
inflater = self.get_inflater() | inflater = self.get_inflater() | ||||
for obj in inflater: | for obj in inflater: | ||||
type, id = obj.type_name, obj.id | type, id = obj.type_name, obj.id | ||||
id_to_type[id] = type | id_to_type[id] = type | ||||
type_to_ids[type].add(id) | type_to_ids[type].add(id) | ||||
return id_to_type, type_to_ids | return id_to_type, type_to_ids | ||||
def prepare_origin_visit(self, *args, **kwargs): | def prepare_origin_visit(self, *args, **kwargs) -> None: | ||||
self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc) | self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc) | ||||
self.origin = Origin(url=self.origin_url) | self.origin = Origin(url=self.origin_url) | ||||
def get_full_snapshot(self, origin_url) -> Optional[Snapshot]: | def get_full_snapshot(self, origin_url) -> Optional[Snapshot]: | ||||
visit = self.storage.origin_visit_get_latest(origin_url, require_snapshot=True) | visit = self.storage.origin_visit_get_latest(origin_url, require_snapshot=True) | ||||
if visit and visit["snapshot"]: | if visit and visit["snapshot"]: | ||||
snapshot = snapshot_get_all_branches(self.storage, visit["snapshot"]) | snapshot = snapshot_get_all_branches(self.storage, visit["snapshot"]) | ||||
else: | else: | ||||
snapshot = None | snapshot = None | ||||
if snapshot is None: | if snapshot is None: | ||||
return None | return None | ||||
return Snapshot.from_dict(snapshot) | return Snapshot.from_dict(snapshot) | ||||
def prepare(self, *args, **kwargs): | def prepare(self, *args, **kwargs) -> None: | ||||
assert self.origin is not None | |||||
base_origin_url = origin_url = self.origin.url | base_origin_url = origin_url = self.origin.url | ||||
prev_snapshot = None | prev_snapshot: Optional[Snapshot] = None | ||||
if not self.ignore_history: | if not self.ignore_history: | ||||
prev_snapshot = self.get_full_snapshot(origin_url) | prev_snapshot = self.get_full_snapshot(origin_url) | ||||
if self.base_url and prev_snapshot is None: | if self.base_url and prev_snapshot is None: | ||||
base_origin = Origin(url=self.base_url) | base_origin = Origin(url=self.base_url) | ||||
base_origin = self.storage.origin_get(base_origin) | base_origin = self.storage.origin_get(base_origin) | ||||
if base_origin: | if base_origin: | ||||
base_origin_url = base_origin.url | base_origin_url = base_origin.url | ||||
prev_snapshot = self.get_full_snapshot(base_origin_url) | prev_snapshot = self.get_full_snapshot(base_origin_url) | ||||
if prev_snapshot is not None: | if prev_snapshot is not None: | ||||
self.base_snapshot = prev_snapshot | self.base_snapshot = prev_snapshot | ||||
else: | else: | ||||
self.base_snapshot = Snapshot(branches={}) | self.base_snapshot = Snapshot(branches={}) | ||||
def fetch_data(self): | def fetch_data(self) -> bool: | ||||
def do_progress(msg): | assert self.origin is not None | ||||
def do_progress(msg: bytes) -> None: | |||||
sys.stderr.buffer.write(msg) | sys.stderr.buffer.write(msg) | ||||
sys.stderr.flush() | sys.stderr.flush() | ||||
fetch_info = self.fetch_pack_from_origin( | fetch_info = self.fetch_pack_from_origin( | ||||
self.origin.url, self.base_snapshot, do_progress | self.origin.url, self.base_snapshot, do_progress | ||||
) | ) | ||||
self.pack_buffer = fetch_info.pack_buffer | self.pack_buffer = fetch_info.pack_buffer | ||||
self.pack_size = fetch_info.pack_size | self.pack_size = fetch_info.pack_size | ||||
self.remote_refs = fetch_info.remote_refs | self.remote_refs = fetch_info.remote_refs | ||||
self.symbolic_refs = fetch_info.symbolic_refs | self.symbolic_refs = fetch_info.symbolic_refs | ||||
origin_url = self.origin.url | |||||
self.log.info( | self.log.info( | ||||
"Listed %d refs for repo %s" % (len(self.remote_refs), origin_url), | "Listed %d refs for repo %s" % (len(self.remote_refs), self.origin.url), | ||||
extra={ | extra={ | ||||
"swh_type": "git_repo_list_refs", | "swh_type": "git_repo_list_refs", | ||||
"swh_repo": origin_url, | "swh_repo": self.origin.url, | ||||
"swh_num_refs": len(self.remote_refs), | "swh_num_refs": len(self.remote_refs), | ||||
}, | }, | ||||
) | ) | ||||
# We want to load the repository, walk all the objects | # We want to load the repository, walk all the objects | ||||
id_to_type, type_to_ids = self.list_pack(self.pack_buffer, self.pack_size) | id_to_type, type_to_ids = self.list_pack(self.pack_buffer, self.pack_size) | ||||
self.id_to_type = id_to_type | self.id_to_type = id_to_type | ||||
self.type_to_ids = type_to_ids | self.type_to_ids = type_to_ids | ||||
# No more data to fetch | # No more data to fetch | ||||
return False | return False | ||||
def save_data(self): | def save_data(self) -> None: | ||||
"""Store a pack for archival""" | """Store a pack for archival""" | ||||
assert isinstance(self.visit_date, datetime.datetime) | |||||
write_size = 8192 | write_size = 8192 | ||||
pack_dir = self.get_save_data_path() | pack_dir = self.get_save_data_path() | ||||
pack_name = "%s.pack" % self.visit_date.isoformat() | pack_name = "%s.pack" % self.visit_date.isoformat() | ||||
refs_name = "%s.refs" % self.visit_date.isoformat() | refs_name = "%s.refs" % self.visit_date.isoformat() | ||||
with open(os.path.join(pack_dir, pack_name), "xb") as f: | with open(os.path.join(pack_dir, pack_name), "xb") as f: | ||||
self.pack_buffer.seek(0) | self.pack_buffer.seek(0) | ||||
while True: | while True: | ||||
r = self.pack_buffer.read(write_size) | r = self.pack_buffer.read(write_size) | ||||
if not r: | if not r: | ||||
break | break | ||||
f.write(r) | f.write(r) | ||||
self.pack_buffer.seek(0) | self.pack_buffer.seek(0) | ||||
with open(os.path.join(pack_dir, refs_name), "xb") as f: | with open(os.path.join(pack_dir, refs_name), "xb") as f: | ||||
pickle.dump(self.remote_refs, f) | pickle.dump(self.remote_refs, f) | ||||
def get_inflater(self): | def get_inflater(self) -> PackInflater: | ||||
"""Reset the pack buffer and get an object inflater from it""" | """Reset the pack buffer and get an object inflater from it""" | ||||
self.pack_buffer.seek(0) | self.pack_buffer.seek(0) | ||||
return PackInflater.for_pack_data( | return PackInflater.for_pack_data( | ||||
PackData.from_file(self.pack_buffer, self.pack_size) | PackData.from_file(self.pack_buffer, self.pack_size) | ||||
) | ) | ||||
def has_contents(self): | def has_contents(self) -> bool: | ||||
return bool(self.type_to_ids[b"blob"]) | return bool(self.type_to_ids[b"blob"]) | ||||
def get_content_ids(self) -> Iterable[Dict[str, Any]]: | def get_content_ids(self) -> Iterable[Dict[str, Any]]: | ||||
"""Get the content identifiers from the git repository""" | """Get the content identifiers from the git repository""" | ||||
for raw_obj in self.get_inflater(): | for raw_obj in self.get_inflater(): | ||||
if raw_obj.type_name != b"blob": | if raw_obj.type_name != b"blob": | ||||
continue | continue | ||||
▲ Show 20 Lines • Show All 166 Lines • ▼ Show 20 Lines | def get_fetch_history_result(self) -> Dict[str, int]: | ||||
"releases": len(self.type_to_ids[b"tag"]), | "releases": len(self.type_to_ids[b"tag"]), | ||||
} | } | ||||
def load_status(self) -> Dict[str, Any]: | def load_status(self) -> Dict[str, Any]: | ||||
"""The load was eventful if the current snapshot is different to | """The load was eventful if the current snapshot is different to | ||||
the one we retrieved at the beginning of the run""" | the one we retrieved at the beginning of the run""" | ||||
eventful = False | eventful = False | ||||
if self.base_snapshot: | if self.base_snapshot and self.snapshot: | ||||
eventful = self.snapshot.id != self.base_snapshot.id | eventful = self.snapshot.id != self.base_snapshot.id | ||||
else: | elif self.snapshot: | ||||
eventful = bool(self.snapshot.branches) | eventful = bool(self.snapshot.branches) | ||||
return {"status": ("eventful" if eventful else "uneventful")} | return {"status": ("eventful" if eventful else "uneventful")} | ||||
if __name__ == "__main__": | if __name__ == "__main__": | ||||
import click | import click | ||||
logging.basicConfig( | logging.basicConfig( | ||||
level=logging.DEBUG, format="%(asctime)s %(process)d %(message)s" | level=logging.DEBUG, format="%(asctime)s %(process)d %(message)s" | ||||
) | ) | ||||
@click.command() | @click.command() | ||||
@click.option("--origin-url", help="Origin url", required=True) | @click.option("--origin-url", help="Origin url", required=True) | ||||
@click.option("--base-url", default=None, help="Optional Base url") | @click.option("--base-url", default=None, help="Optional Base url") | ||||
@click.option( | @click.option( | ||||
"--ignore-history/--no-ignore-history", | "--ignore-history/--no-ignore-history", | ||||
help="Ignore the repository history", | help="Ignore the repository history", | ||||
default=False, | default=False, | ||||
) | ) | ||||
def main(origin_url, base_url, ignore_history): | def main(origin_url: str, base_url: str, ignore_history: bool) -> Dict[str, Any]: | ||||
loader = GitLoader( | loader = GitLoader( | ||||
origin_url, base_url=base_url, ignore_history=ignore_history, | origin_url, base_url=base_url, ignore_history=ignore_history, | ||||
) | ) | ||||
return loader.load() | return loader.load() | ||||
main() | main() |