diff --git a/conftest.py b/conftest.py --- a/conftest.py +++ b/conftest.py @@ -19,7 +19,6 @@ 'storage': { 'cls': 'pipeline', 'steps': [ - {'cls': 'validate'}, {'cls': 'retry'}, {'cls': 'filter'}, {'cls': 'buffer'}, diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,4 +1,4 @@ swh.core >= 0.0.75 -swh.model >= 0.0.54 +swh.model >= 0.0.57 swh.scheduler swh.storage >= 0.0.163 diff --git a/swh/loader/core/tests/test_converters.py b/swh/loader/core/tests/test_converters.py --- a/swh/loader/core/tests/test_converters.py +++ b/swh/loader/core/tests/test_converters.py @@ -31,8 +31,7 @@ data = b'temp file for testing content storage conversion' tmpfile = tmpfile_with_content(tmpdir, data) - obj = from_disk.Content.from_file(path=os.fsdecode(tmpfile), - save_path=True).get_data() + obj = from_disk.Content.from_file(path=os.fsdecode(tmpfile)).get_data() expected_content = obj.copy() expected_content['data'] = data diff --git a/swh/loader/package/archive/loader.py b/swh/loader/package/archive/loader.py --- a/swh/loader/package/archive/loader.py +++ b/swh/loader/package/archive/loader.py @@ -101,8 +101,8 @@ return rev_id return None - def build_revision(self, a_metadata: Mapping[str, Any], - uncompressed_path: str) -> Dict: + def build_revision_data(self, a_metadata: Mapping[str, Any], + uncompressed_path: str) -> Dict: time = a_metadata['time'] # assume it's a timestamp if isinstance(time, str): # otherwise, assume it's a parsable date time = iso8601.parse_date(time) diff --git a/swh/loader/package/cran/loader.py b/swh/loader/package/cran/loader.py --- a/swh/loader/package/cran/loader.py +++ b/swh/loader/package/cran/loader.py @@ -83,7 +83,7 @@ return rev_id return None - def build_revision( + def build_revision_data( self, a_metadata: Mapping[str, Any], uncompressed_path: str) -> Dict[str, Any]: # a_metadata is empty diff --git a/swh/loader/package/debian/loader.py b/swh/loader/package/debian/loader.py --- a/swh/loader/package/debian/loader.py +++ b/swh/loader/package/debian/loader.py @@ -119,8 +119,8 @@ logger.debug('dl_artifacts: %s', dl_artifacts) return extract_package(dl_artifacts, dest=dest) - def build_revision(self, a_metadata: Mapping[str, Any], - uncompressed_path: str) -> Dict: + def build_revision_data(self, a_metadata: Mapping[str, Any], + uncompressed_path: str) -> Dict: dsc_url, dsc_name = dsc_information(a_metadata) if not dsc_name: raise ValueError( diff --git a/swh/loader/package/deposit/loader.py b/swh/loader/package/deposit/loader.py --- a/swh/loader/package/deposit/loader.py +++ b/swh/loader/package/deposit/loader.py @@ -68,7 +68,7 @@ return [self.client.archive_get( self.deposit_id, tmpdir, p_info['filename'])] - def build_revision( + def build_revision_data( self, a_metadata: Dict, uncompressed_path: str) -> Dict: revision = a_metadata.pop('revision') metadata = { diff --git a/swh/loader/package/loader.py b/swh/loader/package/loader.py --- a/swh/loader/package/loader.py +++ b/swh/loader/package/loader.py @@ -14,16 +14,21 @@ from swh.core.tarball import uncompress from swh.core.config import SWHConfig -from swh.model.from_disk import Directory +from swh.model import from_disk from swh.model.hashutil import hash_to_hex from swh.model.identifiers import ( revision_identifier, snapshot_identifier, identifier_to_bytes ) -from swh.model.model import Sha1Git +from swh.model.model import ( + BaseModel, Sha1Git, + Content, SkippedContent, Directory, + Revision, + TargetType, Snapshot, + Origin +) from swh.storage import get_storage from swh.storage.algos.snapshot import snapshot_get_all_branches -from swh.loader.core.converters import prepare_contents from swh.loader.package.utils import download @@ -95,7 +100,7 @@ """ yield from {} - def build_revision( + def build_revision_data( self, a_metadata: Dict, uncompressed_path: str) -> Dict: """Build the revision dict from the archive metadata (extrinsic artifact metadata) and the intrinsic metadata. @@ -119,19 +124,20 @@ """ return '' - def last_snapshot(self) -> Optional[Dict]: + def last_snapshot(self) -> Optional[Snapshot]: """Retrieve the last snapshot """ snapshot = None visit = self.storage.origin_visit_get_latest( self.url, require_snapshot=True) - if visit: - snapshot = snapshot_get_all_branches( - self.storage, visit['snapshot']) + if visit and visit.get('snapshot'): + snapshot = Snapshot.from_dict(snapshot_get_all_branches( + self.storage, visit['snapshot'])) return snapshot - def known_artifacts(self, snapshot: Optional[Dict]) -> Dict: + def known_artifacts( + self, snapshot: Optional[Snapshot]) -> Dict[Sha1Git, BaseModel]: """Retrieve the known releases/artifact for the origin. Args @@ -141,13 +147,13 @@ Dict of keys revision id (bytes), values a metadata Dict. """ - if not snapshot or 'branches' not in snapshot: + if not snapshot: return {} # retrieve only revisions (e.g the alias we do not want here) - revs = [rev['target'] - for rev in snapshot['branches'].values() - if rev and rev['target_type'] == 'revision'] + revs = [rev.target + for rev in snapshot.branches.values() + if rev and rev.target_type == TargetType.REVISION] known_revisions = self.storage.revision_get(revs) ret = {} @@ -263,16 +269,15 @@ snapshot = None # Prepare origin and origin_visit - origin = {'url': self.url} + origin = Origin(url=self.url) try: self.storage.origin_add_one(origin) visit_id = self.storage.origin_visit_add( origin=self.url, date=self.visit_date, type=self.visit_type)['visit'] - except Exception as e: - logger.error( - 'Failed to create origin/origin_visit. Reason: %s', e) + except Exception: + logger.exception('Failed to create origin/origin_visit:') return {'status': 'failed'} try: @@ -327,13 +332,15 @@ 'target': target, } - snapshot = { + snapshot_data = { 'branches': branches } - logger.debug('snapshot: %s', snapshot) + logger.debug('snapshot: %s', snapshot_data) - snapshot['id'] = identifier_to_bytes( - snapshot_identifier(snapshot)) + snapshot_data['id'] = identifier_to_bytes( + snapshot_identifier(snapshot_data)) + + snapshot = Snapshot.from_dict(snapshot_data) logger.debug('snapshot: %s', snapshot) self.storage.snapshot_add([snapshot]) @@ -346,12 +353,12 @@ finally: self.storage.origin_visit_update( origin=self.url, visit_id=visit_id, status=status_visit, - snapshot=snapshot and snapshot['id']) + snapshot=snapshot and snapshot.id) result = { 'status': status_load, } # type: Dict[str, Any] if snapshot: - result['snapshot_id'] = hash_to_hex(snapshot['id']) + result['snapshot_id'] = hash_to_hex(snapshot.id) return result def _load_revision(self, p_info, origin) -> Tuple[Optional[Sha1Git], bool]: @@ -373,51 +380,64 @@ uncompressed_path = self.uncompress(dl_artifacts, dest=tmpdir) logger.debug('uncompressed_path: %s', uncompressed_path) - directory = Directory.from_disk( + directory = from_disk.Directory.from_disk( path=uncompressed_path.encode('utf-8'), - data=True) # noqa - # FIXME: Try not to load the full raw content in - # memory - objects = directory.collect() - - contents, skipped_contents = prepare_contents( - objects.get('content', {}).values(), - max_content_size=self.max_content_size, - origin_url=origin['url']) - self.storage.skipped_content_add(skipped_contents) + max_content_length=self.max_content_size) + + contents: List[Content] = [] + skipped_contents: List[SkippedContent] = [] + directories: List[Directory] = [] + + for obj in directory.iter_tree(): + obj = obj.to_model() + if isinstance(obj, Content): + # FIXME: read the data from disk later (when the + # storage buffer is flushed). + obj = obj.with_data() + contents.append(obj) + elif isinstance(obj, SkippedContent): + skipped_contents.append(obj) + elif isinstance(obj, Directory): + directories.append(obj) + else: + raise TypeError( + f'Unexpected content type from disk: {obj}') + logger.debug('Number of skipped contents: %s', len(skipped_contents)) - self.storage.content_add(contents) + self.storage.skipped_content_add(skipped_contents) logger.debug('Number of contents: %s', len(contents)) + self.storage.content_add(contents) - directories = list( - objects.get('directory', {}).values()) logger.debug('Number of directories: %s', len(directories)) self.storage.directory_add(directories) # FIXME: This should be release. cf. D409 - revision = self.build_revision(p_info['raw'], uncompressed_path) - if not revision: + revision_data = self.build_revision_data( + p_info['raw'], uncompressed_path) + if not revision_data: # Some artifacts are missing intrinsic metadata # skipping those return (None, True) - revision.update({ + revision_data.update({ 'synthetic': True, 'directory': directory.hash, }) - revision['metadata'].update({ + revision_data['metadata'].update({ 'original_artifact': [ hashes for _, hashes in dl_artifacts ], }) - revision['id'] = identifier_to_bytes( - revision_identifier(revision)) + revision_data['id'] = identifier_to_bytes( + revision_identifier(revision_data)) + + revision = Revision.from_dict(revision_data) logger.debug('Revision: %s', revision) self.storage.revision_add([revision]) - return (revision['id'], True) + return (revision.id, True) diff --git a/swh/loader/package/npm/loader.py b/swh/loader/package/npm/loader.py --- a/swh/loader/package/npm/loader.py +++ b/swh/loader/package/npm/loader.py @@ -74,7 +74,7 @@ -> Optional[bytes]: return artifact_to_revision_id(known_artifacts, artifact_metadata) - def build_revision( + def build_revision_data( self, a_metadata: Dict, uncompressed_path: str) -> Dict: i_metadata = extract_intrinsic_metadata(uncompressed_path) if not i_metadata: diff --git a/swh/loader/package/pypi/loader.py b/swh/loader/package/pypi/loader.py --- a/swh/loader/package/pypi/loader.py +++ b/swh/loader/package/pypi/loader.py @@ -71,7 +71,7 @@ -> Optional[bytes]: return artifact_to_revision_id(known_artifacts, artifact_metadata) - def build_revision( + def build_revision_data( self, a_metadata: Dict, uncompressed_path: str) -> Dict: i_metadata = extract_intrinsic_metadata(uncompressed_path) if not i_metadata: diff --git a/swh/loader/package/tests/test_common.py b/swh/loader/package/tests/test_common.py --- a/swh/loader/package/tests/test_common.py +++ b/swh/loader/package/tests/test_common.py @@ -6,6 +6,7 @@ import pytest from swh.model.hashutil import hash_to_bytes +from swh.model.model import Snapshot, SnapshotBranch, TargetType from swh.loader.package.tests.common import ( decode_target, check_snapshot, check_metadata, check_metadata_paths ) @@ -17,9 +18,6 @@ storage_config = { 'cls': 'pipeline', 'steps': [ - { - 'cls': 'validate', - }, { 'cls': 'memory', } @@ -57,15 +55,15 @@ storage = get_storage(**storage_config) snap_id = '2498dbf535f882bc7f9a18fb16c9ad27fda7bab7' - snapshot = { - 'id': hash_to_bytes(snap_id), - 'branches': { - b'master': { - 'target': hash_to_bytes(hash_hex), - 'target_type': 'revision', - }, + snapshot = Snapshot( + id=hash_to_bytes(snap_id), + branches={ + b'master': SnapshotBranch( + target=hash_to_bytes(hash_hex), + target_type=TargetType.REVISION, + ), }, - } + ) s = storage.snapshot_add([snapshot]) assert s == { @@ -87,15 +85,15 @@ def test_check_snapshot_failure(): storage = get_storage(**storage_config) - snapshot = { - 'id': hash_to_bytes('2498dbf535f882bc7f9a18fb16c9ad27fda7bab7'), - 'branches': { - b'master': { - 'target': hash_to_bytes(hash_hex), - 'target_type': 'revision', - }, + snapshot = Snapshot( + id=hash_to_bytes('2498dbf535f882bc7f9a18fb16c9ad27fda7bab7'), + branches={ + b'master': SnapshotBranch( + target=hash_to_bytes(hash_hex), + target_type=TargetType.REVISION, + ), }, - } + ) s = storage.snapshot_add([snapshot]) assert s == {