diff --git a/seirl/swh-dedup/deduper/__main__.py b/seirl/swh-dedup/deduper/__main__.py index fee90c9..29d7d94 100644 --- a/seirl/swh-dedup/deduper/__main__.py +++ b/seirl/swh-dedup/deduper/__main__.py @@ -1,64 +1,19 @@ -import logging import sys -from multiprocessing import Process, Queue from swh.model.hashutil import hash_to_bytes -from swh.objstorage.exc import ObjNotFoundError from deduper.deduper import Deduper - -NUM_WORKERS = 1 - - -def dedup_worker(task_queue, result_queue): - while True: - content_id = task_queue.get() - if content_id is None: # no more tasks - break - - try: - Deduper().dedup(hash_to_bytes(content_id)) - result_queue.put((content_id, True)) - except ObjNotFoundError: - logging.warning('cannot find object "%s", skipping' % content_id) - result_queue.put((content_id, False)) - - -def progress_monitor(result_queue): - obj_count = 0 - while True: - (content_id, _success) = result_queue.get() - obj_count += 1 - if obj_count % 1000 == 0: - logging.info('processed %d objects, currently at %s' % - (obj_count, content_id)) +OBJS_ROOT = '/home/seirl/swh-storage' +OBJS_SLICING = '0:2/2:4/4:6' +DB_SERVICE = 'swh-dedup' # postgres service name def main(): - task_queue = Queue() - result_queue = Queue() - - workers = [] - for i in range(0, NUM_WORKERS): - p = Process(target=dedup_worker, - args=(task_queue, result_queue)) - workers.append(p) - p.start() - - monitor = Process(target=progress_monitor, args=(result_queue,)) - monitor.start() - + deduper = Deduper(DB_SERVICE, OBJS_ROOT, OBJS_SLICING) for line in sys.stdin: # schedule tasks content_id = line.rstrip() - task_queue.put(content_id) - for i in range(0, NUM_WORKERS): # tell workers we're done - task_queue.put(None) - - for p in workers: # wait for completion - p.join() - monitor.terminate() + deduper.dedup(hash_to_bytes(content_id)) if __name__ == '__main__': - logging.basicConfig(level=logging.INFO) main() diff --git a/seirl/swh-dedup/deduper/chunkers.py b/seirl/swh-dedup/deduper/chunkers.py index 1f8d820..7a2d53b 100644 --- a/seirl/swh-dedup/deduper/chunkers.py +++ b/seirl/swh-dedup/deduper/chunkers.py @@ -1,68 +1,63 @@ import borg.chunker import io +import logging import math import rabin import zlib from hashlib import sha1 def buzhash_chunk(content, params): args = { 'seed': params.get('seed', 0), 'chunk_min_exp': int(math.log2(params.get('min_block_size'))), 'chunk_max_exp': int(math.log2(params.get('max_block_size'))), 'hash_mask_bits': int(math.log2(params.get('average_block_size'))), 'hash_window_size': params.get('window_size') } chunker = borg.chunker.Chunker(**args) - buf = bytearray() - for data in content: - buf.extend(data) - pos = 0 - for chunk in chunker.chunkify(io.BytesIO(buf)): + for chunk in chunker.chunkify(io.BytesIO(content)): yield pos, len(chunk) pos += len(chunk) def rabin_chunk(content, params): if 'prime' in params: rabin.set_prime(params['prime']) if 'window_size' in params: rabin.set_window_size(params['window_size']) if 'min_block_size' in params: rabin.set_min_block_size(params['min_block_size']) - if 'average_block_size' in params: - rabin.set_average_block_size(params['average_block_size']) if 'max_block_size' in params: rabin.set_max_block_size(params['max_block_size']) + if 'average_block_size' in params: + rabin.set_average_block_size(params['average_block_size']) r = rabin.Rabin() - buf = bytearray() - for data in content: - buf.extend(data) # TODO avoid loading the entire content in memory - r.update(data) + r.update(content) - if buf: # r.fingerprints() invoked on empty objects segfaults :-( + if content: # r.fingerprints() invoked on empty objects segfaults :-( for position, length, _fpr in r.fingerprints(): yield position, length r.clear() ALGOS = { 'rabin': rabin_chunk, 'buzhash': buzhash_chunk } def chunk(algo, params, content): + logging.debug('Chunking with algo %s, params %s', algo, repr(params)) f = ALGOS[algo] for position, length in f(content, params): chunk = content[position:(position+length)] chunk_id = sha1(chunk).digest() compressed_size = len(zlib.compress(chunk)) yield chunk_id, position, length, compressed_size diff --git a/seirl/swh-dedup/deduper/deduper.py b/seirl/swh-dedup/deduper/deduper.py index fa34753..59274d1 100755 --- a/seirl/swh-dedup/deduper/deduper.py +++ b/seirl/swh-dedup/deduper/deduper.py @@ -1,86 +1,87 @@ #!/usr/bin/python3 """compute Rabin fingerprints of Software Heritage content objects Read a list of Software Heritage content object IDs on standard input, fetch each of them from a (local) object storage and apply Rabin fingerprinting to its content. Store in a (postgres) DB the mapping between content objects and (Rabin-delimited) chunks. """ import magic import psycopg2 import zlib from psycopg2.extras import execute_values, RealDictCursor from swh.objstorage import PathSlicingObjStorage as ObjStorage from deduper.chunkers import chunk -OBJS_ROOT = '/srv/softwareheritage/objects' -OBJS_SLICING = '0:2/2:4' -DB_SERVICE = 'swh-dedup' # postgres service name - class Deduper: - def __init__(self): - self.db_conn = psycopg2.connect('service=%s' % DB_SERVICE) - self.obj_storage = ObjStorage(OBJS_ROOT, OBJS_SLICING) + def __init__(self, db_service, objs_root, objs_slicing): + self.db_conn = psycopg2.connect('service=%s' % db_service) + self.db_conn.autocommit = True + self.obj_storage = ObjStorage(objs_root, objs_slicing) def dedup(self, content_id): content = self.obj_storage.get(content_id) self._insert_content(content_id, content) # get list of methods not yet sweeped with self.db_conn.cursor(cursor_factory=RealDictCursor) as c: c.execute("""SELECT id, algo, min_block_size, average_block_size, max_block_size, window_size FROM chunking_method - LEFT JOIN chunked_content - ON method_id = chunking_method.id - WHERE content_id = %s AND method_id IS NULL""", + WHERE NOT EXISTS ( + SELECT 1 FROM chunked_content + WHERE content_id = %s + )""", (content_id,)) methods = c.fetchall() + chunked_content_queue = [] for method in methods: method_id = method['id'] algo = method['algo'] params = { 'min_block_size': method['min_block_size'], 'average_block_size': method['average_block_size'], 'max_block_size': method['max_block_size'], 'window_size': method['window_size'], } chunks = list(chunk(algo, params, content)) - self._insert_chunks(content_id, method_id, chunks) + chunked_content_queue.append((content_id, method_id, chunks)) + self._insert_chunks(chunked_content_queue) def _insert_content(self, content_id, content): size = len(content) compressed_size = len(zlib.compress(content)) ftype = magic.from_buffer(content) with self.db_conn.cursor() as cur: cur.execute("""INSERT INTO content (id, length, compressed_length, file_type) VALUES (%s, %s, %s, %s) ON CONFLICT DO NOTHING""", (content_id, size, compressed_size, ftype)) - def _insert_chunks(self, content_id, method_id, chunks): + def _insert_chunks(self, chunked_content_queue): chunk_values = [] chunked_content_values = [] - for (chunk_id, position, length, compressed_length) in chunks: - chunk_values.append((chunk_id, length, compressed_length)) - chunked_content_values.append((content_id, chunk_id, method_id, - position)) + for content_id, method_id, chunks in chunked_content_queue: + for (chunk_id, position, length, compressed_length) in chunks: + chunk_values.append((chunk_id, length, compressed_length)) + chunked_content_values.append((content_id, chunk_id, method_id, + position)) with self.db_conn.cursor() as cur: execute_values(cur, """INSERT INTO chunk (id, length, compressed_length) VALUES %s ON CONFLICT DO NOTHING""", chunk_values) execute_values(cur, """INSERT INTO chunked_content (content_id, chunk_id, method_id, position) VALUES %s""", chunked_content_values) diff --git a/seirl/swh-dedup/run.sh b/seirl/swh-dedup/run.sh index a5210de..b6d93e3 100755 --- a/seirl/swh-dedup/run.sh +++ b/seirl/swh-dedup/run.sh @@ -1,14 +1,15 @@ #!/bin/bash -db_name=swh-dedup2 +db_name=swh-dedup db_service=$db_name -sudo -u postgres dropdb -p 5433 $db_name -sudo -u postgres createdb -p 5433 -O swhdev $db_name +dropdb $db_name +createdb $db_name psql service=$db_service -f swh-dedup-blocks.sql +psql service=$db_service -f swh-dedup-blocks-methods.sql time \ -find /srv/softwareheritage/objects -type f \ +find ~/swh-storage -type f -printf "%f\n" \ + | head -n200 \ | sort \ - | cut -f 7 -d/ \ - | ./swh-dedup-blocks.py + | python -m deduper diff --git a/seirl/swh-dedup/swh-dedup-blocks.sql b/seirl/swh-dedup/swh-dedup-blocks.sql index d6a6641..3ecb00e 100644 --- a/seirl/swh-dedup/swh-dedup-blocks.sql +++ b/seirl/swh-dedup/swh-dedup-blocks.sql @@ -1,36 +1,38 @@ CREATE DOMAIN sha1 AS bytea CHECK (length(value) = 20); CREATE TABLE content ( id sha1 PRIMARY KEY, -- SHA1 checksum length integer, - compressed_length integer - file_type text, + compressed_length integer, + file_type text ); CREATE TABLE chunk ( id sha1 PRIMARY KEY, -- SHA1 checksum length integer, compressed_length integer ); CREATE TYPE chunking_algo as enum ('rabin', 'buzhash'); CREATE TABLE chunking_method ( id serial PRIMARY KEY, algo chunking_algo, min_block_size integer, average_block_size integer, max_block_size integer, - window_size integer, + window_size integer ); CREATE TABLE chunked_content ( content_id sha1 REFERENCES content(id), chunk_id sha1 REFERENCES chunk(id), method_id integer REFERENCES chunking_method(id), position integer ); +CREATE UNIQUE INDEX ON chunking_method (algo, min_block_size, average_block_size, max_block_size, window_size); + CREATE INDEX ON chunked_content (content_id); CREATE INDEX ON chunked_content (method_id); CREATE UNIQUE INDEX ON chunked_content (content_id, chunk_id, method_id, position);