Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/requirements-swh.txt b/requirements-swh.txt
index d7661c0..01f1bd4 100644
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -1,3 +1,2 @@
swh.core >= 0.0.75
-swh.storage[schemata] >= 0.0.122
swh.scheduler >= 0.0.58
diff --git a/swh/lister/debian/lister.py b/swh/lister/debian/lister.py
index 6763df2..6f5f487 100644
--- a/swh/lister/debian/lister.py
+++ b/swh/lister/debian/lister.py
@@ -1,241 +1,241 @@
# Copyright (C) 2017-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import bz2
from collections import defaultdict
import datetime
import gzip
import lzma
import logging
from debian.deb822 import Sources
from sqlalchemy.orm import joinedload, load_only
from sqlalchemy.schema import CreateTable, DropTable
-from swh.storage.schemata.distribution import (
+from swh.lister.debian.models import (
AreaSnapshot, Distribution, DistributionSnapshot, Package,
TempPackage,
)
from swh.lister.core.lister_base import ListerBase, FetchError
from swh.lister.core.lister_transports import ListerHttpTransport
decompressors = {
'gz': lambda f: gzip.GzipFile(fileobj=f),
'bz2': bz2.BZ2File,
'xz': lzma.LZMAFile,
}
logger = logging.getLogger(__name__)
class DebianLister(ListerHttpTransport, ListerBase):
MODEL = Package
PATH_TEMPLATE = None
LISTER_NAME = 'debian'
instance = 'debian'
def __init__(self, override_config=None):
ListerHttpTransport.__init__(self, url="notused")
ListerBase.__init__(self, override_config=override_config)
def transport_request(self, identifier):
"""Subvert ListerHttpTransport.transport_request, to try several
index URIs in turn.
The Debian repository format supports several compression algorithms
across the ages, so we try several URIs.
Once we have found a working URI, we break and set `self.decompressor`
to the one that matched.
Returns:
a requests Response object.
Raises:
FetchError: when all the URIs failed to be retrieved.
"""
response = None
compression = None
for uri, compression in self.area.index_uris():
response = super().transport_request(uri)
if response.status_code == 200:
break
else:
raise FetchError(
"Could not retrieve index for %s" % self.area
)
self.decompressor = decompressors.get(compression)
return response
def request_uri(self, identifier):
# In the overridden transport_request, we pass
# ListerBase.transport_request() the full URI as identifier, so we
# need to return it here.
return identifier
def request_params(self, identifier):
# Enable streaming to allow wrapping the response in the decompressor
# in transport_response_simplified.
params = super().request_params(identifier)
params['stream'] = True
return params
def transport_response_simplified(self, response):
"""Decompress and parse the package index fetched in `transport_request`.
For each package, we "pivot" the file list entries (Files,
Checksums-Sha1, Checksums-Sha256), to return a files dict mapping
filenames to their checksums.
"""
if self.decompressor:
data = self.decompressor(response.raw)
else:
data = response.raw
for src_pkg in Sources.iter_paragraphs(data.readlines()):
files = defaultdict(dict)
for field in src_pkg._multivalued_fields:
if field.startswith('checksums-'):
sum_name = field[len('checksums-'):]
else:
sum_name = 'md5sum'
if field in src_pkg:
for entry in src_pkg[field]:
name = entry['name']
files[name]['name'] = entry['name']
files[name]['size'] = int(entry['size'], 10)
files[name][sum_name] = entry[sum_name]
yield {
'name': src_pkg['Package'],
'version': src_pkg['Version'],
'directory': src_pkg['Directory'],
'files': files,
}
def inject_repo_data_into_db(self, models_list):
"""Generate the Package entries that didn't previously exist.
Contrary to ListerBase, we don't actually insert the data in
database. `schedule_missing_tasks` does it once we have the
origin and task identifiers.
"""
by_name_version = {}
temp_packages = []
area_id = self.area.id
for model in models_list:
name = model['name']
version = model['version']
temp_packages.append({
'area_id': area_id,
'name': name,
'version': version,
})
by_name_version[name, version] = model
# Add all the listed packages to a temporary table
self.db_session.execute(CreateTable(TempPackage.__table__))
self.db_session.bulk_insert_mappings(TempPackage, temp_packages)
def exists_tmp_pkg(db_session, model):
return (
db_session.query(model)
.filter(Package.area_id == TempPackage.area_id)
.filter(Package.name == TempPackage.name)
.filter(Package.version == TempPackage.version)
.exists()
)
# Filter out the packages that already exist in the main Package table
new_packages = self.db_session\
.query(TempPackage)\
.options(load_only('name', 'version'))\
.filter(~exists_tmp_pkg(self.db_session, Package))\
.all()
self.old_area_packages = self.db_session.query(Package).filter(
exists_tmp_pkg(self.db_session, TempPackage)
).all()
self.db_session.execute(DropTable(TempPackage.__table__))
added_packages = []
for package in new_packages:
model = by_name_version[package.name, package.version]
added_packages.append(Package(area=self.area,
**model))
self.db_session.add_all(added_packages)
return added_packages
def schedule_missing_tasks(self, models_list, added_packages):
"""We create tasks at the end of the full snapshot processing"""
return
def create_tasks_for_snapshot(self, snapshot):
tasks = [
snapshot.task_for_package(name, versions)
for name, versions in snapshot.get_packages().items()
]
return self.scheduler.create_tasks(tasks)
def run(self, distribution, date=None):
"""Run the lister for a given (distribution, area) tuple.
Args:
distribution (str): name of the distribution (e.g. "Debian")
date (datetime.datetime): date the snapshot is taken (defaults to
now)
"""
distribution = self.db_session\
.query(Distribution)\
.options(joinedload(Distribution.areas))\
.filter(Distribution.name == distribution)\
.one_or_none()
if not distribution:
raise ValueError("Distribution %s is not registered" %
distribution)
if not distribution.type == 'deb':
raise ValueError("Distribution %s is not a Debian derivative" %
distribution)
date = date or datetime.datetime.now(tz=datetime.timezone.utc)
logger.debug('Creating snapshot for distribution %s on date %s' %
(distribution, date))
snapshot = DistributionSnapshot(date=date, distribution=distribution)
self.db_session.add(snapshot)
for area in distribution.areas:
if not area.active:
continue
self.area = area
logger.debug('Processing area %s' % area)
_, new_area_packages = self.ingest_data(None)
area_snapshot = AreaSnapshot(snapshot=snapshot, area=area)
self.db_session.add(area_snapshot)
area_snapshot.packages.extend(new_area_packages)
area_snapshot.packages.extend(self.old_area_packages)
self.create_tasks_for_snapshot(snapshot)
self.db_session.commit()
return True
diff --git a/swh/lister/debian/models.py b/swh/lister/debian/models.py
new file mode 100644
index 0000000..7ddb7a2
--- /dev/null
+++ b/swh/lister/debian/models.py
@@ -0,0 +1,251 @@
+# Copyright (C) 2017-2019 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import binascii
+from collections import defaultdict
+import datetime
+
+from sqlalchemy import (
+ Boolean,
+ Column,
+ DateTime,
+ Enum,
+ ForeignKey,
+ Integer,
+ LargeBinary,
+ String,
+ Table,
+ UniqueConstraint,
+)
+
+try:
+ from sqlalchemy import JSON
+except ImportError:
+ # SQLAlchemy < 1.1
+ from sqlalchemy.dialects.postgresql import JSONB as JSON
+
+from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy.orm import relationship
+
+SQLBase = declarative_base()
+
+
+class Distribution(SQLBase):
+ """A distribution (e.g. Debian, Ubuntu, Fedora, ...)"""
+ __tablename__ = 'distribution'
+
+ id = Column(Integer, primary_key=True)
+ name = Column(String, unique=True, nullable=False)
+ type = Column(Enum('deb', 'rpm', name='distribution_types'),
+ nullable=False)
+ mirror_uri = Column(String, nullable=False)
+
+ areas = relationship('Area', back_populates='distribution')
+
+ def origin_for_package(self, package_name, package_versions):
+ """Return the origin dictionary for the given package"""
+ return {
+ 'type': self.type,
+ 'url': '%s://%s/packages/%s' % (
+ self.type, self.name, package_name
+ ),
+ }
+
+ def __repr__(self):
+ return 'Distribution(%s (%s) on %s)' % (
+ self.name,
+ self.type,
+ self.mirror_uri,
+ )
+
+
+class Area(SQLBase):
+ __tablename__ = 'area'
+ __table_args__ = (
+ UniqueConstraint('distribution_id', 'name'),
+ )
+
+ id = Column(Integer, primary_key=True)
+ distribution_id = Column(Integer, ForeignKey('distribution.id'),
+ nullable=False)
+ name = Column(String, nullable=False)
+ active = Column(Boolean, nullable=False, default=True)
+
+ distribution = relationship('Distribution', back_populates='areas')
+
+ def index_uris(self):
+ """Get possible URIs for this component's package index"""
+ if self.distribution.type == 'deb':
+ compression_exts = ('xz', 'bz2', 'gz', None)
+ base_uri = '%s/dists/%s/source/Sources' % (
+ self.distribution.mirror_uri,
+ self.name,
+ )
+ for ext in compression_exts:
+ if ext:
+ yield (base_uri + '.' + ext, ext)
+ else:
+ yield (base_uri, None)
+
+ raise NotImplementedError(
+ 'Do not know how to build index URI for Distribution type %s' %
+ self.distribution.type
+ )
+
+ def __repr__(self):
+ return 'Area(%s of %s)' % (
+ self.name,
+ self.distribution.name,
+ )
+
+
+class Package(SQLBase):
+ __tablename__ = 'package'
+ __table_args__ = (
+ UniqueConstraint('area_id', 'name', 'version'),
+ )
+
+ id = Column(Integer, primary_key=True)
+ area_id = Column(Integer, ForeignKey('area.id'), nullable=False)
+ name = Column(String, nullable=False)
+ version = Column(String, nullable=False)
+ directory = Column(String, nullable=False)
+ files = Column(JSON, nullable=False)
+
+ origin_id = Column(Integer)
+ task_id = Column(Integer)
+
+ revision_id = Column(LargeBinary(20))
+
+ area = relationship('Area')
+
+ @property
+ def distribution(self):
+ return self.area.distribution
+
+ def fetch_uri(self, filename):
+ """Get the URI to fetch the `filename` file associated with the
+ package"""
+ if self.distribution.type == 'deb':
+ return '%s/%s/%s' % (
+ self.distribution.mirror_uri,
+ self.directory,
+ filename,
+ )
+ else:
+ raise NotImplementedError(
+ 'Do not know how to build fetch URI for Distribution type %s' %
+ self.distribution.type
+ )
+
+ def loader_dict(self):
+ ret = {
+ 'id': self.id,
+ 'name': self.name,
+ 'version': self.version,
+ }
+ if self.revision_id:
+ ret['revision_id'] = binascii.hexlify(self.revision_id).decode()
+ else:
+ files = {
+ name: checksums.copy()
+ for name, checksums in self.files.items()
+ }
+ for name in files:
+ files[name]['uri'] = self.fetch_uri(name)
+
+ ret.update({
+ 'revision_id': None,
+ 'files': files,
+ })
+ return ret
+
+ def __repr__(self):
+ return 'Package(%s_%s of %s %s)' % (
+ self.name,
+ self.version,
+ self.distribution.name,
+ self.area.name,
+ )
+
+
+class DistributionSnapshot(SQLBase):
+ __tablename__ = 'distribution_snapshot'
+
+ id = Column(Integer, primary_key=True)
+ date = Column(DateTime, nullable=False, index=True)
+ distribution_id = Column(Integer,
+ ForeignKey('distribution.id'),
+ nullable=False)
+
+ distribution = relationship('Distribution')
+ areas = relationship('AreaSnapshot', back_populates='snapshot')
+
+ def task_for_package(self, package_name, package_versions):
+ """Return the task dictionary for the given list of package versions"""
+ origin = self.distribution.origin_for_package(
+ package_name, package_versions,
+ )
+
+ return {
+ 'policy': 'oneshot',
+ 'type': 'load-%s-package' % self.distribution.type,
+ 'next_run': datetime.datetime.now(tz=datetime.timezone.utc),
+ 'arguments': {
+ 'args': [],
+ 'kwargs': {
+ 'origin': origin,
+ 'date': self.date.isoformat(),
+ 'packages': package_versions,
+ },
+ }
+ }
+
+ def get_packages(self):
+ packages = defaultdict(dict)
+ for area_snapshot in self.areas:
+ area_name = area_snapshot.area.name
+ for package in area_snapshot.packages:
+ ref_name = '%s/%s' % (area_name, package.version)
+ packages[package.name][ref_name] = package.loader_dict()
+
+ return packages
+
+
+area_snapshot_package_assoc = Table(
+ 'area_snapshot_package', SQLBase.metadata,
+ Column('area_snapshot_id', Integer, ForeignKey('area_snapshot.id'),
+ nullable=False),
+ Column('package_id', Integer, ForeignKey('package.id'),
+ nullable=False),
+)
+
+
+class AreaSnapshot(SQLBase):
+ __tablename__ = 'area_snapshot'
+
+ id = Column(Integer, primary_key=True)
+ snapshot_id = Column(Integer,
+ ForeignKey('distribution_snapshot.id'),
+ nullable=False)
+ area_id = Column(Integer,
+ ForeignKey('area.id'),
+ nullable=False)
+
+ snapshot = relationship('DistributionSnapshot', back_populates='areas')
+ area = relationship('Area')
+ packages = relationship('Package', secondary=area_snapshot_package_assoc)
+
+
+class TempPackage(SQLBase):
+ __tablename__ = 'temp_package'
+ __table_args__ = {
+ 'prefixes': ['TEMPORARY'],
+ }
+
+ id = Column(Integer, primary_key=True)
+ area_id = Column(Integer)
+ name = Column(String)
+ version = Column(String)

File Metadata

Mime Type
text/x-diff
Expires
Mon, Aug 25, 6:03 PM (3 d, 22 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3247446

Event Timeline