Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9749348
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
26 KB
Subscribers
None
View Options
diff --git a/PKG-INFO b/PKG-INFO
index a3e4f67..3fd3e6d 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,10 +1,10 @@
Metadata-Version: 1.0
Name: swh.loader.git
-Version: 0.0.29
+Version: 0.0.30
Summary: Software Heritage git loader
Home-page: https://forge.softwareheritage.org/diffusion/DCORE/
Author: Software Heritage developers
Author-email: swh-devel@inria.fr
License: UNKNOWN
Description: UNKNOWN
Platform: UNKNOWN
diff --git a/README b/README
index 9098a3c..d5c000b 100644
--- a/README
+++ b/README
@@ -1,83 +1,83 @@
The Software Heritage Git Loader is a tool and a library to walk a local
Git repository and inject into the SWH dataset all contained files that
weren't known before.
License
=======
This program is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
Public License for more details.
See top-level LICENSE file for the full text of the GNU General Public
License along with this program.
Dependencies
============
Runtime
-------
- python3
- python3-dulwich
- python3-retrying
- python3-swh.core
- python3-swh.model
- python3-swh.storage
- python3-swh.scheduler
Test
----
- python3-nose
Requirements
============
- implementation language, Python3
- coding guidelines: conform to PEP8
- Git access: via dulwich
Configuration
=============
You can run the loader or the updater directly by calling python3 -m swh.loader.git.{loader,updater}.
Both tools expect a configuration file in .ini format to be present in ~/.config/swh/loader/git-{loader,updater}.ini
The configuration file contains the following directives:
```
[main]
# the storage class used. one of remote_storage, local_storage
storage_class = remote_storage
# arguments passed to the storage class
# for remote_storage: URI of the storage server
-storage_args = http://localhost:5000/
+storage_args = http://localhost:5002/
# for local_storage: database connection string and root of the
# storage, comma separated
# storage_args = dbname=softwareheritage-dev, /tmp/swh/storage
# Whether to send the given types of objects
send_contents = True
send_directories = True
send_revisions = True
send_releases = True
send_occurrences = True
# The size of the packets sent to storage for each kind of object
content_packet_size = 100000
content_packet_size_bytes = 1073741824
directory_packet_size = 25000
revision_packet_size = 100000
release_packet_size = 100000
occurrence_packet_size = 100000
```
diff --git a/swh.loader.git.egg-info/PKG-INFO b/swh.loader.git.egg-info/PKG-INFO
index a3e4f67..3fd3e6d 100644
--- a/swh.loader.git.egg-info/PKG-INFO
+++ b/swh.loader.git.egg-info/PKG-INFO
@@ -1,10 +1,10 @@
Metadata-Version: 1.0
Name: swh.loader.git
-Version: 0.0.29
+Version: 0.0.30
Summary: Software Heritage git loader
Home-page: https://forge.softwareheritage.org/diffusion/DCORE/
Author: Software Heritage developers
Author-email: swh-devel@inria.fr
License: UNKNOWN
Description: UNKNOWN
Platform: UNKNOWN
diff --git a/swh/loader/git/base.py b/swh/loader/git/base.py
index 0623757..a810eaa 100644
--- a/swh/loader/git/base.py
+++ b/swh/loader/git/base.py
@@ -1,453 +1,453 @@
# Copyright (C) 2016-2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import logging
import os
import traceback
import uuid
import psycopg2
import requests
from retrying import retry
from swh.core import config
from swh.storage import get_storage
def send_in_packets(objects, sender, packet_size, packet_size_bytes=None):
"""Send `objects`, using the `sender`, in packets of `packet_size` objects (and
of max `packet_size_bytes`).
"""
formatted_objects = []
count = 0
if not packet_size_bytes:
packet_size_bytes = 0
for obj in objects:
if not obj:
continue
formatted_objects.append(obj)
if packet_size_bytes:
count += obj['length']
if len(formatted_objects) >= packet_size or count > packet_size_bytes:
sender(formatted_objects)
formatted_objects = []
count = 0
if formatted_objects:
sender(formatted_objects)
def retry_loading(error):
"""Retry policy when we catch a recoverable error"""
exception_classes = [
# raised when two parallel insertions insert the same data.
psycopg2.IntegrityError,
# raised when uWSGI restarts and hungs up on the worker.
requests.exceptions.ConnectionError,
]
if not any(isinstance(error, exc) for exc in exception_classes):
return False
logger = logging.getLogger('swh.loader.git.BulkLoader')
error_name = error.__module__ + '.' + error.__class__.__name__
logger.warning('Retry loading a batch', exc_info=False, extra={
'swh_type': 'storage_retry',
'swh_exception_type': error_name,
'swh_exception': traceback.format_exception(
error.__class__,
error,
error.__traceback__,
),
})
return True
class BaseLoader(config.SWHConfig):
"""This base class is a pattern for loaders.
The external calling convention is as such:
- instantiate the class once (loads storage and the configuration)
- for each origin, call load with the origin-specific arguments (for
instance, an origin URL).
load calls several methods that must be implemented in subclasses:
- prepare(*args, **kwargs) prepares the loader for the new origin
- get_origin gets the origin object associated to the current loader
- fetch_data downloads the necessary data from the origin
- get_{contents,directories,revisions,releases,occurrences} retrieve each
kind of object from the origin
- has_* checks whether there are some objects to load for that object type
- get_fetch_history_result retrieves the data to insert in the
fetch_history table once the load was successful
- cleanup cleans up an eventual state installed for computations
- eventful returns whether the load was eventful or not
"""
CONFIG_BASE_FILENAME = None
DEFAULT_CONFIG = {
'storage': ('dict', {
'cls': 'remote',
'args': {
- 'url': 'http://localhost:5000/'
+ 'url': 'http://localhost:5002/'
},
}),
'send_contents': ('bool', True),
'send_directories': ('bool', True),
'send_revisions': ('bool', True),
'send_releases': ('bool', True),
'send_occurrences': ('bool', True),
'save_data': ('bool', False),
'save_data_path': ('str', ''),
'content_packet_size': ('int', 10000),
'content_packet_size_bytes': ('int', 1024 * 1024 * 1024),
'directory_packet_size': ('int', 25000),
'revision_packet_size': ('int', 100000),
'release_packet_size': ('int', 100000),
'occurrence_packet_size': ('int', 100000),
}
ADDITIONAL_CONFIG = {}
def __init__(self):
self.config = self.parse_config_file(
additional_configs=[self.ADDITIONAL_CONFIG])
# Make sure the config is sane
if self.config['save_data']:
path = self.config['save_data_path']
os.stat(path)
if not os.access(path, os.R_OK | os.W_OK):
raise PermissionError("Permission denied: %r" % path)
self.storage = get_storage(**self.config['storage'])
self.log = logging.getLogger('swh.loader.git.BulkLoader')
self.fetch_date = None # possibly overridden in self.prepare method
def prepare(self, *args, **kwargs):
"""Prepare the data source to be loaded"""
raise NotImplementedError
def cleanup(self):
"""Clean up an eventual state installed for computations."""
pass
def get_origin(self):
"""Get the origin that is currently being loaded"""
raise NotImplementedError
def fetch_data(self):
"""Fetch the data from the data source"""
raise NotImplementedError
def has_contents(self):
"""Checks whether we need to load contents"""
return True
def get_contents(self):
"""Get the contents that need to be loaded"""
raise NotImplementedError
def has_directories(self):
"""Checks whether we need to load directories"""
return True
def get_directories(self):
"""Get the directories that need to be loaded"""
raise NotImplementedError
def has_revisions(self):
"""Checks whether we need to load revisions"""
return True
def get_revisions(self):
"""Get the revisions that need to be loaded"""
raise NotImplementedError
def has_releases(self):
"""Checks whether we need to load releases"""
return True
def get_releases(self):
"""Get the releases that need to be loaded"""
raise NotImplementedError
def has_occurrences(self):
"""Checks whether we need to load occurrences"""
return True
def get_occurrences(self):
"""Get the occurrences that need to be loaded"""
raise NotImplementedError
def get_fetch_history_result(self):
"""Return the data to store in fetch_history for the current loader"""
raise NotImplementedError
def eventful(self):
"""Whether the load was eventful"""
raise NotImplementedError
def save_data(self):
"""Save the data associated to the current load"""
raise NotImplementedError
def get_save_data_path(self):
"""The path to which we save the data"""
if not hasattr(self, '__save_data_path'):
origin_id = self.origin_id
year = str(self.fetch_date.year)
path = os.path.join(
self.config['save_data_path'],
"%04d" % (origin_id % 10000),
"%08d" % origin_id,
year,
)
os.makedirs(path, exist_ok=True)
self.__save_data_path = path
return self.__save_data_path
@retry(retry_on_exception=retry_loading, stop_max_attempt_number=3)
def send_contents(self, content_list):
"""Actually send properly formatted contents to the database"""
num_contents = len(content_list)
log_id = str(uuid.uuid4())
self.log.debug("Sending %d contents" % num_contents,
extra={
'swh_type': 'storage_send_start',
'swh_content_type': 'content',
'swh_num': num_contents,
'swh_id': log_id,
})
self.storage.content_add(content_list)
self.log.debug("Done sending %d contents" % num_contents,
extra={
'swh_type': 'storage_send_end',
'swh_content_type': 'content',
'swh_num': num_contents,
'swh_id': log_id,
})
@retry(retry_on_exception=retry_loading, stop_max_attempt_number=3)
def send_directories(self, directory_list):
"""Actually send properly formatted directories to the database"""
num_directories = len(directory_list)
log_id = str(uuid.uuid4())
self.log.debug("Sending %d directories" % num_directories,
extra={
'swh_type': 'storage_send_start',
'swh_content_type': 'directory',
'swh_num': num_directories,
'swh_id': log_id,
})
self.storage.directory_add(directory_list)
self.log.debug("Done sending %d directories" % num_directories,
extra={
'swh_type': 'storage_send_end',
'swh_content_type': 'directory',
'swh_num': num_directories,
'swh_id': log_id,
})
@retry(retry_on_exception=retry_loading, stop_max_attempt_number=3)
def send_revisions(self, revision_list):
"""Actually send properly formatted revisions to the database"""
num_revisions = len(revision_list)
log_id = str(uuid.uuid4())
self.log.debug("Sending %d revisions" % num_revisions,
extra={
'swh_type': 'storage_send_start',
'swh_content_type': 'revision',
'swh_num': num_revisions,
'swh_id': log_id,
})
self.storage.revision_add(revision_list)
self.log.debug("Done sending %d revisions" % num_revisions,
extra={
'swh_type': 'storage_send_end',
'swh_content_type': 'revision',
'swh_num': num_revisions,
'swh_id': log_id,
})
@retry(retry_on_exception=retry_loading, stop_max_attempt_number=3)
def send_releases(self, release_list):
"""Actually send properly formatted releases to the database"""
num_releases = len(release_list)
log_id = str(uuid.uuid4())
self.log.debug("Sending %d releases" % num_releases,
extra={
'swh_type': 'storage_send_start',
'swh_content_type': 'release',
'swh_num': num_releases,
'swh_id': log_id,
})
self.storage.release_add(release_list)
self.log.debug("Done sending %d releases" % num_releases,
extra={
'swh_type': 'storage_send_end',
'swh_content_type': 'release',
'swh_num': num_releases,
'swh_id': log_id,
})
@retry(retry_on_exception=retry_loading, stop_max_attempt_number=3)
def send_occurrences(self, occurrence_list):
"""Actually send properly formatted occurrences to the database"""
num_occurrences = len(occurrence_list)
log_id = str(uuid.uuid4())
self.log.debug("Sending %d occurrences" % num_occurrences,
extra={
'swh_type': 'storage_send_start',
'swh_content_type': 'occurrence',
'swh_num': num_occurrences,
'swh_id': log_id,
})
self.storage.occurrence_add(occurrence_list)
self.log.debug("Done sending %d occurrences" % num_occurrences,
extra={
'swh_type': 'storage_send_end',
'swh_content_type': 'occurrence',
'swh_num': num_occurrences,
'swh_id': log_id,
})
def send_origin(self, origin):
log_id = str(uuid.uuid4())
self.log.debug('Creating %s origin for %s' % (origin['type'],
origin['url']),
extra={
'swh_type': 'storage_send_start',
'swh_content_type': 'origin',
'swh_num': 1,
'swh_id': log_id
})
origin_id = self.storage.origin_add_one(origin)
self.log.debug('Done creating %s origin for %s' % (origin['type'],
origin['url']),
extra={
'swh_type': 'storage_send_end',
'swh_content_type': 'origin',
'swh_num': 1,
'swh_id': log_id
})
return origin_id
def send_all_contents(self, contents):
"""Send all the contents to the database"""
packet_size = self.config['content_packet_size']
packet_size_bytes = self.config['content_packet_size_bytes']
send_in_packets(contents, self.send_contents, packet_size,
packet_size_bytes=packet_size_bytes)
def send_all_directories(self, directories):
"""Send all the directories to the database"""
packet_size = self.config['directory_packet_size']
send_in_packets(directories, self.send_directories, packet_size)
def send_all_revisions(self, revisions):
"""Send all the revisions to the database"""
packet_size = self.config['revision_packet_size']
send_in_packets(revisions, self.send_revisions, packet_size)
def send_all_releases(self, releases):
"""Send all the releases to the database
"""
packet_size = self.config['release_packet_size']
send_in_packets(releases, self.send_releases, packet_size)
def send_all_occurrences(self, occurrences):
"""Send all the occurrences to the database
"""
packet_size = self.config['occurrence_packet_size']
send_in_packets(occurrences, self.send_occurrences, packet_size)
def open_fetch_history(self):
return self.storage.fetch_history_start(self.origin_id)
def close_fetch_history_success(self, fetch_history_id, result):
data = {
'status': True,
'result': result,
}
return self.storage.fetch_history_end(fetch_history_id, data)
def close_fetch_history_failure(self, fetch_history_id):
import traceback
data = {
'status': False,
'stderr': traceback.format_exc(),
}
return self.storage.fetch_history_end(fetch_history_id, data)
def load(self, *args, **kwargs):
self.prepare(*args, **kwargs)
origin = self.get_origin()
self.origin_id = self.send_origin(origin)
fetch_history_id = self.open_fetch_history()
if self.fetch_date: # overwriting the visit_date the fetching date
date_visit = self.fetch_date
else:
date_visit = datetime.datetime.now(tz=datetime.timezone.utc)
origin_visit = self.storage.origin_visit_add(
self.origin_id,
date_visit)
self.visit = origin_visit['visit']
try:
self.fetch_data()
if self.config['save_data']:
self.save_data()
if self.config['send_contents'] and self.has_contents():
self.send_all_contents(self.get_contents())
if self.config['send_directories'] and self.has_directories():
self.send_all_directories(self.get_directories())
if self.config['send_revisions'] and self.has_revisions():
self.send_all_revisions(self.get_revisions())
if self.config['send_releases'] and self.has_releases():
self.send_all_releases(self.get_releases())
if self.config['send_occurrences'] and self.has_occurrences():
self.send_all_occurrences(self.get_occurrences())
self.close_fetch_history_success(fetch_history_id,
self.get_fetch_history_result())
self.storage.origin_visit_update(
self.origin_id, self.visit, status='full')
except:
self.close_fetch_history_failure(fetch_history_id)
self.storage.origin_visit_update(
self.origin_id, self.visit, status='partial')
raise
finally:
self.cleanup()
return self.eventful()
diff --git a/swh/loader/git/converters.py b/swh/loader/git/converters.py
index 64e5049..b84116c 100644
--- a/swh/loader/git/converters.py
+++ b/swh/loader/git/converters.py
@@ -1,214 +1,216 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""Convert dulwich objects to dictionaries suitable for swh.storage"""
from swh.core import hashutil
HASH_ALGORITHMS = hashutil.ALGORITHMS - {'sha1_git'}
def origin_url_to_origin(origin_url):
"""Format a pygit2.Repository as an origin suitable for swh.storage"""
return {
'type': 'git',
'url': origin_url,
}
def dulwich_blob_to_content(blob, log=None, max_content_size=None,
origin_id=None):
"""Convert a dulwich blob to a Software Heritage content"""
if blob.type_name != b'blob':
return
size = blob.raw_length()
ret = {
'sha1_git': blob.sha().digest(),
'length': size,
'status': 'absent'
}
if max_content_size:
if size > max_content_size:
id = hashutil.hash_to_hex(ret['sha1_git'])
if log:
log.info('Skipping content %s, too large (%s > %s)' %
(id, size, max_content_size), extra={
'swh_type': 'loader_git_content_skip',
'swh_id': id,
'swh_size': size,
})
ret['reason'] = 'Content too large'
ret['origin'] = origin_id
return ret
data = blob.as_raw_string()
hashes = hashutil.hashdata(data, HASH_ALGORITHMS)
ret.update(hashes)
ret['data'] = data
ret['status'] = 'visible'
return ret
def dulwich_tree_to_directory(tree, log=None):
"""Format a tree as a directory"""
if tree.type_name != b'tree':
return
ret = {
'id': tree.sha().digest(),
}
entries = []
ret['entries'] = entries
entry_mode_map = {
0o040000: 'dir',
0o160000: 'rev',
0o100644: 'file',
0o100755: 'file',
0o120000: 'file',
}
for entry in tree.iteritems():
entries.append({
'type': entry_mode_map.get(entry.mode, 'file'),
'perms': entry.mode,
'name': entry.path,
'target': hashutil.hex_to_hash(entry.sha.decode('ascii')),
})
return ret
def parse_author(name_email):
"""Parse an author line"""
if name_email is None:
return None
try:
open_bracket = name_email.index(b'<')
except ValueError:
name = email = None
else:
raw_name = name_email[:open_bracket]
raw_email = name_email[open_bracket+1:]
if not raw_name:
name = None
elif raw_name.endswith(b' '):
name = raw_name[:-1]
else:
name = raw_name
try:
close_bracket = raw_email.index(b'>')
except ValueError:
email = None
else:
email = raw_email[:close_bracket]
return {
'name': name,
'email': email,
'fullname': name_email,
}
def dulwich_tsinfo_to_timestamp(timestamp, timezone, timezone_neg_utc):
"""Convert the dulwich timestamp information to a structure compatible with
Software Heritage"""
return {
'timestamp': timestamp,
'offset': timezone // 60,
'negative_utc': timezone_neg_utc if timezone == 0 else None,
}
def dulwich_commit_to_revision(commit, log=None):
if commit.type_name != b'commit':
return
ret = {
'id': commit.sha().digest(),
'author': parse_author(commit.author),
'date': dulwich_tsinfo_to_timestamp(
commit.author_time,
commit.author_timezone,
commit._author_timezone_neg_utc,
),
'committer': parse_author(commit.committer),
'committer_date': dulwich_tsinfo_to_timestamp(
commit.commit_time,
commit.commit_timezone,
commit._commit_timezone_neg_utc,
),
'type': 'git',
'directory': bytes.fromhex(commit.tree.decode()),
'message': commit.message,
'metadata': None,
'synthetic': False,
'parents': [bytes.fromhex(p.decode()) for p in commit.parents],
}
git_metadata = []
if commit.encoding is not None:
git_metadata.append(['encoding', commit.encoding])
if commit.mergetag:
for mergetag in commit.mergetag:
- git_metadata.append(['mergetag', mergetag.as_raw_string()])
+ raw_string = mergetag.as_raw_string()
+ assert raw_string.endswith(b'\n')
+ git_metadata.append(['mergetag', raw_string[:-1]])
if commit.extra:
git_metadata.extend([k.decode('utf-8'), v] for k, v in commit.extra)
if commit.gpgsig:
git_metadata.append(['gpgsig', commit.gpgsig])
if git_metadata:
ret['metadata'] = {
'extra_headers': git_metadata,
}
return ret
DULWICH_TYPES = {
b'blob': 'content',
b'tree': 'directory',
b'commit': 'revision',
b'tag': 'release',
}
def dulwich_tag_to_release(tag, log=None):
if tag.type_name != b'tag':
return
target_type, target = tag.object
ret = {
'id': tag.sha().digest(),
'name': tag.name,
'target': bytes.fromhex(target.decode()),
'target_type': DULWICH_TYPES[target_type.type_name],
'message': tag._message,
'metadata': None,
'synthetic': False,
}
if tag.tagger:
ret['author'] = parse_author(tag.tagger)
ret['date'] = dulwich_tsinfo_to_timestamp(
tag.tag_time,
tag.tag_timezone,
tag._tag_timezone_neg_utc,
)
else:
ret['author'] = ret['date'] = None
return ret
diff --git a/version.txt b/version.txt
index b39233c..011d10a 100644
--- a/version.txt
+++ b/version.txt
@@ -1 +1 @@
-v0.0.29-0-gdbd5ca3
\ No newline at end of file
+v0.0.30-0-gfb03140
\ No newline at end of file
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Mon, Aug 25, 4:52 PM (6 d, 11 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3346777
Attached To
rDLDG Git loader
Event Timeline
Log In to Comment