Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9749620
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
16 KB
Subscribers
None
View Options
diff --git a/requirements-swh.txt b/requirements-swh.txt
index d7661c0..01f1bd4 100644
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -1,3 +1,2 @@
swh.core >= 0.0.75
-swh.storage[schemata] >= 0.0.122
swh.scheduler >= 0.0.58
diff --git a/swh/lister/debian/lister.py b/swh/lister/debian/lister.py
index 6763df2..6f5f487 100644
--- a/swh/lister/debian/lister.py
+++ b/swh/lister/debian/lister.py
@@ -1,241 +1,241 @@
# Copyright (C) 2017-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import bz2
from collections import defaultdict
import datetime
import gzip
import lzma
import logging
from debian.deb822 import Sources
from sqlalchemy.orm import joinedload, load_only
from sqlalchemy.schema import CreateTable, DropTable
-from swh.storage.schemata.distribution import (
+from swh.lister.debian.models import (
AreaSnapshot, Distribution, DistributionSnapshot, Package,
TempPackage,
)
from swh.lister.core.lister_base import ListerBase, FetchError
from swh.lister.core.lister_transports import ListerHttpTransport
decompressors = {
'gz': lambda f: gzip.GzipFile(fileobj=f),
'bz2': bz2.BZ2File,
'xz': lzma.LZMAFile,
}
logger = logging.getLogger(__name__)
class DebianLister(ListerHttpTransport, ListerBase):
MODEL = Package
PATH_TEMPLATE = None
LISTER_NAME = 'debian'
instance = 'debian'
def __init__(self, override_config=None):
ListerHttpTransport.__init__(self, url="notused")
ListerBase.__init__(self, override_config=override_config)
def transport_request(self, identifier):
"""Subvert ListerHttpTransport.transport_request, to try several
index URIs in turn.
The Debian repository format supports several compression algorithms
across the ages, so we try several URIs.
Once we have found a working URI, we break and set `self.decompressor`
to the one that matched.
Returns:
a requests Response object.
Raises:
FetchError: when all the URIs failed to be retrieved.
"""
response = None
compression = None
for uri, compression in self.area.index_uris():
response = super().transport_request(uri)
if response.status_code == 200:
break
else:
raise FetchError(
"Could not retrieve index for %s" % self.area
)
self.decompressor = decompressors.get(compression)
return response
def request_uri(self, identifier):
# In the overridden transport_request, we pass
# ListerBase.transport_request() the full URI as identifier, so we
# need to return it here.
return identifier
def request_params(self, identifier):
# Enable streaming to allow wrapping the response in the decompressor
# in transport_response_simplified.
params = super().request_params(identifier)
params['stream'] = True
return params
def transport_response_simplified(self, response):
"""Decompress and parse the package index fetched in `transport_request`.
For each package, we "pivot" the file list entries (Files,
Checksums-Sha1, Checksums-Sha256), to return a files dict mapping
filenames to their checksums.
"""
if self.decompressor:
data = self.decompressor(response.raw)
else:
data = response.raw
for src_pkg in Sources.iter_paragraphs(data.readlines()):
files = defaultdict(dict)
for field in src_pkg._multivalued_fields:
if field.startswith('checksums-'):
sum_name = field[len('checksums-'):]
else:
sum_name = 'md5sum'
if field in src_pkg:
for entry in src_pkg[field]:
name = entry['name']
files[name]['name'] = entry['name']
files[name]['size'] = int(entry['size'], 10)
files[name][sum_name] = entry[sum_name]
yield {
'name': src_pkg['Package'],
'version': src_pkg['Version'],
'directory': src_pkg['Directory'],
'files': files,
}
def inject_repo_data_into_db(self, models_list):
"""Generate the Package entries that didn't previously exist.
Contrary to ListerBase, we don't actually insert the data in
database. `schedule_missing_tasks` does it once we have the
origin and task identifiers.
"""
by_name_version = {}
temp_packages = []
area_id = self.area.id
for model in models_list:
name = model['name']
version = model['version']
temp_packages.append({
'area_id': area_id,
'name': name,
'version': version,
})
by_name_version[name, version] = model
# Add all the listed packages to a temporary table
self.db_session.execute(CreateTable(TempPackage.__table__))
self.db_session.bulk_insert_mappings(TempPackage, temp_packages)
def exists_tmp_pkg(db_session, model):
return (
db_session.query(model)
.filter(Package.area_id == TempPackage.area_id)
.filter(Package.name == TempPackage.name)
.filter(Package.version == TempPackage.version)
.exists()
)
# Filter out the packages that already exist in the main Package table
new_packages = self.db_session\
.query(TempPackage)\
.options(load_only('name', 'version'))\
.filter(~exists_tmp_pkg(self.db_session, Package))\
.all()
self.old_area_packages = self.db_session.query(Package).filter(
exists_tmp_pkg(self.db_session, TempPackage)
).all()
self.db_session.execute(DropTable(TempPackage.__table__))
added_packages = []
for package in new_packages:
model = by_name_version[package.name, package.version]
added_packages.append(Package(area=self.area,
**model))
self.db_session.add_all(added_packages)
return added_packages
def schedule_missing_tasks(self, models_list, added_packages):
"""We create tasks at the end of the full snapshot processing"""
return
def create_tasks_for_snapshot(self, snapshot):
tasks = [
snapshot.task_for_package(name, versions)
for name, versions in snapshot.get_packages().items()
]
return self.scheduler.create_tasks(tasks)
def run(self, distribution, date=None):
"""Run the lister for a given (distribution, area) tuple.
Args:
distribution (str): name of the distribution (e.g. "Debian")
date (datetime.datetime): date the snapshot is taken (defaults to
now)
"""
distribution = self.db_session\
.query(Distribution)\
.options(joinedload(Distribution.areas))\
.filter(Distribution.name == distribution)\
.one_or_none()
if not distribution:
raise ValueError("Distribution %s is not registered" %
distribution)
if not distribution.type == 'deb':
raise ValueError("Distribution %s is not a Debian derivative" %
distribution)
date = date or datetime.datetime.now(tz=datetime.timezone.utc)
logger.debug('Creating snapshot for distribution %s on date %s' %
(distribution, date))
snapshot = DistributionSnapshot(date=date, distribution=distribution)
self.db_session.add(snapshot)
for area in distribution.areas:
if not area.active:
continue
self.area = area
logger.debug('Processing area %s' % area)
_, new_area_packages = self.ingest_data(None)
area_snapshot = AreaSnapshot(snapshot=snapshot, area=area)
self.db_session.add(area_snapshot)
area_snapshot.packages.extend(new_area_packages)
area_snapshot.packages.extend(self.old_area_packages)
self.create_tasks_for_snapshot(snapshot)
self.db_session.commit()
return True
diff --git a/swh/lister/debian/models.py b/swh/lister/debian/models.py
new file mode 100644
index 0000000..7ddb7a2
--- /dev/null
+++ b/swh/lister/debian/models.py
@@ -0,0 +1,251 @@
+# Copyright (C) 2017-2019 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import binascii
+from collections import defaultdict
+import datetime
+
+from sqlalchemy import (
+ Boolean,
+ Column,
+ DateTime,
+ Enum,
+ ForeignKey,
+ Integer,
+ LargeBinary,
+ String,
+ Table,
+ UniqueConstraint,
+)
+
+try:
+ from sqlalchemy import JSON
+except ImportError:
+ # SQLAlchemy < 1.1
+ from sqlalchemy.dialects.postgresql import JSONB as JSON
+
+from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy.orm import relationship
+
+SQLBase = declarative_base()
+
+
+class Distribution(SQLBase):
+ """A distribution (e.g. Debian, Ubuntu, Fedora, ...)"""
+ __tablename__ = 'distribution'
+
+ id = Column(Integer, primary_key=True)
+ name = Column(String, unique=True, nullable=False)
+ type = Column(Enum('deb', 'rpm', name='distribution_types'),
+ nullable=False)
+ mirror_uri = Column(String, nullable=False)
+
+ areas = relationship('Area', back_populates='distribution')
+
+ def origin_for_package(self, package_name, package_versions):
+ """Return the origin dictionary for the given package"""
+ return {
+ 'type': self.type,
+ 'url': '%s://%s/packages/%s' % (
+ self.type, self.name, package_name
+ ),
+ }
+
+ def __repr__(self):
+ return 'Distribution(%s (%s) on %s)' % (
+ self.name,
+ self.type,
+ self.mirror_uri,
+ )
+
+
+class Area(SQLBase):
+ __tablename__ = 'area'
+ __table_args__ = (
+ UniqueConstraint('distribution_id', 'name'),
+ )
+
+ id = Column(Integer, primary_key=True)
+ distribution_id = Column(Integer, ForeignKey('distribution.id'),
+ nullable=False)
+ name = Column(String, nullable=False)
+ active = Column(Boolean, nullable=False, default=True)
+
+ distribution = relationship('Distribution', back_populates='areas')
+
+ def index_uris(self):
+ """Get possible URIs for this component's package index"""
+ if self.distribution.type == 'deb':
+ compression_exts = ('xz', 'bz2', 'gz', None)
+ base_uri = '%s/dists/%s/source/Sources' % (
+ self.distribution.mirror_uri,
+ self.name,
+ )
+ for ext in compression_exts:
+ if ext:
+ yield (base_uri + '.' + ext, ext)
+ else:
+ yield (base_uri, None)
+
+ raise NotImplementedError(
+ 'Do not know how to build index URI for Distribution type %s' %
+ self.distribution.type
+ )
+
+ def __repr__(self):
+ return 'Area(%s of %s)' % (
+ self.name,
+ self.distribution.name,
+ )
+
+
+class Package(SQLBase):
+ __tablename__ = 'package'
+ __table_args__ = (
+ UniqueConstraint('area_id', 'name', 'version'),
+ )
+
+ id = Column(Integer, primary_key=True)
+ area_id = Column(Integer, ForeignKey('area.id'), nullable=False)
+ name = Column(String, nullable=False)
+ version = Column(String, nullable=False)
+ directory = Column(String, nullable=False)
+ files = Column(JSON, nullable=False)
+
+ origin_id = Column(Integer)
+ task_id = Column(Integer)
+
+ revision_id = Column(LargeBinary(20))
+
+ area = relationship('Area')
+
+ @property
+ def distribution(self):
+ return self.area.distribution
+
+ def fetch_uri(self, filename):
+ """Get the URI to fetch the `filename` file associated with the
+ package"""
+ if self.distribution.type == 'deb':
+ return '%s/%s/%s' % (
+ self.distribution.mirror_uri,
+ self.directory,
+ filename,
+ )
+ else:
+ raise NotImplementedError(
+ 'Do not know how to build fetch URI for Distribution type %s' %
+ self.distribution.type
+ )
+
+ def loader_dict(self):
+ ret = {
+ 'id': self.id,
+ 'name': self.name,
+ 'version': self.version,
+ }
+ if self.revision_id:
+ ret['revision_id'] = binascii.hexlify(self.revision_id).decode()
+ else:
+ files = {
+ name: checksums.copy()
+ for name, checksums in self.files.items()
+ }
+ for name in files:
+ files[name]['uri'] = self.fetch_uri(name)
+
+ ret.update({
+ 'revision_id': None,
+ 'files': files,
+ })
+ return ret
+
+ def __repr__(self):
+ return 'Package(%s_%s of %s %s)' % (
+ self.name,
+ self.version,
+ self.distribution.name,
+ self.area.name,
+ )
+
+
+class DistributionSnapshot(SQLBase):
+ __tablename__ = 'distribution_snapshot'
+
+ id = Column(Integer, primary_key=True)
+ date = Column(DateTime, nullable=False, index=True)
+ distribution_id = Column(Integer,
+ ForeignKey('distribution.id'),
+ nullable=False)
+
+ distribution = relationship('Distribution')
+ areas = relationship('AreaSnapshot', back_populates='snapshot')
+
+ def task_for_package(self, package_name, package_versions):
+ """Return the task dictionary for the given list of package versions"""
+ origin = self.distribution.origin_for_package(
+ package_name, package_versions,
+ )
+
+ return {
+ 'policy': 'oneshot',
+ 'type': 'load-%s-package' % self.distribution.type,
+ 'next_run': datetime.datetime.now(tz=datetime.timezone.utc),
+ 'arguments': {
+ 'args': [],
+ 'kwargs': {
+ 'origin': origin,
+ 'date': self.date.isoformat(),
+ 'packages': package_versions,
+ },
+ }
+ }
+
+ def get_packages(self):
+ packages = defaultdict(dict)
+ for area_snapshot in self.areas:
+ area_name = area_snapshot.area.name
+ for package in area_snapshot.packages:
+ ref_name = '%s/%s' % (area_name, package.version)
+ packages[package.name][ref_name] = package.loader_dict()
+
+ return packages
+
+
+area_snapshot_package_assoc = Table(
+ 'area_snapshot_package', SQLBase.metadata,
+ Column('area_snapshot_id', Integer, ForeignKey('area_snapshot.id'),
+ nullable=False),
+ Column('package_id', Integer, ForeignKey('package.id'),
+ nullable=False),
+)
+
+
+class AreaSnapshot(SQLBase):
+ __tablename__ = 'area_snapshot'
+
+ id = Column(Integer, primary_key=True)
+ snapshot_id = Column(Integer,
+ ForeignKey('distribution_snapshot.id'),
+ nullable=False)
+ area_id = Column(Integer,
+ ForeignKey('area.id'),
+ nullable=False)
+
+ snapshot = relationship('DistributionSnapshot', back_populates='areas')
+ area = relationship('Area')
+ packages = relationship('Package', secondary=area_snapshot_package_assoc)
+
+
+class TempPackage(SQLBase):
+ __tablename__ = 'temp_package'
+ __table_args__ = {
+ 'prefixes': ['TEMPORARY'],
+ }
+
+ id = Column(Integer, primary_key=True)
+ area_id = Column(Integer)
+ name = Column(String)
+ version = Column(String)
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Mon, Aug 25, 6:03 PM (3 d, 22 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3247446
Attached To
rDLS Listers
Event Timeline
Log In to Comment