diff --git a/compact.py b/compact.py index b90cdb0..dae855a 100644 --- a/compact.py +++ b/compact.py @@ -1,195 +1,176 @@ # import aiohttp # import asyncio import io import os import psycopg2 from configparser import ConfigParser # from isochrone import IsochroneGraph from iterator import RevisionIterator # from swh.core.api import RemoteException from swh.model.identifiers import ( # identifier_to_bytes, identifier_to_str ) # from swh.storage.api.client import RemoteStorage # from swh.storage.backfill import fetch from swh.storage.db import Db def config(filename, section): # create a parser parser = ConfigParser() # read config file parser.read(filename) # get section, default to postgresql db = {} if parser.has_section(section): params = parser.items(section) for param in params: db[param[0]] = param[1] else: raise Exception('Section {0} not found in the {1} file'.format(section, filename)) return db def connect(filename, section): """ Connect to the PostgreSQL database server """ conn = None try: # read connection parameters params = config(filename, section) # connect to the PostgreSQL server print('Connecting to the PostgreSQL database...') conn = psycopg2.connect(**params) except (Exception, psycopg2.DatabaseError) as error: print(error) return conn def create_tables(conn, filename='compact.sql'): with io.open(filename) as file: cur = conn.cursor() cur.execute(file.read()) cur.close() conn.commit() def make_record(elem): return {'type' : elem[1], 'id' : elem[2], 'path' : elem[3].decode('utf-8')} # TODO: refactor this method to take the whole directory structure as parameter # and avoid multiple requies (using swh.storage.db.directory_walk prior to # calling the function, instead of swh.storage.db.directory_walk_one within it) def walk_directory(cursor, storage, revision, directory, relative, name='./', ingraph=True): # print("dir: ", identifier_to_str(revision['id']), revision['date'], identifier_to_str(directory), identifier_to_str(relative), name, ingraph) if ingraph: cursor.execute('SELECT date FROM directory WHERE id=%s', (directory,)) row = cursor.fetchone() - if row is None: + if row is None or row[0] > revision['date']: # This directory belongs to the isochrone graph of the revision. # Add directory with the current revision's timestamp as date, and # walk recursively looking for new content. - cursor.execute('INSERT INTO directory VALUES (%s,%s)', (directory, revision['date'])) + cursor.execute('''INSERT INTO directory VALUES (%s,%s) + ON CONFLICT (id) DO UPDATE + SET date=%s''', + (directory, revision['date'], revision['date'])) for entry in storage.directory_walk_one(directory): child = make_record(entry) path = os.path.join(name, child['path']) if child['type'] == 'dir': walk_directory(cursor, storage, revision, child['id'], relative, name=path) elif child['type'] == 'file': process_file(cursor, storage, revision, relative, child['id'], path) - elif row[0] > revision['date']: - # This directory belongs to the isochrone graph of the revision. - # Update its date to match the current revision's timestamp. - cursor.execute('UPDATE directory SET date=%s WHERE id=%s', (revision['date'], directory)) - # TODO: update entries from 'directory_in_rev' pointing to this - # directory to now point to their children? If any children - # of the old directory appears in the 'directory' table, - # their date and entries in 'directory_in_rev' should be - # updated as well!! (same for blobs!!) - else: # This directory is just beyond the isochrone graph # frontier. Add an entry to the 'directory_in_rev' relation # with the path relative to 'name', and continue to walk # recursively looking only for blobs (ie. 'ingraph=False'). cursor.execute('INSERT INTO directory_in_rev VALUES (%s,%s,%s)', (directory, revision['id'], name)) for entry in storage.directory_walk_one(directory): child = make_record(entry) # From now on path is relative to current directory (ie. relative=directory) path = os.path.join('.', child['path']) if child['type'] == 'dir': walk_directory(cursor, storage, revision, child['id'], directory, name=path, ingraph=False) elif child['type'] == 'file': process_file(cursor, storage, revision, directory, child['id'], path) else: # This directory is completely outside the isochrone graph (far # from the frontier). We are just looking for blobs here. for entry in storage.directory_walk_one(directory): child = make_record(entry) path = os.path.join(name, child['path']) if child['type'] == 'dir': walk_directory(cursor, storage, revision, child['id'], relative, name=path, ingraph=False) elif child['type'] == 'file': process_file(cursor, storage, revision, relative, child['id'], path) def process_file(cursor, storage, revision, directory, blob, name): # TODO: add logging support! # print("blob:", identifier_to_str(revision['id']), revision['date'], identifier_to_str(directory), identifier_to_str(blob), name) cursor.execute('SELECT date FROM content WHERE id=%s', (blob,)) row = cursor.fetchone() - if row is None: - # print('row = None:', row, identifier_to_str(revision['id']), revision['date'], identifier_to_str(directory), identifier_to_str(blob), name) - # This blob was never seen before. Add blob with the current revision's - # timestamp as date, and set a record for 'content_early_in_rev' with - # the 'path = name'. - cursor.execute('INSERT INTO content VALUES (%s,%s)', (blob, revision['date'])) - cursor.execute('INSERT INTO content_early_in_rev VALUES (%s,%s,%s)', (blob, revision['id'], name)) - - elif row[0] > revision['date']: - # print('row > date:', row, identifier_to_str(revision['id']), revision['date'], identifier_to_str(directory), identifier_to_str(blob), name) - # This is an earlier occurrance of an already seen blob. Update its - # date to match the current revision's timestamp. - cursor.execute('UPDATE content SET date=%s WHERE id=%s', (revision['date'], blob)) - # TODO: update entries from 'content_early_in_rev' with current path, - # and move previous entry to 'content_in_rev' with its path now - # relative to the parent directory in the isochrone graph - # frontier? - cursor.execute('SELECT path FROM content_early_in_rev WHERE blob=%s', (blob,)) - print("new blob:", revision['date'], name) - for entry in cursor.fetchall(): - print("old blob:", row[0], entry[0].tobytes().decode('utf-8')) + if row is None or row[0] > revision['date']: + # This is an earlier occurrence of the blob. Add it with the current + # revision's timestamp as date, and set a record for + # 'content_early_in_rev' with the 'path = name'. + cursor.execute('''INSERT INTO content VALUES (%s,%s,%s,%s) + ON CONFLICT (id) DO UPDATE + SET date=%s, rev=%s, path=%s''', + (blob, revision['date'], revision['id'], name, revision['date'], revision['id'], name)) else: - # print('otherwise: ', row, identifier_to_str(revision['id']), revision['date'], identifier_to_str(directory), identifier_to_str(blob), name) # This blob was seen before but this occurrence is older. Add # an entry to the 'content_in_dir' relation with the path # relative to the parent directory in the isochrone graph # frontier. - cursor.execute('INSERT INTO content_in_dir VALUES (%s,%s,%s) ON CONFLICT DO NOTHING', (blob, directory, name)) + cursor.execute('''INSERT INTO content_in_dir VALUES (%s,%s,%s) + ON CONFLICT DO NOTHING''', + (blob, directory, name)) # WARNING: There seem to be duplicated directories within the same # revision. Hence, their blobs may appear many times with the # same directory ID and 'relative' path. That's why we need # the 'ON CONFLICT DO NOTHING' statement. if __name__ == "__main__": archive = connect('database.conf', 'archive') compact = connect('database.conf', 'compact') create_tables(compact) # This call changes the way bytes are codified in the connection storage = Db(archive) cursor = compact.cursor() revisions = RevisionIterator(archive, limit=1000) for idx, revision in enumerate(revisions): print(f'{idx} - id: {identifier_to_str(revision["id"])} - date: {revision["date"]} - dir: {identifier_to_str(revision["dir"])}') # Add current revision to the compact DB and start walking its root directory cursor.execute('INSERT INTO revision VALUES (%s,%s)', (revision['id'], revision['date'])) walk_directory(cursor, storage, revision, revision["dir"], revision["dir"]) compact.commit() compact.close() archive.close() diff --git a/compact.sql b/compact.sql index 2c631df..68e5212 100644 --- a/compact.sql +++ b/compact.sql @@ -1,89 +1,93 @@ -- a Git object ID, i.e., a Git-style salted SHA1 checksum drop domain if exists sha1_git cascade; create domain sha1_git as bytea check (length(value) = 20); -- UNIX path (absolute, relative, individual path component, etc.) drop domain if exists unix_path cascade; create domain unix_path as bytea; drop table if exists content; create table content ( id sha1_git primary key, - date timestamptz not null + date timestamptz not null, + rev sha1_git not null, -- id of the revision where the blob appears for the first time + path unix_path not null -- path to the content relative to the revision root directory ); comment on column content.id is 'Git object sha1 hash'; comment on column content.date is 'First seen time'; +comment on column content.rev is 'Revision identifier'; +comment on column content.path is 'Path to content in revision'; drop table if exists directory; create table directory ( id sha1_git primary key, date timestamptz not null ); comment on column directory.id is 'Git object sha1 hash'; comment on column directory.date is 'First seen time'; drop table if exists revision; create table revision ( id sha1_git primary key, date timestamptz not null ); comment on column revision.id is 'Git object sha1 hash'; comment on column revision.date is 'First seen time'; -- TODO: consider merging this table with 'content' -drop table if exists content_early_in_rev; -create table content_early_in_rev -( - blob sha1_git not null, -- id of the content blob - rev sha1_git not null, -- id of the revision where the blob appears for the first time - path unix_path not null, -- path to the content relative to the revision root directory - primary key (blob, rev, path) - -- foreign key (blob) references content (id), - -- foreign key (rev) references revision (id) -); - -comment on column content_early_in_rev.blob is 'Content identifier'; -comment on column content_early_in_rev.rev is 'Revision identifier'; -comment on column content_early_in_rev.path is 'Path to content in revision'; +-- drop table if exists content_early_in_rev; +-- create table content_early_in_rev +-- ( +-- blob sha1_git not null, -- id of the content blob +-- rev sha1_git not null, -- id of the revision where the blob appears for the first time +-- path unix_path not null, -- path to the content relative to the revision root directory +-- primary key (blob, rev, path) +-- -- foreign key (blob) references content (id), +-- -- foreign key (rev) references revision (id) +-- ); + +-- comment on column content_early_in_rev.blob is 'Content identifier'; +-- comment on column content_early_in_rev.rev is 'Revision identifier'; +-- comment on column content_early_in_rev.path is 'Path to content in revision'; drop table if exists content_in_dir; create table content_in_dir ( blob sha1_git not null, -- id of the content blob dir sha1_git not null, -- id of the directory contaning the blob path unix_path not null, -- path name (TODO: relative to parent or absolute (wrt. revision))?) primary key (blob, dir, path) -- foreign key (blob) references content (id), -- foreign key (dir) references directory (id) ); comment on column content_in_dir.blob is 'Content identifier'; comment on column content_in_dir.dir is 'Directory identifier'; -- comment on column content_early_in_rev.path is 'Path to content in directory'; drop table if exists directory_in_rev; create table directory_in_rev ( dir sha1_git not null, -- id of the directory appearing in the revision rev sha1_git not null, -- id of the revision containing the directory path unix_path not null, -- path to the directory relative to the revision root directory primary key (dir, rev, path) -- foreign key (dir) references directory (id), -- foreign key (rev) references revision (id) ); comment on column directory_in_rev.dir is 'Directory identifier'; comment on column directory_in_rev.rev is 'Revision identifier'; comment on column directory_in_rev.path is 'Path to directory in revision'; diff --git a/iterator.py b/iterator.py index c0affe5..d14fe90 100644 --- a/iterator.py +++ b/iterator.py @@ -1,70 +1,70 @@ # import psycopg2 from swh.model.identifiers import identifier_to_str - # def typecast_bytea(value, cur): # if value is not None: # data = psycopg2.BINARY(value, cur) # return data.tobytes() class RevisionIterator: """Iterator over revisions present in the given database.""" # def adapt_conn(self, conn): # """Makes psycopg2 use 'bytes' to decode bytea instead of # 'memoryview', for this connection.""" # t_bytes = psycopg2.extensions.new_type((17,), "bytea", typecast_bytea) # psycopg2.extensions.register_type(t_bytes, conn) # t_bytes_array = psycopg2.extensions.new_array_type((1001,), "bytea[]", t_bytes) # psycopg2.extensions.register_type(t_bytes_array, conn) def __init__(self, conn, limit=None, chunksize=100): # self.adapt_conn(conn) self.cur = conn.cursor() self.chunksize = chunksize self.limit = limit self.records = [] self.aliases = ['id', 'date', 'dir'] def __del__(self): self.cur.close() def __iter__(self): self.records.clear() if self.limit is None: - self.cur.execute('''SELECT id, date, directory + self.cur.execute('''SELECT id, date, committer_date, directory FROM revision''') - # self.cur.execute('''SELECT id, date, directory - # FROM revision ORDER BY date''') else: - self.cur.execute('''SELECT id, date, directory + self.cur.execute('''SELECT id, date, committer_date, directory FROM revision LIMIT %s''', (self.limit,)) - # self.cur.execute('''SELECT id, date, directory - # FROM revision ORDER BY date - # LIMIT %s''', (self.limit,)) for row in self.cur.fetchmany(self.chunksize): - record = dict(zip(self.aliases, row)) - self.records.append(record) + record = self.make_record(row) + if record is not None: + self.records.append(record) return self def __next__(self): if not self.records: self.records.clear() for row in self.cur.fetchmany(self.chunksize): - record = dict(zip(self.aliases, row)) - self.records.append(record) - # self.records.append(( - # identifier_to_str(rev[0]), - # rev[1], - # identifier_to_str(rev[2]) - # )) + record = self.make_record(row) + if record is not None: + self.records.append(record) if self.records: revision, *self.records = self.records return revision else: raise StopIteration + + def make_record(self, row): + # Only revision with author or commiter date are considered + if row[1] is not None: + # If the revision has author date, it takes precedence + return dict(zip(self.aliases, (row[0], row[1], row[3]))) + elif row[2] is not None: + # If not, we use the commiter date + return dict(zip(self.aliases, (row[0], row[2], row[3])))