diff --git a/compact.py b/compact.py new file mode 100644 index 0000000..6fdf45c --- /dev/null +++ b/compact.py @@ -0,0 +1,252 @@ +# import aiohttp +# import asyncio +import io +import os +import psycopg2 +from configparser import ConfigParser + +# from isochrone import IsochroneGraph +from iterator import RevisionIterator + +# from swh.core.api import RemoteException +from swh.model.identifiers import ( + # identifier_to_bytes, + identifier_to_str +) +# from swh.storage.api.client import RemoteStorage +# from swh.storage.backfill import fetch +from swh.storage.db import Db + + +def config(filename, section): + # create a parser + parser = ConfigParser() + # read config file + parser.read(filename) + + # get section, default to postgresql + db = {} + if parser.has_section(section): + params = parser.items(section) + for param in params: + db[param[0]] = param[1] + else: + raise Exception('Section {0} not found in the {1} file'.format(section, filename)) + + return db + + +def connect(filename, section): + """ Connect to the PostgreSQL database server """ + conn = None + + try: + # read connection parameters + params = config(filename, section) + + # connect to the PostgreSQL server + print('Connecting to the PostgreSQL database...') + conn = psycopg2.connect(**params) + + except (Exception, psycopg2.DatabaseError) as error: + print(error) + + return conn + + +def create_tables(conn, filename='compact.sql'): + with io.open(filename) as file: + cur = conn.cursor() + cur.execute(file.read()) + cur.close() + conn.commit() + + +def make_record(elem): + return {'type' : elem[1], 'id' : elem[2], 'path' : elem[3].decode('utf-8')} + + +# def walk_directory(cursor, storage, revision, directory, prefix='./', ingraph=True): +# for entry in storage.directory_walk_one(directory): +# record = make_record(entry) + +# if record['type'] == 'dir': +# if ingraph: +# cursor.execute('SELECT date FROM directory WHERE id=%s', (record['id'],)) + +# row = cursor.fetchone() +# if row is None: +# # This directory belongs to the isochrone graph of the +# # revision. Add directory with the current revision's +# # timestamp as date, and walk recursively looking for new +# # content. +# cursor.execute('INSERT INTO directory VALUES (%s,%s)', +# (record['id'], revision['date'])) +# path = os.path.join(prefix, record['path']) +# walk_directory(cursor, storage, revision, record['id'], path) + +# elif row[0] > revision['date']: +# # This directory belongs to the isochrone graph of the +# # revision. Update its date to match the current revision's +# # timestamp. +# cursor.execute('UPDATE directory SET date=%s WHERE id=%s', +# (revision['date'], record['id'])) +# # TODO: update entries from 'directory_in_rev' pointing to +# # this directory to now point to its children? + +# else: +# # This directory is just beyond the isochrone graph +# # frontier. Add an entry to the 'directory_in_rev' relation +# # with the path relative to 'prefix', and continue to walk +# # recursively looking only for blobs (ie. 'ingraph=False'). +# path = os.path.join(prefix, record['path']) +# cursor.execute('INSERT INTO directory_in_rev VALUES (%s,%s,%s)', +# (record['id'], revision['id'], path)) +# # From now on prefix is relative to current directory +# walk_directory(cursor, storage, revision, record['id'], ingraph=False) + +# else: +# # This directory is completely outside the isochrone graph (far +# # from the frontier). We are just looking for blobs here. +# path = os.path.join(prefix, record['path']) +# walk_directory(cursor, storage, revision, record['id'], path, ingraph=False) + +# elif record['type'] == 'file': +# cursor.execute('SELECT date FROM content WHERE id=%s', (record['id'],)) + +# row = cursor.fetchone() +# if row is None: +# # This blob was never seen before. Add blob with the current +# # revision's timestamp as date, and set a record for +# # 'content_early_in_rev' with the 'path = prefix + blob_name'. +# pass + +# elif row[0] > revision['date']: +# # This is an earlier occurrance of an already seen blob. Update +# # its date to match the current revision's timestamp. +# # TODO: update entries from 'content_early_in_rev' with current +# # path, and move previous entry to 'content_in_rev' with +# # its path now relative to the parent directory in the +# # isochrone graph frontier? +# pass + +# else: +# # This blob was seen before but this occurrence is older. Add +# # an entry to the 'content_in_rev' relation with the path +# # relative to the parent directory in the isochrone graph +# # frontier. +# pass + + +def walk_directory(cursor, storage, revision, directory, name='./', ingraph=True): + if ingraph: + cursor.execute('SELECT date FROM directory WHERE id=%s', (directory,)) + + row = cursor.fetchone() + if row is None: + # This directory belongs to the isochrone graph of the + # revision. Add directory with the current revision's + # timestamp as date, and walk recursively looking for new + # content. + # cursor.execute('INSERT INTO directory VALUES (%s,%s)', (directory, revision['date'])) + + for entry in storage.directory_walk_one(directory): + record = make_record(entry) + path = os.path.join(name, record['path']) + + if record['type'] == 'dir': + walk_directory(cursor, storage, revision, record['id'], name=path) + + elif record['type'] == 'file': + process_file(cursor, storage, revision, record['id'], name=path) + + elif row[0] > revision['date']: + # This directory belongs to the isochrone graph of the + # revision. Update its date to match the current revision's + # timestamp. + # cursor.execute('UPDATE directory SET date=%s WHERE id=%s', (revision['date'], directory)) + pass + # TODO: update entries from 'directory_in_rev' pointing to + # this directory to now point to its children? + + else: + # This directory is just beyond the isochrone graph + # frontier. Add an entry to the 'directory_in_rev' relation + # with the path relative to 'prefix', and continue to walk + # recursively looking only for blobs (ie. 'ingraph=False'). + # cursor.execute('INSERT INTO directory_in_rev VALUES (%s,%s,%s)', (directory, revision['id'], name)) + + for entry in storage.directory_walk_one(directory): + record = make_record(entry) + + # From now on prefix is relative to current directory + path = os.path.join('.', record['path']) + + if record['type'] == 'dir': + walk_directory(cursor, storage, revision, record['id'], name=path, ingraph=False) + + elif record['type'] == 'file': + process_file(cursor, storage, revision, record['id'], name=path, ingraph=False) + + else: + # This directory is completely outside the isochrone graph (far + # from the frontier). We are just looking for blobs here. + for entry in storage.directory_walk_one(directory): + record = make_record(entry) + + # From now on prefix is relative to current directory + path = os.path.join(name, record['path']) + + if record['type'] == 'dir': + walk_directory(cursor, storage, revision, record['id'], name=path, ingraph=False) + + elif record['type'] == 'file': + process_file(cursor, storage, revision, record['id'], name=path, ingraph=False) + + +def process_file(cursor, storage, revision, blob, name='./', ingraph=True): + cursor.execute('SELECT date FROM content WHERE id=%s', (blob,)) + + row = cursor.fetchone() + if row is None: + # This blob was never seen before. Add blob with the current + # revision's timestamp as date, and set a record for + # 'content_early_in_rev' with the 'path = name'. + pass + + elif row[0] > revision['date']: + # This is an earlier occurrance of an already seen blob. Update + # its date to match the current revision's timestamp. + # TODO: update entries from 'content_early_in_rev' with current + # path, and move previous entry to 'content_in_rev' with + # its path now relative to the parent directory in the + # isochrone graph frontier? + pass + + else: + # This blob was seen before but this occurrence is older. Add + # an entry to the 'content_in_rev' relation with the path + # relative to the parent directory in the isochrone graph + # frontier. + pass + + +if __name__ == "__main__": + archive = connect('database.conf', 'archive') + compact = connect('database.conf', 'compact') + + create_tables(compact) + + # This call changes the way bytes are codified in the connection + storage = Db(archive) + cursor = compact.cursor() + revisions = RevisionIterator(archive, limit=1000) + for idx, revision in enumerate(revisions): + print(f'{idx} - id: {identifier_to_str(revision["id"])} - date: {revision["date"]} - dir: {identifier_to_str(revision["dir"])}') + # Add current revision to the compact DB and start walking its root directory + cursor.execute('INSERT INTO revision VALUES (%s,%s)', (revision['id'], revision['date'])) + walk_directory(cursor, storage, revision, revision["dir"]) + compact.commit() + + compact.close() + archive.close() diff --git a/compact.sql b/compact.sql new file mode 100644 index 0000000..2c631df --- /dev/null +++ b/compact.sql @@ -0,0 +1,89 @@ +-- a Git object ID, i.e., a Git-style salted SHA1 checksum +drop domain if exists sha1_git cascade; +create domain sha1_git as bytea check (length(value) = 20); + +-- UNIX path (absolute, relative, individual path component, etc.) +drop domain if exists unix_path cascade; +create domain unix_path as bytea; + + +drop table if exists content; +create table content +( + id sha1_git primary key, + date timestamptz not null +); + +comment on column content.id is 'Git object sha1 hash'; +comment on column content.date is 'First seen time'; + + +drop table if exists directory; +create table directory +( + id sha1_git primary key, + date timestamptz not null +); + +comment on column directory.id is 'Git object sha1 hash'; +comment on column directory.date is 'First seen time'; + + +drop table if exists revision; +create table revision +( + id sha1_git primary key, + date timestamptz not null +); + +comment on column revision.id is 'Git object sha1 hash'; +comment on column revision.date is 'First seen time'; + + +-- TODO: consider merging this table with 'content' +drop table if exists content_early_in_rev; +create table content_early_in_rev +( + blob sha1_git not null, -- id of the content blob + rev sha1_git not null, -- id of the revision where the blob appears for the first time + path unix_path not null, -- path to the content relative to the revision root directory + primary key (blob, rev, path) + -- foreign key (blob) references content (id), + -- foreign key (rev) references revision (id) +); + +comment on column content_early_in_rev.blob is 'Content identifier'; +comment on column content_early_in_rev.rev is 'Revision identifier'; +comment on column content_early_in_rev.path is 'Path to content in revision'; + + +drop table if exists content_in_dir; +create table content_in_dir +( + blob sha1_git not null, -- id of the content blob + dir sha1_git not null, -- id of the directory contaning the blob + path unix_path not null, -- path name (TODO: relative to parent or absolute (wrt. revision))?) + primary key (blob, dir, path) + -- foreign key (blob) references content (id), + -- foreign key (dir) references directory (id) +); + +comment on column content_in_dir.blob is 'Content identifier'; +comment on column content_in_dir.dir is 'Directory identifier'; +-- comment on column content_early_in_rev.path is 'Path to content in directory'; + + +drop table if exists directory_in_rev; +create table directory_in_rev +( + dir sha1_git not null, -- id of the directory appearing in the revision + rev sha1_git not null, -- id of the revision containing the directory + path unix_path not null, -- path to the directory relative to the revision root directory + primary key (dir, rev, path) + -- foreign key (dir) references directory (id), + -- foreign key (rev) references revision (id) +); + +comment on column directory_in_rev.dir is 'Directory identifier'; +comment on column directory_in_rev.rev is 'Revision identifier'; +comment on column directory_in_rev.path is 'Path to directory in revision'; diff --git a/database.conf b/database.conf new file mode 100644 index 0000000..f8c7b6e --- /dev/null +++ b/database.conf @@ -0,0 +1,11 @@ +[archive] +database=softwareheritage +host=somerset.internal.softwareheritage.org +user=guest +port=5433 + +[compact] +host=localhost +database=compact +user=postgres +password=postgres diff --git a/iterator.py b/iterator.py new file mode 100644 index 0000000..c0affe5 --- /dev/null +++ b/iterator.py @@ -0,0 +1,70 @@ +# import psycopg2 +from swh.model.identifiers import identifier_to_str + + + +# def typecast_bytea(value, cur): +# if value is not None: +# data = psycopg2.BINARY(value, cur) +# return data.tobytes() + + +class RevisionIterator: + """Iterator over revisions present in the given database.""" + + # def adapt_conn(self, conn): + # """Makes psycopg2 use 'bytes' to decode bytea instead of + # 'memoryview', for this connection.""" + # t_bytes = psycopg2.extensions.new_type((17,), "bytea", typecast_bytea) + # psycopg2.extensions.register_type(t_bytes, conn) + + # t_bytes_array = psycopg2.extensions.new_array_type((1001,), "bytea[]", t_bytes) + # psycopg2.extensions.register_type(t_bytes_array, conn) + + def __init__(self, conn, limit=None, chunksize=100): + # self.adapt_conn(conn) + self.cur = conn.cursor() + self.chunksize = chunksize + self.limit = limit + self.records = [] + self.aliases = ['id', 'date', 'dir'] + + def __del__(self): + self.cur.close() + + def __iter__(self): + self.records.clear() + if self.limit is None: + self.cur.execute('''SELECT id, date, directory + FROM revision''') + # self.cur.execute('''SELECT id, date, directory + # FROM revision ORDER BY date''') + else: + self.cur.execute('''SELECT id, date, directory + FROM revision + LIMIT %s''', (self.limit,)) + # self.cur.execute('''SELECT id, date, directory + # FROM revision ORDER BY date + # LIMIT %s''', (self.limit,)) + for row in self.cur.fetchmany(self.chunksize): + record = dict(zip(self.aliases, row)) + self.records.append(record) + return self + + def __next__(self): + if not self.records: + self.records.clear() + for row in self.cur.fetchmany(self.chunksize): + record = dict(zip(self.aliases, row)) + self.records.append(record) + # self.records.append(( + # identifier_to_str(rev[0]), + # rev[1], + # identifier_to_str(rev[2]) + # )) + + if self.records: + revision, *self.records = self.records + return revision + else: + raise StopIteration