Changeset View
Standalone View
swh/loader/core/loader.py
Show First 20 Lines • Show All 71 Lines • ▼ Show 20 Lines | class BaseLoader: | |||||||||||||||||||||||||||||||||||||||||||||||||
Args: | Args: | |||||||||||||||||||||||||||||||||||||||||||||||||
lister_name: Name of the lister which triggered this load. | lister_name: Name of the lister which triggered this load. | |||||||||||||||||||||||||||||||||||||||||||||||||
If provided, the loader will try to use the forge's API to retrieve extrinsic | If provided, the loader will try to use the forge's API to retrieve extrinsic | |||||||||||||||||||||||||||||||||||||||||||||||||
metadata | metadata | |||||||||||||||||||||||||||||||||||||||||||||||||
lister_instance_name: Name of the lister instance which triggered this load. | lister_instance_name: Name of the lister instance which triggered this load. | |||||||||||||||||||||||||||||||||||||||||||||||||
Must be None iff lister_name is, but it may be the empty string for listers | Must be None iff lister_name is, but it may be the empty string for listers | |||||||||||||||||||||||||||||||||||||||||||||||||
with a single instance. | with a single instance. | |||||||||||||||||||||||||||||||||||||||||||||||||
""" | """ | |||||||||||||||||||||||||||||||||||||||||||||||||
visit_type: str | visit_type: str | |||||||||||||||||||||||||||||||||||||||||||||||||
origin: Origin | origin: Origin | |||||||||||||||||||||||||||||||||||||||||||||||||
loaded_snapshot_id: Optional[Sha1Git] | loaded_snapshot_id: Optional[Sha1Git] | |||||||||||||||||||||||||||||||||||||||||||||||||
parent_origins: Optional[List[Origin]] | parent_origins: Optional[List[Origin]] | |||||||||||||||||||||||||||||||||||||||||||||||||
"""If the given origin is a "forge fork" (ie. created with the "Fork" button | """If the given origin is a "forge fork" (ie. created with the "Fork" button | |||||||||||||||||||||||||||||||||||||||||||||||||
of GitHub-like forges), :meth:`build_extrinsic_origin_metadata` sets this to | of GitHub-like forges), :meth:`build_extrinsic_origin_metadata` sets this to | |||||||||||||||||||||||||||||||||||||||||||||||||
a list of origins it was forked from; closest parent first.""" | a list of origins it was forked from; closest parent first.""" | |||||||||||||||||||||||||||||||||||||||||||||||||
def __init__( | def __init__( | |||||||||||||||||||||||||||||||||||||||||||||||||
self, | self, | |||||||||||||||||||||||||||||||||||||||||||||||||
storage: StorageInterface, | storage: StorageInterface, | |||||||||||||||||||||||||||||||||||||||||||||||||
origin_url: str, | origin_url: str, | |||||||||||||||||||||||||||||||||||||||||||||||||
logging_class: Optional[str] = None, | logging_class: Optional[str] = None, | |||||||||||||||||||||||||||||||||||||||||||||||||
save_data_path: Optional[str] = None, | save_data_path: Optional[str] = None, | |||||||||||||||||||||||||||||||||||||||||||||||||
max_content_size: Optional[int] = None, | max_content_size: Optional[int] = None, | |||||||||||||||||||||||||||||||||||||||||||||||||
lister_name: Optional[str] = None, | lister_name: Optional[str] = None, | |||||||||||||||||||||||||||||||||||||||||||||||||
lister_instance_name: Optional[str] = None, | lister_instance_name: Optional[str] = None, | |||||||||||||||||||||||||||||||||||||||||||||||||
metadata_fetcher_credentials: CredentialsType = None, | metadata_fetcher_credentials: CredentialsType = None, | |||||||||||||||||||||||||||||||||||||||||||||||||
create_partial_snapshot: bool = False, | ||||||||||||||||||||||||||||||||||||||||||||||||||
): | ): | |||||||||||||||||||||||||||||||||||||||||||||||||
if lister_name == "": | if lister_name == "": | |||||||||||||||||||||||||||||||||||||||||||||||||
raise ValueError("lister_name must not be the empty string") | raise ValueError("lister_name must not be the empty string") | |||||||||||||||||||||||||||||||||||||||||||||||||
if lister_name is None and lister_instance_name is not None: | if lister_name is None and lister_instance_name is not None: | |||||||||||||||||||||||||||||||||||||||||||||||||
raise ValueError( | raise ValueError( | |||||||||||||||||||||||||||||||||||||||||||||||||
f"lister_name is None but lister_instance_name is {lister_instance_name!r}" | f"lister_name is None but lister_instance_name is {lister_instance_name!r}" | |||||||||||||||||||||||||||||||||||||||||||||||||
) | ) | |||||||||||||||||||||||||||||||||||||||||||||||||
if lister_name is not None and lister_instance_name is None: | if lister_name is not None and lister_instance_name is None: | |||||||||||||||||||||||||||||||||||||||||||||||||
raise ValueError( | raise ValueError( | |||||||||||||||||||||||||||||||||||||||||||||||||
f"lister_instance_name is None but lister_name is {lister_name!r}" | f"lister_instance_name is None but lister_name is {lister_name!r}" | |||||||||||||||||||||||||||||||||||||||||||||||||
) | ) | |||||||||||||||||||||||||||||||||||||||||||||||||
self.storage = storage | self.storage = storage | |||||||||||||||||||||||||||||||||||||||||||||||||
self.origin = Origin(url=origin_url) | self.origin = Origin(url=origin_url) | |||||||||||||||||||||||||||||||||||||||||||||||||
self.max_content_size = int(max_content_size) if max_content_size else None | self.max_content_size = int(max_content_size) if max_content_size else None | |||||||||||||||||||||||||||||||||||||||||||||||||
self.lister_name = lister_name | self.lister_name = lister_name | |||||||||||||||||||||||||||||||||||||||||||||||||
self.lister_instance_name = lister_instance_name | self.lister_instance_name = lister_instance_name | |||||||||||||||||||||||||||||||||||||||||||||||||
self.metadata_fetcher_credentials = metadata_fetcher_credentials or {} | self.metadata_fetcher_credentials = metadata_fetcher_credentials or {} | |||||||||||||||||||||||||||||||||||||||||||||||||
self.create_partial_snapshot = create_partial_snapshot | ||||||||||||||||||||||||||||||||||||||||||||||||||
if logging_class is None: | if logging_class is None: | |||||||||||||||||||||||||||||||||||||||||||||||||
logging_class = "%s.%s" % ( | logging_class = "%s.%s" % ( | |||||||||||||||||||||||||||||||||||||||||||||||||
self.__class__.__module__, | self.__class__.__module__, | |||||||||||||||||||||||||||||||||||||||||||||||||
self.__class__.__name__, | self.__class__.__name__, | |||||||||||||||||||||||||||||||||||||||||||||||||
) | ) | |||||||||||||||||||||||||||||||||||||||||||||||||
self.log = logging.getLogger(logging_class) | self.log = logging.getLogger(logging_class) | |||||||||||||||||||||||||||||||||||||||||||||||||
▲ Show 20 Lines • Show All 143 Lines • ▼ Show 20 Lines | def fetch_data(self) -> bool: | |||||||||||||||||||||||||||||||||||||||||||||||||
""" | """ | |||||||||||||||||||||||||||||||||||||||||||||||||
raise NotImplementedError | raise NotImplementedError | |||||||||||||||||||||||||||||||||||||||||||||||||
def process_data(self) -> bool: | def process_data(self) -> bool: | |||||||||||||||||||||||||||||||||||||||||||||||||
"""Run any additional processing between fetching and storing the data | """Run any additional processing between fetching and storing the data | |||||||||||||||||||||||||||||||||||||||||||||||||
Returns: | Returns: | |||||||||||||||||||||||||||||||||||||||||||||||||
a value that is interpreted as a boolean. If True, fetch_data needs | a value that is interpreted as a boolean. If True, :meth:`fetch_data` needs | |||||||||||||||||||||||||||||||||||||||||||||||||
to be called again to complete loading. | to be called again to complete loading. Ignored if :meth:`fetch_data` | |||||||||||||||||||||||||||||||||||||||||||||||||
Ignored if ``fetch_data`` already returned :const:`False`. | already returned :const:`False`. | |||||||||||||||||||||||||||||||||||||||||||||||||
""" | """ | |||||||||||||||||||||||||||||||||||||||||||||||||
return True | return True | |||||||||||||||||||||||||||||||||||||||||||||||||
def store_data(self) -> None: | def store_data(self) -> None: | |||||||||||||||||||||||||||||||||||||||||||||||||
"""Store fetched data in the database. | """Store fetched and processed data in the storage. | |||||||||||||||||||||||||||||||||||||||||||||||||
This should call the `storage.<object>_add` methods, which handle the objects to | ||||||||||||||||||||||||||||||||||||||||||||||||||
store in the storage. | ||||||||||||||||||||||||||||||||||||||||||||||||||
Should call the :func:`maybe_load_xyz` methods, which handle the | ||||||||||||||||||||||||||||||||||||||||||||||||||
bundles sent to storage, rather than send directly. | ||||||||||||||||||||||||||||||||||||||||||||||||||
""" | """ | |||||||||||||||||||||||||||||||||||||||||||||||||
raise NotImplementedError | raise NotImplementedError | |||||||||||||||||||||||||||||||||||||||||||||||||
def load_status(self) -> Dict[str, str]: | def load_status(self) -> Dict[str, str]: | |||||||||||||||||||||||||||||||||||||||||||||||||
"""Detailed loading status. | """Detailed loading status. | |||||||||||||||||||||||||||||||||||||||||||||||||
Defaults to logging an eventful load. | Defaults to logging an eventful load. | |||||||||||||||||||||||||||||||||||||||||||||||||
Show All 30 Lines | class BaseLoader: | |||||||||||||||||||||||||||||||||||||||||||||||||
def pre_cleanup(self) -> None: | def pre_cleanup(self) -> None: | |||||||||||||||||||||||||||||||||||||||||||||||||
"""As a first step, will try and check for dangling data to cleanup. | """As a first step, will try and check for dangling data to cleanup. | |||||||||||||||||||||||||||||||||||||||||||||||||
This should do its best to avoid raising issues. | This should do its best to avoid raising issues. | |||||||||||||||||||||||||||||||||||||||||||||||||
""" | """ | |||||||||||||||||||||||||||||||||||||||||||||||||
pass | pass | |||||||||||||||||||||||||||||||||||||||||||||||||
def build_partial_snapshot(self) -> Optional[Snapshot]: | ||||||||||||||||||||||||||||||||||||||||||||||||||
"""When the loader is configured to serialize partial snapshot, this allows the | ||||||||||||||||||||||||||||||||||||||||||||||||||
loader to give an implementation that builds a partial snapshot. This is used | ||||||||||||||||||||||||||||||||||||||||||||||||||
when the ingestion is taking multiple calls to :meth:`fetch_data` and | ||||||||||||||||||||||||||||||||||||||||||||||||||
:meth:`store_data`. Ignored when the loader is not configured to serialize | ||||||||||||||||||||||||||||||||||||||||||||||||||
partial snapshot. | ||||||||||||||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||||||||||||||
return None | ||||||||||||||||||||||||||||||||||||||||||||||||||
def load(self) -> Dict[str, str]: | def load(self) -> Dict[str, str]: | |||||||||||||||||||||||||||||||||||||||||||||||||
r"""Loading logic for the loader to follow: | r"""Loading logic for the loader to follow: | |||||||||||||||||||||||||||||||||||||||||||||||||
- Store the actual ``origin_visit`` to storage | - Store the actual ``origin_visit`` to storage | |||||||||||||||||||||||||||||||||||||||||||||||||
- Call :meth:`prepare` to prepare any eventual state | - Call :meth:`prepare` to prepare any eventual state | |||||||||||||||||||||||||||||||||||||||||||||||||
- Call :meth:`get_origin` to get the origin we work with and store | - Call :meth:`get_origin` to get the origin we work with and store | |||||||||||||||||||||||||||||||||||||||||||||||||
- while True: | - while True: | |||||||||||||||||||||||||||||||||||||||||||||||||
▲ Show 20 Lines • Show All 60 Lines • ▼ Show 20 Lines | def load(self) -> Dict[str, str]: | |||||||||||||||||||||||||||||||||||||||||||||||||
more_data_to_fetch = self.fetch_data() | more_data_to_fetch = self.fetch_data() | |||||||||||||||||||||||||||||||||||||||||||||||||
t2 = time.monotonic() | t2 = time.monotonic() | |||||||||||||||||||||||||||||||||||||||||||||||||
total_time_fetch_data += t2 - t1 | total_time_fetch_data += t2 - t1 | |||||||||||||||||||||||||||||||||||||||||||||||||
more_data_to_fetch = self.process_data() and more_data_to_fetch | more_data_to_fetch = self.process_data() and more_data_to_fetch | |||||||||||||||||||||||||||||||||||||||||||||||||
t3 = time.monotonic() | t3 = time.monotonic() | |||||||||||||||||||||||||||||||||||||||||||||||||
total_time_process_data += t3 - t2 | total_time_process_data += t3 - t2 | |||||||||||||||||||||||||||||||||||||||||||||||||
self.store_data() | self.store_data() | |||||||||||||||||||||||||||||||||||||||||||||||||
vlorentz: This should use a keyword argument, for readability (make it clear what the argument means) | ||||||||||||||||||||||||||||||||||||||||||||||||||
t4 = time.monotonic() | t4 = time.monotonic() | |||||||||||||||||||||||||||||||||||||||||||||||||
total_time_store_data += t4 - t3 | total_time_store_data += t4 - t3 | |||||||||||||||||||||||||||||||||||||||||||||||||
# At the end of each ingestion loop, if the loader is configured for | ||||||||||||||||||||||||||||||||||||||||||||||||||
# partial snapshot (see self.create_partial_snapshot) and there are more | ||||||||||||||||||||||||||||||||||||||||||||||||||
# data to fetch, allows the loader to record an intermediary snapshot of | ||||||||||||||||||||||||||||||||||||||||||||||||||
# the ingestion. This could help when failing to load large repositories | ||||||||||||||||||||||||||||||||||||||||||||||||||
# for technical reasons (running out of disk, memory, etc...). | ||||||||||||||||||||||||||||||||||||||||||||||||||
if more_data_to_fetch and self.create_partial_snapshot: | ||||||||||||||||||||||||||||||||||||||||||||||||||
partial_snapshot = self.build_partial_snapshot() | ||||||||||||||||||||||||||||||||||||||||||||||||||
if partial_snapshot is not None: | ||||||||||||||||||||||||||||||||||||||||||||||||||
self.storage.snapshot_add([partial_snapshot]) | ||||||||||||||||||||||||||||||||||||||||||||||||||
visit_status = OriginVisitStatus( | ||||||||||||||||||||||||||||||||||||||||||||||||||
origin=self.origin.url, | ||||||||||||||||||||||||||||||||||||||||||||||||||
visit=self.visit.visit, | ||||||||||||||||||||||||||||||||||||||||||||||||||
type=self.visit_type, | ||||||||||||||||||||||||||||||||||||||||||||||||||
date=now(), | ||||||||||||||||||||||||||||||||||||||||||||||||||
status="partial", | ||||||||||||||||||||||||||||||||||||||||||||||||||
snapshot=partial_snapshot.id, | ||||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||||
self.storage.origin_visit_status_add([visit_status]) | ||||||||||||||||||||||||||||||||||||||||||||||||||
if not more_data_to_fetch: | if not more_data_to_fetch: | |||||||||||||||||||||||||||||||||||||||||||||||||
break | break | |||||||||||||||||||||||||||||||||||||||||||||||||
self.statsd_timing("fetch_data", total_time_fetch_data * 1000.0) | self.statsd_timing("fetch_data", total_time_fetch_data * 1000.0) | |||||||||||||||||||||||||||||||||||||||||||||||||
self.statsd_timing("process_data", total_time_process_data * 1000.0) | self.statsd_timing("process_data", total_time_process_data * 1000.0) | |||||||||||||||||||||||||||||||||||||||||||||||||
self.statsd_timing("store_data", total_time_store_data * 1000.0) | self.statsd_timing("store_data", total_time_store_data * 1000.0) | |||||||||||||||||||||||||||||||||||||||||||||||||
status = self.visit_status() | status = self.visit_status() | |||||||||||||||||||||||||||||||||||||||||||||||||
visit_status = OriginVisitStatus( | visit_status = OriginVisitStatus( | |||||||||||||||||||||||||||||||||||||||||||||||||
origin=self.origin.url, | origin=self.origin.url, | |||||||||||||||||||||||||||||||||||||||||||||||||
visit=self.visit.visit, | visit=self.visit.visit, | |||||||||||||||||||||||||||||||||||||||||||||||||
type=self.visit_type, | type=self.visit_type, | |||||||||||||||||||||||||||||||||||||||||||||||||
date=now(), | date=now(), | |||||||||||||||||||||||||||||||||||||||||||||||||
status=status, | status=status, | |||||||||||||||||||||||||||||||||||||||||||||||||
snapshot=self.loaded_snapshot_id, | snapshot=self.loaded_snapshot_id, | |||||||||||||||||||||||||||||||||||||||||||||||||
) | ) | |||||||||||||||||||||||||||||||||||||||||||||||||
self.storage.origin_visit_status_add([visit_status]) | self.storage.origin_visit_status_add([visit_status]) | |||||||||||||||||||||||||||||||||||||||||||||||||
Done Inline Actions
The current code uses slightly ambiguous "create_partial_snapshot" parameter to store_data, which makes it create an OVS in some cases; and create the OVS here in other cases. Instead, you can create the OVS here, simply by moving it inside the loop. This keeps store_data as it was before the diff (doesn't need to concern itself with creating objects) and avoid duplicating OVS creation. Just because Phabricator mangles the diff, this is what the new code should look like: while True: t1 = time.monotonic() more_data_to_fetch = self.fetch_data() t2 = time.monotonic() total_time_fetch_data += t2 - t1 more_data_to_fetch = self.process_data() and more_data_to_fetch t3 = time.monotonic() total_time_process_data += t3 - t2 status = "partial" if more_data_to_fetch else self.visit_status() self.store_data() visit_status = OriginVisitStatus( origin=self.origin.url, visit=self.visit.visit, type=self.visit_type, date=now(), status=status, snapshot=self.loaded_snapshot_id, ) self.storage.origin_visit_status_add([visit_status]) t4 = time.monotonic() total_time_store_data += t4 - t3 if not more_data_to_fetch: break self.statsd_timing("fetch_data", total_time_fetch_data * 1000.0) self.statsd_timing("process_data", total_time_process_data * 1000.0) self.statsd_timing("store_data", total_time_store_data * 1000.0) vlorentz: The current code uses slightly ambiguous "create_partial_snapshot" parameter to store_data… | ||||||||||||||||||||||||||||||||||||||||||||||||||
Done Inline Actionsoops, you should move the status = "partial" if more_data_to_fetch else self.visit_status() line after the call to store_data in both my snippets vlorentz: oops, you should move the `status = "partial" if more_data_to_fetch else self.visit_status()`… | ||||||||||||||||||||||||||||||||||||||||||||||||||
success = True | success = True | |||||||||||||||||||||||||||||||||||||||||||||||||
with self.statsd_timed( | with self.statsd_timed( | |||||||||||||||||||||||||||||||||||||||||||||||||
"post_load", tags={"success": success, "status": status} | "post_load", tags={"success": success, "status": status} | |||||||||||||||||||||||||||||||||||||||||||||||||
): | ): | |||||||||||||||||||||||||||||||||||||||||||||||||
self.post_load() | self.post_load() | |||||||||||||||||||||||||||||||||||||||||||||||||
except BaseException as e: | except BaseException as e: | |||||||||||||||||||||||||||||||||||||||||||||||||
success = False | success = False | |||||||||||||||||||||||||||||||||||||||||||||||||
if isinstance(e, NotFound): | if isinstance(e, NotFound): | |||||||||||||||||||||||||||||||||||||||||||||||||
▲ Show 20 Lines • Show All 432 Lines • ▼ Show 20 Lines | def store_data(self) -> None: | |||||||||||||||||||||||||||||||||||||||||||||||||
self.storage.skipped_content_add(self.skipped_cnts) | self.storage.skipped_content_add(self.skipped_cnts) | |||||||||||||||||||||||||||||||||||||||||||||||||
self.log.debug("Number of contents: %s", len(self.cnts)) | self.log.debug("Number of contents: %s", len(self.cnts)) | |||||||||||||||||||||||||||||||||||||||||||||||||
self.storage.content_add(self.cnts) | self.storage.content_add(self.cnts) | |||||||||||||||||||||||||||||||||||||||||||||||||
self.log.debug("Number of directories: %s", len(self.dirs)) | self.log.debug("Number of directories: %s", len(self.dirs)) | |||||||||||||||||||||||||||||||||||||||||||||||||
self.storage.directory_add(self.dirs) | self.storage.directory_add(self.dirs) | |||||||||||||||||||||||||||||||||||||||||||||||||
assert self.snapshot is not None | assert self.snapshot is not None | |||||||||||||||||||||||||||||||||||||||||||||||||
self.storage.snapshot_add([self.snapshot]) | self.storage.snapshot_add([self.snapshot]) | |||||||||||||||||||||||||||||||||||||||||||||||||
self.loaded_snapshot_id = self.snapshot.id | self.loaded_snapshot_id = self.snapshot.id | |||||||||||||||||||||||||||||||||||||||||||||||||
def visit_status(self): | def visit_status(self): | |||||||||||||||||||||||||||||||||||||||||||||||||
return "full" if self.directory and self.snapshot is not None else "partial" | return "full" if self.directory and self.snapshot is not None else "partial" | |||||||||||||||||||||||||||||||||||||||||||||||||
Done Inline Actionsshould be in the if. We don't really want the flush to happen each time the loop occurs. ardumont: should be in the if. We don't really want the flush to happen each time the loop occurs.
Plus… | ||||||||||||||||||||||||||||||||||||||||||||||||||
Done Inline Actionsthat comment was for a previous implem, it's now irrelevant. ardumont: that comment was for a previous implem, it's now irrelevant. | ||||||||||||||||||||||||||||||||||||||||||||||||||
Done Inline Actions
maybe? vlorentz: maybe? | ||||||||||||||||||||||||||||||||||||||||||||||||||
Done Inline ActionsI recall that's what i initially did and mypy was not happy about it. ardumont: I recall that's what i initially did and mypy was not happy about it. |
This should use a keyword argument, for readability (make it clear what the argument means)