Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/loader/mercurial/bundle20_loader.py b/swh/loader/mercurial/bundle20_loader.py
index fe8ece8..c824f9f 100644
--- a/swh/loader/mercurial/bundle20_loader.py
+++ b/swh/loader/mercurial/bundle20_loader.py
@@ -1,504 +1,508 @@
# Copyright (C) 2017-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""This document contains a SWH loader for ingesting repository data
from Mercurial version 2 bundle files.
"""
# NOTE: The code here does expensive work twice in places because of the
# intermediate need to check for what is missing before sending to the database
# and the desire to not juggle very large amounts of data.
# TODO: Decide whether to also serialize to disk and read back more quickly
# from there. Maybe only for very large repos and fast drives.
# - Avi
import datetime
import hglib
import os
import random
import re
from dateutil import parser
from shutil import rmtree
from tempfile import mkdtemp
from swh.model import hashutil, identifiers
from swh.loader.core.loader import SWHStatelessLoader
from swh.loader.core.converters import content_for_storage
from . import converters
from .archive_extract import tmp_extract
from .bundle20_reader import Bundle20Reader
from .converters import PRIMARY_ALGO as ALGO
from .objects import SelectiveCache, SimpleTree
TAG_PATTERN = re.compile('[0-9A-Fa-f]{40}')
class HgBundle20Loader(SWHStatelessLoader):
"""Mercurial loader able to deal with remote or local repository.
"""
CONFIG_BASE_FILENAME = 'loader/hg'
ADDITIONAL_CONFIG = {
'bundle_filename': ('str', 'HG20_none_bundle'),
'reduce_effort': ('bool', True), # default: Try to be smart about time
'temp_directory': ('str', '/tmp'),
+ 'cache_size': ('int', 2*1024*1024*1024),
}
def __init__(self, logging_class='swh.loader.mercurial.Bundle20Loader'):
super().__init__(logging_class=logging_class)
self.content_max_size_limit = self.config['content_size_limit']
self.bundle_filename = self.config['bundle_filename']
self.reduce_effort_flag = self.config['reduce_effort']
self.empty_repository = None
self.temp_directory = self.config['temp_directory']
+ self.cache_size = self.config['cache_size']
def cleanup(self):
"""Clean temporary working directory
"""
if self.bundle_path and os.path.exists(self.bundle_path):
self.log.debug('Cleanup up working bundle %s' % self.bundle_path)
os.unlink(self.bundle_path)
if self.working_directory and os.path.exists(self.working_directory):
self.log.debug('Cleanup up working directory %s' % (
self.working_directory, ))
rmtree(self.working_directory)
def get_heads(self, repo):
"""Read the closed branches heads (branch, bookmarks) and returns a
dict with branch_name (bytes) and mercurial's node id
(bytes). Those needs conversion to swh-ids. This is taken
care of in get_revisions.
"""
b = {}
for _, node_hash_id, _, branch_name, *_ in repo.heads():
b[branch_name] = hashutil.hash_to_bytes(
node_hash_id.decode())
bookmarks = repo.bookmarks()
if bookmarks and bookmarks[0]:
for bookmark_name, _, target_short in bookmarks[0]:
target = repo[target_short].node()
b[bookmark_name] = hashutil.hash_to_bytes(
target.decode())
return b
def prepare(self, origin_url, visit_date, directory=None):
"""Prepare the necessary steps to load an actual remote or local
repository.
To load a local repository, pass the optional directory
parameter as filled with a path to a real local folder.
To load a remote repository, pass the optional directory
parameter as None.
Args:
origin_url (str): Origin url to load
visit_date (str/datetime): Date of the visit
directory (str/None): The local directory to load
"""
self.origin_url = origin_url
self.origin = self.get_origin()
if isinstance(visit_date, str): # visit_date
visit_date = parser.parse(visit_date)
self.visit_date = visit_date
self.working_directory = None
self.bundle_path = None
self.branches = {}
self.tags = []
self.releases = {}
try:
if not directory: # remote repository
self.working_directory = mkdtemp(
suffix='.tmp',
prefix='swh.loader.mercurial.',
dir=self.temp_directory)
os.makedirs(self.working_directory, exist_ok=True)
self.hgdir = self.working_directory
self.log.debug('Cloning %s to %s' % (
self.origin_url, self.hgdir))
hglib.clone(source=self.origin_url, dest=self.hgdir)
else: # local repository
self.working_directory = None
self.hgdir = directory
self.bundle_path = os.path.join(self.hgdir, self.bundle_filename)
self.log.debug('Bundling at %s' % self.bundle_path)
with hglib.open(self.hgdir) as repo:
self.heads = self.get_heads(repo)
repo.bundle(bytes(self.bundle_path, 'utf-8'),
all=True,
type=b'none-v2')
+ self.cache_filename = os.path.join(
+ self.hgdir, 'sqldict%s' % (
+ hex(random.randint(0, 0xffffff))[2:], ))
+
except Exception:
self.cleanup()
raise
try:
self.br = Bundle20Reader(self.bundle_path)
except FileNotFoundError as e:
# Empty repository! Still a successful visit targeting an
# empty snapshot
self.log.warn('%s is an empty repository!' % self.hgdir)
self.empty_repository = True
else:
self.reduce_effort = set()
if self.reduce_effort_flag:
now = datetime.datetime.now(tz=datetime.timezone.utc)
if (now - self.visit_date).days > 1:
# Assuming that self.visit_date would be today for
# a new visit, treat older visit dates as
# indication of wanting to skip some processing
# effort.
for header, commit in self.br.yield_all_changesets():
ts = commit['time'].timestamp()
if ts < self.visit_date.timestamp():
self.reduce_effort.add(header['node'])
def has_contents(self):
return not self.empty_repository
def has_directories(self):
return not self.empty_repository
def has_revisions(self):
return not self.empty_repository
def has_releases(self):
return not self.empty_repository
def get_origin(self):
"""Get the origin that is currently being loaded in format suitable for
swh.storage."""
return {
'type': 'hg',
'url': self.origin_url
}
def fetch_data(self):
"""Fetch the data from the data source."""
pass
def get_contents(self):
"""Get the contents that need to be loaded."""
# NOTE: This method generates blobs twice to reduce memory usage
# without generating disk writes.
self.file_node_to_hash = {}
hash_to_info = {}
self.num_contents = 0
contents = {}
missing_contents = set()
for blob, node_info in self.br.yield_all_blobs():
self.num_contents += 1
file_name = node_info[0]
header = node_info[2]
if header['linknode'] in self.reduce_effort:
content = hashutil.hash_data(blob, algorithms=[ALGO],
with_length=True)
else:
content = hashutil.hash_data(blob, with_length=True)
blob_hash = content[ALGO]
self.file_node_to_hash[header['node']] = blob_hash
if header['linknode'] in self.reduce_effort:
continue
hash_to_info[blob_hash] = node_info
contents[blob_hash] = content
missing_contents.add(blob_hash)
if file_name == b'.hgtags':
# https://www.mercurial-scm.org/wiki/GitConcepts#Tag_model
# overwrite until the last one
self.tags = (t for t in blob.split(b'\n') if t != b'')
if contents:
missing_contents = set(
self.storage.content_missing(
list(contents.values()),
key_hash=ALGO
)
)
# Clusters needed blobs by file offset and then only fetches the
# groups at the needed offsets.
focs = {} # "file/offset/contents"
for blob_hash in missing_contents:
_, file_offset, header = hash_to_info[blob_hash]
focs.setdefault(file_offset, {})
focs[file_offset][header['node']] = blob_hash
hash_to_info = None
for offset, node_hashes in sorted(focs.items()):
for header, data, *_ in self.br.yield_group_objects(
group_offset=offset
):
node = header['node']
if node in node_hashes:
blob, meta = self.br.extract_meta_from_blob(data)
content = contents.pop(node_hashes[node], None)
if content:
content['data'] = blob
content['length'] = len(blob)
yield content_for_storage(
content,
log=self.log,
max_content_size=self.content_max_size_limit,
origin_id=self.origin_id
)
def load_directories(self):
"""This is where the work is done to convert manifest deltas from the
repository bundle into SWH directories.
"""
self.mnode_to_tree_id = {}
cache_hints = self.br.build_manifest_hints()
def tree_size(t):
return t.size()
- cache_filename = os.path.join(self.hgdir, 'sqldict%s' % (
- hex(random.randint(0, 0xffffff))[2:]))
-
self.trees = SelectiveCache(cache_hints=cache_hints,
size_function=tree_size,
- filename=cache_filename)
+ filename=self.cache_filename,
+ max_size=self.cache_size)
tree = SimpleTree()
for header, added, removed in self.br.yield_all_manifest_deltas(
cache_hints
):
node = header['node']
basenode = header['basenode']
tree = self.trees.fetch(basenode) or tree # working tree
for path in removed.keys():
tree = tree.remove_tree_node_for_path(path)
for path, info in added.items():
file_node, is_symlink, perms_code = info
tree = tree.add_blob(
path,
self.file_node_to_hash[file_node],
is_symlink,
perms_code
)
if header['linknode'] in self.reduce_effort:
self.trees.store(node, tree)
else:
new_dirs = []
self.mnode_to_tree_id[node] = tree.hash_changed(new_dirs)
self.trees.store(node, tree)
yield header, tree, new_dirs
def get_directories(self):
"""Get the directories that need to be loaded."""
dirs = {}
self.num_directories = 0
for _, _, new_dirs in self.load_directories():
for d in new_dirs:
self.num_directories += 1
dirs[d['id']] = d
missing_dirs = list(dirs.keys())
if missing_dirs:
missing_dirs = self.storage.directory_missing(missing_dirs)
for _id in missing_dirs:
yield dirs[_id]
dirs = {}
def get_revisions(self):
"""Get the revisions that need to be loaded."""
node_2_rev = {}
revisions = {}
self.num_revisions = 0
for header, commit in self.br.yield_all_changesets():
if header['node'] in self.reduce_effort:
continue
self.num_revisions += 1
date_dict = identifiers.normalize_timestamp(
int(commit['time'].timestamp())
)
author_dict = converters.parse_author(commit['user'])
if commit['manifest'] == Bundle20Reader.NAUGHT_NODE:
directory_id = SimpleTree().hash_changed()
else:
directory_id = self.mnode_to_tree_id[commit['manifest']]
extra_meta = []
extra = commit.get('extra')
if extra:
for e in extra.split(b'\x00'):
k, v = e.split(b':', 1)
k = k.decode('utf-8')
extra_meta.append([k, v])
revision = {
'author': author_dict,
'date': date_dict,
'committer': author_dict,
'committer_date': date_dict,
'type': 'hg',
'directory': directory_id,
'message': commit['message'],
'metadata': {
'node': hashutil.hash_to_hex(header['node']),
'extra_headers': [
['time_offset_seconds',
str(commit['time_offset_seconds']).encode('utf-8')],
] + extra_meta
},
'synthetic': False,
'parents': []
}
p1 = node_2_rev.get(header['p1'])
p2 = node_2_rev.get(header['p2'])
if p1:
revision['parents'].append(p1)
if p2:
revision['parents'].append(p2)
revision['id'] = hashutil.hash_to_bytes(
identifiers.revision_identifier(revision)
)
node_2_rev[header['node']] = revision['id']
revisions[revision['id']] = revision
# Converts heads to use swh ids
self.heads = {
branch_name: node_2_rev[node_id]
for branch_name, node_id in self.heads.items()
}
node_2_rev = None
missing_revs = revisions.keys()
if missing_revs:
missing_revs = set(
self.storage.revision_missing(list(missing_revs))
)
for r in missing_revs:
yield revisions[r]
self.mnode_to_tree_id = None
def _read_tag(self, tag, split_byte=b' '):
node, *name = tag.split(split_byte)
name = split_byte.join(name)
return node, name
def get_releases(self):
"""Get the releases that need to be loaded."""
self.num_releases = 0
releases = {}
missing_releases = []
for t in self.tags:
self.num_releases += 1
node, name = self._read_tag(t)
node = node.decode()
if not TAG_PATTERN.match(node):
self.log.warn('Wrong pattern (%s) found in tags. Skipping' % (
node, ))
continue
release = {
'name': name,
'target': hashutil.hash_to_bytes(node),
'target_type': 'revision',
'message': None,
'metadata': None,
'synthetic': False,
'author': {'name': None, 'email': None, 'fullname': b''},
'date': None
}
id_hash = hashutil.hash_to_bytes(
identifiers.release_identifier(release))
release['id'] = id_hash
missing_releases.append(id_hash)
releases[id_hash] = release
self.releases[name] = id_hash
if missing_releases:
missing_releases = set(
self.storage.release_missing(missing_releases))
for _id in missing_releases:
yield releases[_id]
def get_snapshot(self):
"""Get the snapshot that need to be loaded."""
self.num_snapshot = 1
branches = {}
for name, target in self.heads.items():
branches[name] = {'target': target, 'target_type': 'revision'}
for name, target in self.releases.items():
branches[name] = {'target': target, 'target_type': 'release'}
snap = {
'id': None,
'branches': branches,
}
snap['id'] = identifiers.identifier_to_bytes(
identifiers.snapshot_identifier(snap))
return snap
def get_fetch_history_result(self):
"""Return the data to store in fetch_history."""
return {
'contents': self.num_contents,
'directories': self.num_directories,
'revisions': self.num_revisions,
'releases': self.num_releases,
'snapshot': self.num_snapshot
}
class HgArchiveBundle20Loader(HgBundle20Loader):
"""Mercurial loader for repository wrapped within archives.
"""
def __init__(self):
super().__init__(
logging_class='swh.loader.mercurial.HgArchiveBundle20Loader')
def prepare(self, origin_url, archive_path, visit_date):
self.temp_dir = tmp_extract(archive=archive_path,
dir=self.temp_directory,
prefix='swh.loader.mercurial.',
log=self.log,
source=origin_url)
repo_name = os.listdir(self.temp_dir)[0]
directory = os.path.join(self.temp_dir, repo_name)
try:
super().prepare(origin_url, visit_date, directory=directory)
except Exception:
self.cleanup()
raise
def cleanup(self):
if os.path.exists(self.temp_dir):
rmtree(self.temp_dir)
super().cleanup()
diff --git a/swh/loader/mercurial/objects.py b/swh/loader/mercurial/objects.py
index 153e1a6..dd6b11f 100644
--- a/swh/loader/mercurial/objects.py
+++ b/swh/loader/mercurial/objects.py
@@ -1,401 +1,401 @@
-# Copyright (C) 2017 The Software Heritage developers
+# Copyright (C) 2017-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""This document contains various helper classes used in converting Mercurial
bundle files into SWH Contents, Directories, etc.
"""
import binascii
import copy
import os
import sys
from collections import OrderedDict
from sqlitedict import SqliteDict
from swh.model import identifiers
OS_PATH_SEP = os.path.sep.encode('utf-8')
class SimpleBlob:
"""Stores basic metadata of a blob object.when constructing deep trees from
commit file manifests.
args:
file_hash: unique hash of the file contents
is_symlink: (bool) is this file a symlink?
file_perms: (string) 3 digit permission code as a string or bytestring,
e.g. '755' or b'755'
"""
kind = 'file'
def __init__(self, file_hash, is_symlink, file_perms):
self.hash = file_hash
self.perms = 0o100000 + int(file_perms, 8)
if is_symlink:
self.perms += 0o020000
def __str__(self):
return ('SimpleBlob: ' + str(self.hash) + ' -- ' + str(self.perms))
def __eq__(self, other):
return ((self.perms == other.perms) and (self.hash == other.hash))
def size(self):
"""Return the size in byte."""
return sys.getsizeof(self) + sys.getsizeof(self.__dict__)
class SimpleTree(dict):
""" Stores data for a nested directory object. Uses shallow cloning to stay
compact after forking and change monitoring for efficient re-hashing.
"""
kind = 'dir'
perms = 0o040000
def __init__(self):
self.hash = None
self._size = None
def __eq__(self, other):
return ((self.hash == other.hash) and (self.items() == other.items()))
def _new_tree_node(self, path):
"""Deeply nests SimpleTrees according to a given subdirectory path and
returns a reference to the deepest one.
args:
path: bytestring containing a relative path from self to a deep
subdirectory. e.g. b'foodir/bardir/bazdir'
returns:
the new node
"""
node = self
for d in path.split(OS_PATH_SEP):
if node.get(d):
if node[d].hash is not None:
node[d] = copy.copy(node[d])
node[d].hash = None
node[d]._size = None
else:
node[d] = SimpleTree()
node = node[d]
return node
def remove_tree_node_for_path(self, path):
"""Deletes a SimpleBlob or SimpleTree from inside nested SimpleTrees
according to the given relative file path, and then recursively removes
any newly depopulated SimpleTrees. It keeps the old history by doing a
shallow clone before any change.
args:
path: bytestring containing a relative path from self to a nested
file or directory. e.g. b'foodir/bardir/bazdir/quxfile.txt'
returns:
the new root node
"""
node = self
if node.hash is not None:
node = copy.copy(node)
node.hash = None
node._size = None
first, sep, rest = path.partition(OS_PATH_SEP)
if rest:
node[first] = node[first].remove_tree_node_for_path(rest)
if len(node[first]) == 0:
del node[first]
else:
del node[first]
return node
def add_blob(self, file_path, file_hash, is_symlink, file_perms):
"""Shallow clones the root node and then deeply nests a SimpleBlob
inside nested SimpleTrees according to the given file path, shallow
cloning all all intermediate nodes and marking them as changed and
in need of new hashes.
args:
file_path: bytestring containing the relative path from self to a
nested file
file_hash: primary identifying hash computed from the blob contents
is_symlink: True/False whether this item is a symbolic link
file_perms: int or string representation of file permissions
returns:
the new root node
"""
root = self
if root.hash is not None:
root = copy.copy(root)
root.hash = None
root._size = None
node = root
fdir, fbase = os.path.split(file_path)
if fdir:
node = root._new_tree_node(fdir)
node[fbase] = SimpleBlob(file_hash, is_symlink, file_perms)
return root
def yield_swh_directories(self):
"""Converts nested SimpleTrees into a stream of SWH Directories.
yields:
an SWH Directory for every node in the tree
"""
for k, v in sorted(self.items()):
if isinstance(v, SimpleTree):
yield from v.yield_swh_directories()
yield {
'id': self.hash,
'entries': [
{
'name': k,
'perms': v.perms,
'type': v.kind,
'target': v.hash
}
for k, v in sorted(self.items())
]
}
def hash_changed(self, new_dirs=None):
"""Computes and sets primary indentifier hashes for unhashed subtrees.
args:
new_dirs (optional): an empty list to be populated with the SWH
Directories for all of the new (not previously
hashed) nodes
returns:
the top level hash of the whole tree
"""
if self.hash is None:
directory = {
'entries': [
{
'name': k,
'perms': v.perms,
'type': v.kind,
'target': (v.hash if v.hash is not None
else v.hash_changed(new_dirs))
}
for k, v in sorted(self.items())
]
}
self.hash = binascii.unhexlify(
identifiers.directory_identifier(directory)
)
directory['id'] = self.hash
if new_dirs is not None:
new_dirs.append(directory)
return self.hash
def flatten(self, _curpath=None, _files=None):
"""Converts nested sub-SimpleTrees and SimpleBlobs into a list of
file paths. Useful for counting the number of files in a manifest.
returns:
a flat list of all of the contained file paths
"""
_curpath = _curpath or b''
_files = _files or {}
for k, v in sorted(self.items()):
p = os.path.join(_curpath, k)
if isinstance(v, SimpleBlob):
_files[p] = (v.hash, v.perms)
else:
v.flatten(p, _files)
return _files
def size(self):
"""Return the (approximate?) memory utilization in bytes of the nested
structure.
"""
if self._size is None:
self._size = (
sys.getsizeof(self) + sys.getsizeof(self.__dict__)
+ sum([
sys.getsizeof(k)+v.size()
for k, v in self.items()
])
)
return self._size
class SelectiveCache(OrderedDict):
"""Special cache for storing past data upon which new data is known to be
dependent. Optional hinting of how many instances of which keys will be
needed down the line makes utilization more efficient. And, because the
distance between related data can be arbitrarily long and the data
fragments can be arbitrarily large, a disk-based secondary storage is used
if the primary RAM-based storage area is filled to the designated capacity.
Storage is occupied in three phases:
1) The most recent key/value pair is always held, regardless of other
factors, until the next entry replaces it.
2) Stored key/value pairs are pushed into a randomly accessible
expanding buffer in memory with a stored size function, maximum size
value, and special hinting about which keys to store for how long
optionally declared at instantiation.
3) The in-memory buffer pickles into a randomly accessible disk-backed
secondary buffer when it becomes full.
Occupied space is calculated by default as whatever the len() function
returns on the values being stored. This can be changed by passing in a new
size_function at instantiation.
The cache_hints parameter is a dict of key/int pairs recording how many
subsequent fetches that particular key's value should stay in storage for
before being erased. If you provide a set of hints and then try to store a
key that is not in that set of hints, the cache will store it only while it
is the most recent entry, and will bypass storage phases 2 and 3.
"""
- DEFAULT_SIZE = 800*1024*1024 # bytes or whatever
+ DEFAULT_SIZE = 2*1024*1024*2014 # bytes or whatever
def __init__(self, max_size=None, cache_hints=None,
size_function=None, filename=None):
"""args:
max_size: integer value indicating the maximum size of the part
of storage held in memory
cache_hints: dict of key/int pairs as described in the class
description
size_function: callback function that accepts one parameter and
returns one int, which should probably be the
calculated size of the parameter
"""
self._max_size = max_size or SelectiveCache.DEFAULT_SIZE
self._disk = None
if size_function is None:
self._size_function = sys.getsizeof
else:
self._size_function = size_function
self._latest = None
self._cache_size = 0
self._cache_hints = copy.copy(cache_hints) or None
self.filename = filename
def store(self, key, data):
"""Primary method for putting data into the cache.
args:
key: any hashable value
data: any python object (preferably one that is measurable)
"""
self._latest = (key, data)
if (self._cache_hints is not None) and (key not in self._cache_hints):
return
# cache the completed data...
self._cache_size += self._size_function(data) + 53
# ...but limit memory expenditure for the cache by offloading to disk
should_commit = False
while (
self._cache_size > self._max_size
and len(self) > 0
):
should_commit = True
k, v = self.popitem(last=False)
self._cache_size -= self._size_function(v) - 53
self._diskstore(k, v)
if should_commit:
self._disk.commit(blocking=False)
self[key] = data
def _diskstore(self, key, value):
if self._disk is None:
self._disk = SqliteDict(
autocommit=False, journal_mode='OFF', filename=self.filename)
self._disk.in_temp = True # necessary to force the disk clean up
self._disk[key] = value
def has(self, key):
"""Tests whether the data for the provided key is being stored.
args:
key: the key of the data whose storage membership property you wish
to discover
returns:
True or False
"""
return (
(self._latest and self._latest[0] == key)
or (key in self)
or (self._disk and (key in self._disk))
)
def fetch(self, key):
"""Pulls a value out of storage and decrements the hint counter for the
given key.
args:
key: the key of the data that you want to retrieve
returns:
the retrieved value or None
"""
retval = None
if self._latest and self._latest[0] == key:
retval = self._latest[1]
if retval is None:
retval = self.get(key)
if (retval is None) and self._disk:
self._disk.commit(blocking=False)
retval = self._disk.get(key) or None
self.dereference(key)
return retval
def dereference(self, key):
"""Remove one instance of expected future retrieval of the data for the
given key. This is called automatically by fetch requests that aren't
satisfied by phase 1 of storage.
args:
the key of the data for which the future retrievals hint is to be
decremented
"""
newref = self._cache_hints and self._cache_hints.get(key)
if newref:
newref -= 1
if newref == 0:
del self._cache_hints[key]
if key in self:
item = self[key]
self._cache_size -= self._size_function(item)
del self[key]
else:
if self._disk:
del self._disk[key]
else:
self._cache_hints[key] = newref
def keys(self):
yield from self.keys()
if self._disk:
yield from self._disk.keys()
def values(self):
yield from self.values()
if self._disk:
yield from self._disk.values()
def items(self):
yield from self.items()
if self._disk:
yield from self._disk.items()

File Metadata

Mime Type
text/x-diff
Expires
Jun 4 2025, 7:09 PM (9 w, 5 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3271629

Event Timeline