Page MenuHomeSoftware Heritage

D4771.diff
No OneTemporary

D4771.diff

diff --git a/swh/model/identifiers.py b/swh/model/identifiers.py
--- a/swh/model/identifiers.py
+++ b/swh/model/identifiers.py
@@ -29,6 +29,16 @@
SWHID_TYPES = ["ori", "snp", "rel", "rev", "dir", "cnt"]
SWHID_SEP = ":"
SWHID_CTXT_SEP = ";"
+SWHID_QUALIFIERS = {"origin", "anchor", "visit", "path", "lines"}
+
+SWHID_RE_RAW = (
+ f"(?P<scheme>{SWHID_NAMESPACE})"
+ f"{SWHID_SEP}(?P<version>{SWHID_VERSION})"
+ f"{SWHID_SEP}(?P<object_type>{'|'.join(SWHID_TYPES)})"
+ f"{SWHID_SEP}(?P<object_id>[0-9a-f]{{40}})"
+ f"({SWHID_CTXT_SEP}(?P<qualifiers>\\S+))?"
+)
+SWHID_RE = re.compile(SWHID_RE_RAW)
@lru_cache()
@@ -677,6 +687,15 @@
CONTENT: {"short_name": "cnt", "key_id": "sha1_git"},
}
+_swhid_type_map = {
+ "ori": ORIGIN,
+ "snp": SNAPSHOT,
+ "rel": RELEASE,
+ "rev": REVISION,
+ "dir": DIRECTORY,
+ "cnt": CONTENT,
+}
+
@attr.s(frozen=True)
class SWHID:
@@ -717,8 +736,8 @@
# 'swh:1:cnt:8ff44f081d43176474b267de5451f2c2e88089d0'
"""
- namespace = attr.ib(type=str, default="swh")
- scheme_version = attr.ib(type=int, default=1)
+ namespace = attr.ib(type=str, default=SWHID_NAMESPACE)
+ scheme_version = attr.ib(type=int, default=SWHID_VERSION)
object_type = attr.ib(type=str, default="")
object_id = attr.ib(type=str, converter=hash_to_hex, default="") # type: ignore
metadata = attr.ib(
@@ -729,28 +748,43 @@
def check_namespace(self, attribute, value):
if value != SWHID_NAMESPACE:
raise ValidationError(
- f"Invalid SWHID: namespace is '{value}' but must be '{SWHID_NAMESPACE}'"
+ "Invalid SWHID: invalid namespace: %(namespace)s",
+ params={"namespace": value},
)
@scheme_version.validator
def check_scheme_version(self, attribute, value):
if value != SWHID_VERSION:
raise ValidationError(
- f"Invalid SWHID: version is {value} but must be {SWHID_VERSION}"
+ "Invalid SWHID: invalid version: %(version)s", params={"version": value}
)
@object_type.validator
def check_object_type(self, attribute, value):
if value not in _object_type_map:
- supported_types = ", ".join(_object_type_map.keys())
raise ValidationError(
- f"Invalid SWHID: object type is {value} but must be "
- f"one of {supported_types}"
+ "Invalid SWHID: invalid type: %(object_type)s)",
+ params={"object_type": value},
)
@object_id.validator
def check_object_id(self, attribute, value):
- validate_sha1(value) # can raise if invalid hash
+ try:
+ validate_sha1(value) # can raise if invalid hash
+ except ValidationError:
+ raise ValidationError(
+ "Invalid SWHID: invalid checksum: %(object_id)s",
+ params={"object_id": value},
+ ) from None
+
+ @metadata.validator
+ def check_qualifiers(self, attribute, value):
+ for k in value:
+ if k not in SWHID_QUALIFIERS:
+ raise ValidationError(
+ "Invalid SWHID: unknown qualifier: %(qualifier)s",
+ params={"qualifier": k},
+ )
def to_dict(self) -> Dict[str, Any]:
return attr.asdict(self)
@@ -801,77 +835,44 @@
return str(swhid)
-CONTEXT_QUALIFIERS = {"origin", "anchor", "visit", "path", "lines"}
-
-
def parse_swhid(swhid: str) -> SWHID:
- """Parse :ref:`persistent-identifiers`.
+ """Parse a Software Heritage identifier (SWHID) from string (see:
+ :ref:`persistent-identifiers`.)
Args:
swhid (str): A persistent identifier
- Raises:
- swh.model.exceptions.ValidationError: in case of:
-
- * missing mandatory values (4)
- * invalid namespace supplied
- * invalid version supplied
- * invalid type supplied
- * missing hash
- * invalid hash identifier supplied
-
Returns:
a named tuple holding the parsing result
- """
- if re.search(r"[ \t\n\r\f\v]", swhid):
- raise ValidationError("Invalid SwHID: SWHIDs cannot contain whitespaces")
-
- # <swhid>;<contextual-information>
- swhid_parts = swhid.split(SWHID_CTXT_SEP)
- swhid_data = swhid_parts.pop(0).split(":")
-
- if len(swhid_data) != 4:
- raise ValidationError(
- "Invalid SWHID, format must be 'swh:1:OBJECT_TYPE:OBJECT_ID'"
- )
-
- # Checking for parsing errors
- _ns, _version, _type, _id = swhid_data
-
- for otype, data in _object_type_map.items():
- if _type == data["short_name"]:
- _type = otype
- break
+ Raises:
+ swh.model.exceptions.ValidationError: if passed string is not a valid SWHID
- if not _id:
+ """
+ m = SWHID_RE.fullmatch(swhid)
+ if not m:
raise ValidationError(
- "Invalid SWHID: missing OBJECT_ID (as a 40 hex digit string)"
- )
-
- _metadata = {}
- for part in swhid_parts:
- try:
- qualifier, val = part.split("=")
- _metadata[qualifier] = val
- except Exception:
- raise ValidationError(
- "Invalid SWHID: contextual data must be a ;-separated list of "
- "key=value pairs"
- )
-
- wrong_qualifiers = set(_metadata) - set(CONTEXT_QUALIFIERS)
- if wrong_qualifiers:
- error_msg = (
- f"Invalid SWHID: Wrong qualifiers {', '.join(wrong_qualifiers)}. "
- f"The qualifiers must be one of {', '.join(CONTEXT_QUALIFIERS)}"
+ "Invalid SWHID: invalid syntax: %(swhid)s", params={"swhid": swhid}
)
- raise ValidationError(error_msg)
+ parts = m.groupdict()
+
+ _qualifiers = {}
+ qualifiers_raw = parts["qualifiers"]
+ if qualifiers_raw:
+ for qualifier in qualifiers_raw.split(SWHID_CTXT_SEP):
+ try:
+ k, v = qualifier.split("=")
+ except ValueError:
+ raise ValidationError(
+ "Invalid SWHID: invalid qualifier: %(qualifier)s",
+ params={"qualifier": qualifier},
+ )
+ _qualifiers[k] = v
return SWHID(
- _ns,
- int(_version),
- _type,
- _id,
- _metadata, # type: ignore # mypy can't properly unify types
+ parts["scheme"],
+ int(parts["version"]),
+ _swhid_type_map[parts["object_type"]],
+ parts["object_id"],
+ _qualifiers, # type: ignore # mypy can't properly unify types
)
diff --git a/swh/model/tests/test_identifiers.py b/swh/model/tests/test_identifiers.py
--- a/swh/model/tests/test_identifiers.py
+++ b/swh/model/tests/test_identifiers.py
@@ -218,6 +218,8 @@
],
}
+dummy_qualifiers = {"origin": "https://example.com", "lines": "42"}
+
class DirectoryIdentifier(unittest.TestCase):
def setUp(self):
@@ -843,7 +845,7 @@
for _type, _hash in [
(SNAPSHOT, _snapshot_id),
(SNAPSHOT, _snapshot),
- ("foo", ""),
+ ("lines", "42"),
]:
with self.assertRaises(ValidationError):
identifiers.swhid(_type, _hash)
@@ -1117,17 +1119,9 @@
)
assert hash(
- SWHID(
- object_type="directory",
- object_id=object_id,
- metadata={"foo": "bar", "baz": "qux"},
- )
+ SWHID(object_type="directory", object_id=object_id, metadata=dummy_qualifiers,)
) == hash(
- SWHID(
- object_type="directory",
- object_id=object_id,
- metadata={"foo": "bar", "baz": "qux"},
- )
+ SWHID(object_type="directory", object_id=object_id, metadata=dummy_qualifiers,)
)
# Different order of the dictionary, so the underlying order of the tuple in
@@ -1136,13 +1130,13 @@
SWHID(
object_type="directory",
object_id=object_id,
- metadata={"foo": "bar", "baz": "qux"},
+ metadata={"origin": "https://example.com", "lines": "42"},
)
) == hash(
SWHID(
object_type="directory",
object_id=object_id,
- metadata={"baz": "qux", "foo": "bar"},
+ metadata={"lines": "42", "origin": "https://example.com"},
)
)
@@ -1155,21 +1149,9 @@
)
assert SWHID(
- object_type="directory",
- object_id=object_id,
- metadata={"foo": "bar", "baz": "qux"},
- ) == SWHID(
- object_type="directory",
- object_id=object_id,
- metadata={"foo": "bar", "baz": "qux"},
- )
+ object_type="directory", object_id=object_id, metadata=dummy_qualifiers,
+ ) == SWHID(object_type="directory", object_id=object_id, metadata=dummy_qualifiers,)
assert SWHID(
- object_type="directory",
- object_id=object_id,
- metadata={"foo": "bar", "baz": "qux"},
- ) == SWHID(
- object_type="directory",
- object_id=object_id,
- metadata={"baz": "qux", "foo": "bar"},
- )
+ object_type="directory", object_id=object_id, metadata=dummy_qualifiers,
+ ) == SWHID(object_type="directory", object_id=object_id, metadata=dummy_qualifiers,)
diff --git a/swh/model/tests/test_model.py b/swh/model/tests/test_model.py
--- a/swh/model/tests/test_model.py
+++ b/swh/model/tests/test_model.py
@@ -782,12 +782,13 @@
_metadata_fetcher = MetadataFetcher(name="test-fetcher", version="0.0.1",)
_content_swhid = parse_swhid("swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2")
_origin_url = "https://forge.softwareheritage.org/source/swh-model.git"
+_dummy_qualifiers = {"origin": "https://example.com", "lines": "42"}
_common_metadata_fields = dict(
discovery_date=datetime.datetime.now(tz=datetime.timezone.utc),
authority=_metadata_authority,
fetcher=_metadata_fetcher,
format="json",
- metadata=b'{"foo": "bar"}',
+ metadata=b'{"origin": "https://example.com", "lines": "42"}',
)
@@ -815,7 +816,7 @@
"fetcher": {"name": "test-fetcher", "version": "0.0.1",},
"discovery_date": _common_metadata_fields["discovery_date"],
"format": "json",
- "metadata": b'{"foo": "bar"}',
+ "metadata": b'{"origin": "https://example.com", "lines": "42"}',
}
m = RawExtrinsicMetadata(
@@ -893,7 +894,7 @@
target=SWHID(
object_type="content",
object_id="94a9ed024d3859793618152ea559a168bbcbb5e2",
- metadata={"foo": "bar"},
+ metadata=_dummy_qualifiers,
),
**_common_metadata_fields,
)
@@ -1018,7 +1019,7 @@
snapshot=SWHID(
object_type="snapshot",
object_id="94a9ed024d3859793618152ea559a168bbcbb5e2",
- metadata={"foo": "bar"},
+ metadata=_dummy_qualifiers,
),
**_common_metadata_fields,
)
@@ -1073,7 +1074,7 @@
release=SWHID(
object_type="release",
object_id="94a9ed024d3859793618152ea559a168bbcbb5e2",
- metadata={"foo": "bar"},
+ metadata=_dummy_qualifiers,
),
**_common_metadata_fields,
)
@@ -1128,7 +1129,7 @@
revision=SWHID(
object_type="revision",
object_id="94a9ed024d3859793618152ea559a168bbcbb5e2",
- metadata={"foo": "bar"},
+ metadata=_dummy_qualifiers,
),
**_common_metadata_fields,
)
@@ -1205,7 +1206,7 @@
directory=SWHID(
object_type="directory",
object_id="94a9ed024d3859793618152ea559a168bbcbb5e2",
- metadata={"foo": "bar"},
+ metadata=_dummy_qualifiers,
),
**_common_metadata_fields,
)

File Metadata

Mime Type
text/plain
Expires
Thu, Jan 30, 4:07 PM (1 h, 49 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3224223

Event Timeline