Changeset View
Changeset View
Standalone View
Standalone View
swh/web/save_origin_webhooks/sourceforge.py
- This file was added.
# Copyright (C) 2022 The Software Heritage developers | |||||
# See the AUTHORS file at the top-level directory of this distribution | |||||
# License: GNU Affero General Public License version 3, or any later version | |||||
# See top-level LICENSE file for more information | |||||
from typing import Tuple | |||||
import requests | |||||
from rest_framework.request import Request | |||||
from swh.web.api.apidoc import api_doc, format_docstring, set_docstring | |||||
from swh.web.api.apiurls import api_route | |||||
from swh.web.save_origin_webhooks.generic_receiver import origin_save_webhook_receiver | |||||
FORGE_TYPE = "SourceForge" | |||||
SOURCE_FORGE_API_PROJECT_URL_PATTERN = "https://sourceforge.net/rest/p/{project_name}" | |||||
def is_sourceforge_request(request: Request) -> bool: | |||||
return ( | |||||
request.headers.get("User-Agent", "") | |||||
== "Allura Webhook (https://allura.apache.org/)" | |||||
) | |||||
def is_sourceforge_push_event(request: Request) -> bool: | |||||
# SourceForge only support webhooks for push events | |||||
return True | |||||
def extract_sourceforge_repo_url_and_visit_type(request: Request) -> Tuple[str, str]: | |||||
repo_url = "" | |||||
visit_type = "" | |||||
project_full_name = request.data.get("repository", {}).get("full_name") | |||||
if project_full_name: | |||||
project_name = project_full_name.split("/")[2] | |||||
project_api_url = SOURCE_FORGE_API_PROJECT_URL_PATTERN.format( | |||||
project_name=project_name | |||||
) | |||||
response = requests.get(project_api_url) | |||||
if response.ok: | |||||
project_data = response.json() | |||||
for tool in project_data.get("tools", []): | |||||
if tool.get("mount_point") == "code" and tool.get("url", "").endswith( | |||||
project_full_name | |||||
): | |||||
repo_url = tool.get( | |||||
"clone_url_https_anon", tool.get("clone_url_ro", "") | |||||
) | |||||
visit_type = tool.get("name", "") | |||||
return repo_url, visit_type | |||||
@api_route( | |||||
f"/origin/save/webhook/{FORGE_TYPE.lower()}/", | |||||
f"api-1-origin-save-webhook-{FORGE_TYPE.lower()}", | |||||
methods=["POST"], | |||||
) | |||||
@api_doc(f"/origin/save/webhook/{FORGE_TYPE.lower()}/", category="Request archival") | |||||
@format_docstring( | |||||
forge_type_lower=FORGE_TYPE.lower(), | |||||
forge_type=FORGE_TYPE, | |||||
repo_types="git, hg or svn", | |||||
webhook_guide_url=( | |||||
"https://sourceforge.net/blog/" | |||||
"how-to-use-webhooks-for-git-mercurial-and-svn-repositories/" | |||||
), | |||||
) | |||||
@set_docstring(origin_save_webhook_receiver.__doc__) | |||||
def api_origin_save_webhook_sourceforge(request: Request): | |||||
return origin_save_webhook_receiver( | |||||
request, | |||||
forge_type=FORGE_TYPE, | |||||
is_forge_request=is_sourceforge_request, | |||||
is_push_event=is_sourceforge_push_event, | |||||
extract_repo_url_and_visit_type=extract_sourceforge_repo_url_and_visit_type, | |||||
) |