Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/git/loader.py
Show All 12 Lines | |||||
import sys | import sys | ||||
from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Tuple, Type | from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Tuple, Type | ||||
import dulwich.client | import dulwich.client | ||||
from dulwich.errors import GitProtocolError, NotGitRepository | from dulwich.errors import GitProtocolError, NotGitRepository | ||||
from dulwich.object_store import ObjectStoreGraphWalker | from dulwich.object_store import ObjectStoreGraphWalker | ||||
from dulwich.pack import PackData, PackInflater | from dulwich.pack import PackData, PackInflater | ||||
from swh.core.config import merge_configs | |||||
from swh.loader.core.loader import DVCSLoader | from swh.loader.core.loader import DVCSLoader | ||||
from swh.loader.exception import NotFound | from swh.loader.exception import NotFound | ||||
from swh.model import hashutil | from swh.model import hashutil | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
BaseContent, | BaseContent, | ||||
Directory, | Directory, | ||||
Origin, | Origin, | ||||
Release, | Release, | ||||
Revision, | Revision, | ||||
Sha1Git, | Sha1Git, | ||||
Snapshot, | Snapshot, | ||||
SnapshotBranch, | SnapshotBranch, | ||||
TargetType, | TargetType, | ||||
) | ) | ||||
from swh.storage.algos.snapshot import snapshot_get_latest | from swh.storage.algos.snapshot import snapshot_get_latest | ||||
from swh.storage.interface import StorageInterface | |||||
from . import converters, utils | from . import converters, utils | ||||
class RepoRepresentation: | class RepoRepresentation: | ||||
"""Repository representation for a Software Heritage origin.""" | """Repository representation for a Software Heritage origin.""" | ||||
def __init__( | def __init__( | ||||
▲ Show 20 Lines • Show All 48 Lines • ▼ Show 20 Lines | |||||
@dataclass | @dataclass | ||||
class FetchPackReturn: | class FetchPackReturn: | ||||
remote_refs: Dict[bytes, bytes] | remote_refs: Dict[bytes, bytes] | ||||
symbolic_refs: Dict[bytes, bytes] | symbolic_refs: Dict[bytes, bytes] | ||||
pack_buffer: BytesIO | pack_buffer: BytesIO | ||||
pack_size: int | pack_size: int | ||||
DEFAULT_CONFIG: Dict[str, Any] = { | |||||
"pack_size_bytes": 4 * 1024 * 1024 * 1024, | |||||
} | |||||
class GitLoader(DVCSLoader): | class GitLoader(DVCSLoader): | ||||
"""A bulk loader for a git repository""" | """A bulk loader for a git repository""" | ||||
visit_type = "git" | visit_type = "git" | ||||
def __init__( | def __init__( | ||||
self, | self, | ||||
storage: StorageInterface, | |||||
url: str, | url: str, | ||||
base_url: Optional[str] = None, | base_url: Optional[str] = None, | ||||
ignore_history: bool = False, | ignore_history: bool = False, | ||||
repo_representation: Type[RepoRepresentation] = RepoRepresentation, | repo_representation: Type[RepoRepresentation] = RepoRepresentation, | ||||
config: Optional[Dict[str, Any]] = None, | pack_size_bytes: int = 4 * 1024 * 1024 * 1024, | ||||
save_data_path: Optional[str] = None, | |||||
max_content_size: Optional[int] = None, | |||||
): | ): | ||||
"""Initialize the bulk updater. | """Initialize the bulk updater. | ||||
Args: | Args: | ||||
repo_representation: swh's repository representation | repo_representation: swh's repository representation | ||||
which is in charge of filtering between known and remote | which is in charge of filtering between known and remote | ||||
data. | data. | ||||
""" | """ | ||||
super().__init__(logging_class="swh.loader.git.BulkLoader", config=config) | super().__init__( | ||||
self.config = merge_configs(DEFAULT_CONFIG, self.config) | storage=storage, | ||||
anlambert: same here | |||||
save_data_path=save_data_path, | |||||
max_content_size=max_content_size, | |||||
) | |||||
self.origin_url = url | self.origin_url = url | ||||
self.base_url = base_url | self.base_url = base_url | ||||
self.ignore_history = ignore_history | self.ignore_history = ignore_history | ||||
self.repo_representation = repo_representation | self.repo_representation = repo_representation | ||||
self.pack_size_bytes = pack_size_bytes | |||||
# state initialized in fetch_data | # state initialized in fetch_data | ||||
self.remote_refs: Dict[bytes, bytes] = {} | self.remote_refs: Dict[bytes, bytes] = {} | ||||
self.symbolic_refs: Dict[bytes, bytes] = {} | self.symbolic_refs: Dict[bytes, bytes] = {} | ||||
def fetch_pack_from_origin( | def fetch_pack_from_origin( | ||||
self, | self, | ||||
origin_url: str, | origin_url: str, | ||||
base_snapshot: Optional[Snapshot], | base_snapshot: Optional[Snapshot], | ||||
do_activity: Callable[[bytes], None], | do_activity: Callable[[bytes], None], | ||||
) -> FetchPackReturn: | ) -> FetchPackReturn: | ||||
"""Fetch a pack from the origin""" | """Fetch a pack from the origin""" | ||||
pack_buffer = BytesIO() | pack_buffer = BytesIO() | ||||
base_repo = self.repo_representation( | base_repo = self.repo_representation( | ||||
storage=self.storage, | storage=self.storage, | ||||
base_snapshot=base_snapshot, | base_snapshot=base_snapshot, | ||||
ignore_history=self.ignore_history, | ignore_history=self.ignore_history, | ||||
) | ) | ||||
client, path = dulwich.client.get_transport_and_path( | client, path = dulwich.client.get_transport_and_path( | ||||
origin_url, thin_packs=False | origin_url, thin_packs=False | ||||
) | ) | ||||
size_limit = self.config["pack_size_bytes"] | size_limit = self.pack_size_bytes | ||||
def do_pack(data: bytes) -> None: | def do_pack(data: bytes) -> None: | ||||
cur_size = pack_buffer.tell() | cur_size = pack_buffer.tell() | ||||
would_write = len(data) | would_write = len(data) | ||||
if cur_size + would_write > size_limit: | if cur_size + would_write > size_limit: | ||||
Done Inline Actionswhy the type ignore here ? anlambert: why the type ignore here ? | |||||
Done Inline Actions(because it complained ;) I don't recall the details but it was not happy, probably because of the optional nature on stuff. ardumont: (because it complained ;)
I don't recall the details but it was not happy, probably because of… | |||||
raise IOError( | raise IOError( | ||||
"Pack file too big for repository %s, " | f"Pack file too big for repository {origin_url}, " | ||||
"limit is %d bytes, current size is %d, " | "limit is {size_limit} bytes, current size is {cur_size}, " | ||||
"would write %d" % (origin_url, size_limit, cur_size, would_write) | "would write {would_write}" | ||||
) | ) | ||||
pack_buffer.write(data) | pack_buffer.write(data) | ||||
pack_result = client.fetch_pack( | pack_result = client.fetch_pack( | ||||
path, | path, | ||||
base_repo.determine_wants, | base_repo.determine_wants, | ||||
base_repo.graph_walker(), | base_repo.graph_walker(), | ||||
Show All 25 Lines | ) -> Tuple[Dict[bytes, bytes], Dict[bytes, Set[bytes]]]: | ||||
for obj in inflater: | for obj in inflater: | ||||
type, id = obj.type_name, obj.id | type, id = obj.type_name, obj.id | ||||
id_to_type[id] = type | id_to_type[id] = type | ||||
type_to_ids[type].add(id) | type_to_ids[type].add(id) | ||||
return id_to_type, type_to_ids | return id_to_type, type_to_ids | ||||
def prepare_origin_visit(self, *args, **kwargs) -> None: | def prepare_origin_visit(self) -> None: | ||||
self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc) | self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc) | ||||
self.origin = Origin(url=self.origin_url) | self.origin = Origin(url=self.origin_url) | ||||
def get_full_snapshot(self, origin_url) -> Optional[Snapshot]: | def get_full_snapshot(self, origin_url) -> Optional[Snapshot]: | ||||
return snapshot_get_latest(self.storage, origin_url) | return snapshot_get_latest(self.storage, origin_url) | ||||
def prepare(self, *args, **kwargs) -> None: | def prepare(self) -> None: | ||||
assert self.origin is not None | assert self.origin is not None | ||||
prev_snapshot: Optional[Snapshot] = None | prev_snapshot: Optional[Snapshot] = None | ||||
if not self.ignore_history: | if not self.ignore_history: | ||||
prev_snapshot = self.get_full_snapshot(self.origin.url) | prev_snapshot = self.get_full_snapshot(self.origin.url) | ||||
if self.base_url and prev_snapshot is None: | if self.base_url and prev_snapshot is None: | ||||
▲ Show 20 Lines • Show All 299 Lines • ▼ Show 20 Lines | if __name__ == "__main__": | ||||
@click.option("--origin-url", help="Origin url", required=True) | @click.option("--origin-url", help="Origin url", required=True) | ||||
@click.option("--base-url", default=None, help="Optional Base url") | @click.option("--base-url", default=None, help="Optional Base url") | ||||
@click.option( | @click.option( | ||||
"--ignore-history/--no-ignore-history", | "--ignore-history/--no-ignore-history", | ||||
help="Ignore the repository history", | help="Ignore the repository history", | ||||
default=False, | default=False, | ||||
) | ) | ||||
def main(origin_url: str, base_url: str, ignore_history: bool) -> Dict[str, Any]: | def main(origin_url: str, base_url: str, ignore_history: bool) -> Dict[str, Any]: | ||||
from swh.storage import get_storage | |||||
storage = get_storage(cls="memory") | |||||
loader = GitLoader( | loader = GitLoader( | ||||
origin_url, base_url=base_url, ignore_history=ignore_history, | storage, origin_url, base_url=base_url, ignore_history=ignore_history, | ||||
) | ) | ||||
return loader.load() | return loader.load() | ||||
main() | main() |
same here