Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/git/loader.py
Show All 18 Lines | |||||
from dulwich.pack import PackData, PackInflater | from dulwich.pack import PackData, PackInflater | ||||
from swh.loader.core.loader import DVCSLoader | from swh.loader.core.loader import DVCSLoader | ||||
from swh.loader.exception import NotFound | from swh.loader.exception import NotFound | ||||
from swh.model import hashutil | from swh.model import hashutil | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
BaseContent, | BaseContent, | ||||
Directory, | Directory, | ||||
Origin, | |||||
Release, | Release, | ||||
Revision, | Revision, | ||||
Snapshot, | Snapshot, | ||||
SnapshotBranch, | SnapshotBranch, | ||||
TargetType, | TargetType, | ||||
) | ) | ||||
from swh.storage.algos.snapshot import snapshot_get_latest | from swh.storage.algos.snapshot import snapshot_get_latest | ||||
from swh.storage.interface import StorageInterface | from swh.storage.interface import StorageInterface | ||||
▲ Show 20 Lines • Show All 80 Lines • ▼ Show 20 Lines | def __init__( | ||||
self, | self, | ||||
storage: StorageInterface, | storage: StorageInterface, | ||||
url: str, | url: str, | ||||
base_url: Optional[str] = None, | base_url: Optional[str] = None, | ||||
incremental: bool = True, | incremental: bool = True, | ||||
repo_representation: Type[RepoRepresentation] = RepoRepresentation, | repo_representation: Type[RepoRepresentation] = RepoRepresentation, | ||||
pack_size_bytes: int = 4 * 1024 * 1024 * 1024, | pack_size_bytes: int = 4 * 1024 * 1024 * 1024, | ||||
temp_file_cutoff: int = 100 * 1024 * 1024, | temp_file_cutoff: int = 100 * 1024 * 1024, | ||||
save_data_path: Optional[str] = None, | **kwargs: Any, | ||||
max_content_size: Optional[int] = None, | |||||
): | ): | ||||
"""Initialize the bulk updater. | """Initialize the bulk updater. | ||||
Args: | Args: | ||||
repo_representation: swh's repository representation | repo_representation: swh's repository representation | ||||
which is in charge of filtering between known and remote | which is in charge of filtering between known and remote | ||||
data. | data. | ||||
... | ... | ||||
incremental: If True, the default, this starts from the last known snapshot | incremental: If True, the default, this starts from the last known snapshot | ||||
(if any) references. Otherwise, this loads the full repository. | (if any) references. Otherwise, this loads the full repository. | ||||
""" | """ | ||||
super().__init__( | super().__init__(storage=storage, origin_url=url, **kwargs) | ||||
storage=storage, | |||||
save_data_path=save_data_path, | |||||
max_content_size=max_content_size, | |||||
) | |||||
self.origin_url = url | |||||
self.base_url = base_url | self.base_url = base_url | ||||
self.incremental = incremental | self.incremental = incremental | ||||
self.repo_representation = repo_representation | self.repo_representation = repo_representation | ||||
self.pack_size_bytes = pack_size_bytes | self.pack_size_bytes = pack_size_bytes | ||||
self.temp_file_cutoff = temp_file_cutoff | self.temp_file_cutoff = temp_file_cutoff | ||||
# state initialized in fetch_data | # state initialized in fetch_data | ||||
self.remote_refs: Dict[bytes, HexBytes] = {} | self.remote_refs: Dict[bytes, HexBytes] = {} | ||||
self.symbolic_refs: Dict[bytes, HexBytes] = {} | self.symbolic_refs: Dict[bytes, HexBytes] = {} | ||||
▲ Show 20 Lines • Show All 76 Lines • ▼ Show 20 Lines | ) -> FetchPackReturn: | ||||
return FetchPackReturn( | return FetchPackReturn( | ||||
remote_refs=utils.filter_refs(remote_refs), | remote_refs=utils.filter_refs(remote_refs), | ||||
symbolic_refs=utils.filter_refs(symbolic_refs), | symbolic_refs=utils.filter_refs(symbolic_refs), | ||||
pack_buffer=pack_buffer, | pack_buffer=pack_buffer, | ||||
pack_size=pack_size, | pack_size=pack_size, | ||||
) | ) | ||||
def prepare_origin_visit(self) -> None: | |||||
self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc) | |||||
self.origin = Origin(url=self.origin_url) | |||||
def get_full_snapshot(self, origin_url) -> Optional[Snapshot]: | def get_full_snapshot(self, origin_url) -> Optional[Snapshot]: | ||||
return snapshot_get_latest(self.storage, origin_url) | return snapshot_get_latest(self.storage, origin_url) | ||||
def prepare(self) -> None: | def prepare(self) -> None: | ||||
assert self.origin is not None | assert self.origin is not None | ||||
prev_snapshot: Optional[Snapshot] = None | prev_snapshot: Optional[Snapshot] = None | ||||
▲ Show 20 Lines • Show All 41 Lines • ▼ Show 20 Lines | def fetch_data(self) -> bool: | ||||
raise NotFound(e) | raise NotFound(e) | ||||
# otherwise transmit the error | # otherwise transmit the error | ||||
raise | raise | ||||
except (AttributeError, NotImplementedError, ValueError): | except (AttributeError, NotImplementedError, ValueError): | ||||
# with old dulwich versions, those exceptions types can be raised | # with old dulwich versions, those exceptions types can be raised | ||||
# by the fetch_pack operation when encountering a repository with | # by the fetch_pack operation when encountering a repository with | ||||
# dumb transfer protocol so we check if the repository supports it | # dumb transfer protocol so we check if the repository supports it | ||||
# here to continue the loading if it is the case | # here to continue the loading if it is the case | ||||
self.dumb = dumb.check_protocol(self.origin_url) | self.dumb = dumb.check_protocol(self.origin.url) | ||||
if not self.dumb: | if not self.dumb: | ||||
raise | raise | ||||
logger.debug( | logger.debug( | ||||
"Protocol used for communication: %s", "dumb" if self.dumb else "smart" | "Protocol used for communication: %s", "dumb" if self.dumb else "smart" | ||||
) | ) | ||||
if self.dumb: | if self.dumb: | ||||
self.dumb_fetcher = dumb.GitObjectsFetcher(self.origin_url, base_repo) | self.dumb_fetcher = dumb.GitObjectsFetcher(self.origin.url, base_repo) | ||||
self.dumb_fetcher.fetch_object_ids() | self.dumb_fetcher.fetch_object_ids() | ||||
self.remote_refs = utils.filter_refs(self.dumb_fetcher.refs) # type: ignore | self.remote_refs = utils.filter_refs(self.dumb_fetcher.refs) # type: ignore | ||||
self.symbolic_refs = self.dumb_fetcher.head | self.symbolic_refs = self.dumb_fetcher.head | ||||
else: | else: | ||||
self.pack_buffer = fetch_info.pack_buffer | self.pack_buffer = fetch_info.pack_buffer | ||||
self.pack_size = fetch_info.pack_size | self.pack_size = fetch_info.pack_size | ||||
self.remote_refs = fetch_info.remote_refs | self.remote_refs = fetch_info.remote_refs | ||||
self.symbolic_refs = fetch_info.symbolic_refs | self.symbolic_refs = fetch_info.symbolic_refs | ||||
▲ Show 20 Lines • Show All 163 Lines • ▼ Show 20 Lines | def get_snapshot(self) -> Snapshot: | ||||
", ".join( | ", ".join( | ||||
f"{name.decode()}: {hashutil.hash_to_hex(obj)}" | f"{name.decode()}: {hashutil.hash_to_hex(obj)}" | ||||
for name, obj in unknown_objects.items() | for name, obj in unknown_objects.items() | ||||
) | ) | ||||
) | ) | ||||
) | ) | ||||
utils.warn_dangling_branches( | utils.warn_dangling_branches( | ||||
branches, dangling_branches, logger, self.origin_url | branches, dangling_branches, logger, self.origin.url | ||||
) | ) | ||||
self.snapshot = Snapshot(branches=branches) | self.snapshot = Snapshot(branches=branches) | ||||
return self.snapshot | return self.snapshot | ||||
def load_status(self) -> Dict[str, Any]: | def load_status(self) -> Dict[str, Any]: | ||||
"""The load was eventful if the current snapshot is different to | """The load was eventful if the current snapshot is different to | ||||
the one we retrieved at the beginning of the run""" | the one we retrieved at the beginning of the run""" | ||||
▲ Show 20 Lines • Show All 41 Lines • Show Last 20 Lines |