diff --git a/swh/dataset/luigi.py b/swh/dataset/luigi.py --- a/swh/dataset/luigi.py +++ b/swh/dataset/luigi.py @@ -401,14 +401,16 @@ # recursively copy local files to S3, and end with stamps and export metadata for format_ in self.formats: - for object_type in self.object_types: - local_dir = self.local_export_path / format_.name / object_type.name - s3_dir = f"{self.s3_export_path}/{format_.name}/{object_type.name}" - if not local_dir.exists(): - # intermediary object types (eg. origin_visit, origin_visit_status) - # do not have their own directory + for dirname in os.listdir(self.local_export_path / format_.name): + if dirname == "stamps": + # used as stamps while exporting, pointless to copy them continue - status_message = f"Uploading {format_.name}/{object_type.name}/" + if dirname == "meta": + # used as final stamp; copy it at the end + continue + local_dir = self.local_export_path / format_.name / dirname + s3_dir = f"{self.s3_export_path}/{format_.name}/{dirname}" + status_message = f"Uploading {format_.name}/{dirname}/" self.set_status_message(status_message) for file_ in tqdm.tqdm( list(os.listdir(local_dir)), @@ -471,6 +473,8 @@ def run(self) -> None: """Copies all files: first the export itself, then :file:`meta.json`.""" + import collections + import luigi.contrib.s3 import tqdm @@ -478,16 +482,18 @@ # recursively copy local files to S3, and end with export metadata for format_ in self.formats: - for object_type in self.object_types: - local_dir = self.local_export_path / format_.name / object_type.name - s3_dir = f"{self.s3_export_path}/{format_.name}/{object_type.name}" - files = list(client.list(s3_dir)) - if not files: - # intermediary object types (eg. origin_visit, origin_visit_status) - # do not have their own directory - continue - local_dir.mkdir(parents=True, exist_ok=True) - status_message = f"Downloading {format_.name}/{object_type.name}/" + local_dir = self.local_export_path / format_.name + s3_dir = f"{self.s3_export_path}/{format_.name}" + files = list(client.list(s3_dir)) + assert files, "No files found" + + files_by_type = collections.defaultdict(list) + for file in files: + files_by_type[file.split("/")[0]].append(file) + + for (object_type, files) in files_by_type.items(): + (local_dir / object_type).mkdir(parents=True, exist_ok=True) + status_message = f"Downloading {format_.name}/{object_type}/" self.set_status_message(status_message) for file_ in tqdm.tqdm( files,