Add an archiver that purpose is to replicate the object storage into another (or many other) location(s)
Details
- Reviewers
ardumont - Group Reviewers
Reviewers - Maniphest Tasks
- T240: content archiver
- Commits
- rDSTOC7afda2189e74: Make the worker asynchronous optional
rDSTOCe32124dd75bf: Change database model and update swh.storage.db api in order to follow those…
rDSTOC0e84344d3a57: Add asynchronous mode
rDSTOCf0388f124e22: Add synchronous first implementation of the archiver
R65:f0388f124e22: Add synchronous first implementation of the archiver
R65:e32124dd75bf: Change database model and update swh.storage.db api in order to follow those…
R65:7afda2189e74: Make the worker asynchronous optional
R65:0e84344d3a57: Add asynchronous mode
rDSTO7afda2189e74: Make the worker asynchronous optional
rDSTO0e84344d3a57: Add asynchronous mode
rDSTOf0388f124e22: Add synchronous first implementation of the archiver
rDSTOe32124dd75bf: Change database model and update swh.storage.db api in order to follow those…
- Archive objects only when it's needed
- Less copies than expected
- The content is missing and not present
- An archive with status 'ongoing' will be rescheduled if its delay have been elapsed.
Diff Detail
- Repository
- rDSTO Storage manager
- Branch
- T240
- Lint
No Linters Available - Unit
No Unit Test Coverage - Build Status
Buildable 93 Build 131: Software Heritage Python tests Build 130: arc lint + arc unit
Event Timeline
The current diff is a Work in Progress waiting for some feedback.
ATM, there is no task queue involved and everything would run on the same thread. If everything is correct, it will be the next step.
swh/storage/archiver/copier.py | ||
---|---|---|
53–66 | If that there is an error during the process, some files may have been copied. What should be the best option? Mark them as 'present', or just ignore them and reschedule the whole batch after the 'ongoing' maximum delay is elapsed? | |
swh/storage/archiver/director.py | ||
158–212 | Please, ignore that as it is only used for debug purposes. However, that brings up a question. The table content_archive contains a field status that may be 'missing'. Does that means that everything in the content table will be in the content_data table as 'missing' at the start? | |
swh/storage/tests/test_archiver.py | ||
59–72 | I had to do some db commits because the archiver directly uses the local storage's db. As the test class uses it's own cursor, the archiver would not have been aware of these changes otherwise. |
swh/storage/archiver/__init__.py | ||
---|---|---|
1 | why the # noqa? | |
swh/storage/archiver/copier.py | ||
16 | a list of sha1`s` | |
50 | I think it might work without needing to consume the map. Can you refresh my memory, why do you need to strip the first 2 characters (is it '\\x')? | |
53–66 | As a first approximation, considering nothing is done is reasonable. | |
56 | self.server.content_add(map(lambda c: c['data'], contents)) ? | |
58 | You may want to avoid writing your doubts in the code and use differential for that (as you also did below). I like to use, TODO (you'll need to think more and improve the code), FIXME (you saw something ugly but this is not the time to fix it) , HACK (you had to do something horribly ugly but did not see any other way around it) followed by some concise description about what this is all about. | |
64 | Well, it depends on how you see the asynchroneous part work... If archive-director spawns asynchroneous archive-workers which also spawns asynchroneous archive-copiers, it could be simpler to do what you propose (database access here)... But if the async activity stops at archive-workers meaning only archive-directory spawns async archive-workers. | |
swh/storage/archiver/director.py | ||
29 | which one is needed as backup? | |
39 | c`o`py` | |
84 | content`s` | |
94 | content`s` | |
99 | content`s` | |
115 | storage slaves | |
145 | archiva`l` | |
158–212 |
Ok.
Possibly. I see 2 problematics:
But that is for later.
To have an approximate size (because it's dayly updated) check https://archive.softwareheritage.org/api/1/stat/counters/.
I think that bootstraping this and dealing with prior data are 2 separate tasks. | |
swh/storage/archiver/worker.py | ||
13 | Indeed but we have abstracted it. | |
14 | should probably extend``. | |
20 | ha`s` | |
37 | content's location | |
49 | archiva`l` | |
52 | amou`n`t | |
82 | If it's an update, call it update ^^ | |
87 | change its status | |
90 | content id | |
94 | change`s` | |
111 | We hide the sql queries in db usually, it's clearer to continue that way. | |
swh/storage/tests/test_archiver.py | ||
59–72 | Using self.conn? |
ATM, there is no task queue involved and everything would run on the same thread.
Ok.
Were you able to run your code aside from the tests?
If everything is correct, it will be the next step.
Everything seems correct ^^ (aside for the itsy bitsy tini mini remarks :D)
swh/storage/archiver/__init__.py | ||
---|---|---|
1 | Ok, discussing with Quentin, it's due to the pre-commit-hook we have installed which refuses to let the user commits if there are some pep8 violations. And, as the module is imported but in appearance not used, for flake8, it's a fail. We need to find a way to improve on this (or not). I know it's possible to have some setup file for flake8 to ignore some errors (this one is F401). Also, i found this which relates to this http://stackoverflow.com/questions/31079047/python-pep8-class-in-init-imported-but-not-used. Following the proposed solution, this'd give something like: from .director import ArchiverDirector from .worker import ArchiverWorker from .copier import ArchiverCopier __all__ = ['ArchiverDirector', 'ArchiverWorker', 'ArchiverCopier'] I don't know if this is reasonable or not but it works. | |
swh/storage/archiver/copier.py | ||
53–66 |
What i meant was, as long as you can write and overwrite existing contents again, i think it's ok. |
Were you able to run your code aside from the tests?
I've done some tests with a local backup and very small samples but it was on an early version.
I have no idea how the task queue works. Aside the inheritance from swh.scheduler.task.Task, what should I do to the worker to make it asynchronous?
swh/storage/archiver/copier.py | ||
---|---|---|
53–66 | Shouldn't create any problem (ATM) as the object storage just write files without checking if they already exists. | |
56 | Just checked : RemoteObjStorage::content_add is linked to ObjStorage::add_bytes that only take a single content. | |
64 | Now that I think about it , the specification is kinda blurry about what precisely is asynchronous. But I understood it as Director spawning async workers that execute copier's code. |
Ok. Can you please check with the current version?
I have no idea how the task queue works. Aside the inheritance from swh.scheduler.task.Task, what should I do to the worker to make it asynchronous?
We usually create a task.py module (here that would be swh.storage.archiver.task) in which we declare an inherited Task class .
(https://forge.softwareheritage.org/diffusion/DSCH/browse/master/swh/scheduler/task.py)
This class will contain the async code to execute (here, following the actual code, we'd delegate the job to ArchiveCopier inside the new task in the run function passing every need state information as function parameter for example).
This task declares a queue name to use for receiving message and a method run (for example swh_storage_archive_copier)
The queue is what is used to communicate the job to do (the producer sends a message to a queue, the worker picks up the message and starts working).
The function code in the run is the actual code being executed by a celery worker.
Some tasks example: https://forge.softwareheritage.org/diffusion/61/browse/master/swh/fetcher/googlecode/tasks.py.
Then you'd need to adapt the current ArchiveDirector to produce a message (that says, here, do some work with that batch of contents).
So instead of instantiating ArchiveWorker directory (swh/storage/archiver/director.py line 88), you'd need to retrieve from the app the task you want and delay it (this is the message sending part).
Here is some example: https://forge.softwareheritage.org/diffusion/61/browse/master/swh/fetcher/googlecode/checker_producer.py;9a428e0f2b908db390286a96fe4e559b592edb9b$29
Then you'd also need to adapt how you trigger everything.
Add asynchronous code for the archiver
- Change database model and update swh.storage.db api in order to follow those changes
- Add synchronous first implementation of the archiver
- Add tests for the archiver
- Add asynchronous mode
- Fix storage instantiation problem
- Align ArchiverWorker parameter orders
- Extract imports from the method to top-level
- Fix docstring
swh/storage/objstorage/api/server.py | ||
---|---|---|
68–69 ↗ | (On Diff #67) | Thats not clean, and will be in its own commit (its own diff maybe?). |
- Add asynchronous mode
- Fix storage instantiation problem
- Align ArchiverWorker parameter orders
- Extract imports from the method to top-level
- Fix docstring
- Change server mains in order to change default port
- Make the worker asynchronous optional
- Change database model and update swh.storage.db api in order to follow those changes
- Add synchronous first implementation of the archiver
- Add asynchronous mode
- Make the worker asynchronous optional
- Add synchronous first implementation of the archiver
- Add asynchronous mode
- Make the worker asynchronous optional