diff --git a/seirl/swh-dedup/deduper/__init__.py b/seirl/swh-dedup/deduper/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/seirl/swh-dedup/deduper/__main__.py b/seirl/swh-dedup/deduper/__main__.py new file mode 100644 index 0000000..fee90c9 --- /dev/null +++ b/seirl/swh-dedup/deduper/__main__.py @@ -0,0 +1,64 @@ +import logging +import sys +from multiprocessing import Process, Queue +from swh.model.hashutil import hash_to_bytes +from swh.objstorage.exc import ObjNotFoundError + +from deduper.deduper import Deduper + + +NUM_WORKERS = 1 + + +def dedup_worker(task_queue, result_queue): + while True: + content_id = task_queue.get() + if content_id is None: # no more tasks + break + + try: + Deduper().dedup(hash_to_bytes(content_id)) + result_queue.put((content_id, True)) + except ObjNotFoundError: + logging.warning('cannot find object "%s", skipping' % content_id) + result_queue.put((content_id, False)) + + +def progress_monitor(result_queue): + obj_count = 0 + while True: + (content_id, _success) = result_queue.get() + obj_count += 1 + if obj_count % 1000 == 0: + logging.info('processed %d objects, currently at %s' % + (obj_count, content_id)) + + +def main(): + task_queue = Queue() + result_queue = Queue() + + workers = [] + for i in range(0, NUM_WORKERS): + p = Process(target=dedup_worker, + args=(task_queue, result_queue)) + workers.append(p) + p.start() + + monitor = Process(target=progress_monitor, args=(result_queue,)) + monitor.start() + + for line in sys.stdin: # schedule tasks + content_id = line.rstrip() + task_queue.put(content_id) + for i in range(0, NUM_WORKERS): # tell workers we're done + task_queue.put(None) + + for p in workers: # wait for completion + p.join() + monitor.terminate() + + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO) + main() diff --git a/seirl/swh-dedup/deduper/chunkers.py b/seirl/swh-dedup/deduper/chunkers.py new file mode 100644 index 0000000..1f8d820 --- /dev/null +++ b/seirl/swh-dedup/deduper/chunkers.py @@ -0,0 +1,68 @@ +import borg.chunker +import io +import math +import rabin +import zlib + +from hashlib import sha1 + + +def buzhash_chunk(content, params): + args = { + 'seed': params.get('seed', 0), + 'chunk_min_exp': int(math.log2(params.get('min_block_size'))), + 'chunk_max_exp': int(math.log2(params.get('max_block_size'))), + 'hash_mask_bits': int(math.log2(params.get('average_block_size'))), + 'hash_window_size': params.get('window_size') + } + + chunker = borg.chunker.Chunker(**args) + + buf = bytearray() + for data in content: + buf.extend(data) + + pos = 0 + for chunk in chunker.chunkify(io.BytesIO(buf)): + yield pos, len(chunk) + pos += len(chunk) + + +def rabin_chunk(content, params): + if 'prime' in params: + rabin.set_prime(params['prime']) + if 'window_size' in params: + rabin.set_window_size(params['window_size']) + if 'min_block_size' in params: + rabin.set_min_block_size(params['min_block_size']) + if 'average_block_size' in params: + rabin.set_average_block_size(params['average_block_size']) + if 'max_block_size' in params: + rabin.set_max_block_size(params['max_block_size']) + + r = rabin.Rabin() + buf = bytearray() + for data in content: + buf.extend(data) # TODO avoid loading the entire content in memory + r.update(data) + + if buf: # r.fingerprints() invoked on empty objects segfaults :-( + for position, length, _fpr in r.fingerprints(): + yield position, length + + r.clear() + + +ALGOS = { + 'rabin': rabin_chunk, + 'buzhash': buzhash_chunk +} + + +def chunk(algo, params, content): + f = ALGOS[algo] + for position, length in f(content, params): + chunk = content[position:(position+length)] + chunk_id = sha1(chunk).digest() + compressed_size = len(zlib.compress(chunk)) + yield chunk_id, position, length, compressed_size diff --git a/seirl/swh-dedup/deduper/deduper.py b/seirl/swh-dedup/deduper/deduper.py new file mode 100755 index 0000000..fa34753 --- /dev/null +++ b/seirl/swh-dedup/deduper/deduper.py @@ -0,0 +1,86 @@ +#!/usr/bin/python3 + +"""compute Rabin fingerprints of Software Heritage content objects + +Read a list of Software Heritage content object IDs on standard input, fetch +each of them from a (local) object storage and apply Rabin fingerprinting to +its content. Store in a (postgres) DB the mapping between content objects and +(Rabin-delimited) chunks. + +""" + +import magic +import psycopg2 +import zlib + +from psycopg2.extras import execute_values, RealDictCursor + +from swh.objstorage import PathSlicingObjStorage as ObjStorage +from deduper.chunkers import chunk + +OBJS_ROOT = '/srv/softwareheritage/objects' +OBJS_SLICING = '0:2/2:4' +DB_SERVICE = 'swh-dedup' # postgres service name + + +class Deduper: + def __init__(self): + self.db_conn = psycopg2.connect('service=%s' % DB_SERVICE) + self.obj_storage = ObjStorage(OBJS_ROOT, OBJS_SLICING) + + def dedup(self, content_id): + content = self.obj_storage.get(content_id) + self._insert_content(content_id, content) + + # get list of methods not yet sweeped + with self.db_conn.cursor(cursor_factory=RealDictCursor) as c: + c.execute("""SELECT id, algo, min_block_size, average_block_size, + max_block_size, window_size + FROM chunking_method + LEFT JOIN chunked_content + ON method_id = chunking_method.id + WHERE content_id = %s AND method_id IS NULL""", + (content_id,)) + methods = c.fetchall() + + for method in methods: + method_id = method['id'] + algo = method['algo'] + params = { + 'min_block_size': method['min_block_size'], + 'average_block_size': method['average_block_size'], + 'max_block_size': method['max_block_size'], + 'window_size': method['window_size'], + } + chunks = list(chunk(algo, params, content)) + self._insert_chunks(content_id, method_id, chunks) + + def _insert_content(self, content_id, content): + size = len(content) + compressed_size = len(zlib.compress(content)) + ftype = magic.from_buffer(content) + + with self.db_conn.cursor() as cur: + cur.execute("""INSERT INTO content + (id, length, compressed_length, file_type) + VALUES (%s, %s, %s, %s) + ON CONFLICT DO NOTHING""", + (content_id, size, compressed_size, ftype)) + + def _insert_chunks(self, content_id, method_id, chunks): + chunk_values = [] + chunked_content_values = [] + for (chunk_id, position, length, compressed_length) in chunks: + chunk_values.append((chunk_id, length, compressed_length)) + chunked_content_values.append((content_id, chunk_id, method_id, + position)) + with self.db_conn.cursor() as cur: + execute_values(cur, """INSERT INTO chunk + (id, length, compressed_length) + VALUES %s + ON CONFLICT DO NOTHING""", + chunk_values) + execute_values(cur, """INSERT INTO chunked_content + (content_id, chunk_id, method_id, position) + VALUES %s""", + chunked_content_values) diff --git a/seirl/swh-dedup/packages.txt b/seirl/swh-dedup/packages.txt new file mode 100644 index 0000000..0080156 --- /dev/null +++ b/seirl/swh-dedup/packages.txt @@ -0,0 +1,3 @@ +cython +python3-dev +libacl1-dev diff --git a/seirl/swh-dedup/requirements.txt b/seirl/swh-dedup/requirements.txt new file mode 100644 index 0000000..d33f841 --- /dev/null +++ b/seirl/swh-dedup/requirements.txt @@ -0,0 +1,3 @@ +borgbackup +pyrabin +python-magic diff --git a/seirl/swh-dedup/run.sh b/seirl/swh-dedup/run.sh new file mode 100755 index 0000000..a5210de --- /dev/null +++ b/seirl/swh-dedup/run.sh @@ -0,0 +1,14 @@ +#!/bin/bash + +db_name=swh-dedup2 +db_service=$db_name + +sudo -u postgres dropdb -p 5433 $db_name +sudo -u postgres createdb -p 5433 -O swhdev $db_name +psql service=$db_service -f swh-dedup-blocks.sql + +time \ +find /srv/softwareheritage/objects -type f \ + | sort \ + | cut -f 7 -d/ \ + | ./swh-dedup-blocks.py diff --git a/seirl/swh-dedup/swh-dedup-blocks.sql b/seirl/swh-dedup/swh-dedup-blocks.sql new file mode 100644 index 0000000..d6a6641 --- /dev/null +++ b/seirl/swh-dedup/swh-dedup-blocks.sql @@ -0,0 +1,36 @@ +CREATE DOMAIN sha1 AS bytea CHECK (length(value) = 20); + +CREATE TABLE content ( + id sha1 PRIMARY KEY, -- SHA1 checksum + length integer, + compressed_length integer + file_type text, +); + +CREATE TABLE chunk ( + id sha1 PRIMARY KEY, -- SHA1 checksum + length integer, + compressed_length integer +); + +CREATE TYPE chunking_algo as enum ('rabin', 'buzhash'); + +CREATE TABLE chunking_method ( + id serial PRIMARY KEY, + algo chunking_algo, + min_block_size integer, + average_block_size integer, + max_block_size integer, + window_size integer, +); + +CREATE TABLE chunked_content ( + content_id sha1 REFERENCES content(id), + chunk_id sha1 REFERENCES chunk(id), + method_id integer REFERENCES chunking_method(id), + position integer +); + +CREATE INDEX ON chunked_content (content_id); +CREATE INDEX ON chunked_content (method_id); +CREATE UNIQUE INDEX ON chunked_content (content_id, chunk_id, method_id, position);