Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/origin.py
from datetime import datetime, timezone | |||||
from itertools import islice | from itertools import islice | ||||
import logging | import logging | ||||
import time | import time | ||||
from typing import Iterable, Iterator, List, Optional, Tuple | from typing import Iterable, Iterator, List, Optional, Tuple | ||||
import iso8601 | from swh.model.model import Sha1Git | ||||
from .archive import ArchiveInterface | from .archive import ArchiveInterface | ||||
from .graph import HistoryNode, build_history_graph | from .graph import HistoryNode, build_history_graph | ||||
from .model import OriginEntry, RevisionEntry | from .model import OriginEntry, RevisionEntry | ||||
from .provenance import ProvenanceInterface | from .provenance import ProvenanceInterface | ||||
class CSVOriginIterator: | class CSVOriginIterator: | ||||
"""Iterator over origin visit statuses typically present in the given CSV | """Iterator over origin visit statuses typically present in the given CSV | ||||
file. | file. | ||||
The input is an iterator that produces 3 elements per row: | The input is an iterator that produces 2 elements per row: | ||||
(url, date, snap) | (url, snap) | ||||
where: | where: | ||||
- url: is the origin url of the visit | - url: is the origin url of the visit | ||||
- date: is the date of the visit | |||||
- snap: sha1_git of the snapshot pointed by the visit status | - snap: sha1_git of the snapshot pointed by the visit status | ||||
""" | """ | ||||
def __init__( | def __init__( | ||||
self, | self, | ||||
statuses: Iterable[Tuple[str, datetime, bytes]], | statuses: Iterable[Tuple[str, Sha1Git]], | ||||
limit: Optional[int] = None, | limit: Optional[int] = None, | ||||
): | ): | ||||
self.statuses: Iterator[Tuple[str, datetime, bytes]] | self.statuses: Iterator[Tuple[str, Sha1Git]] | ||||
if limit is not None: | if limit is not None: | ||||
self.statuses = islice(statuses, limit) | self.statuses = islice(statuses, limit) | ||||
else: | else: | ||||
self.statuses = iter(statuses) | self.statuses = iter(statuses) | ||||
def __iter__(self): | def __iter__(self): | ||||
for url, date, snap in self.statuses: | return (OriginEntry(url, snapshot) for url, snapshot in self.statuses) | ||||
date = iso8601.parse_date(date, default_timezone=timezone.utc) | |||||
yield OriginEntry(url, date, snap) | |||||
def origin_add( | def origin_add( | ||||
provenance: ProvenanceInterface, | provenance: ProvenanceInterface, | ||||
archive: ArchiveInterface, | archive: ArchiveInterface, | ||||
origins: List[OriginEntry], | origins: List[OriginEntry], | ||||
): | ): | ||||
start = time.time() | start = time.time() | ||||
for origin in origins: | for origin in origins: | ||||
provenance.origin_add(origin) | |||||
origin.retrieve_revisions(archive) | origin.retrieve_revisions(archive) | ||||
for revision in origin.revisions: | for revision in origin.revisions: | ||||
graph = build_history_graph(archive, provenance, revision) | graph = build_history_graph(archive, provenance, revision) | ||||
origin_add_revision(provenance, origin, graph) | origin_add_revision(provenance, origin, graph) | ||||
done = time.time() | done = time.time() | ||||
provenance.commit() | provenance.commit() | ||||
stop = time.time() | stop = time.time() | ||||
logging.debug( | logging.debug( | ||||
"Origins " | "Origins " | ||||
";".join([origin.url + ":" + origin.snapshot.hex() for origin in origins]) | ";".join([origin.id.hex() + ":" + origin.snapshot.hex() for origin in origins]) | ||||
+ f" were processed in {stop - start} secs (commit took {stop - done} secs)!" | + f" were processed in {stop - start} secs (commit took {stop - done} secs)!" | ||||
) | ) | ||||
def origin_add_revision( | def origin_add_revision( | ||||
provenance: ProvenanceInterface, | provenance: ProvenanceInterface, | ||||
origin: OriginEntry, | origin: OriginEntry, | ||||
graph: HistoryNode, | graph: HistoryNode, | ||||
Show All 34 Lines |