diff --git a/mypy.ini b/mypy.ini --- a/mypy.ini +++ b/mypy.ini @@ -20,5 +20,8 @@ [mypy-pyorc.*] ignore_missing_imports = True +[mypy-plyvel.*] +ignore_missing_imports = True + # [mypy-add_your_lib_here.*] # ignore_missing_imports = True diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,4 @@ click tqdm pyorc +plyvel diff --git a/swh/dataset/cli.py b/swh/dataset/cli.py --- a/swh/dataset/cli.py +++ b/swh/dataset/cli.py @@ -74,7 +74,7 @@ if not export_id: export_id = str(uuid.uuid4()) - exclude_obj_types = {o.strip() for o in exclude.split(",")} + exclude_obj_types = {o.strip() for o in (exclude.split(",") if exclude else [])} export_formats = [c.strip() for c in formats.split(",")] for f in export_formats: if f not in AVAILABLE_EXPORTERS: diff --git a/swh/dataset/journalprocessor.py b/swh/dataset/journalprocessor.py --- a/swh/dataset/journalprocessor.py +++ b/swh/dataset/journalprocessor.py @@ -18,7 +18,7 @@ import tqdm from swh.dataset.exporter import Exporter -from swh.dataset.utils import SQLiteSet +from swh.dataset.utils import LevelDBSet from swh.journal.client import JournalClient from swh.journal.serializers import kafka_to_value from swh.model.identifiers import origin_identifier @@ -280,7 +280,7 @@ self.node_sets_path = node_sets_path self.node_sets_path.mkdir(exist_ok=True, parents=True) - self.node_sets: Dict[Tuple[int, str], SQLiteSet] = {} + self.node_sets: Dict[Tuple[int, str], LevelDBSet] = {} self.exporters = [ exporter_class(config, **kwargs) for exporter_class, kwargs in exporters @@ -316,8 +316,8 @@ / ("part-{}".format(str(partition_id))) ) node_set_dir.mkdir(exist_ok=True, parents=True) - node_set_file = node_set_dir / "nodes-{}.sqlite".format(obj_id_prefix) - node_set = SQLiteSet(node_set_file) + node_set_file = node_set_dir / "nodes-{}.db".format(obj_id_prefix) + node_set = LevelDBSet(node_set_file) self.exit_stack.enter_context(node_set) self.node_sets[shard_id] = node_set return self.node_sets[shard_id] diff --git a/swh/dataset/test/test_edges.py b/swh/dataset/test/test_edges.py --- a/swh/dataset/test/test_edges.py +++ b/swh/dataset/test/test_edges.py @@ -72,7 +72,7 @@ class FakeDiskSet(set): """ A set with an add() method that returns whether the item has been added - or was already there. Used to replace SQLiteSet in unittests. + or was already there. Used to replace disk sets in unittests. """ def add(self, v): diff --git a/swh/dataset/test/test_utils.py b/swh/dataset/test/test_utils.py --- a/swh/dataset/test/test_utils.py +++ b/swh/dataset/test/test_utils.py @@ -3,13 +3,19 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from swh.dataset.utils import SQLiteSet +import pytest +from swh.dataset.utils import LevelDBSet, SQLiteSet -def test_sqliteset(tmp_path): - f = tmp_path / "test.sqlite3" - with SQLiteSet(f) as s: +@pytest.fixture(params=[SQLiteSet, LevelDBSet]) +def diskset(request, tmp_path): + backend = request.param + return backend(tmp_path / "test") + + +def test_diskset(diskset): + with diskset as s: assert s.add(b"a") assert s.add(b"b") assert not s.add(b"a") diff --git a/swh/dataset/utils.py b/swh/dataset/utils.py --- a/swh/dataset/utils.py +++ b/swh/dataset/utils.py @@ -6,6 +6,12 @@ import sqlite3 import subprocess +try: + # Plyvel shouldn't be a hard dependency if we want to use sqlite instead + import plyvel +except ImportError: + plyvel = None + class ZSTFile: """ @@ -85,6 +91,41 @@ return True +class LevelDBSet: + """ + On-disk Set object for hashes using LevelDB as an indexer backend. Used to + deduplicate objects when processing large queues with duplicates. + """ + + def __init__(self, db_path): + self.db_path = db_path + if plyvel is None: + raise ImportError("Plyvel library not found, required for LevelDBSet") + + def __enter__(self): + self.db = plyvel.DB(str(self.db_path), create_if_missing=True) + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.db.close() + + def add(self, v: bytes) -> bool: + """ + Add an item to the set. + + Args: + v: The value to add to the set. + + Returns: + True if the value was added to the set, False if it was already present. + """ + if self.db.get(v): + return False + else: + self.db.put(v, b"T") + return True + + def remove_pull_requests(snapshot): """ Heuristic to filter out pull requests in snapshots: remove all branches