Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/package/loader.py
Show All 32 Lines | |||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
class PackageLoader: | class PackageLoader: | ||||
# Origin visit type (str) set by the loader | # Origin visit type (str) set by the loader | ||||
visit_type = '' | visit_type = '' | ||||
def __init__(self, url): | def __init__(self, url, origin: Optional[str] = None): | ||||
"""Loader's constructor. This raises exception if the minimal required | """Loader's constructor. This raises exception if the minimal required | ||||
configuration is missing (cf. fn:`check` method). | configuration is missing (cf. fn:`check` method). | ||||
Args: | Args: | ||||
url (str): Origin url to load data from | url (str): Origin url to load data from | ||||
ardumont: Please move the type in the constructor. | |||||
origin (str): The name of the origin. If not specified, url is used | |||||
""" | """ | ||||
# This expects to use the environment variable SWH_CONFIG_FILENAME | # This expects to use the environment variable SWH_CONFIG_FILENAME | ||||
self.config = SWHConfig.parse_config_file() | self.config = SWHConfig.parse_config_file() | ||||
self._check_configuration() | self._check_configuration() | ||||
self.storage = get_storage(**self.config['storage']) | self.storage = get_storage(**self.config['storage']) | ||||
self.url = url | self.url = url | ||||
if origin is None: | |||||
self.origin = url | |||||
else: | |||||
self.origin = origin | |||||
self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc) | self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc) | ||||
self.max_content_size = self.config['max_content_size'] | self.max_content_size = self.config['max_content_size'] | ||||
def _check_configuration(self): | def _check_configuration(self): | ||||
"""Checks the minimal configuration required is set for the loader. | """Checks the minimal configuration required is set for the loader. | ||||
If some required configuration is missing, exception detailing the | If some required configuration is missing, exception detailing the | ||||
issue is raised. | issue is raised. | ||||
▲ Show 20 Lines • Show All 52 Lines • ▼ Show 20 Lines | def get_default_version(self) -> str: | ||||
return '' | return '' | ||||
def last_snapshot(self) -> Optional[Snapshot]: | def last_snapshot(self) -> Optional[Snapshot]: | ||||
"""Retrieve the last snapshot | """Retrieve the last snapshot | ||||
""" | """ | ||||
snapshot = None | snapshot = None | ||||
visit = self.storage.origin_visit_get_latest( | visit = self.storage.origin_visit_get_latest( | ||||
self.url, require_snapshot=True) | self.origin, require_snapshot=True) | ||||
if visit and visit.get('snapshot'): | if visit and visit.get('snapshot'): | ||||
snapshot = Snapshot.from_dict(snapshot_get_all_branches( | snapshot = Snapshot.from_dict(snapshot_get_all_branches( | ||||
self.storage, visit['snapshot'])) | self.storage, visit['snapshot'])) | ||||
return snapshot | return snapshot | ||||
def known_artifacts( | def known_artifacts( | ||||
self, snapshot: Optional[Snapshot]) -> Dict[Sha1Git, BaseModel]: | self, snapshot: Optional[Snapshot]) -> Dict[Sha1Git, BaseModel]: | ||||
"""Retrieve the known releases/artifact for the origin. | """Retrieve the known releases/artifact for the origin. | ||||
▲ Show 20 Lines • Show All 122 Lines • ▼ Show 20 Lines | def load(self) -> Dict: | ||||
""" | """ | ||||
status_load = 'uneventful' # either: eventful, uneventful, failed | status_load = 'uneventful' # either: eventful, uneventful, failed | ||||
status_visit = 'full' # either: partial, full | status_visit = 'full' # either: partial, full | ||||
tmp_revisions = {} # type: Dict[str, List] | tmp_revisions = {} # type: Dict[str, List] | ||||
snapshot = None | snapshot = None | ||||
# Prepare origin and origin_visit | # Prepare origin and origin_visit | ||||
origin = Origin(url=self.url) | origin = Origin(url=self.origin) | ||||
try: | try: | ||||
self.storage.origin_add_one(origin) | self.storage.origin_add_one(origin) | ||||
visit = self.storage.origin_visit_add( | visit = self.storage.origin_visit_add( | ||||
self.url, date=self.visit_date, type=self.visit_type) | self.origin, date=self.visit_date, type=self.visit_type) | ||||
except Exception: | except Exception: | ||||
logger.exception('Failed to create origin/origin_visit:') | logger.exception('Failed to create origin/origin_visit:') | ||||
return {'status': 'failed'} | return {'status': 'failed'} | ||||
try: | try: | ||||
last_snapshot = self.last_snapshot() | last_snapshot = self.last_snapshot() | ||||
logger.debug('last snapshot: %s', last_snapshot) | logger.debug('last snapshot: %s', last_snapshot) | ||||
known_artifacts = self.known_artifacts(last_snapshot) | known_artifacts = self.known_artifacts(last_snapshot) | ||||
▲ Show 20 Lines • Show All 57 Lines • ▼ Show 20 Lines | def load(self) -> Dict: | ||||
if hasattr(self.storage, 'flush'): | if hasattr(self.storage, 'flush'): | ||||
self.storage.flush() | self.storage.flush() | ||||
except Exception: | except Exception: | ||||
logger.exception('Fail to load %s' % self.url) | logger.exception('Fail to load %s' % self.url) | ||||
status_visit = 'partial' | status_visit = 'partial' | ||||
status_load = 'failed' | status_load = 'failed' | ||||
finally: | finally: | ||||
self.storage.origin_visit_update( | self.storage.origin_visit_update( | ||||
origin=self.url, visit_id=visit.visit, status=status_visit, | origin=self.origin, visit_id=visit.visit, status=status_visit, | ||||
snapshot=snapshot and snapshot.id) | snapshot=snapshot and snapshot.id) | ||||
result = { | result = { | ||||
'status': status_load, | 'status': status_load, | ||||
} # type: Dict[str, Any] | } # type: Dict[str, Any] | ||||
if snapshot: | if snapshot: | ||||
result['snapshot_id'] = hash_to_hex(snapshot.id) | result['snapshot_id'] = hash_to_hex(snapshot.id) | ||||
return result | return result | ||||
def _load_revision(self, p_info, origin) -> Tuple[Optional[Sha1Git], bool]: | def _load_revision(self, p_info, origin) -> Tuple[Optional[Sha1Git], bool]: | ||||
"""Does all the loading of a revision itself: | """Does all the loading of a revision itself: | ||||
* downloads a package and uncompresses it | * downloads a package and uncompresses it | ||||
* loads it from disk | * loads it from disk | ||||
* adds contents, directories, and revision to self.storage | * adds contents, directories, and revision to self.storage | ||||
* returns (revision_id, loaded) | * returns (revision_id, loaded) | ||||
""" | """ | ||||
with tempfile.TemporaryDirectory() as tmpdir: | with tempfile.TemporaryDirectory() as tmpdir: | ||||
try: | try: | ||||
dl_artifacts = self.download_package(p_info, tmpdir) | dl_artifacts = self.download_package(p_info, tmpdir) | ||||
except Exception: | except Exception: | ||||
logger.exception('Unable to retrieve %s', | logger.exception('Unable to retrieve %s', | ||||
p_info) | p_info) | ||||
return (None, False) | return (None, False) | ||||
try: | |||||
uncompressed_path = self.uncompress(dl_artifacts, dest=tmpdir) | uncompressed_path = self.uncompress(dl_artifacts, dest=tmpdir) | ||||
logger.debug('uncompressed_path: %s', uncompressed_path) | logger.debug('uncompressed_path: %s', uncompressed_path) | ||||
except ValueError: | |||||
logger.exception('Fail to uncompress %s', | |||||
p_info['url']) | |||||
return (None, False) | |||||
directory = from_disk.Directory.from_disk( | directory = from_disk.Directory.from_disk( | ||||
path=uncompressed_path.encode('utf-8'), | path=uncompressed_path.encode('utf-8'), | ||||
max_content_length=self.max_content_size) | max_content_length=self.max_content_size) | ||||
contents: List[Content] = [] | contents: List[Content] = [] | ||||
skipped_contents: List[SkippedContent] = [] | skipped_contents: List[SkippedContent] = [] | ||||
directories: List[Directory] = [] | directories: List[Directory] = [] | ||||
▲ Show 20 Lines • Show All 46 Lines • Show Last 20 Lines |
Please move the type in the constructor.