Changeset View
Changeset View
Standalone View
Standalone View
swh/web/save_origin_webhooks/generic_receiver.py
Show All 27 Lines | class OriginSaveWebhookReceiver(abc.ABC): | ||||
def is_ping_event(self, request: Request) -> bool: | def is_ping_event(self, request: Request) -> bool: | ||||
return False | return False | ||||
@abc.abstractmethod | @abc.abstractmethod | ||||
def is_push_event(self, request: Request) -> bool: | def is_push_event(self, request: Request) -> bool: | ||||
... | ... | ||||
@abc.abstractmethod | @abc.abstractmethod | ||||
def extract_repo_url_and_visit_type(self, request: Request) -> Tuple[str, str]: | def extract_repo_info(self, request: Request) -> Tuple[str, str, bool]: | ||||
"""Extract and return a tuple (repository_url, visit_type, private) from | |||||
the forge webhook payload.""" | |||||
... | ... | ||||
def __init__(self): | def __init__(self): | ||||
self.__doc__ = f""" | self.__doc__ = f""" | ||||
.. http:post:: /api/1/origin/save/webhook/{self.FORGE_TYPE.lower()}/ | .. http:post:: /api/1/origin/save/webhook/{self.FORGE_TYPE.lower()}/ | ||||
Webhook receiver for {self.FORGE_TYPE} to request or update the archival of | Webhook receiver for {self.FORGE_TYPE} to request or update the archival of | ||||
a repository when new commits are pushed to it. | a repository when new commits are pushed to it. | ||||
▲ Show 20 Lines • Show All 50 Lines • ▼ Show 20 Lines | ) -> Dict[str, Any]: | ||||
content_type = request.headers.get("Content-Type") | content_type = request.headers.get("Content-Type") | ||||
if content_type and not content_type.startswith("application/json"): | if content_type and not content_type.startswith("application/json"): | ||||
raise BadInputExc( | raise BadInputExc( | ||||
f"Invalid content type '{content_type}' for the POST request sent by " | f"Invalid content type '{content_type}' for the POST request sent by " | ||||
f"{self.FORGE_TYPE} webhook, it should be 'application/json'." | f"{self.FORGE_TYPE} webhook, it should be 'application/json'." | ||||
) | ) | ||||
repo_url, visit_type = self.extract_repo_url_and_visit_type(request) | repo_url, visit_type, private = self.extract_repo_info(request) | ||||
if not repo_url: | if not repo_url: | ||||
raise BadInputExc( | raise BadInputExc( | ||||
f"Repository URL could not be extracted from {self.FORGE_TYPE} webhook " | f"Repository URL could not be extracted from {self.FORGE_TYPE} webhook " | ||||
f"payload." | f"payload." | ||||
) | ) | ||||
if not visit_type: | if not visit_type: | ||||
raise BadInputExc( | raise BadInputExc( | ||||
f"Visit type could not be determined for repository {repo_url}." | f"Visit type could not be determined for repository {repo_url}." | ||||
) | ) | ||||
if private: | |||||
raise BadInputExc( | |||||
f"Repository {repo_url} is private and cannot be cloned without authentication." | |||||
) | |||||
save_request = create_save_origin_request( | save_request = create_save_origin_request( | ||||
visit_type=visit_type, origin_url=repo_url | visit_type=visit_type, origin_url=repo_url | ||||
) | ) | ||||
return { | return { | ||||
"origin_url": save_request["origin_url"], | "origin_url": save_request["origin_url"], | ||||
"visit_type": save_request["visit_type"], | "visit_type": save_request["visit_type"], | ||||
"save_request_date": save_request["save_request_date"], | "save_request_date": save_request["save_request_date"], | ||||
"save_request_status": save_request["save_request_status"], | "save_request_status": save_request["save_request_status"], | ||||
} | } |