Changeset View
Changeset View
Standalone View
Standalone View
swh/graph/luigi.py
Show First 20 Lines • Show All 50 Lines • ▼ Show 20 Lines | [ | ||||
"name": "swh.graph", | "name": "swh.graph", | ||||
"version": "2.2.0" | "version": "2.2.0" | ||||
} | } | ||||
} | } | ||||
] | ] | ||||
When the compression pipeline is run in separate steps, each of the steps is recorded | When the compression pipeline is run in separate steps, each of the steps is recorded | ||||
as an object in the root list. | as an object in the root list. | ||||
S3 layout | |||||
--------- | |||||
As ``.bin`` files are meant to be accessed randomly, they are uncompressed on disk. | |||||
However, this is undesirable on at-rest/long-term storage like on S3, because | |||||
some are very sparse (eg. :file:`graph.property.committer_timestamp.bin` can be | |||||
quickly compressed from 300 to 1GB). | |||||
Therefore, these files are compressed to ``.bin.zst``, and need to be decompressed | |||||
when downloading. | |||||
The layout is otherwise the same as the file layout. | |||||
""" | """ | ||||
# WARNING: do not import unnecessary things here to keep cli startup time under | # WARNING: do not import unnecessary things here to keep cli startup time under | ||||
# control | # control | ||||
from typing import List | from typing import List | ||||
import luigi | import luigi | ||||
from swh.dataset.luigi import Format, LocalExport, ObjectType | from swh.dataset.luigi import Format, LocalExport, ObjectType, S3PathParameter | ||||
class CompressGraph(luigi.Task): | class CompressGraph(luigi.Task): | ||||
local_export_path = luigi.PathParameter() | local_export_path = luigi.PathParameter(significant=False) | ||||
local_graph_path = luigi.PathParameter() | local_graph_path = luigi.PathParameter() | ||||
batch_size = luigi.IntParameter( | batch_size = luigi.IntParameter( | ||||
default=0, | default=0, | ||||
significant=False, | significant=False, | ||||
description=""" | description=""" | ||||
Size of work batches to use while compressing. | Size of work batches to use while compressing. | ||||
Larger is faster, but consumes more resources. | Larger is faster, but consumes more resources. | ||||
""", | """, | ||||
▲ Show 20 Lines • Show All 99 Lines • ▼ Show 20 Lines | def run(self): | ||||
"tool": { | "tool": { | ||||
"name": "swh.graph", | "name": "swh.graph", | ||||
"version": pkg_resources.get_distribution("swh.graph").version, | "version": pkg_resources.get_distribution("swh.graph").version, | ||||
}, | }, | ||||
} | } | ||||
) | ) | ||||
with self._compression_meta().open("w") as fd: | with self._compression_meta().open("w") as fd: | ||||
json.dump(meta, fd, indent=4) | json.dump(meta, fd, indent=4) | ||||
class UploadGraphToS3(luigi.Task): | |||||
"""Uploads a local compressed graphto S3; creating automatically if it does | |||||
not exist. | |||||
Example invocation:: | |||||
luigi --local-scheduler --module swh.graph.luigi UploadGraphToS3 \ | |||||
--local-graph-path=graph/ \ | |||||
--s3-graph-path=s3://softwareheritage/graph/swh_2022-11-08/compressed/ | |||||
""" | |||||
local_graph_path = luigi.PathParameter(significant=False) | |||||
s3_graph_path = S3PathParameter() | |||||
def requires(self) -> List[luigi.Task]: | |||||
"""Returns a :class:`CompressGraph` task that writes local files at the | |||||
expected location.""" | |||||
return [ | |||||
CompressGraph( | |||||
local_graph_path=self.local_graph_path, | |||||
) | |||||
] | |||||
def output(self) -> List[luigi.Target]: | |||||
"""Returns stamp and meta paths on S3.""" | |||||
return [self._meta()] | |||||
def _meta(self): | |||||
import luigi.contrib.s3 | |||||
return luigi.contrib.s3.S3Target(f"{self.s3_graph_path}/meta/compression.json") | |||||
def run(self) -> None: | |||||
"""Copies all files: first the graph itself, then :file:`meta/compression.json`.""" | |||||
import subprocess | |||||
import tempfile | |||||
import luigi.contrib.s3 | |||||
import tqdm | |||||
compression_metadata_path = self.local_graph_path / "meta" / "compression.json" | |||||
seen_compression_metadata = False | |||||
client = luigi.contrib.s3.S3Client() | |||||
# recursively copy local files to S3, and end with compression metadata | |||||
paths = list(self.local_graph_path.glob("**/*")) | |||||
for (i, path) in tqdm.tqdm( | |||||
list(enumerate(paths)), | |||||
desc="Uploading compressed graph", | |||||
): | |||||
if path == compression_metadata_path: | |||||
# Write it last | |||||
seen_compression_metadata = True | |||||
continue | |||||
if path.is_dir(): | |||||
continue | |||||
relative_path = path.relative_to(self.local_graph_path) | |||||
self.set_progress_percentage(int(i * 100 / len(paths))) | |||||
if path.suffix == ".bin": | |||||
# Large sparse file; store it compressed on S3. | |||||
with tempfile.NamedTemporaryFile( | |||||
prefix=path.stem, suffix=".bin.zst" | |||||
) as fd: | |||||
self.set_status_message(f"Compressing {relative_path}") | |||||
subprocess.run( | |||||
["zstdmt", "--force", "--keep", path, "-o", fd.name], check=True | |||||
) | |||||
self.set_status_message(f"Uploading {relative_path} (compressed)") | |||||
client.put_multipart( | |||||
fd.name, | |||||
f"{self.s3_graph_path}/{relative_path}.zst", | |||||
ACL="public-read", | |||||
) | |||||
else: | |||||
self.set_status_message(f"Uploading {relative_path}") | |||||
client.put_multipart( | |||||
path, f"{self.s3_graph_path}/{relative_path}", ACL="public-read" | |||||
) | |||||
assert ( | |||||
seen_compression_metadata | |||||
), "did not see meta/compression.json in directory listing" | |||||
# Write it last, to act as a stamp | |||||
client.put( | |||||
compression_metadata_path, | |||||
self._meta().path, | |||||
ACL="public-read", | |||||
) | |||||
class DownloadGraphFromS3(luigi.Task): | |||||
"""Downloads a local dataset graph from S3. | |||||
This performs the inverse operation of :class:`UploadGraphToS3` | |||||
Example invocation:: | |||||
luigi --local-scheduler --module swh.graph.luigi DownloadGraphFromS3 \ | |||||
--local-graph-path=graph/ \ | |||||
--s3-graph-path=s3://softwareheritage/graph/swh_2022-11-08/compressed/ | |||||
""" | |||||
local_graph_path = luigi.PathParameter() | |||||
s3_graph_path = S3PathParameter(significant=False) | |||||
def requires(self) -> List[luigi.Task]: | |||||
"""Returns a :class:`ExportGraph` task that writes local files at the | |||||
expected location.""" | |||||
return [ | |||||
UploadGraphToS3( | |||||
local_graph_path=self.local_graph_path, | |||||
s3_graph_path=self.s3_graph_path, | |||||
) | |||||
] | |||||
def output(self) -> List[luigi.Target]: | |||||
"""Returns stamp and meta paths on the local filesystem.""" | |||||
return [self._meta()] | |||||
def _meta(self): | |||||
return luigi.LocalTarget(self.local_graph_path / "meta" / "export.json") | |||||
def run(self) -> None: | |||||
"""Copies all files: first the graph itself, then :file:`meta/compression.json`.""" | |||||
import subprocess | |||||
import tempfile | |||||
import luigi.contrib.s3 | |||||
import tqdm | |||||
client = luigi.contrib.s3.S3Client() | |||||
compression_metadata_path = f"{self.s3_graph_path}/meta/compression.json" | |||||
seen_compression_metadata = False | |||||
# recursively copy local files to S3, and end with compression metadata | |||||
files = list(client.list(self.s3_graph_path)) | |||||
for (i, file_) in tqdm.tqdm( | |||||
list(enumerate(files)), | |||||
desc="Downloading", | |||||
): | |||||
if file_ == compression_metadata_path: | |||||
# Will copy it last | |||||
seen_compression_metadata = True | |||||
continue | |||||
self.set_progress_percentage(int(i * 100 / len(files))) | |||||
local_path = self.local_graph_path / file_ | |||||
local_path.parent.mkdir(parents=True, exist_ok=True) | |||||
if file_.endswith(".bin.zst"): | |||||
# The file was compressed before uploading to S3, we need it | |||||
# to be decompressed locally | |||||
with tempfile.NamedTemporaryFile( | |||||
prefix=local_path.stem, suffix=".bin.zst" | |||||
) as fd: | |||||
self.set_status_message(f"Downloading {file_} (compressed)") | |||||
client.get( | |||||
f"{self.s3_graph_path}/{file_}", | |||||
fd.name, | |||||
) | |||||
self.set_status_message(f"Decompressing {file_}") | |||||
subprocess.run( | |||||
[ | |||||
"zstdmt", | |||||
"--force", | |||||
"-d", | |||||
fd.name, | |||||
"-o", | |||||
str(local_path)[0:-4], | |||||
], | |||||
check=True, | |||||
) | |||||
else: | |||||
self.set_status_message(f"Downloading {file_}") | |||||
client.get( | |||||
f"{self.s3_graph_path}/{file_}", | |||||
str(local_path), | |||||
) | |||||
assert ( | |||||
seen_compression_metadata | |||||
), "did not see meta/compression.json in directory listing" | |||||
# Write it last, to act as a stamp | |||||
client.get( | |||||
compression_metadata_path, | |||||
self._meta().path, | |||||
) | |||||
class LocalGraph(luigi.Task): | |||||
"""Task that depends on a local dataset being present -- either directly from | |||||
:class:`ExportGraph` or via :class:`DownloadGraphFromS3`. | |||||
""" | |||||
local_graph_path = luigi.PathParameter() | |||||
compression_task_type = luigi.TaskParameter( | |||||
default=DownloadGraphFromS3, | |||||
significant=False, | |||||
description="""The task used to get the compressed graph if it is not present. | |||||
Should be either ``swh.graph.luigi.CompressGraph`` or | |||||
``swh.graph.luigi.DownloadGraphFromS3``.""", | |||||
) | |||||
def requires(self) -> List[luigi.Task]: | |||||
"""Returns an instance of either :class:`CompressGraph` or | |||||
:class:`DownloadGraphFromS3` depending on the value of | |||||
:attr:`compression_task_type`.""" | |||||
if issubclass(self.compression_task_type, CompressGraph): | |||||
return [ | |||||
CompressGraph( | |||||
local_graph_path=self.local_graph_path, | |||||
) | |||||
] | |||||
elif issubclass(self.compression_task_type, DownloadGraphFromS3): | |||||
return [ | |||||
DownloadGraphFromS3( | |||||
local_graph_path=self.local_graph_path, | |||||
) | |||||
] | |||||
else: | |||||
raise ValueError( | |||||
f"Unexpected compression_task_type: " | |||||
f"{self.compression_task_type.__name__}" | |||||
) | |||||
def output(self) -> List[luigi.Target]: | |||||
"""Returns stamp and meta paths on the local filesystem.""" | |||||
return [self._meta()] | |||||
def _meta(self): | |||||
return luigi.LocalTarget(self.local_graph_path / "meta" / "compression.json") |