Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/postgresql/converters.py
Show All 33 Lines | DEFAULT_AUTHOR = { | ||||
"name": None, | "name": None, | ||||
"email": None, | "email": None, | ||||
} | } | ||||
DEFAULT_DATE = { | DEFAULT_DATE = { | ||||
"timestamp": None, | "timestamp": None, | ||||
"offset": 0, | "offset": 0, | ||||
"neg_utc_offset": None, | "neg_utc_offset": None, | ||||
"offset_bytes": None, | |||||
} | } | ||||
def author_to_db(author: Optional[Person]) -> Dict[str, Any]: | def author_to_db(author: Optional[Person]) -> Dict[str, Any]: | ||||
"""Convert a swh-model author to its DB representation. | """Convert a swh-model author to its DB representation. | ||||
Args: | Args: | ||||
author: a :mod:`swh.model` compatible author | author: a :mod:`swh.model` compatible author | ||||
Show All 29 Lines | |||||
def db_to_git_headers(db_git_headers): | def db_to_git_headers(db_git_headers): | ||||
ret = [] | ret = [] | ||||
for key, value in db_git_headers: | for key, value in db_git_headers: | ||||
ret.append([key.encode("utf-8"), encode_with_unescape(value)]) | ret.append([key.encode("utf-8"), encode_with_unescape(value)]) | ||||
return ret | return ret | ||||
def db_to_date( | def db_to_date( | ||||
date: Optional[datetime.datetime], offset: int, neg_utc_offset: Optional[bool] | date: Optional[datetime.datetime], | ||||
offset: int, | |||||
neg_utc_offset: Optional[bool], | |||||
offset_bytes: Optional[bytes], | |||||
) -> Optional[TimestampWithTimezone]: | ) -> Optional[TimestampWithTimezone]: | ||||
"""Convert the DB representation of a date to a swh-model compatible date. | """Convert the DB representation of a date to a swh-model compatible date. | ||||
Args: | Args: | ||||
date: a date pulled out of the database | date: a date pulled out of the database | ||||
offset: an integer number of minutes representing an UTC offset | offset: an integer number of minutes representing an UTC offset | ||||
neg_utc_offset: whether an utc offset is negative | neg_utc_offset: whether an utc offset is negative | ||||
Returns: | Returns: | ||||
a TimestampWithTimezone, or None if the date is None. | a TimestampWithTimezone, or None if the date is None. | ||||
""" | """ | ||||
if date is None: | if date is None: | ||||
return None | return None | ||||
if neg_utc_offset is None: | if neg_utc_offset is None: | ||||
# For older versions of the database that were not migrated to schema v160 | # For older versions of the database that were not migrated to schema v160 | ||||
neg_utc_offset = False | neg_utc_offset = False | ||||
kwargs = {} | |||||
if offset_bytes: | |||||
# TODO: remove the conditional after migration is complete. | |||||
kwargs["offset_bytes"] = offset_bytes | |||||
return TimestampWithTimezone( | return TimestampWithTimezone( | ||||
timestamp=Timestamp( | timestamp=Timestamp( | ||||
# we use floor() instead of int() to round down, because of negative dates | # we use floor() instead of int() to round down, because of negative dates | ||||
seconds=math.floor(date.timestamp()), | seconds=math.floor(date.timestamp()), | ||||
microseconds=date.microsecond, | microseconds=date.microsecond, | ||||
), | ), | ||||
offset=offset, | offset=offset, | ||||
negative_utc=neg_utc_offset, | negative_utc=neg_utc_offset, | ||||
**kwargs, | |||||
) | ) | ||||
def date_to_db(ts_with_tz: Optional[TimestampWithTimezone]) -> Dict[str, Any]: | def date_to_db(ts_with_tz: Optional[TimestampWithTimezone]) -> Dict[str, Any]: | ||||
"""Convert a swh-model date_offset to its DB representation. | """Convert a swh-model date_offset to its DB representation. | ||||
Args: | Args: | ||||
ts_with_tz: a TimestampWithTimezone object | ts_with_tz: a TimestampWithTimezone object | ||||
Returns: | Returns: | ||||
dict: a dictionary with three keys: | dict: a dictionary with these keys: | ||||
- timestamp: a date in ISO format | - timestamp: a date in ISO format | ||||
- offset: the UTC offset in minutes | - offset: the UTC offset in minutes | ||||
- neg_utc_offset: a boolean indicating whether a null offset is | - neg_utc_offset: a boolean indicating whether a null offset is | ||||
negative or positive. | negative or positive. | ||||
- offset_bytes: a byte representation of the latter two, usually as "+HHMM" | |||||
or "-HHMM" | |||||
""" | """ | ||||
if ts_with_tz is None: | if ts_with_tz is None: | ||||
return DEFAULT_DATE | return DEFAULT_DATE | ||||
ts = ts_with_tz.timestamp | ts = ts_with_tz.timestamp | ||||
timestamp = datetime.datetime.fromtimestamp(ts.seconds, datetime.timezone.utc) | timestamp = datetime.datetime.fromtimestamp(ts.seconds, datetime.timezone.utc) | ||||
timestamp = timestamp.replace(microsecond=ts.microseconds) | timestamp = timestamp.replace(microsecond=ts.microseconds) | ||||
return { | return { | ||||
# PostgreSQL supports isoformatted timestamps | # PostgreSQL supports isoformatted timestamps | ||||
"timestamp": timestamp.isoformat(), | "timestamp": timestamp.isoformat(), | ||||
"offset": ts_with_tz.offset, | "offset": ts_with_tz.offset, | ||||
"neg_utc_offset": ts_with_tz.negative_utc, | "neg_utc_offset": ts_with_tz.negative_utc, | ||||
"offset_bytes": ts_with_tz.offset_bytes, | |||||
} | } | ||||
def revision_to_db(revision: Revision) -> Dict[str, Any]: | def revision_to_db(revision: Revision) -> Dict[str, Any]: | ||||
"""Convert a swh-model revision to its database representation. | """Convert a swh-model revision to its database representation. | ||||
""" | """ | ||||
author = author_to_db(revision.author) | author = author_to_db(revision.author) | ||||
date = date_to_db(revision.date) | date = date_to_db(revision.date) | ||||
committer = author_to_db(revision.committer) | committer = author_to_db(revision.committer) | ||||
committer_date = date_to_db(revision.committer_date) | committer_date = date_to_db(revision.committer_date) | ||||
return { | return { | ||||
"id": revision.id, | "id": revision.id, | ||||
"author_fullname": author["fullname"], | "author_fullname": author["fullname"], | ||||
"author_name": author["name"], | "author_name": author["name"], | ||||
"author_email": author["email"], | "author_email": author["email"], | ||||
"date": date["timestamp"], | "date": date["timestamp"], | ||||
"date_offset": date["offset"], | "date_offset": date["offset"], | ||||
"date_neg_utc_offset": date["neg_utc_offset"], | "date_neg_utc_offset": date["neg_utc_offset"], | ||||
"date_offset_bytes": date["offset_bytes"], | |||||
"committer_fullname": committer["fullname"], | "committer_fullname": committer["fullname"], | ||||
"committer_name": committer["name"], | "committer_name": committer["name"], | ||||
"committer_email": committer["email"], | "committer_email": committer["email"], | ||||
"committer_date": committer_date["timestamp"], | "committer_date": committer_date["timestamp"], | ||||
"committer_date_offset": committer_date["offset"], | "committer_date_offset": committer_date["offset"], | ||||
"committer_date_neg_utc_offset": committer_date["neg_utc_offset"], | "committer_date_neg_utc_offset": committer_date["neg_utc_offset"], | ||||
"committer_date_offset_bytes": committer_date["offset_bytes"], | |||||
"type": revision.type.value, | "type": revision.type.value, | ||||
"directory": revision.directory, | "directory": revision.directory, | ||||
"message": revision.message, | "message": revision.message, | ||||
"metadata": None if revision.metadata is None else dict(revision.metadata), | "metadata": None if revision.metadata is None else dict(revision.metadata), | ||||
"synthetic": revision.synthetic, | "synthetic": revision.synthetic, | ||||
"extra_headers": revision.extra_headers, | "extra_headers": revision.extra_headers, | ||||
"raw_manifest": revision.raw_manifest, | |||||
"parents": [ | "parents": [ | ||||
{"id": revision.id, "parent_id": parent, "parent_rank": i,} | {"id": revision.id, "parent_id": parent, "parent_rank": i,} | ||||
for i, parent in enumerate(revision.parents) | for i, parent in enumerate(revision.parents) | ||||
], | ], | ||||
} | } | ||||
def db_to_revision(db_revision: Dict[str, Any]) -> Optional[Revision]: | def db_to_revision(db_revision: Dict[str, Any]) -> Optional[Revision]: | ||||
Show All 9 Lines | author = db_to_author( | ||||
db_revision["author_fullname"], | db_revision["author_fullname"], | ||||
db_revision["author_name"], | db_revision["author_name"], | ||||
db_revision["author_email"], | db_revision["author_email"], | ||||
) | ) | ||||
date = db_to_date( | date = db_to_date( | ||||
db_revision["date"], | db_revision["date"], | ||||
db_revision["date_offset"], | db_revision["date_offset"], | ||||
db_revision["date_neg_utc_offset"], | db_revision["date_neg_utc_offset"], | ||||
db_revision["date_offset_bytes"], | |||||
) | ) | ||||
committer = db_to_author( | committer = db_to_author( | ||||
db_revision["committer_fullname"], | db_revision["committer_fullname"], | ||||
db_revision["committer_name"], | db_revision["committer_name"], | ||||
db_revision["committer_email"], | db_revision["committer_email"], | ||||
) | ) | ||||
committer_date = db_to_date( | committer_date = db_to_date( | ||||
db_revision["committer_date"], | db_revision["committer_date"], | ||||
db_revision["committer_date_offset"], | db_revision["committer_date_offset"], | ||||
db_revision["committer_date_neg_utc_offset"], | db_revision["committer_date_neg_utc_offset"], | ||||
db_revision["committer_date_offset_bytes"], | |||||
) | ) | ||||
assert author, "author is None" | assert author, "author is None" | ||||
assert committer, "committer is None" | assert committer, "committer is None" | ||||
parents = [] | parents = [] | ||||
if "parents" in db_revision: | if "parents" in db_revision: | ||||
for parent in db_revision["parents"]: | for parent in db_revision["parents"]: | ||||
Show All 17 Lines | return Revision( | ||||
committer_date=committer_date, | committer_date=committer_date, | ||||
type=RevisionType(db_revision["type"]), | type=RevisionType(db_revision["type"]), | ||||
directory=db_revision["directory"], | directory=db_revision["directory"], | ||||
message=db_revision["message"], | message=db_revision["message"], | ||||
metadata=metadata, | metadata=metadata, | ||||
synthetic=db_revision["synthetic"], | synthetic=db_revision["synthetic"], | ||||
extra_headers=extra_headers, | extra_headers=extra_headers, | ||||
parents=tuple(parents), | parents=tuple(parents), | ||||
raw_manifest=db_revision["raw_manifest"], | |||||
) | ) | ||||
def release_to_db(release: Release) -> Dict[str, Any]: | def release_to_db(release: Release) -> Dict[str, Any]: | ||||
"""Convert a swh-model release to its database representation. | """Convert a swh-model release to its database representation. | ||||
""" | """ | ||||
author = author_to_db(release.author) | author = author_to_db(release.author) | ||||
date = date_to_db(release.date) | date = date_to_db(release.date) | ||||
return { | return { | ||||
"id": release.id, | "id": release.id, | ||||
"author_fullname": author["fullname"], | "author_fullname": author["fullname"], | ||||
"author_name": author["name"], | "author_name": author["name"], | ||||
"author_email": author["email"], | "author_email": author["email"], | ||||
"date": date["timestamp"], | "date": date["timestamp"], | ||||
"date_offset": date["offset"], | "date_offset": date["offset"], | ||||
"date_neg_utc_offset": date["neg_utc_offset"], | "date_neg_utc_offset": date["neg_utc_offset"], | ||||
"date_offset_bytes": date["offset_bytes"], | |||||
"name": release.name, | "name": release.name, | ||||
"target": release.target, | "target": release.target, | ||||
"target_type": release.target_type.value, | "target_type": release.target_type.value, | ||||
"comment": release.message, | "comment": release.message, | ||||
"synthetic": release.synthetic, | "synthetic": release.synthetic, | ||||
"raw_manifest": release.raw_manifest, | |||||
} | } | ||||
def db_to_release(db_release: Dict[str, Any]) -> Optional[Release]: | def db_to_release(db_release: Dict[str, Any]) -> Optional[Release]: | ||||
"""Convert a database representation of a release to its swh-model | """Convert a database representation of a release to its swh-model | ||||
representation. | representation. | ||||
""" | """ | ||||
if db_release["target_type"] is None: | if db_release["target_type"] is None: | ||||
assert all(v is None for (k, v) in db_release.items() if k != "id") | assert all(v is None for (k, v) in db_release.items() if k != "id") | ||||
return None | return None | ||||
author = db_to_author( | author = db_to_author( | ||||
db_release["author_fullname"], | db_release["author_fullname"], | ||||
db_release["author_name"], | db_release["author_name"], | ||||
db_release["author_email"], | db_release["author_email"], | ||||
) | ) | ||||
date = db_to_date( | date = db_to_date( | ||||
db_release["date"], db_release["date_offset"], db_release["date_neg_utc_offset"] | db_release["date"], | ||||
db_release["date_offset"], | |||||
db_release["date_neg_utc_offset"], | |||||
db_release["date_offset_bytes"], | |||||
) | ) | ||||
return Release( | return Release( | ||||
author=author, | author=author, | ||||
date=date, | date=date, | ||||
id=db_release["id"], | id=db_release["id"], | ||||
name=db_release["name"], | name=db_release["name"], | ||||
message=db_release["comment"], | message=db_release["comment"], | ||||
synthetic=db_release["synthetic"], | synthetic=db_release["synthetic"], | ||||
target=db_release["target"], | target=db_release["target"], | ||||
target_type=ObjectType(db_release["target_type"]), | target_type=ObjectType(db_release["target_type"]), | ||||
raw_manifest=db_release["raw_manifest"], | |||||
) | ) | ||||
def db_to_raw_extrinsic_metadata(row) -> RawExtrinsicMetadata: | def db_to_raw_extrinsic_metadata(row) -> RawExtrinsicMetadata: | ||||
target = row["raw_extrinsic_metadata.target"] | target = row["raw_extrinsic_metadata.target"] | ||||
if not target.startswith("swh:1:"): | if not target.startswith("swh:1:"): | ||||
warnings.warn( | warnings.warn( | ||||
"Fetching raw_extrinsic_metadata row with URL target", DeprecationWarning | "Fetching raw_extrinsic_metadata row with URL target", DeprecationWarning | ||||
Show All 35 Lines |