diff --git a/swh/dataset/luigi.py b/swh/dataset/luigi.py --- a/swh/dataset/luigi.py +++ b/swh/dataset/luigi.py @@ -404,6 +404,97 @@ ) +class DownloadFromS3(luigi.Task): + """Downloads a local dataset export from S3. + + This performs the inverse operation of :class:`UploadToS3` + + Example invocation:: + + luigi --local-scheduler --module swh.dataset.luigi DownloadFromS3 \ + --config=graph.prod.yml \ + --local-export-path=export/ \ + --formats=edges \ + --s3-export-path=s3://softwareheritage/graph/swh_2022-11-08 + """ + + local_export_path = PathParameter(is_dir=True, create=True) + formats = luigi.EnumListParameter(enum=Format, batch_method=merge_lists) + object_types = luigi.EnumListParameter( + enum=ObjectType, default=list(ObjectType), batch_method=merge_lists + ) + s3_export_path = S3PathParameter(significant=False) + + def requires(self) -> List[luigi.Task]: + """Returns a :class:`ExportGraph` task that writes local files at the + expected location.""" + return [ + UploadToS3( + local_export_path=self.local_export_path, + formats=self.formats, + object_types=self.object_types, + s3_export_path=self.s3_export_path, + ) + ] + + def output(self) -> List[luigi.Target]: + """Returns stamp and meta paths on S3.""" + return self._stamps() + [self._meta()] + + def _stamps(self): + return [ + luigi.LocalTarget(self.local_export_path / path) + for path in stamps_paths(self.formats, self.object_types) + ] + + def _meta(self): + return luigi.LocalTarget(self.local_export_path / "meta" / "export.json") + + def run(self) -> None: + """Copies all files: first the export itself, then stamps, then + :file:`meta.json`. + """ + import luigi.contrib.s3 + import tqdm + + client = luigi.contrib.s3.S3Client() + + # recursively copy local files to S3, and end with stamps and export metadata + for format_ in self.formats: + for object_type in self.object_types: + local_dir = self.local_export_path / format_.name / object_type.name + s3_dir = f"{self.s3_export_path}/{format_.name}/{object_type.name}" + files = list(client.list(s3_dir)) + if not files: + # intermediary object types (eg. origin_visit, origin_visit_status) + # do not have their own directory + continue + local_dir.mkdir(parents=True, exist_ok=True) + for file_ in tqdm.tqdm( + files, + desc=f"Downloading {format_.name}/{object_type.name}/", + ): + client.get( + f"{s3_dir}/{file_}", + str(local_dir / file_), + ) + + for stamp in stamps_paths(self.formats, self.object_types): + stamp_path = self.local_export_path / stamp + stamp_path.parent.mkdir(parents=True, exist_ok=True) + client.get( + f"{self.s3_export_path}/{stamp}", + str(stamp_path), + ) + + export_json_path = self.local_export_path / "meta" / "export.json" + export_json_path.parent.mkdir(exist_ok=True) + client.get( + f"{self.s3_export_path}/meta/export.json", + self._meta().path, + ) + + class AthenaDatabaseTarget(luigi.Target): """Target for the existence of a database on Athena."""