Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/origins/client.py b/origins/client.py
new file mode 100755
index 0000000..c48a0df
--- /dev/null
+++ b/origins/client.py
@@ -0,0 +1,104 @@
+#!/usr/bin/env python
+
+import logging
+import logging.handlers
+import multiprocessing
+import os
+import sys
+import time
+from typing import Any, Callable, Dict, List, Optional
+
+from swh.core import config
+from swh.model.hashutil import hash_to_bytes
+from swh.provenance import get_archive, get_provenance
+from swh.provenance.origin import OriginEntry, origin_add
+import yaml
+import zmq
+
+CONFIG_ENVVAR = "SWH_CONFIG_FILENAME"
+
+DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, None)
+
+
+class Client(multiprocessing.Process):
+ def __init__(
+ self,
+ conf: Dict[str, Any],
+ group: None = None,
+ target: Optional[Callable[..., Any]] = ...,
+ name: Optional[str] = ...,
+ ) -> None:
+ super().__init__(group=group, target=target, name=name)
+ self.archive_conf = conf["archive"]
+ self.storage_conf = conf["storage"]
+ self.url = f"tcp://{conf['org_server']['host']}:{conf['org_server']['port']}"
+ logging.info(f"Client {self.name} created")
+
+ def run(self):
+ logging.info(f"Client {self.name} started")
+ # XXX: should we reconnect on each iteration to save resources?
+ archive = get_archive(**self.archive_conf)
+
+ context = zmq.Context()
+ socket: zmq.Socket = context.socket(zmq.REQ)
+ socket.connect(self.url)
+
+ with get_provenance(**self.storage_conf) as provenance:
+ while True:
+ socket.send(b"NEXT")
+ response = socket.recv_json()
+
+ if response is None:
+ break
+
+ batch = []
+ for origin in response:
+ batch.append(
+ OriginEntry(origin["url"], hash_to_bytes(origin["snapshot"]))
+ )
+ origin_add(provenance, archive, batch)
+ logging.info(f"Client {self.name} stopped")
+
+
+if __name__ == "__main__":
+ # Check parameters
+ if len(sys.argv) != 2:
+ print("usage: client <processes>")
+ exit(-1)
+
+ processes = int(sys.argv[1])
+
+ config_file = None # TODO: add as a cli option
+ if (
+ config_file is None
+ and DEFAULT_PATH is not None
+ and config.config_exists(DEFAULT_PATH)
+ ):
+ config_file = DEFAULT_PATH
+
+ if config_file is None or not os.path.exists(config_file):
+ print("No configuration provided")
+ exit(-1)
+
+ conf = yaml.safe_load(open(config_file, "rb"))["provenance"]
+
+ # Start counter
+ start = time.time()
+
+ # Launch as many clients as requested
+ clients: List[Client] = []
+ for idx in range(processes):
+ logging.info(f"MAIN: launching process {idx}")
+ client = Client(conf, name=f"worker{idx}")
+ client.start()
+ clients.append(client)
+
+ # Wait for all processes to complete their work
+ for client in clients:
+ logging.info(f"MAIN: waiting for process {client.name} to finish")
+ client.join()
+ logging.info(f"MAIN: process {client.name} finished executing")
+
+ # Stop counter and report elapsed time
+ stop = time.time()
+ print("Elapsed time:", stop - start, "seconds")
diff --git a/revisions/server.py b/origins/server.py
similarity index 83%
copy from revisions/server.py
copy to origins/server.py
index 9cf1a1a..2bf9bde 100755
--- a/revisions/server.py
+++ b/origins/server.py
@@ -1,242 +1,234 @@
#!/usr/bin/env python
from datetime import datetime, timezone
from enum import Enum
import gzip
import io
import os
import queue
import sys
import threading
import time
from typing import Any, Callable, Dict, List, Optional
-import iso8601
from swh.core import config
from swh.provenance import get_provenance
from swh.provenance.postgresql.provenance import ProvenanceStoragePostgreSql
import yaml
import zmq
CONFIG_ENVVAR = "SWH_CONFIG_FILENAME"
DEFAULT_BATCH_SIZE = 1
DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, None)
DEFAULT_PORT = 5555
DEFAULT_STATS_RATE = 300
DEFAULT_SKIP_VALUE = 0
UTCEPOCH = datetime.fromtimestamp(0, timezone.utc)
class Command(Enum):
TERMINATE = "terminate"
class StatsWorker(threading.Thread):
def __init__(
self,
filename: str,
storage_conf: Dict[str, Any],
timeout: float = DEFAULT_STATS_RATE,
group: None = None,
target: Optional[Callable[..., Any]] = ...,
name: Optional[str] = ...,
) -> None:
super().__init__(group=group, target=target, name=name)
self.filename = filename
self.queue = queue.Queue()
self.storage_conf = storage_conf
self.timeout = timeout
def get_tables_stats(self, tables: List[str]) -> Dict[str, int]:
# TODO: use ProvenanceStorageInterface instead!
with get_provenance(**self.storage_conf) as provenance:
assert isinstance(provenance.storage, ProvenanceStoragePostgreSql)
stats = {}
for table in tables:
with provenance.storage.transaction(readonly=True) as cursor:
cursor.execute(f"SELECT COUNT(*) AS count FROM {table}")
stats[table] = cursor.fetchone()["count"]
- with provenance.storage.transaction(readonly=True) as cursor:
- cursor.execute(f"SELECT MAX(date) AS date FROM revision")
- stats["maxdate"] = cursor.fetchone()["date"]
return stats
def init_stats(self, filename: str) -> List[str]:
tables = [
- "content",
- "content_in_revision",
- "content_in_directory",
- "directory",
- "directory_in_revision",
- "location",
+ "origin",
"revision",
+ "revision_in_origin",
+ "revision_before_revision",
]
header = ["datetime"]
for table in tables:
header.append(f"{table} rows")
- header.append("revision maxdate")
with io.open(filename, "w") as outfile:
outfile.write(",".join(header))
outfile.write("\n")
return tables
def run(self) -> None:
tables = self.init_stats(self.filename)
start = time.monotonic()
while True:
now = time.monotonic()
if now - start > self.timeout:
self.write_stats(self.filename, self.get_tables_stats(tables))
start = now
try:
cmd = self.queue.get(timeout=1)
if cmd == Command.TERMINATE:
break
except queue.Empty:
continue
def stop(self) -> None:
self.queue.put(Command.TERMINATE)
self.join()
def write_stats(self, filename: str, stats: Dict[str, int]) -> None:
line = [str(datetime.now())]
for _, stat in stats.items():
line.append(str(stat))
with io.open(filename, "a") as outfile:
outfile.write(",".join(line))
outfile.write("\n")
-class RevisionWorker(threading.Thread):
+class OriginWorker(threading.Thread):
def __init__(
self,
filename: str,
url: str,
batch_size: int = DEFAULT_BATCH_SIZE,
limit: Optional[int] = None,
skip: int = DEFAULT_SKIP_VALUE,
group: None = None,
target: Optional[Callable[..., Any]] = ...,
name: Optional[str] = ...,
) -> None:
super().__init__(group=group, target=target, name=name)
self.filename = filename
self.batch_size = batch_size
self.limit = limit
self.queue = queue.Queue()
self.skip = skip
self.url = url
def run(self) -> None:
context = zmq.Context()
socket: zmq.Socket = context.socket(zmq.REP)
socket.bind(self.url)
# TODO: improve this using a context manager
file = (
io.open(self.filename, "r")
if os.path.splitext(self.filename)[1] == ".csv"
else gzip.open(self.filename, "rt")
)
- provider = (line.strip().split(",") for line in file if line.strip())
+ provider = (
+ line.strip().rsplit(",", maxsplit=1) for line in file if line.strip()
+ )
count = 0
while True:
if self.limit is not None and count > self.limit:
break
response = []
- for rev, date, root in provider:
+ for url, snapshot in provider:
count += 1
- if count <= self.skip or iso8601.parse_date(date) <= UTCEPOCH:
+ if count <= self.skip:
continue
- response.append({"rev": rev, "date": date, "root": root})
+ response.append({"url": url, "snapshot": snapshot})
if len(response) == self.batch_size:
break
if not response:
break
# Wait for next request from client
# (TODO: make it non-blocking or add timeout)
socket.recv()
socket.send_json(response)
try:
cmd = self.queue.get(block=False)
if cmd == Command.TERMINATE:
break
except queue.Empty:
continue
while True: # TODO: improve shutdown logic
socket.recv()
socket.send_json(None)
# context.term()
def stop(self) -> None:
self.queue.put(Command.TERMINATE)
self.join()
if __name__ == "__main__":
# TODO: improve command line parsing
if len(sys.argv) < 2:
print("usage: server <filename>")
print("where")
print(
- " filename : csv file containing the list of revisions to be iterated (one per"
- )
- print(
- " line): revision sha1, date in ISO format, root directory sha1."
+ " filename : csv file containing the list of origins to be iterated (one per"
)
+ print(" line): origin url, snapshot sha1.")
exit(-1)
config_file = None # TODO: Add as a cli option
if (
config_file is None
and DEFAULT_PATH is not None
and config.config_exists(DEFAULT_PATH)
):
config_file = DEFAULT_PATH
if config_file is None or not os.path.exists(config_file):
print("No configuration provided")
exit(-1)
conf = yaml.safe_load(open(config_file, "rb"))["provenance"]
# Init stats
- stats = conf["rev_server"].pop("stats", None)
+ stats = conf["org_server"].pop("stats", None)
if stats is not None:
storage_conf = (
conf["storage"]["storage_config"]
if conf["storage"]["cls"] == "rabbitmq"
else conf["storage"]
)
statsfile = f"stats_{datetime.now()}_{stats.pop('suffix')}"
statsworker = StatsWorker(statsfile, storage_conf, **stats)
statsworker.start()
- # Init revision provider
- revsfile = sys.argv[1]
- host = conf["rev_server"].pop("host", None)
- url = f"tcp://*:{conf['rev_server'].pop('port', DEFAULT_PORT)}"
- revsworker = RevisionWorker(revsfile, url, **conf["rev_server"])
- revsworker.start()
+ # Init origin provider
+ orgsfile = sys.argv[1]
+ host = conf["org_server"].pop("host", None)
+ url = f"tcp://*:{conf['org_server'].pop('port', DEFAULT_PORT)}"
+ orgsworker = OriginWorker(orgsfile, url, **conf["org_server"])
+ orgsworker.start()
# Wait for user commands
while True:
try:
command = input("Enter EXIT to stop service: ")
if command.lower() == "exit":
break
except KeyboardInterrupt:
pass
# Release resources
- revsworker.stop()
+ orgsworker.stop()
if stats is not None:
statsworker.stop()
diff --git a/revisions/server.py b/revisions/server.py
index 9cf1a1a..9163b00 100755
--- a/revisions/server.py
+++ b/revisions/server.py
@@ -1,242 +1,242 @@
#!/usr/bin/env python
from datetime import datetime, timezone
from enum import Enum
import gzip
import io
import os
import queue
import sys
import threading
import time
from typing import Any, Callable, Dict, List, Optional
import iso8601
from swh.core import config
from swh.provenance import get_provenance
from swh.provenance.postgresql.provenance import ProvenanceStoragePostgreSql
import yaml
import zmq
CONFIG_ENVVAR = "SWH_CONFIG_FILENAME"
DEFAULT_BATCH_SIZE = 1
DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, None)
-DEFAULT_PORT = 5555
+DEFAULT_PORT = 5556
DEFAULT_STATS_RATE = 300
DEFAULT_SKIP_VALUE = 0
UTCEPOCH = datetime.fromtimestamp(0, timezone.utc)
class Command(Enum):
TERMINATE = "terminate"
class StatsWorker(threading.Thread):
def __init__(
self,
filename: str,
storage_conf: Dict[str, Any],
timeout: float = DEFAULT_STATS_RATE,
group: None = None,
target: Optional[Callable[..., Any]] = ...,
name: Optional[str] = ...,
) -> None:
super().__init__(group=group, target=target, name=name)
self.filename = filename
self.queue = queue.Queue()
self.storage_conf = storage_conf
self.timeout = timeout
def get_tables_stats(self, tables: List[str]) -> Dict[str, int]:
# TODO: use ProvenanceStorageInterface instead!
with get_provenance(**self.storage_conf) as provenance:
assert isinstance(provenance.storage, ProvenanceStoragePostgreSql)
stats = {}
for table in tables:
with provenance.storage.transaction(readonly=True) as cursor:
cursor.execute(f"SELECT COUNT(*) AS count FROM {table}")
stats[table] = cursor.fetchone()["count"]
with provenance.storage.transaction(readonly=True) as cursor:
cursor.execute(f"SELECT MAX(date) AS date FROM revision")
stats["maxdate"] = cursor.fetchone()["date"]
return stats
def init_stats(self, filename: str) -> List[str]:
tables = [
"content",
"content_in_revision",
"content_in_directory",
"directory",
"directory_in_revision",
"location",
"revision",
]
header = ["datetime"]
for table in tables:
header.append(f"{table} rows")
header.append("revision maxdate")
with io.open(filename, "w") as outfile:
outfile.write(",".join(header))
outfile.write("\n")
return tables
def run(self) -> None:
tables = self.init_stats(self.filename)
start = time.monotonic()
while True:
now = time.monotonic()
if now - start > self.timeout:
self.write_stats(self.filename, self.get_tables_stats(tables))
start = now
try:
cmd = self.queue.get(timeout=1)
if cmd == Command.TERMINATE:
break
except queue.Empty:
continue
def stop(self) -> None:
self.queue.put(Command.TERMINATE)
self.join()
def write_stats(self, filename: str, stats: Dict[str, int]) -> None:
line = [str(datetime.now())]
for _, stat in stats.items():
line.append(str(stat))
with io.open(filename, "a") as outfile:
outfile.write(",".join(line))
outfile.write("\n")
class RevisionWorker(threading.Thread):
def __init__(
self,
filename: str,
url: str,
batch_size: int = DEFAULT_BATCH_SIZE,
limit: Optional[int] = None,
skip: int = DEFAULT_SKIP_VALUE,
group: None = None,
target: Optional[Callable[..., Any]] = ...,
name: Optional[str] = ...,
) -> None:
super().__init__(group=group, target=target, name=name)
self.filename = filename
self.batch_size = batch_size
self.limit = limit
self.queue = queue.Queue()
self.skip = skip
self.url = url
def run(self) -> None:
context = zmq.Context()
socket: zmq.Socket = context.socket(zmq.REP)
socket.bind(self.url)
# TODO: improve this using a context manager
file = (
io.open(self.filename, "r")
if os.path.splitext(self.filename)[1] == ".csv"
else gzip.open(self.filename, "rt")
)
provider = (line.strip().split(",") for line in file if line.strip())
count = 0
while True:
if self.limit is not None and count > self.limit:
break
response = []
for rev, date, root in provider:
count += 1
if count <= self.skip or iso8601.parse_date(date) <= UTCEPOCH:
continue
response.append({"rev": rev, "date": date, "root": root})
if len(response) == self.batch_size:
break
if not response:
break
# Wait for next request from client
# (TODO: make it non-blocking or add timeout)
socket.recv()
socket.send_json(response)
try:
cmd = self.queue.get(block=False)
if cmd == Command.TERMINATE:
break
except queue.Empty:
continue
while True: # TODO: improve shutdown logic
socket.recv()
socket.send_json(None)
# context.term()
def stop(self) -> None:
self.queue.put(Command.TERMINATE)
self.join()
if __name__ == "__main__":
# TODO: improve command line parsing
if len(sys.argv) < 2:
print("usage: server <filename>")
print("where")
print(
" filename : csv file containing the list of revisions to be iterated (one per"
)
print(
" line): revision sha1, date in ISO format, root directory sha1."
)
exit(-1)
config_file = None # TODO: Add as a cli option
if (
config_file is None
and DEFAULT_PATH is not None
and config.config_exists(DEFAULT_PATH)
):
config_file = DEFAULT_PATH
if config_file is None or not os.path.exists(config_file):
print("No configuration provided")
exit(-1)
conf = yaml.safe_load(open(config_file, "rb"))["provenance"]
# Init stats
stats = conf["rev_server"].pop("stats", None)
if stats is not None:
storage_conf = (
conf["storage"]["storage_config"]
if conf["storage"]["cls"] == "rabbitmq"
else conf["storage"]
)
statsfile = f"stats_{datetime.now()}_{stats.pop('suffix')}"
statsworker = StatsWorker(statsfile, storage_conf, **stats)
statsworker.start()
# Init revision provider
revsfile = sys.argv[1]
host = conf["rev_server"].pop("host", None)
url = f"tcp://*:{conf['rev_server'].pop('port', DEFAULT_PORT)}"
revsworker = RevisionWorker(revsfile, url, **conf["rev_server"])
revsworker.start()
# Wait for user commands
while True:
try:
command = input("Enter EXIT to stop service: ")
if command.lower() == "exit":
break
except KeyboardInterrupt:
pass
# Release resources
revsworker.stop()
if stats is not None:
statsworker.stop()

File Metadata

Mime Type
text/x-diff
Expires
Thu, Jul 3, 10:43 AM (1 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3245889

Event Timeline