diff --git a/swh/graph/cli.py b/swh/graph/cli.py --- a/swh/graph/cli.py +++ b/swh/graph/cli.py @@ -47,7 +47,7 @@ def dump_int2pid(filename): - for (int, pid) in enumerate(IntToPidMap(filename)): + for (int, pid) in IntToPidMap(filename): print('{}\t{}'.format(int, pid)) @@ -62,16 +62,16 @@ PidToIntMap.write_record(dst, str_pid, int(str_int)) -def restore_int2pid(filename): +def restore_int2pid(filename, length): """read a textual int->PID map from stdin and write its binary version to filename """ - with open(filename, 'wb') as dst: - for line in sys.stdin: - (str_int, str_pid) = line.split() - dst.seek(int(str_int) * PID_BIN_SIZE) - IntToPidMap.write_record(dst, str_pid) + int2pid = IntToPidMap(filename, mode='wb', length=length) + for line in sys.stdin: + (str_int, str_pid) = line.split() + int2pid[int(str_int)] = str_pid + int2pid.close() @map.command('dump') @@ -95,14 +95,21 @@ @click.option('--type', '-t', 'map_type', required=True, type=click.Choice(['pid2int', 'int2pid']), help='type of map to dump') +@click.option('--length', '-l', type=int, + help='''map size in number of logical records + (required for int2pid maps)''') @click.argument('filename', required=True, type=click.Path()) @click.pass_context -def restore_map(ctx, map_type, filename): +def restore_map(ctx, map_type, length, filename): """restore a binary PID<->int map from textual format""" if map_type == 'pid2int': - restore_pid2int(filename) + restore_pid2int(filename, length) elif map_type == 'int2pid': - restore_int2pid(filename) + if length is None: + raise click.UsageError( + 'map length is required when restoring {} maps'.format( + map_type), ctx) + restore_int2pid(filename, length) else: raise ValueError('invalid map type: ' + map_type) diff --git a/swh/graph/pid.py b/swh/graph/pid.py --- a/swh/graph/pid.py +++ b/swh/graph/pid.py @@ -7,8 +7,9 @@ import os import struct -from collections.abc import Mapping, Sequence +from collections.abc import MutableMapping from enum import Enum +from mmap import MAP_SHARED, MAP_PRIVATE from swh.model.identifiers import PersistentId, parse_persistent_identifier @@ -79,15 +80,40 @@ """ - def __init__(self, record_size, fname): + def __init__(self, record_size, fname, mode='rb', length=None): """open an existing on-disk map Args: record_size (int): size of each record in bytes fname (str): path to the on-disk map + mode (str): file open mode, usually either 'rb' for read-only maps, + 'wb' for creating new maps, or 'rb+' for updating existing ones + (default: 'rb') + length (int): map size in number of logical records; used to + initialize writable maps at creation time. Must be given when + mode is 'wb' and the map doesn't exist on disk; ignored + otherwise """ + if mode not in ['rb', 'wb', 'rb+']: + raise ValueError('invalid file open mode: ' + mode) + new_map = (mode == 'wb') + writable_map = mode in ['wb', 'rb+'] + os_mode = None + if mode == 'rb': + os_mode = os.O_RDONLY + elif mode == 'wb': + os_mode = os.O_RDWR | os.O_CREAT + elif mode == 'rb+': + os_mode = os.O_RDWR + self.record_size = record_size + self.fd = os.open(fname, os_mode) + if new_map: + if length is None: + raise ValueError('missing length when creating new map') + os.truncate(self.fd, length * self.record_size) + self.size = os.path.getsize(fname) (self.length, remainder) = divmod(self.size, record_size) if remainder: @@ -95,8 +121,9 @@ 'map size {} is not a multiple of the record size {}'.format( self.size, record_size)) - self.f = open(fname, 'rb') - self.mm = mmap.mmap(self.f.fileno(), self.size, mmap.MAP_PRIVATE) + self.mm = mmap.mmap( + self.fd, self.size, + flags=MAP_SHARED if writable_map else MAP_PRIVATE) def close(self): """close the map @@ -106,11 +133,16 @@ """ if not self.mm.closed: self.mm.close() - if not self.f.closed: - self.f.close() + os.close(self.fd) + + def __len__(self): + return self.length + def __delitem__(self, pos): + raise NotImplementedError('cannot delete records from fixed-size map') -class PidToIntMap(_OnDiskMap, Mapping): + +class PidToIntMap(_OnDiskMap, MutableMapping): """memory mapped map from PID (:ref:`persistent-identifiers`) to a continuous range 0..N of (8-byte long) integers @@ -127,20 +159,33 @@ performed via binary search. Hence a huge map with, say, 11 B entries, will require ~30 disk seeks. + Note that, due to fixed size + ordering, it is not possible to create these + maps by random writing. Hence, __setitem__ can be used only to *update* the + value associated to an existing key, rather than to add a missing item. To + create an entire map from scratch, you should do so *sequentially*, using + static method :meth:`write_record` (or, at your own risk, by hand via the + mmap :attr:`mm`). + """ # record binary format: PID + a big endian 8-byte big endian integer RECORD_BIN_FMT = '>' + PID_BIN_FMT + 'q' RECORD_SIZE = PID_BIN_SIZE + INT_BIN_SIZE - def __init__(self, fname): + def __init__(self, fname, mode='rb', length=None): """open an existing on-disk map Args: fname (str): path to the on-disk map + mode (str): file open mode, usually either 'rb' for read-only maps, + 'wb' for creating new maps, or 'rb+' for updating existing ones + (default: 'rb') + length (int): map size in number of logical records; used to + initialize read-write maps at creation time. Must be given when + mode is 'wb'; ignored otherwise """ - super().__init__(self.RECORD_SIZE, fname) + super().__init__(self.RECORD_SIZE, fname, mode=mode, length=length) def _get_bin_record(self, pos): """seek and return the (binary) record at a given (logical) position @@ -192,14 +237,15 @@ f.write(str_to_bytes(pid)) f.write(struct.pack(INT_BIN_FMT, int)) - def __getitem__(self, pid_str): - """lookup the integer identifier of a PID + def _find(self, pid_str): + """lookup the integer identifier of a pid and its position Args: - pid (str): the PID as a string + pid (str): the pid as a string Returns: - int: the integer identifier of pid + tuple: a pair `(pid, pos)` with pid integer identifier and its + logical record position in the map """ if not isinstance(pid_str, str): @@ -219,19 +265,36 @@ elif pid > target: max = mid - 1 else: # pid == target - return struct.unpack(INT_BIN_FMT, int)[0] + return (struct.unpack(INT_BIN_FMT, int)[0], mid) raise KeyError(pid_str) + def __getitem__(self, pid_str): + """lookup the integer identifier of a PID + + Args: + pid (str): the PID as a string + + Returns: + int: the integer identifier of pid + + """ + return self._find(pid_str)[0] # return element, ignore position + + def __setitem__(self, pid_str, int): + (_pid, pos) = self._find(pid_str) # might raise KeyError and that's OK + + rec_pos = pos * self.RECORD_SIZE + int_pos = rec_pos + PID_BIN_SIZE + self.mm[rec_pos:int_pos] = str_to_bytes(pid_str) + self.mm[int_pos:int_pos+INT_BIN_SIZE] = struct.pack(INT_BIN_FMT, int) + def __iter__(self): for pos in range(self.length): yield self._get_record(pos) - def __len__(self): - return self.length - -class IntToPidMap(_OnDiskMap, Sequence): +class IntToPidMap(_OnDiskMap, MutableMapping): """memory mapped map from a continuous range of 0..N (8-byte long) integers to PIDs (:ref:`persistent-identifiers`) @@ -249,14 +312,21 @@ RECORD_BIN_FMT = PID_BIN_FMT RECORD_SIZE = PID_BIN_SIZE - def __init__(self, fname): + def __init__(self, fname, mode='rb', length=None): """open an existing on-disk map Args: fname (str): path to the on-disk map + mode (str): file open mode, usually either 'rb' for read-only maps, + 'wb' for creating new maps, or 'rb+' for updating existing ones + (default: 'rb') + size (int): map size in number of logical records; used to + initialize read-write maps at creation time. Must be given when + mode is 'wb'; ignored otherwise """ - super().__init__(self.RECORD_SIZE, fname) + + super().__init__(self.RECORD_SIZE, fname, mode=mode, length=length) def _get_bin_record(self, pos): """seek and return the (binary) PID at a given (logical) position @@ -292,5 +362,10 @@ return bytes_to_str(self._get_bin_record(pos)) - def __len__(self): - return self.length + def __setitem__(self, pos, pid): + rec_pos = pos * self.RECORD_SIZE + self.mm[rec_pos:rec_pos+self.RECORD_SIZE] = str_to_bytes(pid) + + def __iter__(self): + for pos in range(self.length): + yield (pos, self[pos]) diff --git a/swh/graph/tests/test_pid.py b/swh/graph/tests/test_pid.py --- a/swh/graph/tests/test_pid.py +++ b/swh/graph/tests/test_pid.py @@ -8,6 +8,8 @@ import tempfile import unittest +from itertools import islice + from swh.graph.pid import str_to_bytes, bytes_to_str from swh.graph.pid import PidToIntMap, IntToPidMap @@ -49,7 +51,7 @@ self.assertEqual(pid_bytes, str_to_bytes(bytes_to_str(pid_bytes))) -def gen_records(types=['ori', 'snp', 'rev', 'rel', 'dir', 'cnt'], +def gen_records(types=['cnt', 'dir', 'rel', 'rev', 'ori', 'snp'], length=10000): """generate sequential PID/int records, suitable for filling int<->pid maps for testing swh-graph on-disk binary databases @@ -124,6 +126,17 @@ with self.assertRaises(TypeError): self.map[1.2] + def test_update(self): + fname2 = self.fname + '.update' + shutil.copy(self.fname, fname2) # fresh map copy + map2 = PidToIntMap(fname2, mode='rb+') + for (pid, int) in islice(map2, 11): # update the first N items + new_int = int + 42 + map2[pid] = new_int + self.assertEqual(map2[pid], new_int) # check updated value + + os.unlink(fname2) # tmpdir will be cleaned even if we don't reach this + class TestIntToPidMap(unittest.TestCase): @@ -157,3 +170,14 @@ self.map[1000000] with self.assertRaises(IndexError): self.map[-1000000] + + def test_update(self): + fname2 = self.fname + '.update' + shutil.copy(self.fname, fname2) # fresh map copy + map2 = IntToPidMap(fname2, mode='rb+') + for (int, pid) in islice(map2, 11): # update the first N items + new_pid = pid.replace(':0', ':f') # mangle first hex digit + map2[int] = new_pid + self.assertEqual(map2[int], new_pid) # check updated value + + os.unlink(fname2) # tmpdir will be cleaned even if we don't reach this