diff --git a/requirements-swh.txt b/requirements-swh.txt index d7661c0..01f1bd4 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,3 +1,2 @@ swh.core >= 0.0.75 -swh.storage[schemata] >= 0.0.122 swh.scheduler >= 0.0.58 diff --git a/swh/lister/debian/lister.py b/swh/lister/debian/lister.py index 6763df2..6f5f487 100644 --- a/swh/lister/debian/lister.py +++ b/swh/lister/debian/lister.py @@ -1,241 +1,241 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import bz2 from collections import defaultdict import datetime import gzip import lzma import logging from debian.deb822 import Sources from sqlalchemy.orm import joinedload, load_only from sqlalchemy.schema import CreateTable, DropTable -from swh.storage.schemata.distribution import ( +from swh.lister.debian.models import ( AreaSnapshot, Distribution, DistributionSnapshot, Package, TempPackage, ) from swh.lister.core.lister_base import ListerBase, FetchError from swh.lister.core.lister_transports import ListerHttpTransport decompressors = { 'gz': lambda f: gzip.GzipFile(fileobj=f), 'bz2': bz2.BZ2File, 'xz': lzma.LZMAFile, } logger = logging.getLogger(__name__) class DebianLister(ListerHttpTransport, ListerBase): MODEL = Package PATH_TEMPLATE = None LISTER_NAME = 'debian' instance = 'debian' def __init__(self, override_config=None): ListerHttpTransport.__init__(self, url="notused") ListerBase.__init__(self, override_config=override_config) def transport_request(self, identifier): """Subvert ListerHttpTransport.transport_request, to try several index URIs in turn. The Debian repository format supports several compression algorithms across the ages, so we try several URIs. Once we have found a working URI, we break and set `self.decompressor` to the one that matched. Returns: a requests Response object. Raises: FetchError: when all the URIs failed to be retrieved. """ response = None compression = None for uri, compression in self.area.index_uris(): response = super().transport_request(uri) if response.status_code == 200: break else: raise FetchError( "Could not retrieve index for %s" % self.area ) self.decompressor = decompressors.get(compression) return response def request_uri(self, identifier): # In the overridden transport_request, we pass # ListerBase.transport_request() the full URI as identifier, so we # need to return it here. return identifier def request_params(self, identifier): # Enable streaming to allow wrapping the response in the decompressor # in transport_response_simplified. params = super().request_params(identifier) params['stream'] = True return params def transport_response_simplified(self, response): """Decompress and parse the package index fetched in `transport_request`. For each package, we "pivot" the file list entries (Files, Checksums-Sha1, Checksums-Sha256), to return a files dict mapping filenames to their checksums. """ if self.decompressor: data = self.decompressor(response.raw) else: data = response.raw for src_pkg in Sources.iter_paragraphs(data.readlines()): files = defaultdict(dict) for field in src_pkg._multivalued_fields: if field.startswith('checksums-'): sum_name = field[len('checksums-'):] else: sum_name = 'md5sum' if field in src_pkg: for entry in src_pkg[field]: name = entry['name'] files[name]['name'] = entry['name'] files[name]['size'] = int(entry['size'], 10) files[name][sum_name] = entry[sum_name] yield { 'name': src_pkg['Package'], 'version': src_pkg['Version'], 'directory': src_pkg['Directory'], 'files': files, } def inject_repo_data_into_db(self, models_list): """Generate the Package entries that didn't previously exist. Contrary to ListerBase, we don't actually insert the data in database. `schedule_missing_tasks` does it once we have the origin and task identifiers. """ by_name_version = {} temp_packages = [] area_id = self.area.id for model in models_list: name = model['name'] version = model['version'] temp_packages.append({ 'area_id': area_id, 'name': name, 'version': version, }) by_name_version[name, version] = model # Add all the listed packages to a temporary table self.db_session.execute(CreateTable(TempPackage.__table__)) self.db_session.bulk_insert_mappings(TempPackage, temp_packages) def exists_tmp_pkg(db_session, model): return ( db_session.query(model) .filter(Package.area_id == TempPackage.area_id) .filter(Package.name == TempPackage.name) .filter(Package.version == TempPackage.version) .exists() ) # Filter out the packages that already exist in the main Package table new_packages = self.db_session\ .query(TempPackage)\ .options(load_only('name', 'version'))\ .filter(~exists_tmp_pkg(self.db_session, Package))\ .all() self.old_area_packages = self.db_session.query(Package).filter( exists_tmp_pkg(self.db_session, TempPackage) ).all() self.db_session.execute(DropTable(TempPackage.__table__)) added_packages = [] for package in new_packages: model = by_name_version[package.name, package.version] added_packages.append(Package(area=self.area, **model)) self.db_session.add_all(added_packages) return added_packages def schedule_missing_tasks(self, models_list, added_packages): """We create tasks at the end of the full snapshot processing""" return def create_tasks_for_snapshot(self, snapshot): tasks = [ snapshot.task_for_package(name, versions) for name, versions in snapshot.get_packages().items() ] return self.scheduler.create_tasks(tasks) def run(self, distribution, date=None): """Run the lister for a given (distribution, area) tuple. Args: distribution (str): name of the distribution (e.g. "Debian") date (datetime.datetime): date the snapshot is taken (defaults to now) """ distribution = self.db_session\ .query(Distribution)\ .options(joinedload(Distribution.areas))\ .filter(Distribution.name == distribution)\ .one_or_none() if not distribution: raise ValueError("Distribution %s is not registered" % distribution) if not distribution.type == 'deb': raise ValueError("Distribution %s is not a Debian derivative" % distribution) date = date or datetime.datetime.now(tz=datetime.timezone.utc) logger.debug('Creating snapshot for distribution %s on date %s' % (distribution, date)) snapshot = DistributionSnapshot(date=date, distribution=distribution) self.db_session.add(snapshot) for area in distribution.areas: if not area.active: continue self.area = area logger.debug('Processing area %s' % area) _, new_area_packages = self.ingest_data(None) area_snapshot = AreaSnapshot(snapshot=snapshot, area=area) self.db_session.add(area_snapshot) area_snapshot.packages.extend(new_area_packages) area_snapshot.packages.extend(self.old_area_packages) self.create_tasks_for_snapshot(snapshot) self.db_session.commit() return True diff --git a/swh/lister/debian/models.py b/swh/lister/debian/models.py new file mode 100644 index 0000000..7ddb7a2 --- /dev/null +++ b/swh/lister/debian/models.py @@ -0,0 +1,251 @@ +# Copyright (C) 2017-2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import binascii +from collections import defaultdict +import datetime + +from sqlalchemy import ( + Boolean, + Column, + DateTime, + Enum, + ForeignKey, + Integer, + LargeBinary, + String, + Table, + UniqueConstraint, +) + +try: + from sqlalchemy import JSON +except ImportError: + # SQLAlchemy < 1.1 + from sqlalchemy.dialects.postgresql import JSONB as JSON + +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import relationship + +SQLBase = declarative_base() + + +class Distribution(SQLBase): + """A distribution (e.g. Debian, Ubuntu, Fedora, ...)""" + __tablename__ = 'distribution' + + id = Column(Integer, primary_key=True) + name = Column(String, unique=True, nullable=False) + type = Column(Enum('deb', 'rpm', name='distribution_types'), + nullable=False) + mirror_uri = Column(String, nullable=False) + + areas = relationship('Area', back_populates='distribution') + + def origin_for_package(self, package_name, package_versions): + """Return the origin dictionary for the given package""" + return { + 'type': self.type, + 'url': '%s://%s/packages/%s' % ( + self.type, self.name, package_name + ), + } + + def __repr__(self): + return 'Distribution(%s (%s) on %s)' % ( + self.name, + self.type, + self.mirror_uri, + ) + + +class Area(SQLBase): + __tablename__ = 'area' + __table_args__ = ( + UniqueConstraint('distribution_id', 'name'), + ) + + id = Column(Integer, primary_key=True) + distribution_id = Column(Integer, ForeignKey('distribution.id'), + nullable=False) + name = Column(String, nullable=False) + active = Column(Boolean, nullable=False, default=True) + + distribution = relationship('Distribution', back_populates='areas') + + def index_uris(self): + """Get possible URIs for this component's package index""" + if self.distribution.type == 'deb': + compression_exts = ('xz', 'bz2', 'gz', None) + base_uri = '%s/dists/%s/source/Sources' % ( + self.distribution.mirror_uri, + self.name, + ) + for ext in compression_exts: + if ext: + yield (base_uri + '.' + ext, ext) + else: + yield (base_uri, None) + + raise NotImplementedError( + 'Do not know how to build index URI for Distribution type %s' % + self.distribution.type + ) + + def __repr__(self): + return 'Area(%s of %s)' % ( + self.name, + self.distribution.name, + ) + + +class Package(SQLBase): + __tablename__ = 'package' + __table_args__ = ( + UniqueConstraint('area_id', 'name', 'version'), + ) + + id = Column(Integer, primary_key=True) + area_id = Column(Integer, ForeignKey('area.id'), nullable=False) + name = Column(String, nullable=False) + version = Column(String, nullable=False) + directory = Column(String, nullable=False) + files = Column(JSON, nullable=False) + + origin_id = Column(Integer) + task_id = Column(Integer) + + revision_id = Column(LargeBinary(20)) + + area = relationship('Area') + + @property + def distribution(self): + return self.area.distribution + + def fetch_uri(self, filename): + """Get the URI to fetch the `filename` file associated with the + package""" + if self.distribution.type == 'deb': + return '%s/%s/%s' % ( + self.distribution.mirror_uri, + self.directory, + filename, + ) + else: + raise NotImplementedError( + 'Do not know how to build fetch URI for Distribution type %s' % + self.distribution.type + ) + + def loader_dict(self): + ret = { + 'id': self.id, + 'name': self.name, + 'version': self.version, + } + if self.revision_id: + ret['revision_id'] = binascii.hexlify(self.revision_id).decode() + else: + files = { + name: checksums.copy() + for name, checksums in self.files.items() + } + for name in files: + files[name]['uri'] = self.fetch_uri(name) + + ret.update({ + 'revision_id': None, + 'files': files, + }) + return ret + + def __repr__(self): + return 'Package(%s_%s of %s %s)' % ( + self.name, + self.version, + self.distribution.name, + self.area.name, + ) + + +class DistributionSnapshot(SQLBase): + __tablename__ = 'distribution_snapshot' + + id = Column(Integer, primary_key=True) + date = Column(DateTime, nullable=False, index=True) + distribution_id = Column(Integer, + ForeignKey('distribution.id'), + nullable=False) + + distribution = relationship('Distribution') + areas = relationship('AreaSnapshot', back_populates='snapshot') + + def task_for_package(self, package_name, package_versions): + """Return the task dictionary for the given list of package versions""" + origin = self.distribution.origin_for_package( + package_name, package_versions, + ) + + return { + 'policy': 'oneshot', + 'type': 'load-%s-package' % self.distribution.type, + 'next_run': datetime.datetime.now(tz=datetime.timezone.utc), + 'arguments': { + 'args': [], + 'kwargs': { + 'origin': origin, + 'date': self.date.isoformat(), + 'packages': package_versions, + }, + } + } + + def get_packages(self): + packages = defaultdict(dict) + for area_snapshot in self.areas: + area_name = area_snapshot.area.name + for package in area_snapshot.packages: + ref_name = '%s/%s' % (area_name, package.version) + packages[package.name][ref_name] = package.loader_dict() + + return packages + + +area_snapshot_package_assoc = Table( + 'area_snapshot_package', SQLBase.metadata, + Column('area_snapshot_id', Integer, ForeignKey('area_snapshot.id'), + nullable=False), + Column('package_id', Integer, ForeignKey('package.id'), + nullable=False), +) + + +class AreaSnapshot(SQLBase): + __tablename__ = 'area_snapshot' + + id = Column(Integer, primary_key=True) + snapshot_id = Column(Integer, + ForeignKey('distribution_snapshot.id'), + nullable=False) + area_id = Column(Integer, + ForeignKey('area.id'), + nullable=False) + + snapshot = relationship('DistributionSnapshot', back_populates='areas') + area = relationship('Area') + packages = relationship('Package', secondary=area_snapshot_package_assoc) + + +class TempPackage(SQLBase): + __tablename__ = 'temp_package' + __table_args__ = { + 'prefixes': ['TEMPORARY'], + } + + id = Column(Integer, primary_key=True) + area_id = Column(Integer) + name = Column(String) + version = Column(String)