Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9124922
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
59 KB
Subscribers
None
View Options
diff --git a/PKG-INFO b/PKG-INFO
index de95f8f..832dc70 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,10 +1,10 @@
Metadata-Version: 1.0
Name: swh.archiver
-Version: 0.0.1
+Version: 0.0.2
Summary: Software Heritage archiver
Home-page: https://forge.softwareheritage.org/diffusion/DARC/
Author: Software Heritage developers
Author-email: swh-devel@inria.fr
License: UNKNOWN
Description: UNKNOWN
Platform: UNKNOWN
diff --git a/docs/.gitignore b/docs/.gitignore
new file mode 100644
index 0000000..58a761e
--- /dev/null
+++ b/docs/.gitignore
@@ -0,0 +1,3 @@
+_build/
+apidoc/
+*-stamp
diff --git a/docs/Makefile b/docs/Makefile
new file mode 100644
index 0000000..c30c50a
--- /dev/null
+++ b/docs/Makefile
@@ -0,0 +1 @@
+include ../../swh-docs/Makefile.sphinx
diff --git a/docs/_static/.placeholder b/docs/_static/.placeholder
new file mode 100644
index 0000000..e69de29
diff --git a/docs/_templates/.placeholder b/docs/_templates/.placeholder
new file mode 100644
index 0000000..e69de29
diff --git a/docs/conf.py b/docs/conf.py
new file mode 100644
index 0000000..190deb7
--- /dev/null
+++ b/docs/conf.py
@@ -0,0 +1 @@
+from swh.docs.sphinx.conf import * # NoQA
diff --git a/docs/index.rst b/docs/index.rst
new file mode 100644
index 0000000..8b64117
--- /dev/null
+++ b/docs/index.rst
@@ -0,0 +1,15 @@
+Software Heritage - Development Documentation
+=============================================
+
+.. toctree::
+ :maxdepth: 2
+ :caption: Contents:
+
+
+
+Indices and tables
+==================
+
+* :ref:`genindex`
+* :ref:`modindex`
+* :ref:`search`
diff --git a/requirements.txt b/requirements.txt
index 7758480..bc752ac 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,3 +1,4 @@
click
+kafka-python
psycopg2
vcversioner
diff --git a/setup.py b/setup.py
index 18c1aed..dccd330 100755
--- a/setup.py
+++ b/setup.py
@@ -1,32 +1,29 @@
#!/usr/bin/env python3
-from setuptools import setup
+from setuptools import setup, find_packages
def parse_requirements():
requirements = []
for reqf in ('requirements.txt', 'requirements-swh.txt'):
with open(reqf) as f:
for line in f.readlines():
line = line.strip()
if not line or line.startswith('#'):
continue
requirements.append(line)
return requirements
setup(
name='swh.archiver',
description='Software Heritage archiver',
author='Software Heritage developers',
author_email='swh-devel@inria.fr',
url='https://forge.softwareheritage.org/diffusion/DARC/',
- packages=[
- 'swh.archiver',
- 'swh.archiver.tests',
- ],
+ packages=find_packages(),
install_requires=parse_requirements(),
setup_requires=['vcversioner'],
vcversioner={},
include_package_data=True,
)
diff --git a/sql/.gitignore b/sql/.gitignore
new file mode 100644
index 0000000..83f9ed8
--- /dev/null
+++ b/sql/.gitignore
@@ -0,0 +1,3 @@
+*-stamp
+autodoc/
+swh.dump
diff --git a/sql/Makefile b/sql/Makefile
index c132dbc..ab596ae 100644
--- a/sql/Makefile
+++ b/sql/Makefile
@@ -1,42 +1,42 @@
# Depends: postgresql-client, postgresql-autodoc
DBNAME = softwareheritage-archiver-dev
DOCDIR = autodoc
-SQL_INIT = ../swh-init.sql
+SQL_INIT = swh-init.sql
SQL_SCHEMA = swh-archiver-schema.sql
SQL_FUNC = swh-archiver-func.sql
SQL_DATA = swh-archiver-data.sql
SQLS = $(SQL_INIT) $(SQL_SCHEMA) $(SQL_FUNC) $(SQL_DATA)
PSQL_BIN = psql
PSQL_FLAGS = --single-transaction --echo-all -X
PSQL = $(PSQL_BIN) $(PSQL_FLAGS)
all:
createdb: createdb-stamp
createdb-stamp: $(SQL_INIT)
createdb $(DBNAME)
touch $@
filldb: filldb-stamp
filldb-stamp: createdb-stamp
cat $(SQLS) | $(PSQL) $(DBNAME)
touch $@
dropdb:
-dropdb $(DBNAME)
dumpdb: swh-archiver.dump
swh.dump: filldb-stamp
pg_dump -Fc $(DBNAME) > $@
clean:
rm -rf *-stamp $(DOCDIR)/
distclean: clean dropdb
rm -f swh.dump
.PHONY: all initdb createdb dropdb doc clean
diff --git a/sql/swh-init.sql b/sql/swh-init.sql
new file mode 100644
index 0000000..1165283
--- /dev/null
+++ b/sql/swh-init.sql
@@ -0,0 +1 @@
+create or replace language plpgsql;
diff --git a/PKG-INFO b/swh.archiver.egg-info/PKG-INFO
similarity index 94%
copy from PKG-INFO
copy to swh.archiver.egg-info/PKG-INFO
index de95f8f..832dc70 100644
--- a/PKG-INFO
+++ b/swh.archiver.egg-info/PKG-INFO
@@ -1,10 +1,10 @@
Metadata-Version: 1.0
Name: swh.archiver
-Version: 0.0.1
+Version: 0.0.2
Summary: Software Heritage archiver
Home-page: https://forge.softwareheritage.org/diffusion/DARC/
Author: Software Heritage developers
Author-email: swh-devel@inria.fr
License: UNKNOWN
Description: UNKNOWN
Platform: UNKNOWN
diff --git a/swh.archiver.egg-info/SOURCES.txt b/swh.archiver.egg-info/SOURCES.txt
new file mode 100644
index 0000000..a8ca13b
--- /dev/null
+++ b/swh.archiver.egg-info/SOURCES.txt
@@ -0,0 +1,57 @@
+.gitignore
+MANIFEST.in
+Makefile
+Makefile.local
+README.db_testing
+requirements-swh.txt
+requirements.txt
+setup.py
+version.txt
+debian/changelog
+debian/compat
+debian/control
+debian/copyright
+debian/rules
+debian/source/format
+docs/.gitignore
+docs/Makefile
+docs/archiver-blueprint.md
+docs/conf.py
+docs/index.rst
+docs/_static/.placeholder
+docs/_templates/.placeholder
+sql/.gitignore
+sql/Makefile
+sql/createdb-stamp
+sql/filldb-stamp
+sql/swh-archiver-data.sql
+sql/swh-archiver-func.sql
+sql/swh-archiver-schema.sql
+sql/swh-init.sql
+sql/bin/db-upgrade
+sql/upgrades/002.sql
+sql/upgrades/003.sql
+sql/upgrades/004.sql
+sql/upgrades/005.sql
+sql/upgrades/006.sql
+sql/upgrades/007.sql
+sql/upgrades/008.sql
+sql/upgrades/009.sql
+swh/__init__.py
+swh.archiver.egg-info/PKG-INFO
+swh.archiver.egg-info/SOURCES.txt
+swh.archiver.egg-info/dependency_links.txt
+swh.archiver.egg-info/requires.txt
+swh.archiver.egg-info/top_level.txt
+swh/archiver/__init__.py
+swh/archiver/checker.py
+swh/archiver/copier.py
+swh/archiver/db.py
+swh/archiver/director.py
+swh/archiver/storage.py
+swh/archiver/tasks.py
+swh/archiver/updater.py
+swh/archiver/worker.py
+swh/archiver/tests/__init__.py
+swh/archiver/tests/test_archiver.py
+swh/archiver/tests/test_checker.py
\ No newline at end of file
diff --git a/swh.archiver.egg-info/dependency_links.txt b/swh.archiver.egg-info/dependency_links.txt
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/swh.archiver.egg-info/dependency_links.txt
@@ -0,0 +1 @@
+
diff --git a/swh.archiver.egg-info/requires.txt b/swh.archiver.egg-info/requires.txt
new file mode 100644
index 0000000..539798d
--- /dev/null
+++ b/swh.archiver.egg-info/requires.txt
@@ -0,0 +1,10 @@
+click
+kafka-python
+psycopg2
+swh.core>=0.0.28
+swh.journal>=0.0.2
+swh.model>=0.0.15
+swh.objstorage>=0.0.17
+swh.scheduler>=0.0.14
+swh.storage>=0.0.88
+vcversioner
diff --git a/swh.archiver.egg-info/top_level.txt b/swh.archiver.egg-info/top_level.txt
new file mode 100644
index 0000000..0cb0f8f
--- /dev/null
+++ b/swh.archiver.egg-info/top_level.txt
@@ -0,0 +1 @@
+swh
diff --git a/swh/__init__.py b/swh/__init__.py
new file mode 100644
index 0000000..69e3be5
--- /dev/null
+++ b/swh/__init__.py
@@ -0,0 +1 @@
+__path__ = __import__('pkgutil').extend_path(__path__, __name__)
diff --git a/swh/archiver/db.py b/swh/archiver/db.py
index d9e5e43..df26142 100644
--- a/swh/archiver/db.py
+++ b/swh/archiver/db.py
@@ -1,224 +1,225 @@
# Copyright (C) 2015-2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
from swh.storage.db import BaseDb, cursor_to_bytes, stored_procedure
def utcnow():
return datetime.datetime.now(tz=datetime.timezone.utc)
class ArchiverDb(BaseDb):
"""Proxy to the SWH's archiver DB
"""
def archive_ls(self, cur=None):
""" Get all the archives registered on the server.
Yields:
a tuple (server_id, server_url) for each archive server.
"""
cur = self._cursor(cur)
cur.execute("SELECT * FROM archive")
yield from cursor_to_bytes(cur)
def content_archive_get(self, content_id, cur=None):
""" Get the archival status of a content in a specific server.
Retrieve from the database the archival status of the given content
in the given archive server.
Args:
content_id: the sha1 of the content.
Yields:
A tuple (content_id, present_copies, ongoing_copies), where
ongoing_copies is a dict mapping copy to mtime.
"""
query = """select archive.name, status, mtime
from content_copies
left join archive on content_copies.archive_id = archive.id
where content_copies.content_id = (
select id from content where sha1 = %s)
"""
cur = self._cursor(cur)
cur.execute(query, (content_id,))
rows = cur.fetchall()
if not rows:
return None
present = []
ongoing = {}
for archive, status, mtime in rows:
if status == 'present':
present.append(archive)
elif status == 'ongoing':
ongoing[archive] = mtime
return (content_id, present, ongoing)
def content_archive_get_copies(self, last_content=None, limit=1000,
cur=None):
"""Get the list of copies for `limit` contents starting after
`last_content`.
Args:
last_content: sha1 of the last content retrieved. May be None
to start at the beginning.
limit: number of contents to retrieve. Can be None to retrieve all
objects (will be slow).
Yields:
A tuple (content_id, present_copies, ongoing_copies), where
ongoing_copies is a dict mapping copy to mtime.
"""
vars = {
'limit': limit,
}
if last_content is None:
last_content_clause = 'true'
else:
last_content_clause = """content_id > (
select id from content
where sha1 = %(last_content)s)"""
vars['last_content'] = last_content
query = """select
(select sha1 from content where id = content_id),
array_agg((select name from archive
where id = archive_id))
from content_copies
where status = 'present' and %s
group by content_id
order by content_id
limit %%(limit)s""" % last_content_clause
cur = self._cursor(cur)
cur.execute(query, vars)
for content_id, present in cursor_to_bytes(cur):
yield (content_id, present, {})
def content_archive_get_unarchived_copies(
self, retention_policy, last_content=None,
limit=1000, cur=None):
""" Get the list of copies for `limit` contents starting after
`last_content`. Yields only copies with number of present
smaller than `retention policy`.
Args:
last_content: sha1 of the last content retrieved. May be None
to start at the beginning.
retention_policy: number of required present copies
limit: number of contents to retrieve. Can be None to retrieve all
objects (will be slow).
Yields:
A tuple (content_id, present_copies, ongoing_copies), where
ongoing_copies is a dict mapping copy to mtime.
"""
vars = {
'limit': limit,
'retention_policy': retention_policy,
}
if last_content is None:
last_content_clause = 'true'
else:
last_content_clause = """content_id > (
select id from content
where sha1 = %(last_content)s)"""
vars['last_content'] = last_content
query = """select
(select sha1 from content where id = content_id),
array_agg((select name from archive
where id = archive_id))
from content_copies
where status = 'present' and %s
group by content_id
having count(archive_id) < %%(retention_policy)s
order by content_id
limit %%(limit)s""" % last_content_clause
cur = self._cursor(cur)
cur.execute(query, vars)
for content_id, present in cursor_to_bytes(cur):
yield (content_id, present, {})
@stored_procedure('swh_mktemp_content_archive')
def mktemp_content_archive(self, cur=None):
"""Trigger the creation of the temporary table tmp_content_archive
during the lifetime of the transaction.
"""
pass
@stored_procedure('swh_content_archive_add')
def content_archive_add_from_temp(self, cur=None):
"""Add new content archive entries from temporary table.
- Use from archiver.storage module:
+ Use from archiver.storage module::
+
self.db.mktemp_content_archive()
# copy data over to the temp table
self.db.copy_to([{'colname': id0}, {'colname': id1}],
'tmp_cache_content',
['colname'], cur)
# insert into the main table
self.db.add_content_archive_from_temp(cur)
"""
pass
def content_archive_get_missing(self, backend_name, cur=None):
"""Retrieve the content missing from backend_name.
"""
cur = self._cursor(cur)
cur.execute("select * from swh_content_archive_missing(%s)",
(backend_name,))
yield from cursor_to_bytes(cur)
def content_archive_get_unknown(self, cur=None):
"""Retrieve unknown sha1 from archiver db.
"""
cur = self._cursor(cur)
cur.execute('select * from swh_content_archive_unknown()')
yield from cursor_to_bytes(cur)
def content_archive_update(self, content_id, archive_id,
new_status=None, cur=None):
""" Update the status of an archive content and set its mtime to
Change the mtime of an archived content for the given archive and set
it's mtime to the current time.
Args:
content_id (str): content sha1
archive_id (str): name of the archive
new_status (str): one of 'missing', 'present' or 'ongoing'.
this status will replace the previous one. If not given,
the function only change the mtime of the content for the
given archive.
"""
assert isinstance(content_id, bytes)
assert new_status is not None
query = """insert into content_copies (archive_id, content_id, status, mtime)
values ((select id from archive where name=%s),
(select id from content where sha1=%s),
%s, %s)
on conflict (archive_id, content_id) do
update set status = excluded.status, mtime = excluded.mtime
"""
cur = self._cursor(cur)
cur.execute(query, (archive_id, content_id, new_status, utcnow()))
diff --git a/swh/archiver/director.py b/swh/archiver/director.py
index 9e569bb..9077cbb 100644
--- a/swh/archiver/director.py
+++ b/swh/archiver/director.py
@@ -1,339 +1,341 @@
# Copyright (C) 2015-2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import abc
import logging
import sys
import time
import click
from swh.core import config, utils
from swh.model import hashutil
from swh.objstorage import get_objstorage
from swh.scheduler.utils import get_task
from . import tasks # noqa
from .storage import get_archiver_storage
class ArchiverDirectorBase(config.SWHConfig, metaclass=abc.ABCMeta):
"""Abstract Director class
An archiver director is in charge of dispatching batch of
contents to archiver workers (for them to archive).
Inherit from this class and provide:
+
- ADDITIONAL_CONFIG: Some added configuration needed for the
director to work
- CONFIG_BASE_FILENAME: relative path to lookup for the
configuration file
- def get_contents_to_archive(self): Implementation method to read
contents to archive
"""
DEFAULT_CONFIG = {
'batch_max_size': ('int', 1500),
'asynchronous': ('bool', True),
'max_queue_length': ('int', 100000),
'queue_throttling_delay': ('int', 120),
'archiver_storage': ('dict', {
'cls': 'db',
'args': {
'dbconn': 'dbname=softwareheritage-archiver-dev user=guest',
},
}),
}
# Destined to be overridden by subclass
ADDITIONAL_CONFIG = {}
# We use the same configuration file as the worker
CONFIG_BASE_FILENAME = 'archiver/worker'
# The worker's task queue name to use
TASK_NAME = None
def __init__(self):
""" Constructor of the archiver director.
Args:
db_conn_archiver: Either a libpq connection string,
or a psycopg2 connection for the archiver db.
config: optionnal additional configuration. Keys in the dict will
override the one parsed from the configuration file.
"""
super().__init__()
self.config = self.parse_config_file(
additional_configs=[self.ADDITIONAL_CONFIG])
self.archiver_storage = get_archiver_storage(
**self.config['archiver_storage'])
self.task = get_task(self.TASK_NAME)
self.max_queue_length = self.config['max_queue_length']
self.throttling_delay = self.config['queue_throttling_delay']
def run(self):
""" Run the archiver director.
The archiver director will check all the contents of the archiver
database and do the required backup jobs.
"""
if self.config['asynchronous']:
run_fn = self.run_async_worker
else:
run_fn = self.run_sync_worker
for batch in self.read_batch_contents():
run_fn(batch)
def run_async_worker(self, batch):
"""Produce a worker that will be added to the task queue.
"""
max_length = self.max_queue_length
throttling_delay = self.throttling_delay
while True:
length = self.task.app.get_queue_length(self.task.task_queue)
if length >= max_length:
logging.info(
'queue length %s >= %s, throttling for %s seconds' % (
length,
max_length,
throttling_delay,
)
)
time.sleep(throttling_delay)
else:
break
self.task.delay(batch=batch)
def run_sync_worker(self, batch):
"""Run synchronously a worker on the given batch.
"""
self.task(batch=batch)
def read_batch_contents(self):
""" Create batch of contents that needs to be archived
Yields:
batch of sha1 that corresponds to contents that needs more archive
copies.
"""
contents = []
for content in self.get_contents_to_archive():
contents.append(content)
if len(contents) > self.config['batch_max_size']:
yield contents
contents = []
if len(contents) > 0:
yield contents
@abc.abstractmethod
def get_contents_to_archive(self):
"""Retrieve generator of sha1 to archive
Yields:
sha1 to archive
"""
pass
class ArchiverWithRetentionPolicyDirector(ArchiverDirectorBase):
"""Process the files in order to know which one is needed as backup.
The archiver director processes the files in the local storage in order
to know which one needs archival and it delegates this task to
archiver workers.
"""
ADDITIONAL_CONFIG = {
'retention_policy': ('int', 2),
}
TASK_NAME = 'swh.archiver.tasks.SWHArchiverWithRetentionPolicyTask'
def __init__(self, start_id):
super().__init__()
if start_id is not None:
self.start_id = hashutil.hash_to_bytes(start_id)
else:
self.start_id = None
def get_contents_to_archive(self):
"""Create batch of contents that needs to be archived
Yields:
Datas about a content as a tuple
(content_id, present_copies, ongoing_copies) where ongoing_copies
is a dict mapping copy to mtime.
"""
last_content = self.start_id
while True:
archiver_contents = list(
self.archiver_storage.content_archive_get_unarchived_copies(
last_content=last_content,
retention_policy=self.config['retention_policy'],
limit=self.config['batch_max_size']))
if not archiver_contents:
return
for content_id, _, _ in archiver_contents:
last_content = content_id
yield content_id
def read_sha1_from_stdin():
"""Read sha1 from stdin.
"""
for line in sys.stdin:
sha1 = line.strip()
try:
yield hashutil.hash_to_bytes(sha1)
except Exception:
print("%s is not a valid sha1 hash, continuing" % repr(sha1),
file=sys.stderr)
continue
class ArchiverStdinToBackendDirector(ArchiverDirectorBase):
"""A cloud archiver director in charge of reading contents and send
them in batch in the cloud.
The archiver director, in order:
+
- Reads sha1 to send to a specific backend.
- Checks if those sha1 are known in the archiver. If they are not,
add them
- if the sha1 are missing, they are sent for the worker to archive
If the flag force_copy is set, this will force the copy to be sent
for archive even though it has already been done.
"""
ADDITIONAL_CONFIG = {
'destination': ('str', 'azure'),
'force_copy': ('bool', False),
'source': ('str', 'uffizi'),
'storages': ('list[dict]',
[
{'host': 'uffizi',
'cls': 'pathslicing',
'args': {'root': '/tmp/softwareheritage/objects',
'slicing': '0:2/2:4/4:6'}},
{'host': 'banco',
'cls': 'remote',
'args': {'base_url': 'http://banco:5003/'}}
])
}
CONFIG_BASE_FILENAME = 'archiver/worker-to-backend'
TASK_NAME = 'swh.archiver.tasks.SWHArchiverToBackendTask'
def __init__(self):
super().__init__()
self.destination = self.config['destination']
self.force_copy = self.config['force_copy']
self.objstorages = {
storage['host']: get_objstorage(storage['cls'], storage['args'])
for storage in self.config.get('storages', [])
}
# Fallback objstorage
self.source = self.config['source']
def _add_unknown_content_ids(self, content_ids):
"""Check whether some content_id are unknown.
If they are, add them to the archiver db.
Args:
content_ids: List of dict with one key content_id
"""
source_objstorage = self.objstorages[self.source]
self.archiver_storage.content_archive_add(
(h
for h in content_ids
if h in source_objstorage),
sources_present=[self.source])
def get_contents_to_archive(self):
gen_content_ids = (
ids for ids in utils.grouper(read_sha1_from_stdin(),
self.config['batch_max_size']))
if self.force_copy:
for content_ids in gen_content_ids:
content_ids = list(content_ids)
if not content_ids:
continue
# Add missing entries in archiver table
self._add_unknown_content_ids(content_ids)
print('Send %s contents to archive' % len(content_ids))
for content_id in content_ids:
# force its status to missing
self.archiver_storage.content_archive_update(
content_id, self.destination, 'missing')
yield content_id
else:
for content_ids in gen_content_ids:
content_ids = list(content_ids)
# Add missing entries in archiver table
self._add_unknown_content_ids(content_ids)
# Filter already copied data
content_ids = list(
self.archiver_storage.content_archive_get_missing(
content_ids=content_ids,
backend_name=self.destination))
if not content_ids:
continue
print('Send %s contents to archive' % len(content_ids))
for content in content_ids:
yield content
def run_async_worker(self, batch):
"""Produce a worker that will be added to the task queue.
"""
self.task.delay(destination=self.destination, batch=batch)
def run_sync_worker(self, batch):
"""Run synchronously a worker on the given batch.
"""
self.task(destination=self.destination, batch=batch)
@click.command()
@click.option('--direct', is_flag=True,
help="""The archiver sends content for backup to
one storage.""")
@click.option('--start-id', default=None, help="The first id to process")
def launch(direct, start_id):
if direct:
archiver = ArchiverStdinToBackendDirector()
else:
archiver = ArchiverWithRetentionPolicyDirector(start_id)
archiver.run()
if __name__ == '__main__':
launch()
diff --git a/swh/archiver/tests/__init__.py b/swh/archiver/tests/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/swh/archiver/tests/test_archiver.py b/swh/archiver/tests/test_archiver.py
index 67794a5..67cb0f7 100644
--- a/swh/archiver/tests/test_archiver.py
+++ b/swh/archiver/tests/test_archiver.py
@@ -1,475 +1,465 @@
# Copyright (C) 2015-2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import glob
import tempfile
import shutil
import unittest
import os
from nose.tools import istest
from nose.plugins.attrib import attr
-from swh.core.tests.db_testing import DbsTestFixture
+from swh.core.tests.db_testing import SingleDbTestFixture
from swh.archiver.storage import get_archiver_storage
from swh.archiver import ArchiverWithRetentionPolicyDirector
from swh.archiver import ArchiverWithRetentionPolicyWorker
from swh.archiver.db import utcnow
from swh.objstorage import get_objstorage
from swh.objstorage.exc import ObjNotFoundError
from swh.objstorage.api.server import make_app as app
from swh.storage.tests.server_testing import ServerTestFixtureAsync
TEST_DIR = os.path.dirname(os.path.abspath(__file__))
TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../swh-storage-testdata')
@attr('db')
-class TestArchiver(DbsTestFixture, ServerTestFixtureAsync,
+class TestArchiver(SingleDbTestFixture, ServerTestFixtureAsync,
unittest.TestCase):
""" Test the objstorage archiver.
"""
- TEST_DB_NAMES = [
- 'softwareheritage-archiver-test',
- ]
- TEST_DB_DUMPS = [
- os.path.join(TEST_DATA_DIR, 'dumps/swh-archiver.dump'),
- ]
- TEST_DB_DUMP_TYPES = [
- 'pg_dump',
- ]
+ TEST_DB_NAME = 'softwareheritage-archiver-test'
+ TEST_DB_DUMP = os.path.join(TEST_DATA_DIR, 'dumps/swh-archiver.dump')
+ TEST_DB_DUMP_TYPE = 'pg_dump'
def setUp(self):
# Launch the backup server
self.dest_root = tempfile.mkdtemp(prefix='remote')
self.config = {
'cls': 'pathslicing',
'args': {
'root': self.dest_root,
'slicing': '0:2/2:4/4:6',
}
}
self.app = app(self.config)
super().setUp()
- # Retrieve connection (depends on the order in TEST_DB_NAMES)
- self.conn = self.conns[0] # archiver db's connection
- self.cursor = self.cursors[0]
-
# Create source storage
self.src_root = tempfile.mkdtemp()
src_config = {
'cls': 'pathslicing',
'args': {
'root': self.src_root,
'slicing': '0:2/2:4/4:6'
}
}
self.src_storage = get_objstorage(**src_config)
# Create destination storage
dest_config = {
'cls': 'remote',
'args': {
'url': self.url()
}
}
self.dest_storage = get_objstorage(**dest_config)
# Keep mapped the id to the storages
self.storages = {
'uffizi': self.src_storage,
'banco': self.dest_storage
}
# Override configurations
src_archiver_conf = {'host': 'uffizi'}
dest_archiver_conf = {'host': 'banco'}
src_archiver_conf.update(src_config)
dest_archiver_conf.update(dest_config)
self.archiver_storages = [src_archiver_conf, dest_archiver_conf]
self._override_director_config()
self._override_worker_config()
# Create the base archiver
self.archiver = self._create_director()
def tearDown(self):
self.empty_tables()
shutil.rmtree(self.src_root)
shutil.rmtree(self.dest_root)
super().tearDown()
def empty_tables(self):
# Remove all content
self.cursor.execute('DELETE FROM content')
self.cursor.execute('DELETE FROM content_copies')
self.conn.commit()
def _override_director_config(self, retention_policy=2):
""" Override the default config of the Archiver director
to allow the tests to use the *-test db instead of the default one as
there is no configuration file for now.
"""
ArchiverWithRetentionPolicyDirector.parse_config_file = lambda obj, additional_configs: { # noqa
'archiver_storage': {
'cls': 'db',
'args': {
'dbconn': self.conn,
},
},
'batch_max_size': 5000,
'archival_max_age': 3600,
'retention_policy': retention_policy,
'asynchronous': False,
'max_queue_length': 100000,
'queue_throttling_delay': 120,
}
def _override_worker_config(self):
""" Override the default config of the Archiver worker
to allow the tests to use the *-test db instead of the default one as
there is no configuration file for now.
"""
ArchiverWithRetentionPolicyWorker.parse_config_file = lambda obj, additional_configs: { # noqa
'retention_policy': 2,
'archival_max_age': 3600,
'archiver_storage': {
'cls': 'db',
'args': {
'dbconn': self.conn,
},
},
'storages': self.archiver_storages,
'source': 'uffizi',
'sources': ['uffizi'],
}
def _create_director(self):
return ArchiverWithRetentionPolicyDirector(start_id=None)
def _create_worker(self, batch={}):
return ArchiverWithRetentionPolicyWorker(batch)
def _add_content(self, storage_name, content_data):
""" Add really a content to the given objstorage
This put an empty status for the added content.
Args:
storage_name: the concerned storage
content_data: the data to insert
with_row_insert: to insert a row entry in the db or not
"""
# Add the content to the storage
obj_id = self.storages[storage_name].add(content_data)
self.cursor.execute(""" INSERT INTO content (sha1)
VALUES (%s)
""", (obj_id,))
return obj_id
def _update_status(self, obj_id, storage_name, status, date=None):
""" Update the db status for the given id/storage_name.
This does not create the content in the storage.
"""
self.cursor.execute("""insert into archive (name)
values (%s)
on conflict do nothing""", (storage_name,))
self.archiver.archiver_storage.content_archive_update(
obj_id, storage_name, status
)
# Integration test
@istest
def archive_missing_content(self):
""" Run archiver on a missing content should archive it.
"""
obj_data = b'archive_missing_content'
obj_id = self._add_content('uffizi', obj_data)
self._update_status(obj_id, 'uffizi', 'present')
# Content is missing on banco (entry not present in the db)
try:
self.dest_storage.get(obj_id)
except ObjNotFoundError:
pass
else:
self.fail('Content should not be present before archival')
self.archiver.run()
# now the content should be present on remote objstorage
remote_data = self.dest_storage.get(obj_id)
self.assertEquals(obj_data, remote_data)
@istest
def archive_present_content(self):
""" A content that is not 'missing' shouldn't be archived.
"""
obj_id = self._add_content('uffizi', b'archive_present_content')
self._update_status(obj_id, 'uffizi', 'present')
self._update_status(obj_id, 'banco', 'present')
# After the run, the content should NOT be in the archive.
# As the archiver believe it was already in.
self.archiver.run()
with self.assertRaises(ObjNotFoundError):
self.dest_storage.get(obj_id)
@istest
def archive_already_enough(self):
""" A content missing with enough copies shouldn't be archived.
"""
obj_id = self._add_content('uffizi', b'archive_alread_enough')
self._update_status(obj_id, 'uffizi', 'present')
self._override_director_config(retention_policy=1)
director = self._create_director()
# Obj is present in only one archive but only one copy is required.
director.run()
with self.assertRaises(ObjNotFoundError):
self.dest_storage.get(obj_id)
@istest
def content_archive_get_copies(self):
self.assertCountEqual(
self.archiver.archiver_storage.content_archive_get_copies(),
[],
)
obj_id = self._add_content('uffizi', b'archive_alread_enough')
self._update_status(obj_id, 'uffizi', 'present')
self.assertCountEqual(
self.archiver.archiver_storage.content_archive_get_copies(),
[(obj_id, ['uffizi'], {})],
)
# Unit tests for archive worker
def archival_elapsed(self, mtime):
return self._create_worker()._is_archival_delay_elapsed(mtime)
@istest
def vstatus_ongoing_remaining(self):
self.assertFalse(self.archival_elapsed(utcnow()))
@istest
def vstatus_ongoing_elapsed(self):
past_time = (utcnow()
- datetime.timedelta(
seconds=self._create_worker().archival_max_age))
self.assertTrue(self.archival_elapsed(past_time))
@istest
def need_archival_missing(self):
""" A content should need archival when it is missing.
"""
status_copies = {'present': ['uffizi'], 'missing': ['banco']}
worker = self._create_worker()
self.assertEqual(worker.need_archival(status_copies),
True)
@istest
def need_archival_present(self):
""" A content present everywhere shouldn't need archival
"""
status_copies = {'present': ['uffizi', 'banco']}
worker = self._create_worker()
self.assertEqual(worker.need_archival(status_copies),
False)
def _compute_copies_status(self, status):
""" A content with a given status should be detected correctly
"""
obj_id = self._add_content(
'banco', b'compute_copies_' + bytes(status, 'utf8'))
self._update_status(obj_id, 'banco', status)
worker = self._create_worker()
self.assertIn('banco', worker.compute_copies(
set(worker.objstorages), obj_id)[status])
@istest
def compute_copies_present(self):
""" A present content should be detected with correct status
"""
self._compute_copies_status('present')
@istest
def compute_copies_missing(self):
""" A missing content should be detected with correct status
"""
self._compute_copies_status('missing')
@istest
def compute_copies_extra_archive(self):
obj_id = self._add_content('banco', b'foobar')
self._update_status(obj_id, 'banco', 'present')
self._update_status(obj_id, 'random_archive', 'present')
worker = self._create_worker()
copies = worker.compute_copies(set(worker.objstorages), obj_id)
self.assertEqual(copies['present'], {'banco'})
self.assertEqual(copies['missing'], {'uffizi'})
def _get_backups(self, present, missing):
""" Return a list of the pair src/dest from the present and missing
"""
worker = self._create_worker()
return list(worker.choose_backup_servers(present, missing))
@istest
def choose_backup_servers(self):
self.assertEqual(len(self._get_backups(['uffizi', 'banco'], [])), 0)
self.assertEqual(len(self._get_backups(['uffizi'], ['banco'])), 1)
# Even with more possible destinations, do not take more than the
# retention_policy require
self.assertEqual(
len(self._get_backups(['uffizi'], ['banco', 's3'])),
1
)
class TestArchiverStorageStub(unittest.TestCase):
def setUp(self):
self.src_root = tempfile.mkdtemp(prefix='swh.storage.archiver.local')
self.dest_root = tempfile.mkdtemp(prefix='swh.storage.archiver.remote')
self.log_root = tempfile.mkdtemp(prefix='swh.storage.archiver.log')
src_config = {
'cls': 'pathslicing',
'args': {
'root': self.src_root,
'slicing': '0:2/2:4/4:6'
}
}
self.src_storage = get_objstorage(**src_config)
# Create destination storage
dest_config = {
'cls': 'pathslicing',
'args': {
'root': self.dest_root,
'slicing': '0:2/2:4/4:6'
}
}
self.dest_storage = get_objstorage(**dest_config)
self.config = {
'cls': 'stub',
'args': {
'archives': {
'present_archive': 'http://uffizi:5003',
'missing_archive': 'http://banco:5003',
},
'present': ['present_archive'],
'missing': ['missing_archive'],
'logfile_base': os.path.join(self.log_root, 'log_'),
}
}
# Generated with:
#
# id_length = 20
# random.getrandbits(8 * id_length).to_bytes(id_length, 'big')
#
self.content_ids = [
b"\xc7\xc9\x8dlk!'k\x81+\xa9\xc1lg\xc2\xcbG\r`f",
b'S\x03:\xc9\xd0\xa7\xf2\xcc\x8f\x86v$0\x8ccq\\\xe3\xec\x9d',
b'\xca\x1a\x84\xcbi\xd6co\x14\x08\\8\x9e\xc8\xc2|\xd0XS\x83',
b'O\xa9\xce(\xb4\x95_&\xd2\xa2e\x0c\x87\x8fw\xd0\xdfHL\xb2',
b'\xaaa \xd1vB\x15\xbd\xf2\xf0 \xd7\xc4_\xf4\xb9\x8a;\xb4\xcc',
]
self.archiver_storage = get_archiver_storage(**self.config)
super().setUp()
def tearDown(self):
shutil.rmtree(self.src_root)
shutil.rmtree(self.dest_root)
shutil.rmtree(self.log_root)
super().tearDown()
@istest
def archive_ls(self):
self.assertCountEqual(
self.archiver_storage.archive_ls(),
self.config['args']['archives'].items()
)
@istest
def content_archive_get(self):
for content_id in self.content_ids:
self.assertEqual(
self.archiver_storage.content_archive_get(content_id),
(content_id, set(self.config['args']['present']), {}),
)
@istest
def content_archive_get_copies(self):
self.assertCountEqual(
self.archiver_storage.content_archive_get_copies(),
[],
)
@istest
def content_archive_get_unarchived_copies(self):
retention_policy = 2
self.assertCountEqual(
self.archiver_storage.content_archive_get_unarchived_copies(
retention_policy),
[],
)
@istest
def content_archive_get_missing(self):
self.assertCountEqual(
self.archiver_storage.content_archive_get_missing(
self.content_ids,
'missing_archive'
),
self.content_ids,
)
self.assertCountEqual(
self.archiver_storage.content_archive_get_missing(
self.content_ids,
'present_archive'
),
[],
)
with self.assertRaises(ValueError):
list(self.archiver_storage.content_archive_get_missing(
self.content_ids,
'unknown_archive'
))
@istest
def content_archive_get_unknown(self):
self.assertCountEqual(
self.archiver_storage.content_archive_get_unknown(
self.content_ids,
),
[],
)
@istest
def content_archive_update(self):
for content_id in self.content_ids:
self.archiver_storage.content_archive_update(
content_id, 'present_archive', 'present')
self.archiver_storage.content_archive_update(
content_id, 'missing_archive', 'present')
self.archiver_storage.close_logfile()
# Make sure we created a logfile
files = glob.glob('%s*' % self.config['args']['logfile_base'])
self.assertEqual(len(files), 1)
# make sure the logfile contains all our lines
lines = open(files[0]).readlines()
self.assertEqual(len(lines), 2 * len(self.content_ids))
diff --git a/swh/archiver/worker.py b/swh/archiver/worker.py
index c94d6f1..257cfdc 100644
--- a/swh/archiver/worker.py
+++ b/swh/archiver/worker.py
@@ -1,429 +1,431 @@
# Copyright (C) 2015-2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import abc
import datetime
import logging
import random
from collections import defaultdict
from celery import group
from swh.core import config, utils
from swh.objstorage import get_objstorage
from swh.objstorage.exc import Error, ObjNotFoundError
from swh.model import hashutil
from swh.scheduler.utils import get_task
from .storage import get_archiver_storage
from .copier import ArchiverCopier
logger = logging.getLogger('archiver.worker')
class BaseArchiveWorker(config.SWHConfig, metaclass=abc.ABCMeta):
"""Base archive worker.
Inherit from this class and override:
+
- ADDITIONAL_CONFIG: Some added configuration needed for the
director to work
- CONFIG_BASE_FILENAME: relative path to lookup for the
configuration file
- def need_archival(self, content_data): Determine if a content
needs archival or not
- def choose_backup_servers(self, present, missing): Choose
which backup server to send copies to
"""
DEFAULT_CONFIG = {
'archiver_storage': ('dict', {
'cls': 'db',
'args': {
'dbconn': 'dbname=softwareheritage-archiver-dev user=guest',
},
}),
'storages': ('list[dict]',
[
{'host': 'uffizi',
'cls': 'pathslicing',
'args': {'root': '/tmp/softwareheritage/objects',
'slicing': '0:2/2:4/4:6'}},
{'host': 'banco',
'cls': 'remote',
'args': {'base_url': 'http://banco:5003/'}}
])
}
ADDITIONAL_CONFIG = {}
CONFIG_BASE_FILENAME = 'archiver/worker'
objstorages = {}
def __init__(self, batch):
super().__init__()
self.config = self.parse_config_file(
additional_configs=[self.ADDITIONAL_CONFIG])
self.batch = batch
self.archiver_db = get_archiver_storage(
**self.config['archiver_storage'])
self.objstorages = {
storage['host']: get_objstorage(storage['cls'], storage['args'])
for storage in self.config.get('storages', [])
}
self.set_objstorages = set(self.objstorages)
def run(self):
"""Do the task expected from the archiver worker.
Process the contents in self.batch, ensure that the elements
still need an archival (using archiver db), and spawn copiers
to copy files in each destination according to the
archiver-worker's policy.
"""
transfers = defaultdict(list)
for obj_id in self.batch:
# Get dict {'missing': [servers], 'present': [servers]}
# for contents ignoring those who don't need archival.
copies = self.compute_copies(self.set_objstorages, obj_id)
if not copies: # could not happen if using .director module
msg = 'Unknown content %s' % hashutil.hash_to_hex(obj_id)
logger.warning(msg)
continue
if not self.need_archival(copies):
continue
present = copies.get('present', set())
missing = copies.get('missing', set())
if len(present) == 0:
msg = 'Lost content %s' % hashutil.hash_to_hex(obj_id)
logger.critical(msg)
continue
# Choose servers to be used as srcs and dests.
for src_dest in self.choose_backup_servers(present, missing):
transfers[src_dest].append(obj_id)
# Then run copiers for each of the required transfers.
contents_copied = []
for (src, dest), content_ids in transfers.items():
contents_copied.extend(self.run_copier(src, dest, content_ids))
# copy is done, eventually do something else with them
self.copy_finished(contents_copied)
def compute_copies(self, set_objstorages, content_id):
"""From a content_id, return present and missing copies.
Args:
objstorages (set): objstorage's id name
content_id: the content concerned
Returns:
- A dictionary with the following keys:
- - 'present': set of archives where the content is present
- - 'missing': set of archives where the content is missing
- - 'ongoing': ongoing copies: dict mapping the archive id
- with the time the copy supposedly started.
+ dict: A dictionary with the following keys:
+
+ - present: set of archives where the content is present
+ - missing: set of archives where the content is missing
+ - ongoing: dict mapping the archive id with the time the copy
+ supposedly started.
"""
result = self.archiver_db.content_archive_get(content_id)
if not result:
return None
_, present, ongoing = result
set_present = set_objstorages & set(present)
set_ongoing = set_objstorages & set(ongoing)
set_missing = set_objstorages - set_present - set_ongoing
return {
'present': set_present,
'missing': set_missing,
'ongoing': {archive: value
for archive, value in ongoing.items()
if archive in set_ongoing},
}
def run_copier(self, source, destination, content_ids):
"""Run a copier in order to archive the given contents.
Upload the given contents from the source to the destination.
If the process fails, the whole content is considered uncopied
and remains 'ongoing', waiting to be rescheduled as there is a
delay.
Args:
source (str): source storage's identifier
destination (str): destination storage's identifier
content_ids ([sha1]): list of content ids to archive.
"""
# Check if there are any errors among the contents.
content_status = self.get_contents_error(content_ids, source)
# Iterates over the error detected.
for content_id, real_status in content_status.items():
# Remove them from the to-archive list,
# as they cannot be retrieved correctly.
content_ids.remove(content_id)
# Update their status to reflect their real state.
self.archiver_db.content_archive_update(
content_id, archive_id=source, new_status=real_status)
# Now perform the copy on the remaining contents
ac = ArchiverCopier(
source=self.objstorages[source],
destination=self.objstorages[destination],
content_ids=content_ids)
if ac.run():
# Once the archival complete, update the database.
for content_id in content_ids:
self.archiver_db.content_archive_update(
content_id, archive_id=destination, new_status='present')
return content_ids
return []
def copy_finished(self, content_ids):
"""Hook to notify the content_ids archive copy is finished.
(This is not an abstract method as this is optional
"""
pass
def get_contents_error(self, content_ids, source_storage):
"""Indicates what is the error associated to a content when needed
Check the given content on the given storage. If an error is detected,
it will be reported through the returned dict.
Args:
content_ids ([sha1]): list of content ids to check
source_storage (str): the source storage holding the
contents to check.
Returns:
a dict that map {content_id -> error_status} for each content_id
with an error. The `error_status` result may be 'missing' or
'corrupted'.
"""
content_status = {}
storage = self.objstorages[source_storage]
for content_id in content_ids:
try:
storage.check(content_id)
except Error:
content_status[content_id] = 'corrupted'
logger.error('%s corrupted!' % hashutil.hash_to_hex(
content_id))
except ObjNotFoundError:
content_status[content_id] = 'missing'
logger.error('%s missing!' % hashutil.hash_to_hex(content_id))
return content_status
@abc.abstractmethod
def need_archival(self, content_data):
"""Indicate if the content needs to be archived.
Args:
content_data (dict): dict that contains two lists 'present' and
'missing' with copies id corresponding to this status.
Returns:
True if there is not enough copies, False otherwise.
"""
pass
@abc.abstractmethod
def choose_backup_servers(self, present, missing):
"""Choose and yield the required amount of couple source/destination
For each required copy, choose a unique destination server
among the missing copies and a source server among the
presents.
Args:
present: set of objstorage source name where the content
is present
missing: set of objstorage destination name where the
content is missing
Yields:
tuple (source (str), destination (src)) for each required copy.
"""
pass
class ArchiverWithRetentionPolicyWorker(BaseArchiveWorker):
""" Do the required backups on a given batch of contents.
Process the content of a content batch in order to do the needed backups on
the slaves servers.
"""
ADDITIONAL_CONFIG = {
'retention_policy': ('int', 2),
'archival_max_age': ('int', 3600),
'sources': ('list[str]', ['uffizi', 'banco']),
}
def __init__(self, batch):
""" Constructor of the ArchiverWorker class.
Args:
batch: list of object's sha1 that potentially need archival.
"""
super().__init__(batch)
config = self.config
self.retention_policy = config['retention_policy']
self.archival_max_age = config['archival_max_age']
self.sources = config['sources']
if len(self.objstorages) < self.retention_policy:
raise ValueError('Retention policy is too high for the number of '
'provided servers')
def need_archival(self, content_data):
""" Indicate if the content need to be archived.
Args:
content_data (dict): dict that contains two lists 'present' and
'missing' with copies id corresponding to this status.
Returns: True if there is not enough copies, False otherwise.
"""
nb_presents = len(content_data.get('present', []))
for copy, mtime in content_data.get('ongoing', {}).items():
if not self._is_archival_delay_elapsed(mtime):
nb_presents += 1
return nb_presents < self.retention_policy
def _is_archival_delay_elapsed(self, start_time):
""" Indicates if the archival delay is elapsed given the start_time
Args:
start_time (float): time at which the archival started.
Returns:
True if the archival delay is elasped, False otherwise
"""
elapsed = datetime.datetime.now(tz=datetime.timezone.utc) - start_time
return elapsed > datetime.timedelta(seconds=self.archival_max_age)
def choose_backup_servers(self, present, missing):
"""Choose and yield the required amount of couple source/destination
For each required copy, choose a unique destination server
among the missing copies and a source server among the
presents.
Each destination server is unique so after archival, the
retention policy requirement will be fulfilled. However, the
source server may be used multiple times.
Args:
present: set of objstorage source name where the content
is present
missing: set of objstorage destination name where the
content is missing
Yields:
tuple (source, destination) for each required copy.
"""
# Transform from set to list to allow random selections
missing = list(missing)
present = list(present)
all_sources = [source for source in present if source in self.sources]
nb_required = self.retention_policy - len(present)
destinations = random.sample(missing, nb_required)
sources = [random.choice(all_sources) for dest in destinations]
yield from zip(sources, destinations)
class ArchiverToBackendWorker(BaseArchiveWorker):
"""Worker that sends copies over from a source to another backend.
Process the content of a content batch from source objstorage to
destination objstorage.
"""
CONFIG_BASE_FILENAME = 'archiver/worker-to-backend'
ADDITIONAL_CONFIG = {
'next_task': (
'dict', {
'queue': 'swh.indexer.tasks.SWHOrchestratorAllContentsTask',
'batch_size': 10,
}
)
}
def __init__(self, destination, batch):
"""Constructor of the ArchiverWorkerToBackend class.
Args:
destination: where to copy the objects from
batch: sha1s to send to destination
"""
super().__init__(batch)
self.destination = destination
next_task = self.config['next_task']
if next_task:
destination_queue = next_task['queue']
self.task_destination = get_task(destination_queue)
self.batch_size = int(next_task['batch_size'])
else:
self.task_destination = self.batch_size = None
def need_archival(self, content_data):
"""Indicate if the content needs to be archived.
Args:
content_data (dict): dict that contains 3 lists 'present',
'ongoing' and 'missing' with copies id corresponding to
this status.
Returns:
True if we need to archive, False otherwise
"""
return self.destination in content_data.get('missing', {})
def choose_backup_servers(self, present, missing):
"""The destination is fixed to the destination mentioned.
The only variable here is the source of information that we
choose randomly in 'present'.
Args:
present: set of objstorage source name where the content
is present
missing: set of objstorage destination name where the
content is missing
Yields:
tuple (source, destination) for each required copy.
"""
yield (random.choice(list(present)), self.destination)
def copy_finished(self, content_ids):
"""Once the copy is finished, we'll send those batch of contents as
done in the destination queue.
"""
if self.task_destination:
groups = []
for ids in utils.grouper(content_ids, self.batch_size):
sig_ids = self.task_destination.s(list(ids))
groups.append(sig_ids)
group(groups).delay()
diff --git a/version.txt b/version.txt
new file mode 100644
index 0000000..0984193
--- /dev/null
+++ b/version.txt
@@ -0,0 +1 @@
+v0.0.2-0-g39dd7db
\ No newline at end of file
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sat, Jun 21, 7:44 PM (2 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3293305
Attached To
rDARC Software Heritage Archiver
Event Timeline
Log In to Comment