diff --git a/seirl/swh-dedup/deduper/__main__.py b/seirl/swh-dedup/deduper/__main__.py index 29d7d94..7df9647 100644 --- a/seirl/swh-dedup/deduper/__main__.py +++ b/seirl/swh-dedup/deduper/__main__.py @@ -1,19 +1,30 @@ +#!/usr/bin/python3 + +"""compute Rabin fingerprints of Software Heritage content objects + +Read a list of Software Heritage content object IDs on standard input, fetch +each of them from a (local) object storage and apply Rabin fingerprinting to +its content. Store in a (postgres) DB the mapping between content objects and +(Rabin-delimited) chunks. + +""" + import sys from swh.model.hashutil import hash_to_bytes from deduper.deduper import Deduper -OBJS_ROOT = '/home/seirl/swh-storage' -OBJS_SLICING = '0:2/2:4/4:6' +OBJS_ROOT = '/home/seirl/content-samples' +OBJS_SLICING = '0:2/2:4' DB_SERVICE = 'swh-dedup' # postgres service name def main(): deduper = Deduper(DB_SERVICE, OBJS_ROOT, OBJS_SLICING) for line in sys.stdin: # schedule tasks content_id = line.rstrip() deduper.dedup(hash_to_bytes(content_id)) if __name__ == '__main__': main() diff --git a/seirl/swh-dedup/deduper/deduper.py b/seirl/swh-dedup/deduper/deduper.py index 59274d1..342e79b 100755 --- a/seirl/swh-dedup/deduper/deduper.py +++ b/seirl/swh-dedup/deduper/deduper.py @@ -1,87 +1,105 @@ -#!/usr/bin/python3 - -"""compute Rabin fingerprints of Software Heritage content objects - -Read a list of Software Heritage content object IDs on standard input, fetch -each of them from a (local) object storage and apply Rabin fingerprinting to -its content. Store in a (postgres) DB the mapping between content objects and -(Rabin-delimited) chunks. - -""" - import magic import psycopg2 +import random +import time import zlib from psycopg2.extras import execute_values, RealDictCursor from swh.objstorage import PathSlicingObjStorage as ObjStorage from deduper.chunkers import chunk - class Deduper: def __init__(self, db_service, objs_root, objs_slicing): self.db_conn = psycopg2.connect('service=%s' % db_service) self.db_conn.autocommit = True self.obj_storage = ObjStorage(objs_root, objs_slicing) def dedup(self, content_id): content = self.obj_storage.get(content_id) self._insert_content(content_id, content) # get list of methods not yet sweeped with self.db_conn.cursor(cursor_factory=RealDictCursor) as c: c.execute("""SELECT id, algo, min_block_size, average_block_size, max_block_size, window_size FROM chunking_method WHERE NOT EXISTS ( SELECT 1 FROM chunked_content WHERE content_id = %s )""", (content_id,)) - methods = c.fetchall() + methods = list(c.fetchall()) + + random.shuffle(methods) + _ = max(content) # Force read and cache content # noqa chunked_content_queue = [] for method in methods: method_id = method['id'] algo = method['algo'] params = { 'min_block_size': method['min_block_size'], 'average_block_size': method['average_block_size'], 'max_block_size': method['max_block_size'], 'window_size': method['window_size'], } + t0 = int(time.monotonic() * 1000000) chunks = list(chunk(algo, params, content)) - chunked_content_queue.append((content_id, method_id, chunks)) + t = int(time.monotonic() * 1000000) + duration = t - t0 + + chunked_content_queue.append( + (content_id, method_id, duration, chunks)) self._insert_chunks(chunked_content_queue) def _insert_content(self, content_id, content): size = len(content) compressed_size = len(zlib.compress(content)) - ftype = magic.from_buffer(content) + try: + ftype = magic.from_buffer(content) + except: + ftype = '' with self.db_conn.cursor() as cur: cur.execute("""INSERT INTO content (id, length, compressed_length, file_type) VALUES (%s, %s, %s, %s) ON CONFLICT DO NOTHING""", (content_id, size, compressed_size, ftype)) def _insert_chunks(self, chunked_content_queue): - chunk_values = [] chunked_content_values = [] - for content_id, method_id, chunks in chunked_content_queue: - for (chunk_id, position, length, compressed_length) in chunks: - chunk_values.append((chunk_id, length, compressed_length)) - chunked_content_values.append((content_id, chunk_id, method_id, - position)) + for content_id, method_id, duration, chunks in chunked_content_queue: + chunked_content_values.append((content_id, method_id, duration)) + with self.db_conn.cursor() as cur: + execute_values(cur, """INSERT INTO chunked_content + (content_id, method_id, duration_us) + VALUES %s + RETURNING id, content_id, method_id""", + chunked_content_values) + chunked_content_ids = { + (bytes(content_id), method_id): id + for id, content_id, method_id in cur.fetchall() + } + + chunk_values = [] + chunked_content_chunks_values = [] + for content_id, method_id, _, chunks in chunked_content_queue: + for (chunk_id, position, length, compressed_length) in chunks: + chunk_values.append((chunk_id, length, compressed_length)) + chunked_content_id = chunked_content_ids[ + (content_id, method_id)] + chunked_content_chunks_values.append( + (chunked_content_id, chunk_id, position)) + execute_values(cur, """INSERT INTO chunk (id, length, compressed_length) VALUES %s ON CONFLICT DO NOTHING""", chunk_values) - execute_values(cur, """INSERT INTO chunked_content - (content_id, chunk_id, method_id, position) + execute_values(cur, """INSERT INTO chunked_content_chunk + (chunked_content_id, chunk_id, position) VALUES %s""", - chunked_content_values) + chunked_content_chunks_values) diff --git a/seirl/swh-dedup/requirements.txt b/seirl/swh-dedup/requirements.txt index d33f841..4f867b6 100644 --- a/seirl/swh-dedup/requirements.txt +++ b/seirl/swh-dedup/requirements.txt @@ -1,3 +1,4 @@ borgbackup pyrabin python-magic +psycopg2-binary diff --git a/seirl/swh-dedup/run.sh b/seirl/swh-dedup/run.sh index b6d93e3..845bdd3 100755 --- a/seirl/swh-dedup/run.sh +++ b/seirl/swh-dedup/run.sh @@ -1,15 +1,13 @@ #!/bin/bash -db_name=swh-dedup -db_service=$db_name +#db_name=swh-dedup +#db_service=$db_name +# +#dropdb $db_name +#createdb $db_name +#psql service=$db_service -f swh-dedup-blocks.sql +#psql service=$db_service -f swh-dedup-blocks-methods.sql -dropdb $db_name -createdb $db_name -psql service=$db_service -f swh-dedup-blocks.sql -psql service=$db_service -f swh-dedup-blocks-methods.sql - -time \ -find ~/swh-storage -type f -printf "%f\n" \ - | head -n200 \ - | sort \ - | python -m deduper +time xzcat ~/content-samples/content-sample.0.1pct.txt.xz \ + | parallel --spreadstdin --pipe -L 10000 -P 24 python -m deduper + #| head -n1000000 \ diff --git a/seirl/swh-dedup/swh-dedup-blocks-methods.sql b/seirl/swh-dedup/swh-dedup-blocks-methods.sql index 752ea52..286566f 100644 --- a/seirl/swh-dedup/swh-dedup-blocks-methods.sql +++ b/seirl/swh-dedup/swh-dedup-blocks-methods.sql @@ -1,28 +1,28 @@ INSERT INTO chunking_method (algo, min_block_size, average_block_size, max_block_size, window_size) VALUES - ('rabin', 2<<11, 2<<14, 2<<17, 48*1024), - ('rabin', 2<<12, 2<<14, 2<<16, 48*1024), - ('rabin', 2<<13, 2<<14, 2<<15, 32*1024), + ('buzhash', 2<<11, 2<<14, 2<<17, 48*1024), + ('buzhash', 2<<12, 2<<14, 2<<16, 48*1024), + ('buzhash', 2<<13, 2<<14, 2<<15, 32*1024), - ('rabin', 2<<11, 2<<13, 2<<15, 16*1024), - ('rabin', 2<<12, 2<<13, 2<<14, 16*1024), + ('buzhash', 2<<11, 2<<13, 2<<15, 16*1024), + ('buzhash', 2<<12, 2<<13, 2<<14, 16*1024), - ('rabin', 2<<10, 2<<12, 2<<14, 8*1024), - ('rabin', 2<<11, 2<<12, 2<<13, 8*1024), + ('buzhash', 2<<10, 2<<12, 2<<14, 8*1024), + ('buzhash', 2<<11, 2<<12, 2<<13, 8*1024), - ('rabin', 2<<9, 2<<11, 2<<13, 4*1024), - ('rabin', 2<<10, 2<<11, 2<<12, 4*1024), + ('buzhash', 2<<9, 2<<11, 2<<13, 4*1024), + ('buzhash', 2<<10, 2<<11, 2<<12, 4*1024), - ('rabin', 2<<8, 2<<10, 2<<12, 2*1024), - ('rabin', 2<<9, 2<<10, 2<<11, 2*1024), + ('buzhash', 2<<8, 2<<10, 2<<12, 2*1024), + ('buzhash', 2<<9, 2<<10, 2<<11, 2*1024), - ('rabin', 2<<7, 2<<9, 2<<11, 1*1024), - ('rabin', 2<<8, 2<<9, 2<<10, 1*1024), + ('buzhash', 2<<7, 2<<9, 2<<11, 1*1024), + ('buzhash', 2<<8, 2<<9, 2<<10, 1*1024), - ('rabin', 2<<6, 2<<8, 2<<10, 512), - ('rabin', 2<<7, 2<<8, 2<<9, 512) + ('buzhash', 2<<6, 2<<8, 2<<10, 512), + ('buzhash', 2<<7, 2<<8, 2<<9, 512) ON CONFLICT DO NOTHING; -INSERT INTO chunking_method (algo, min_block_size, average_block_size, max_block_size, window_size) - SELECT 'buzhash', min_block_size, average_block_size, max_block_size, window_size - FROM chunking_method WHERE algo = 'rabin' -ON CONFLICT DO NOTHING; +--INSERT INTO chunking_method (algo, min_block_size, average_block_size, max_block_size, window_size) +-- SELECT 'rabin', min_block_size, average_block_size, max_block_size, window_size +-- FROM chunking_method WHERE algo = 'buzhash' +--ON CONFLICT DO NOTHING; diff --git a/seirl/swh-dedup/swh-dedup-blocks.sql b/seirl/swh-dedup/swh-dedup-blocks.sql index 3ecb00e..1146311 100644 --- a/seirl/swh-dedup/swh-dedup-blocks.sql +++ b/seirl/swh-dedup/swh-dedup-blocks.sql @@ -1,38 +1,48 @@ CREATE DOMAIN sha1 AS bytea CHECK (length(value) = 20); CREATE TABLE content ( id sha1 PRIMARY KEY, -- SHA1 checksum length integer, compressed_length integer, file_type text ); CREATE TABLE chunk ( id sha1 PRIMARY KEY, -- SHA1 checksum length integer, compressed_length integer ); CREATE TYPE chunking_algo as enum ('rabin', 'buzhash'); CREATE TABLE chunking_method ( id serial PRIMARY KEY, algo chunking_algo, min_block_size integer, average_block_size integer, max_block_size integer, window_size integer ); CREATE TABLE chunked_content ( - content_id sha1 REFERENCES content(id), - chunk_id sha1 REFERENCES chunk(id), - method_id integer REFERENCES chunking_method(id), - position integer + id serial PRIMARY KEY, + content_id sha1 REFERENCES content(id), + method_id integer REFERENCES chunking_method(id), + duration_us integer +); + +CREATE TABLE chunked_content_chunk ( + chunked_content_id integer REFERENCES chunked_content, + chunk_id sha1 REFERENCES chunk(id), + position integer ); CREATE UNIQUE INDEX ON chunking_method (algo, min_block_size, average_block_size, max_block_size, window_size); CREATE INDEX ON chunked_content (content_id); CREATE INDEX ON chunked_content (method_id); -CREATE UNIQUE INDEX ON chunked_content (content_id, chunk_id, method_id, position); +CREATE UNIQUE INDEX ON chunked_content (content_id, method_id); + +CREATE INDEX ON chunked_content_chunk (chunked_content_id); +CREATE INDEX ON chunked_content_chunk (chunk_id); +CREATE UNIQUE INDEX ON chunked_content_chunk (chunked_content_id, chunk_id, position);