diff --git a/mypy.ini b/mypy.ini index d91201c..7c7233a 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,24 +1,27 @@ [mypy] namespace_packages = True warn_unused_ignores = True exclude = (?x)( ^swh/graph/grpc ) # 3rd party libraries without stubs (yet) [mypy-luigi.*] ignore_missing_imports = True [mypy-pkg_resources.*] ignore_missing_imports = True [mypy-psutil.*] ignore_missing_imports = True [mypy-py4j.*] ignore_missing_imports = True [mypy-pytest.*] ignore_missing_imports = True + +[mypy-tqdm.*] +ignore_missing_imports = True diff --git a/requirements-luigi.txt b/requirements-luigi.txt index dc52dc6..c58d750 100644 --- a/requirements-luigi.txt +++ b/requirements-luigi.txt @@ -1 +1,2 @@ luigi +tqdm diff --git a/requirements-swh-luigi.txt b/requirements-swh-luigi.txt index 3c7e36b..b27c22b 100644 --- a/requirements-swh-luigi.txt +++ b/requirements-swh-luigi.txt @@ -1 +1 @@ -swh.dataset[luigi] >= v0.3.2 +swh.dataset[luigi] >= v1.0.1 diff --git a/swh/graph/luigi.py b/swh/graph/luigi.py index f109657..bf53019 100644 --- a/swh/graph/luigi.py +++ b/swh/graph/luigi.py @@ -1,186 +1,436 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """ Luigi tasks =========== This module contains `Luigi `_ tasks, as an alternative to the CLI that can be composed with other tasks, such as swh-dataset's. Unlike the CLI, this requires the graph to be named `graph`. File layout ----------- In addition to files documented in :ref:`graph-compression` (eg. :file:`graph.graph`, :file:`graph.mph`, ...), tasks in this module produce this directory structure:: swh_[_]/ graph.graph graph.mph ... meta/ export.json compression.json ``graph.meta/export.json`` is copied from the ORC dataset exported by :mod:`swh.dataset.luigi`. ``graph.meta/compression.json`` contains information about the compression itself, for provenance tracking. For example: .. code-block:: json [ { "steps": null, "export_start": "2022-11-08T11:00:54.998799+00:00", "export_end": "2022-11-08T11:05:53.105519+00:00", "object_type": [ "origin", "origin_visit" ], "hostname": "desktop5", "conf": {}, "tool": { "name": "swh.graph", "version": "2.2.0" } } ] When the compression pipeline is run in separate steps, each of the steps is recorded as an object in the root list. + +S3 layout +--------- + +As ``.bin`` files are meant to be accessed randomly, they are uncompressed on disk. +However, this is undesirable on at-rest/long-term storage like on S3, because +some are very sparse (eg. :file:`graph.property.committer_timestamp.bin` can be +quickly compressed from 300 to 1GB). + +Therefore, these files are compressed to ``.bin.zst``, and need to be decompressed +when downloading. + +The layout is otherwise the same as the file layout. """ # WARNING: do not import unnecessary things here to keep cli startup time under # control from typing import List import luigi -from swh.dataset.luigi import Format, LocalExport, ObjectType +from swh.dataset.luigi import Format, LocalExport, ObjectType, S3PathParameter class CompressGraph(luigi.Task): - local_export_path = luigi.PathParameter() + local_export_path = luigi.PathParameter(significant=False) local_graph_path = luigi.PathParameter() batch_size = luigi.IntParameter( default=0, significant=False, description=""" Size of work batches to use while compressing. Larger is faster, but consumes more resources. """, ) object_types = list(ObjectType) # To make this configurable, we could use this: # object_types = luigi.EnumListParameter( # enum=ObjectType, default=list(ObjectType), batch_method=merge_lists # ) # then use swh.dataset.luigi._export_metadata_has_object_types to check in # .meta/export.json that all objects are present before skipping the task def requires(self) -> List[luigi.Task]: """Returns a :class:`LocalExport` task.""" return [ LocalExport( local_export_path=self.local_export_path, formats=[Format.orc], # type: ignore[attr-defined] object_types=self.object_types, ) ] def output(self) -> List[luigi.LocalTarget]: """Returns the ``meta/*.json`` targets""" return [self._export_meta(), self._compression_meta()] def _export_meta(self) -> luigi.Target: """Returns the metadata on the dataset export""" return luigi.LocalTarget(self.local_graph_path / "meta/export.json") def _compression_meta(self) -> luigi.Target: """Returns the metadata on the compression pipeline""" return luigi.LocalTarget(self.local_graph_path / "meta/compression.json") def run(self): """Runs the full compression pipeline, then writes :file:`meta/compression.json` This does not support running individual steps yet.""" import datetime import json import shutil import socket import pkg_resources from swh.graph import webgraph conf = {} # TODO: make this configurable steps = None # TODO: make this configurable if self.batch_size: conf["batch_size"] = self.batch_size # Delete stamps. Otherwise interrupting this compression pipeline may leave # stamps from a previous successful compression if self._export_meta().exists(): self._export_meta().remove() if self._compression_meta().exists(): self._compression_meta().remove() # Make sure we don't accidentally append to existing files if self.local_graph_path.exists(): shutil.rmtree(self.local_graph_path) output_directory = self.local_graph_path graph_name = "graph" def progress_cb(percentage: int, step: webgraph.CompressionStep): self.set_progress_percentage(percentage) self.set_status_message(f"Running {step.name} (step #{step.value})") start_date = datetime.datetime.now(tz=datetime.timezone.utc) webgraph.compress( graph_name, self.local_export_path / "orc", output_directory, steps, conf, ) end_date = datetime.datetime.now(tz=datetime.timezone.utc) # Copy dataset export metadata with self._export_meta().open("w") as write_fd: with (self.local_export_path / "meta" / "export.json").open() as read_fd: write_fd.write(read_fd.read()) # Append metadata about this compression pipeline if self._compression_meta().exists(): with self._compression_meta().open("w") as fd: meta = json.load(fd) else: meta = [] meta.append( { "steps": steps, "compression_start": start_date.isoformat(), "compression_end": end_date.isoformat(), "object_type": [object_type.name for object_type in self.object_types], "hostname": socket.getfqdn(), "conf": conf, "tool": { "name": "swh.graph", "version": pkg_resources.get_distribution("swh.graph").version, }, } ) with self._compression_meta().open("w") as fd: json.dump(meta, fd, indent=4) + + +class UploadGraphToS3(luigi.Task): + """Uploads a local compressed graphto S3; creating automatically if it does + not exist. + + Example invocation:: + + luigi --local-scheduler --module swh.graph.luigi UploadGraphToS3 \ + --local-graph-path=graph/ \ + --s3-graph-path=s3://softwareheritage/graph/swh_2022-11-08/compressed/ + """ + + local_graph_path = luigi.PathParameter(significant=False) + s3_graph_path = S3PathParameter() + + def requires(self) -> List[luigi.Task]: + """Returns a :class:`CompressGraph` task that writes local files at the + expected location.""" + return [ + CompressGraph( + local_graph_path=self.local_graph_path, + ) + ] + + def output(self) -> List[luigi.Target]: + """Returns stamp and meta paths on S3.""" + return [self._meta()] + + def _meta(self): + import luigi.contrib.s3 + + return luigi.contrib.s3.S3Target(f"{self.s3_graph_path}/meta/compression.json") + + def run(self) -> None: + """Copies all files: first the graph itself, then :file:`meta/compression.json`.""" + import subprocess + import tempfile + + import luigi.contrib.s3 + import tqdm + + compression_metadata_path = self.local_graph_path / "meta" / "compression.json" + seen_compression_metadata = False + + client = luigi.contrib.s3.S3Client() + + # recursively copy local files to S3, and end with compression metadata + paths = list(self.local_graph_path.glob("**/*")) + for (i, path) in tqdm.tqdm( + list(enumerate(paths)), + desc="Uploading compressed graph", + ): + if path == compression_metadata_path: + # Write it last + seen_compression_metadata = True + continue + if path.is_dir(): + continue + relative_path = path.relative_to(self.local_graph_path) + self.set_progress_percentage(int(i * 100 / len(paths))) + + if path.suffix == ".bin": + # Large sparse file; store it compressed on S3. + with tempfile.NamedTemporaryFile( + prefix=path.stem, suffix=".bin.zst" + ) as fd: + self.set_status_message(f"Compressing {relative_path}") + subprocess.run( + ["zstdmt", "--force", "--keep", path, "-o", fd.name], check=True + ) + self.set_status_message(f"Uploading {relative_path} (compressed)") + client.put_multipart( + fd.name, + f"{self.s3_graph_path}/{relative_path}.zst", + ACL="public-read", + ) + else: + self.set_status_message(f"Uploading {relative_path}") + client.put_multipart( + path, f"{self.s3_graph_path}/{relative_path}", ACL="public-read" + ) + + assert ( + seen_compression_metadata + ), "did not see meta/compression.json in directory listing" + + # Write it last, to act as a stamp + client.put( + compression_metadata_path, + self._meta().path, + ACL="public-read", + ) + + +class DownloadGraphFromS3(luigi.Task): + """Downloads a local dataset graph from S3. + + This performs the inverse operation of :class:`UploadGraphToS3` + + Example invocation:: + + luigi --local-scheduler --module swh.graph.luigi DownloadGraphFromS3 \ + --local-graph-path=graph/ \ + --s3-graph-path=s3://softwareheritage/graph/swh_2022-11-08/compressed/ + """ + + local_graph_path = luigi.PathParameter() + s3_graph_path = S3PathParameter(significant=False) + + def requires(self) -> List[luigi.Task]: + """Returns a :class:`ExportGraph` task that writes local files at the + expected location.""" + return [ + UploadGraphToS3( + local_graph_path=self.local_graph_path, + s3_graph_path=self.s3_graph_path, + ) + ] + + def output(self) -> List[luigi.Target]: + """Returns stamp and meta paths on the local filesystem.""" + return [self._meta()] + + def _meta(self): + return luigi.LocalTarget(self.local_graph_path / "meta" / "export.json") + + def run(self) -> None: + """Copies all files: first the graph itself, then :file:`meta/compression.json`.""" + import subprocess + import tempfile + + import luigi.contrib.s3 + import tqdm + + client = luigi.contrib.s3.S3Client() + + compression_metadata_path = f"{self.s3_graph_path}/meta/compression.json" + seen_compression_metadata = False + + # recursively copy local files to S3, and end with compression metadata + files = list(client.list(self.s3_graph_path)) + for (i, file_) in tqdm.tqdm( + list(enumerate(files)), + desc="Downloading", + ): + if file_ == compression_metadata_path: + # Will copy it last + seen_compression_metadata = True + continue + self.set_progress_percentage(int(i * 100 / len(files))) + local_path = self.local_graph_path / file_ + local_path.parent.mkdir(parents=True, exist_ok=True) + if file_.endswith(".bin.zst"): + # The file was compressed before uploading to S3, we need it + # to be decompressed locally + with tempfile.NamedTemporaryFile( + prefix=local_path.stem, suffix=".bin.zst" + ) as fd: + self.set_status_message(f"Downloading {file_} (compressed)") + client.get( + f"{self.s3_graph_path}/{file_}", + fd.name, + ) + self.set_status_message(f"Decompressing {file_}") + subprocess.run( + [ + "zstdmt", + "--force", + "-d", + fd.name, + "-o", + str(local_path)[0:-4], + ], + check=True, + ) + else: + self.set_status_message(f"Downloading {file_}") + client.get( + f"{self.s3_graph_path}/{file_}", + str(local_path), + ) + + assert ( + seen_compression_metadata + ), "did not see meta/compression.json in directory listing" + + # Write it last, to act as a stamp + client.get( + compression_metadata_path, + self._meta().path, + ) + + +class LocalGraph(luigi.Task): + """Task that depends on a local dataset being present -- either directly from + :class:`ExportGraph` or via :class:`DownloadGraphFromS3`. + """ + + local_graph_path = luigi.PathParameter() + compression_task_type = luigi.TaskParameter( + default=DownloadGraphFromS3, + significant=False, + description="""The task used to get the compressed graph if it is not present. + Should be either ``swh.graph.luigi.CompressGraph`` or + ``swh.graph.luigi.DownloadGraphFromS3``.""", + ) + + def requires(self) -> List[luigi.Task]: + """Returns an instance of either :class:`CompressGraph` or + :class:`DownloadGraphFromS3` depending on the value of + :attr:`compression_task_type`.""" + + if issubclass(self.compression_task_type, CompressGraph): + return [ + CompressGraph( + local_graph_path=self.local_graph_path, + ) + ] + elif issubclass(self.compression_task_type, DownloadGraphFromS3): + return [ + DownloadGraphFromS3( + local_graph_path=self.local_graph_path, + ) + ] + else: + raise ValueError( + f"Unexpected compression_task_type: " + f"{self.compression_task_type.__name__}" + ) + + def output(self) -> List[luigi.Target]: + """Returns stamp and meta paths on the local filesystem.""" + return [self._meta()] + + def _meta(self): + return luigi.LocalTarget(self.local_graph_path / "meta" / "compression.json")