Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/compact.py b/compact.py
index b90cdb0..dae855a 100644
--- a/compact.py
+++ b/compact.py
@@ -1,195 +1,176 @@
# import aiohttp
# import asyncio
import io
import os
import psycopg2
from configparser import ConfigParser
# from isochrone import IsochroneGraph
from iterator import RevisionIterator
# from swh.core.api import RemoteException
from swh.model.identifiers import (
# identifier_to_bytes,
identifier_to_str
)
# from swh.storage.api.client import RemoteStorage
# from swh.storage.backfill import fetch
from swh.storage.db import Db
def config(filename, section):
# create a parser
parser = ConfigParser()
# read config file
parser.read(filename)
# get section, default to postgresql
db = {}
if parser.has_section(section):
params = parser.items(section)
for param in params:
db[param[0]] = param[1]
else:
raise Exception('Section {0} not found in the {1} file'.format(section, filename))
return db
def connect(filename, section):
""" Connect to the PostgreSQL database server """
conn = None
try:
# read connection parameters
params = config(filename, section)
# connect to the PostgreSQL server
print('Connecting to the PostgreSQL database...')
conn = psycopg2.connect(**params)
except (Exception, psycopg2.DatabaseError) as error:
print(error)
return conn
def create_tables(conn, filename='compact.sql'):
with io.open(filename) as file:
cur = conn.cursor()
cur.execute(file.read())
cur.close()
conn.commit()
def make_record(elem):
return {'type' : elem[1], 'id' : elem[2], 'path' : elem[3].decode('utf-8')}
# TODO: refactor this method to take the whole directory structure as parameter
# and avoid multiple requies (using swh.storage.db.directory_walk prior to
# calling the function, instead of swh.storage.db.directory_walk_one within it)
def walk_directory(cursor, storage, revision, directory, relative, name='./', ingraph=True):
# print("dir: ", identifier_to_str(revision['id']), revision['date'], identifier_to_str(directory), identifier_to_str(relative), name, ingraph)
if ingraph:
cursor.execute('SELECT date FROM directory WHERE id=%s', (directory,))
row = cursor.fetchone()
- if row is None:
+ if row is None or row[0] > revision['date']:
# This directory belongs to the isochrone graph of the revision.
# Add directory with the current revision's timestamp as date, and
# walk recursively looking for new content.
- cursor.execute('INSERT INTO directory VALUES (%s,%s)', (directory, revision['date']))
+ cursor.execute('''INSERT INTO directory VALUES (%s,%s)
+ ON CONFLICT (id) DO UPDATE
+ SET date=%s''',
+ (directory, revision['date'], revision['date']))
for entry in storage.directory_walk_one(directory):
child = make_record(entry)
path = os.path.join(name, child['path'])
if child['type'] == 'dir':
walk_directory(cursor, storage, revision, child['id'], relative, name=path)
elif child['type'] == 'file':
process_file(cursor, storage, revision, relative, child['id'], path)
- elif row[0] > revision['date']:
- # This directory belongs to the isochrone graph of the revision.
- # Update its date to match the current revision's timestamp.
- cursor.execute('UPDATE directory SET date=%s WHERE id=%s', (revision['date'], directory))
- # TODO: update entries from 'directory_in_rev' pointing to this
- # directory to now point to their children? If any children
- # of the old directory appears in the 'directory' table,
- # their date and entries in 'directory_in_rev' should be
- # updated as well!! (same for blobs!!)
-
else:
# This directory is just beyond the isochrone graph
# frontier. Add an entry to the 'directory_in_rev' relation
# with the path relative to 'name', and continue to walk
# recursively looking only for blobs (ie. 'ingraph=False').
cursor.execute('INSERT INTO directory_in_rev VALUES (%s,%s,%s)', (directory, revision['id'], name))
for entry in storage.directory_walk_one(directory):
child = make_record(entry)
# From now on path is relative to current directory (ie. relative=directory)
path = os.path.join('.', child['path'])
if child['type'] == 'dir':
walk_directory(cursor, storage, revision, child['id'], directory, name=path, ingraph=False)
elif child['type'] == 'file':
process_file(cursor, storage, revision, directory, child['id'], path)
else:
# This directory is completely outside the isochrone graph (far
# from the frontier). We are just looking for blobs here.
for entry in storage.directory_walk_one(directory):
child = make_record(entry)
path = os.path.join(name, child['path'])
if child['type'] == 'dir':
walk_directory(cursor, storage, revision, child['id'], relative, name=path, ingraph=False)
elif child['type'] == 'file':
process_file(cursor, storage, revision, relative, child['id'], path)
def process_file(cursor, storage, revision, directory, blob, name):
# TODO: add logging support!
# print("blob:", identifier_to_str(revision['id']), revision['date'], identifier_to_str(directory), identifier_to_str(blob), name)
cursor.execute('SELECT date FROM content WHERE id=%s', (blob,))
row = cursor.fetchone()
- if row is None:
- # print('row = None:', row, identifier_to_str(revision['id']), revision['date'], identifier_to_str(directory), identifier_to_str(blob), name)
- # This blob was never seen before. Add blob with the current revision's
- # timestamp as date, and set a record for 'content_early_in_rev' with
- # the 'path = name'.
- cursor.execute('INSERT INTO content VALUES (%s,%s)', (blob, revision['date']))
- cursor.execute('INSERT INTO content_early_in_rev VALUES (%s,%s,%s)', (blob, revision['id'], name))
-
- elif row[0] > revision['date']:
- # print('row > date:', row, identifier_to_str(revision['id']), revision['date'], identifier_to_str(directory), identifier_to_str(blob), name)
- # This is an earlier occurrance of an already seen blob. Update its
- # date to match the current revision's timestamp.
- cursor.execute('UPDATE content SET date=%s WHERE id=%s', (revision['date'], blob))
- # TODO: update entries from 'content_early_in_rev' with current path,
- # and move previous entry to 'content_in_rev' with its path now
- # relative to the parent directory in the isochrone graph
- # frontier?
- cursor.execute('SELECT path FROM content_early_in_rev WHERE blob=%s', (blob,))
- print("new blob:", revision['date'], name)
- for entry in cursor.fetchall():
- print("old blob:", row[0], entry[0].tobytes().decode('utf-8'))
+ if row is None or row[0] > revision['date']:
+ # This is an earlier occurrence of the blob. Add it with the current
+ # revision's timestamp as date, and set a record for
+ # 'content_early_in_rev' with the 'path = name'.
+ cursor.execute('''INSERT INTO content VALUES (%s,%s,%s,%s)
+ ON CONFLICT (id) DO UPDATE
+ SET date=%s, rev=%s, path=%s''',
+ (blob, revision['date'], revision['id'], name, revision['date'], revision['id'], name))
else:
- # print('otherwise: ', row, identifier_to_str(revision['id']), revision['date'], identifier_to_str(directory), identifier_to_str(blob), name)
# This blob was seen before but this occurrence is older. Add
# an entry to the 'content_in_dir' relation with the path
# relative to the parent directory in the isochrone graph
# frontier.
- cursor.execute('INSERT INTO content_in_dir VALUES (%s,%s,%s) ON CONFLICT DO NOTHING', (blob, directory, name))
+ cursor.execute('''INSERT INTO content_in_dir VALUES (%s,%s,%s)
+ ON CONFLICT DO NOTHING''',
+ (blob, directory, name))
# WARNING: There seem to be duplicated directories within the same
# revision. Hence, their blobs may appear many times with the
# same directory ID and 'relative' path. That's why we need
# the 'ON CONFLICT DO NOTHING' statement.
if __name__ == "__main__":
archive = connect('database.conf', 'archive')
compact = connect('database.conf', 'compact')
create_tables(compact)
# This call changes the way bytes are codified in the connection
storage = Db(archive)
cursor = compact.cursor()
revisions = RevisionIterator(archive, limit=1000)
for idx, revision in enumerate(revisions):
print(f'{idx} - id: {identifier_to_str(revision["id"])} - date: {revision["date"]} - dir: {identifier_to_str(revision["dir"])}')
# Add current revision to the compact DB and start walking its root directory
cursor.execute('INSERT INTO revision VALUES (%s,%s)', (revision['id'], revision['date']))
walk_directory(cursor, storage, revision, revision["dir"], revision["dir"])
compact.commit()
compact.close()
archive.close()
diff --git a/compact.sql b/compact.sql
index 2c631df..68e5212 100644
--- a/compact.sql
+++ b/compact.sql
@@ -1,89 +1,93 @@
-- a Git object ID, i.e., a Git-style salted SHA1 checksum
drop domain if exists sha1_git cascade;
create domain sha1_git as bytea check (length(value) = 20);
-- UNIX path (absolute, relative, individual path component, etc.)
drop domain if exists unix_path cascade;
create domain unix_path as bytea;
drop table if exists content;
create table content
(
id sha1_git primary key,
- date timestamptz not null
+ date timestamptz not null,
+ rev sha1_git not null, -- id of the revision where the blob appears for the first time
+ path unix_path not null -- path to the content relative to the revision root directory
);
comment on column content.id is 'Git object sha1 hash';
comment on column content.date is 'First seen time';
+comment on column content.rev is 'Revision identifier';
+comment on column content.path is 'Path to content in revision';
drop table if exists directory;
create table directory
(
id sha1_git primary key,
date timestamptz not null
);
comment on column directory.id is 'Git object sha1 hash';
comment on column directory.date is 'First seen time';
drop table if exists revision;
create table revision
(
id sha1_git primary key,
date timestamptz not null
);
comment on column revision.id is 'Git object sha1 hash';
comment on column revision.date is 'First seen time';
-- TODO: consider merging this table with 'content'
-drop table if exists content_early_in_rev;
-create table content_early_in_rev
-(
- blob sha1_git not null, -- id of the content blob
- rev sha1_git not null, -- id of the revision where the blob appears for the first time
- path unix_path not null, -- path to the content relative to the revision root directory
- primary key (blob, rev, path)
- -- foreign key (blob) references content (id),
- -- foreign key (rev) references revision (id)
-);
-
-comment on column content_early_in_rev.blob is 'Content identifier';
-comment on column content_early_in_rev.rev is 'Revision identifier';
-comment on column content_early_in_rev.path is 'Path to content in revision';
+-- drop table if exists content_early_in_rev;
+-- create table content_early_in_rev
+-- (
+-- blob sha1_git not null, -- id of the content blob
+-- rev sha1_git not null, -- id of the revision where the blob appears for the first time
+-- path unix_path not null, -- path to the content relative to the revision root directory
+-- primary key (blob, rev, path)
+-- -- foreign key (blob) references content (id),
+-- -- foreign key (rev) references revision (id)
+-- );
+
+-- comment on column content_early_in_rev.blob is 'Content identifier';
+-- comment on column content_early_in_rev.rev is 'Revision identifier';
+-- comment on column content_early_in_rev.path is 'Path to content in revision';
drop table if exists content_in_dir;
create table content_in_dir
(
blob sha1_git not null, -- id of the content blob
dir sha1_git not null, -- id of the directory contaning the blob
path unix_path not null, -- path name (TODO: relative to parent or absolute (wrt. revision))?)
primary key (blob, dir, path)
-- foreign key (blob) references content (id),
-- foreign key (dir) references directory (id)
);
comment on column content_in_dir.blob is 'Content identifier';
comment on column content_in_dir.dir is 'Directory identifier';
-- comment on column content_early_in_rev.path is 'Path to content in directory';
drop table if exists directory_in_rev;
create table directory_in_rev
(
dir sha1_git not null, -- id of the directory appearing in the revision
rev sha1_git not null, -- id of the revision containing the directory
path unix_path not null, -- path to the directory relative to the revision root directory
primary key (dir, rev, path)
-- foreign key (dir) references directory (id),
-- foreign key (rev) references revision (id)
);
comment on column directory_in_rev.dir is 'Directory identifier';
comment on column directory_in_rev.rev is 'Revision identifier';
comment on column directory_in_rev.path is 'Path to directory in revision';
diff --git a/iterator.py b/iterator.py
index c0affe5..d14fe90 100644
--- a/iterator.py
+++ b/iterator.py
@@ -1,70 +1,70 @@
# import psycopg2
from swh.model.identifiers import identifier_to_str
-
# def typecast_bytea(value, cur):
# if value is not None:
# data = psycopg2.BINARY(value, cur)
# return data.tobytes()
class RevisionIterator:
"""Iterator over revisions present in the given database."""
# def adapt_conn(self, conn):
# """Makes psycopg2 use 'bytes' to decode bytea instead of
# 'memoryview', for this connection."""
# t_bytes = psycopg2.extensions.new_type((17,), "bytea", typecast_bytea)
# psycopg2.extensions.register_type(t_bytes, conn)
# t_bytes_array = psycopg2.extensions.new_array_type((1001,), "bytea[]", t_bytes)
# psycopg2.extensions.register_type(t_bytes_array, conn)
def __init__(self, conn, limit=None, chunksize=100):
# self.adapt_conn(conn)
self.cur = conn.cursor()
self.chunksize = chunksize
self.limit = limit
self.records = []
self.aliases = ['id', 'date', 'dir']
def __del__(self):
self.cur.close()
def __iter__(self):
self.records.clear()
if self.limit is None:
- self.cur.execute('''SELECT id, date, directory
+ self.cur.execute('''SELECT id, date, committer_date, directory
FROM revision''')
- # self.cur.execute('''SELECT id, date, directory
- # FROM revision ORDER BY date''')
else:
- self.cur.execute('''SELECT id, date, directory
+ self.cur.execute('''SELECT id, date, committer_date, directory
FROM revision
LIMIT %s''', (self.limit,))
- # self.cur.execute('''SELECT id, date, directory
- # FROM revision ORDER BY date
- # LIMIT %s''', (self.limit,))
for row in self.cur.fetchmany(self.chunksize):
- record = dict(zip(self.aliases, row))
- self.records.append(record)
+ record = self.make_record(row)
+ if record is not None:
+ self.records.append(record)
return self
def __next__(self):
if not self.records:
self.records.clear()
for row in self.cur.fetchmany(self.chunksize):
- record = dict(zip(self.aliases, row))
- self.records.append(record)
- # self.records.append((
- # identifier_to_str(rev[0]),
- # rev[1],
- # identifier_to_str(rev[2])
- # ))
+ record = self.make_record(row)
+ if record is not None:
+ self.records.append(record)
if self.records:
revision, *self.records = self.records
return revision
else:
raise StopIteration
+
+ def make_record(self, row):
+ # Only revision with author or commiter date are considered
+ if row[1] is not None:
+ # If the revision has author date, it takes precedence
+ return dict(zip(self.aliases, (row[0], row[1], row[3])))
+ elif row[2] is not None:
+ # If not, we use the commiter date
+ return dict(zip(self.aliases, (row[0], row[2], row[3])))

File Metadata

Mime Type
text/x-diff
Expires
Jul 4 2025, 7:56 AM (10 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3449887

Event Timeline