Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/api/server.py
Show All 25 Lines | |||||
from swh.core import config | from swh.core import config | ||||
from swh.core.api.serializers import encode_data_client as encode_data | from swh.core.api.serializers import encode_data_client as encode_data | ||||
from swh.core.api.serializers import msgpack_loads as decode_data | from swh.core.api.serializers import msgpack_loads as decode_data | ||||
from swh.model.hashutil import hash_to_hex | from swh.model.hashutil import hash_to_hex | ||||
from swh.model.model import Sha1Git | from swh.model.model import Sha1Git | ||||
from .. import get_provenance_storage | from .. import get_provenance_storage | ||||
from ..interface import EntityType, RelationData, RelationType, RevisionData | from ..interface import ( | ||||
DirectoryData, | |||||
EntityType, | |||||
RelationData, | |||||
RelationType, | |||||
RevisionData, | |||||
) | |||||
from ..util import path_id | from ..util import path_id | ||||
from .serializers import DECODERS, ENCODERS | from .serializers import DECODERS, ENCODERS | ||||
LOG_FORMAT = ( | LOG_FORMAT = ( | ||||
"%(levelname) -10s %(asctime)s %(name) -30s %(funcName) " | "%(levelname) -10s %(asctime)s %(name) -30s %(funcName) " | ||||
"-35s %(lineno) -5d: %(message)s" | "-35s %(lineno) -5d: %(message)s" | ||||
) | ) | ||||
LOGGER = logging.getLogger(__name__) | LOGGER = logging.getLogger(__name__) | ||||
TERMINATE = object() | TERMINATE = object() | ||||
class ServerCommand(Enum): | class ServerCommand(Enum): | ||||
TERMINATE = "terminate" | TERMINATE = "terminate" | ||||
CONSUMING = "consuming" | CONSUMING = "consuming" | ||||
class TerminateSignal(BaseException): | class TerminateSignal(BaseException): | ||||
pass | pass | ||||
def resolve_dates( | def resolve_dates(dates: Iterable[Tuple[Sha1Git, datetime]]) -> Dict[Sha1Git, datetime]: | ||||
dates: Iterable[Union[Tuple[Sha1Git, Optional[datetime]], Tuple[Sha1Git]]] | result: Dict[Sha1Git, datetime] = {} | ||||
) -> Dict[Sha1Git, Optional[datetime]]: | for sha1, date in dates: | ||||
result: Dict[Sha1Git, Optional[datetime]] = {} | known = result.setdefault(sha1, date) | ||||
for row in dates: | if date < known: | ||||
sha1 = row[0] | |||||
date = ( | |||||
cast(Tuple[Sha1Git, Optional[datetime]], row)[1] if len(row) > 1 else None | |||||
) | |||||
known = result.setdefault(sha1, None) | |||||
if date is not None and (known is None or date < known): | |||||
result[sha1] = date | result[sha1] = date | ||||
return result | return result | ||||
def resolve_directory( | |||||
data: Iterable[Tuple[Sha1Git, DirectoryData]] | |||||
) -> Dict[Sha1Git, DirectoryData]: | |||||
result: Dict[Sha1Git, DirectoryData] = {} | |||||
for sha1, dir in data: | |||||
known = result.setdefault(sha1, dir) | |||||
value = known | |||||
if dir.date < known.date: | |||||
value = DirectoryData(date=dir.date, flat=value.flat) | |||||
if dir.flat: | |||||
value = DirectoryData(date=value.date, flat=dir.flat) | |||||
if value != known: | |||||
result[sha1] = value | |||||
return result | |||||
def resolve_revision( | def resolve_revision( | ||||
data: Iterable[Union[Tuple[Sha1Git, RevisionData], Tuple[Sha1Git]]] | data: Iterable[Union[Tuple[Sha1Git, RevisionData], Tuple[Sha1Git]]] | ||||
) -> Dict[Sha1Git, RevisionData]: | ) -> Dict[Sha1Git, RevisionData]: | ||||
result: Dict[Sha1Git, RevisionData] = {} | result: Dict[Sha1Git, RevisionData] = {} | ||||
for row in data: | for row in data: | ||||
sha1 = row[0] | sha1 = row[0] | ||||
rev = ( | rev = ( | ||||
cast(Tuple[Sha1Git, RevisionData], row)[1] | cast(Tuple[Sha1Git, RevisionData], row)[1] | ||||
▲ Show 20 Lines • Show All 374 Lines • ▼ Show 20 Lines | def stop(self) -> None: | ||||
self.stop_consuming() | self.stop_consuming() | ||||
self._connection.ioloop.start() | self._connection.ioloop.start() | ||||
else: | else: | ||||
self._connection.ioloop.stop() | self._connection.ioloop.stop() | ||||
LOGGER.info("Stopped") | LOGGER.info("Stopped") | ||||
@staticmethod | @staticmethod | ||||
def get_conflicts_func(meth_name: str) -> Callable[[Iterable[Any]], Any]: | def get_conflicts_func(meth_name: str) -> Callable[[Iterable[Any]], Any]: | ||||
if meth_name in ["content_add", "directory_add"]: | if meth_name == "content_add": | ||||
return resolve_dates | return resolve_dates | ||||
elif meth_name == "directory_add": | |||||
return resolve_directory | |||||
elif meth_name == "location_add": | elif meth_name == "location_add": | ||||
return lambda data: set(data) # just remove duplicates | return lambda data: set(data) # just remove duplicates | ||||
elif meth_name == "origin_add": | elif meth_name == "origin_add": | ||||
return lambda data: dict(data) # last processed value is good enough | return lambda data: dict(data) # last processed value is good enough | ||||
elif meth_name == "revision_add": | elif meth_name == "revision_add": | ||||
return resolve_revision | return resolve_revision | ||||
elif meth_name == "relation_add": | elif meth_name == "relation_add": | ||||
return resolve_relation | return resolve_relation | ||||
▲ Show 20 Lines • Show All 248 Lines • Show Last 20 Lines |