Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9337179
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
17 KB
Subscribers
None
View Options
diff --git a/compact.py b/compact.py
index b90cdb0..dae855a 100644
--- a/compact.py
+++ b/compact.py
@@ -1,195 +1,176 @@
# import aiohttp
# import asyncio
import io
import os
import psycopg2
from configparser import ConfigParser
# from isochrone import IsochroneGraph
from iterator import RevisionIterator
# from swh.core.api import RemoteException
from swh.model.identifiers import (
# identifier_to_bytes,
identifier_to_str
)
# from swh.storage.api.client import RemoteStorage
# from swh.storage.backfill import fetch
from swh.storage.db import Db
def config(filename, section):
# create a parser
parser = ConfigParser()
# read config file
parser.read(filename)
# get section, default to postgresql
db = {}
if parser.has_section(section):
params = parser.items(section)
for param in params:
db[param[0]] = param[1]
else:
raise Exception('Section {0} not found in the {1} file'.format(section, filename))
return db
def connect(filename, section):
""" Connect to the PostgreSQL database server """
conn = None
try:
# read connection parameters
params = config(filename, section)
# connect to the PostgreSQL server
print('Connecting to the PostgreSQL database...')
conn = psycopg2.connect(**params)
except (Exception, psycopg2.DatabaseError) as error:
print(error)
return conn
def create_tables(conn, filename='compact.sql'):
with io.open(filename) as file:
cur = conn.cursor()
cur.execute(file.read())
cur.close()
conn.commit()
def make_record(elem):
return {'type' : elem[1], 'id' : elem[2], 'path' : elem[3].decode('utf-8')}
# TODO: refactor this method to take the whole directory structure as parameter
# and avoid multiple requies (using swh.storage.db.directory_walk prior to
# calling the function, instead of swh.storage.db.directory_walk_one within it)
def walk_directory(cursor, storage, revision, directory, relative, name='./', ingraph=True):
# print("dir: ", identifier_to_str(revision['id']), revision['date'], identifier_to_str(directory), identifier_to_str(relative), name, ingraph)
if ingraph:
cursor.execute('SELECT date FROM directory WHERE id=%s', (directory,))
row = cursor.fetchone()
- if row is None:
+ if row is None or row[0] > revision['date']:
# This directory belongs to the isochrone graph of the revision.
# Add directory with the current revision's timestamp as date, and
# walk recursively looking for new content.
- cursor.execute('INSERT INTO directory VALUES (%s,%s)', (directory, revision['date']))
+ cursor.execute('''INSERT INTO directory VALUES (%s,%s)
+ ON CONFLICT (id) DO UPDATE
+ SET date=%s''',
+ (directory, revision['date'], revision['date']))
for entry in storage.directory_walk_one(directory):
child = make_record(entry)
path = os.path.join(name, child['path'])
if child['type'] == 'dir':
walk_directory(cursor, storage, revision, child['id'], relative, name=path)
elif child['type'] == 'file':
process_file(cursor, storage, revision, relative, child['id'], path)
- elif row[0] > revision['date']:
- # This directory belongs to the isochrone graph of the revision.
- # Update its date to match the current revision's timestamp.
- cursor.execute('UPDATE directory SET date=%s WHERE id=%s', (revision['date'], directory))
- # TODO: update entries from 'directory_in_rev' pointing to this
- # directory to now point to their children? If any children
- # of the old directory appears in the 'directory' table,
- # their date and entries in 'directory_in_rev' should be
- # updated as well!! (same for blobs!!)
-
else:
# This directory is just beyond the isochrone graph
# frontier. Add an entry to the 'directory_in_rev' relation
# with the path relative to 'name', and continue to walk
# recursively looking only for blobs (ie. 'ingraph=False').
cursor.execute('INSERT INTO directory_in_rev VALUES (%s,%s,%s)', (directory, revision['id'], name))
for entry in storage.directory_walk_one(directory):
child = make_record(entry)
# From now on path is relative to current directory (ie. relative=directory)
path = os.path.join('.', child['path'])
if child['type'] == 'dir':
walk_directory(cursor, storage, revision, child['id'], directory, name=path, ingraph=False)
elif child['type'] == 'file':
process_file(cursor, storage, revision, directory, child['id'], path)
else:
# This directory is completely outside the isochrone graph (far
# from the frontier). We are just looking for blobs here.
for entry in storage.directory_walk_one(directory):
child = make_record(entry)
path = os.path.join(name, child['path'])
if child['type'] == 'dir':
walk_directory(cursor, storage, revision, child['id'], relative, name=path, ingraph=False)
elif child['type'] == 'file':
process_file(cursor, storage, revision, relative, child['id'], path)
def process_file(cursor, storage, revision, directory, blob, name):
# TODO: add logging support!
# print("blob:", identifier_to_str(revision['id']), revision['date'], identifier_to_str(directory), identifier_to_str(blob), name)
cursor.execute('SELECT date FROM content WHERE id=%s', (blob,))
row = cursor.fetchone()
- if row is None:
- # print('row = None:', row, identifier_to_str(revision['id']), revision['date'], identifier_to_str(directory), identifier_to_str(blob), name)
- # This blob was never seen before. Add blob with the current revision's
- # timestamp as date, and set a record for 'content_early_in_rev' with
- # the 'path = name'.
- cursor.execute('INSERT INTO content VALUES (%s,%s)', (blob, revision['date']))
- cursor.execute('INSERT INTO content_early_in_rev VALUES (%s,%s,%s)', (blob, revision['id'], name))
-
- elif row[0] > revision['date']:
- # print('row > date:', row, identifier_to_str(revision['id']), revision['date'], identifier_to_str(directory), identifier_to_str(blob), name)
- # This is an earlier occurrance of an already seen blob. Update its
- # date to match the current revision's timestamp.
- cursor.execute('UPDATE content SET date=%s WHERE id=%s', (revision['date'], blob))
- # TODO: update entries from 'content_early_in_rev' with current path,
- # and move previous entry to 'content_in_rev' with its path now
- # relative to the parent directory in the isochrone graph
- # frontier?
- cursor.execute('SELECT path FROM content_early_in_rev WHERE blob=%s', (blob,))
- print("new blob:", revision['date'], name)
- for entry in cursor.fetchall():
- print("old blob:", row[0], entry[0].tobytes().decode('utf-8'))
+ if row is None or row[0] > revision['date']:
+ # This is an earlier occurrence of the blob. Add it with the current
+ # revision's timestamp as date, and set a record for
+ # 'content_early_in_rev' with the 'path = name'.
+ cursor.execute('''INSERT INTO content VALUES (%s,%s,%s,%s)
+ ON CONFLICT (id) DO UPDATE
+ SET date=%s, rev=%s, path=%s''',
+ (blob, revision['date'], revision['id'], name, revision['date'], revision['id'], name))
else:
- # print('otherwise: ', row, identifier_to_str(revision['id']), revision['date'], identifier_to_str(directory), identifier_to_str(blob), name)
# This blob was seen before but this occurrence is older. Add
# an entry to the 'content_in_dir' relation with the path
# relative to the parent directory in the isochrone graph
# frontier.
- cursor.execute('INSERT INTO content_in_dir VALUES (%s,%s,%s) ON CONFLICT DO NOTHING', (blob, directory, name))
+ cursor.execute('''INSERT INTO content_in_dir VALUES (%s,%s,%s)
+ ON CONFLICT DO NOTHING''',
+ (blob, directory, name))
# WARNING: There seem to be duplicated directories within the same
# revision. Hence, their blobs may appear many times with the
# same directory ID and 'relative' path. That's why we need
# the 'ON CONFLICT DO NOTHING' statement.
if __name__ == "__main__":
archive = connect('database.conf', 'archive')
compact = connect('database.conf', 'compact')
create_tables(compact)
# This call changes the way bytes are codified in the connection
storage = Db(archive)
cursor = compact.cursor()
revisions = RevisionIterator(archive, limit=1000)
for idx, revision in enumerate(revisions):
print(f'{idx} - id: {identifier_to_str(revision["id"])} - date: {revision["date"]} - dir: {identifier_to_str(revision["dir"])}')
# Add current revision to the compact DB and start walking its root directory
cursor.execute('INSERT INTO revision VALUES (%s,%s)', (revision['id'], revision['date']))
walk_directory(cursor, storage, revision, revision["dir"], revision["dir"])
compact.commit()
compact.close()
archive.close()
diff --git a/compact.sql b/compact.sql
index 2c631df..68e5212 100644
--- a/compact.sql
+++ b/compact.sql
@@ -1,89 +1,93 @@
-- a Git object ID, i.e., a Git-style salted SHA1 checksum
drop domain if exists sha1_git cascade;
create domain sha1_git as bytea check (length(value) = 20);
-- UNIX path (absolute, relative, individual path component, etc.)
drop domain if exists unix_path cascade;
create domain unix_path as bytea;
drop table if exists content;
create table content
(
id sha1_git primary key,
- date timestamptz not null
+ date timestamptz not null,
+ rev sha1_git not null, -- id of the revision where the blob appears for the first time
+ path unix_path not null -- path to the content relative to the revision root directory
);
comment on column content.id is 'Git object sha1 hash';
comment on column content.date is 'First seen time';
+comment on column content.rev is 'Revision identifier';
+comment on column content.path is 'Path to content in revision';
drop table if exists directory;
create table directory
(
id sha1_git primary key,
date timestamptz not null
);
comment on column directory.id is 'Git object sha1 hash';
comment on column directory.date is 'First seen time';
drop table if exists revision;
create table revision
(
id sha1_git primary key,
date timestamptz not null
);
comment on column revision.id is 'Git object sha1 hash';
comment on column revision.date is 'First seen time';
-- TODO: consider merging this table with 'content'
-drop table if exists content_early_in_rev;
-create table content_early_in_rev
-(
- blob sha1_git not null, -- id of the content blob
- rev sha1_git not null, -- id of the revision where the blob appears for the first time
- path unix_path not null, -- path to the content relative to the revision root directory
- primary key (blob, rev, path)
- -- foreign key (blob) references content (id),
- -- foreign key (rev) references revision (id)
-);
-
-comment on column content_early_in_rev.blob is 'Content identifier';
-comment on column content_early_in_rev.rev is 'Revision identifier';
-comment on column content_early_in_rev.path is 'Path to content in revision';
+-- drop table if exists content_early_in_rev;
+-- create table content_early_in_rev
+-- (
+-- blob sha1_git not null, -- id of the content blob
+-- rev sha1_git not null, -- id of the revision where the blob appears for the first time
+-- path unix_path not null, -- path to the content relative to the revision root directory
+-- primary key (blob, rev, path)
+-- -- foreign key (blob) references content (id),
+-- -- foreign key (rev) references revision (id)
+-- );
+
+-- comment on column content_early_in_rev.blob is 'Content identifier';
+-- comment on column content_early_in_rev.rev is 'Revision identifier';
+-- comment on column content_early_in_rev.path is 'Path to content in revision';
drop table if exists content_in_dir;
create table content_in_dir
(
blob sha1_git not null, -- id of the content blob
dir sha1_git not null, -- id of the directory contaning the blob
path unix_path not null, -- path name (TODO: relative to parent or absolute (wrt. revision))?)
primary key (blob, dir, path)
-- foreign key (blob) references content (id),
-- foreign key (dir) references directory (id)
);
comment on column content_in_dir.blob is 'Content identifier';
comment on column content_in_dir.dir is 'Directory identifier';
-- comment on column content_early_in_rev.path is 'Path to content in directory';
drop table if exists directory_in_rev;
create table directory_in_rev
(
dir sha1_git not null, -- id of the directory appearing in the revision
rev sha1_git not null, -- id of the revision containing the directory
path unix_path not null, -- path to the directory relative to the revision root directory
primary key (dir, rev, path)
-- foreign key (dir) references directory (id),
-- foreign key (rev) references revision (id)
);
comment on column directory_in_rev.dir is 'Directory identifier';
comment on column directory_in_rev.rev is 'Revision identifier';
comment on column directory_in_rev.path is 'Path to directory in revision';
diff --git a/iterator.py b/iterator.py
index c0affe5..d14fe90 100644
--- a/iterator.py
+++ b/iterator.py
@@ -1,70 +1,70 @@
# import psycopg2
from swh.model.identifiers import identifier_to_str
-
# def typecast_bytea(value, cur):
# if value is not None:
# data = psycopg2.BINARY(value, cur)
# return data.tobytes()
class RevisionIterator:
"""Iterator over revisions present in the given database."""
# def adapt_conn(self, conn):
# """Makes psycopg2 use 'bytes' to decode bytea instead of
# 'memoryview', for this connection."""
# t_bytes = psycopg2.extensions.new_type((17,), "bytea", typecast_bytea)
# psycopg2.extensions.register_type(t_bytes, conn)
# t_bytes_array = psycopg2.extensions.new_array_type((1001,), "bytea[]", t_bytes)
# psycopg2.extensions.register_type(t_bytes_array, conn)
def __init__(self, conn, limit=None, chunksize=100):
# self.adapt_conn(conn)
self.cur = conn.cursor()
self.chunksize = chunksize
self.limit = limit
self.records = []
self.aliases = ['id', 'date', 'dir']
def __del__(self):
self.cur.close()
def __iter__(self):
self.records.clear()
if self.limit is None:
- self.cur.execute('''SELECT id, date, directory
+ self.cur.execute('''SELECT id, date, committer_date, directory
FROM revision''')
- # self.cur.execute('''SELECT id, date, directory
- # FROM revision ORDER BY date''')
else:
- self.cur.execute('''SELECT id, date, directory
+ self.cur.execute('''SELECT id, date, committer_date, directory
FROM revision
LIMIT %s''', (self.limit,))
- # self.cur.execute('''SELECT id, date, directory
- # FROM revision ORDER BY date
- # LIMIT %s''', (self.limit,))
for row in self.cur.fetchmany(self.chunksize):
- record = dict(zip(self.aliases, row))
- self.records.append(record)
+ record = self.make_record(row)
+ if record is not None:
+ self.records.append(record)
return self
def __next__(self):
if not self.records:
self.records.clear()
for row in self.cur.fetchmany(self.chunksize):
- record = dict(zip(self.aliases, row))
- self.records.append(record)
- # self.records.append((
- # identifier_to_str(rev[0]),
- # rev[1],
- # identifier_to_str(rev[2])
- # ))
+ record = self.make_record(row)
+ if record is not None:
+ self.records.append(record)
if self.records:
revision, *self.records = self.records
return revision
else:
raise StopIteration
+
+ def make_record(self, row):
+ # Only revision with author or commiter date are considered
+ if row[1] is not None:
+ # If the revision has author date, it takes precedence
+ return dict(zip(self.aliases, (row[0], row[1], row[3])))
+ elif row[2] is not None:
+ # If not, we use the commiter date
+ return dict(zip(self.aliases, (row[0], row[2], row[3])))
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Jul 4 2025, 7:56 AM (10 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3449887
Attached To
rDPROV Provenance database
Event Timeline
Log In to Comment