Page MenuHomeSoftware Heritage

Declare new service worker to consume save code now queues
ClosedPublic

Authored by ardumont on Apr 12 2021, 5:02 PM.

Details

Summary

The scheduler runner will route the save code now queries to those queues. So we need to
define a new worker with loaders subscribed to those new queues.

Related to T3084

Test Plan

octocatalog on workers (staging, prod):

$ bin/octocatalog-diff --octocatalog-diff-args --no-truncate-details --to staging worker0.internal.staging.swh.network
...
*** Running octocatalog-diff on host worker0.internal.staging.swh.network
I, [2021-04-12T16:58:04.687667 #11853]  INFO -- : Catalogs compiled for worker0.internal.staging.swh.network
I, [2021-04-12T16:58:05.891259 #11853]  INFO -- : Diffs computed for worker0.internal.staging.swh.network
diff origin/production/worker0.internal.staging.swh.network current/worker0.internal.staging.swh.network
*******************************************
+ Concat_fragment[profile::cron::swh-worker-loader_high_priority-autorestart] =>
   parameters =>
      "order": "10"
      "tag": "profile::cron::swh-worker"
      "target": "profile::cron::swh-worker"
      "content": >>>
# Cron snippet swh-worker-loader_high_priority-autorestart
13-58/15 * * * * root chronic /usr/local/sbin/swh-worker-ping-restart loader_high_priority@worker0.internal.staging.swh.network loader_high_priority
<<<
*******************************************
+ File[/etc/softwareheritage/loader_high_priority.yml] =>
   parameters =>
      "ensure": "present"
      "group": "swhworker"
      "mode": "0644"
      "owner": "swhworker"
      "content": >>>
---
storage:
  cls: pipeline
  steps:
  - cls: buffer
    min_batch_size:
      content: 1000
      content_bytes: 52428800
      directory: 1000
      revision: 1000
      release: 1000
  - cls: filter
  - cls: retry
  - cls: remote
    args:
      url: http://storage1.internal.staging.swh.network:5002/
max_content_size: 104857600
celery:
  task_broker: amqp://swhconsumer:swh-deploy-worker-task_broker-password@scheduler0.internal.staging.swh.network:5672/%2f
  task_queues:
  - swh.loader.git.tasks.UpdateGitRepository
  - swh.loader.git.tasks.LoadDiskGitRepository
  - swh.loader.git.tasks.UncompressAndLoadDiskGitRepository
  - swh.loader.mercurial.tasks.LoadMercurial
  - swh.loader.mercurial.tasks.LoadArchiveMercurial
  - swh.loader.svn.tasks.LoadSvnRepository
  - swh.loader.svn.tasks.MountAndLoadSvnRepository
  - swh.loader.svn.tasks.DumpMountAndLoadSvnRepository
<<<
*******************************************
+ File[/etc/systemd/system/swh-worker@loader_high_priority.service.d/parameters.conf] =>
   parameters =>
      "ensure": "file"
      "group": "root"
      "mode": "0444"
      "notify": ["Class[Systemd::Systemctl::Daemon_reload]"]
      "owner": "root"
      "show_diff": true
      "content": >>>
# Managed by puppet - modifications will be overwritten
# In defined class profile::swh::deploy::worker::instance

[Service]
Environment=CONCURRENCY=1
Environment=MAX_TASKS_PER_CHILD=100
Environment=LOGLEVEL=info
<<<
*******************************************
+ File[/etc/systemd/system/swh-worker@loader_high_priority.service.d] =>
   parameters =>
      "ensure": "directory"
      "group": "root"
      "owner": "root"
      "purge": true
      "recurse": true
*******************************************
+ Profile::Cron::D[swh-worker-loader_high_priority-autorestart] =>
   parameters =>
      "command": "chronic /usr/local/sbin/swh-worker-ping-restart loader_high_priority@worker0.internal.staging.swh.network loader_high_priority"
      "minute": "fqdn_rand/15"
      "target": "swh-worker"
      "unique_tag": "swh-worker-loader_high_priority-autorestart"
      "user": "root"
*******************************************
+ Profile::Swh::Deploy::Worker::Instance[loader_high_priority] =>
   parameters =>
      "ensure": "present"
      "instance_name": "loader_high_priority"
      "merge_policy": "deep"
      "sentry_name": "loader_high_priority"
*******************************************
+ Service[swh-worker@loader_high_priority] =>
   parameters =>
      "enable": true
*******************************************
+ Systemd::Dropin_file[swh-worker@loader_high_priority/parameters.conf] =>
   parameters =>
      "daemon_reload": "lazy"
      "ensure": "present"
      "filename": "parameters.conf"
      "group": "root"
      "mode": "0444"
      "owner": "root"
      "path": "/etc/systemd/system"
      "show_diff": true
      "unit": "swh-worker@loader_high_priority.service"
      "content": >>>
# Managed by puppet - modifications will be overwritten
# In defined class profile::swh::deploy::worker::instance

[Service]
Environment=CONCURRENCY=1
Environment=MAX_TASKS_PER_CHILD=100
Environment=LOGLEVEL=info
<<<
*******************************************
*** End octocatalog-diff on worker0.internal.staging.swh.network
$ bin/octocatalog-diff --octocatalog-diff-args --no-truncate-details --to staging worker01
...
*** Running octocatalog-diff on host worker01.softwareheritage.org
I, [2021-04-12T16:58:48.403207 #17232]  INFO -- : Catalogs compiled for worker01.softwareheritage.org
I, [2021-04-12T16:58:49.538970 #17232]  INFO -- : Diffs computed for worker01.softwareheritage.org
diff origin/production/worker01.softwareheritage.org current/worker01.softwareheritage.org
*******************************************
+ Concat_fragment[profile::cron::swh-worker-loader_high_priority-autorestart] =>
   parameters =>
      "order": "10"
      "tag": "profile::cron::swh-worker"
      "target": "profile::cron::swh-worker"
      "content": >>>
# Cron snippet swh-worker-loader_high_priority-autorestart
6-51/15 * * * * root chronic /usr/local/sbin/swh-worker-ping-restart loader_high_priority@worker01.internal.softwareheritage.org loader_high_priority
<<<
*******************************************
+ File[/etc/softwareheritage/loader_high_priority.yml] =>
   parameters =>
      "ensure": "present"
      "group": "swhworker"
      "mode": "0644"
      "owner": "swhworker"
      "content": >>>
---
storage:
  cls: pipeline
  steps:
  - cls: buffer
    min_batch_size:
      content: 1000
      content_bytes: 52428800
      directory: 1000
      revision: 1000
      release: 1000
  - cls: filter
  - cls: retry
  - cls: remote
    args:
      url: http://saam.internal.softwareheritage.org:5002/
max_content_size: 104857600
celery:
  task_broker: amqp://swhconsumer:swh-deploy-worker-task_broker-password@rabbitmq:5672/%2f
  task_queues:
  - swh.loader.git.tasks.UpdateGitRepository
  - swh.loader.git.tasks.LoadDiskGitRepository
  - swh.loader.git.tasks.UncompressAndLoadDiskGitRepository
  - swh.loader.mercurial.tasks.LoadMercurial
  - swh.loader.mercurial.tasks.LoadArchiveMercurial
  - swh.loader.svn.tasks.LoadSvnRepository
  - swh.loader.svn.tasks.MountAndLoadSvnRepository
  - swh.loader.svn.tasks.DumpMountAndLoadSvnRepository
<<<
*******************************************
+ File[/etc/systemd/system/swh-worker@loader_high_priority.service.d/parameters.conf] =>
   parameters =>
      "ensure": "file"
      "group": "root"
      "mode": "0444"
      "notify": ["Class[Systemd::Systemctl::Daemon_reload]"]
      "owner": "root"
      "show_diff": true
      "content": >>>
# Managed by puppet - modifications will be overwritten
# In defined class profile::swh::deploy::worker::instance

[Service]
Environment=CONCURRENCY=1
Environment=MAX_TASKS_PER_CHILD=100
Environment=LOGLEVEL=info
<<<
*******************************************
+ File[/etc/systemd/system/swh-worker@loader_high_priority.service.d] =>
   parameters =>
      "ensure": "directory"
      "group": "root"
      "owner": "root"
      "purge": true
      "recurse": true
*******************************************
+ Profile::Cron::D[swh-worker-loader_high_priority-autorestart] =>
   parameters =>
      "command": "chronic /usr/local/sbin/swh-worker-ping-restart loader_high_priority@worker01.internal.softwareheritage.org loader_high_priority"
      "minute": "fqdn_rand/15"
      "target": "swh-worker"
      "unique_tag": "swh-worker-loader_high_priority-autorestart"
      "user": "root"
*******************************************
+ Profile::Swh::Deploy::Worker::Instance[loader_high_priority] =>
   parameters =>
      "ensure": "present"
      "instance_name": "loader_high_priority"
      "merge_policy": "deep"
      "sentry_name": "loader_high_priority"
*******************************************
+ Service[swh-worker@loader_high_priority] =>
   parameters =>
      "enable": true
*******************************************
+ Systemd::Dropin_file[swh-worker@loader_high_priority/parameters.conf] =>
   parameters =>
      "daemon_reload": "lazy"
      "ensure": "present"
      "filename": "parameters.conf"
      "group": "root"
      "mode": "0444"
      "owner": "root"
      "path": "/etc/systemd/system"
      "show_diff": true
      "unit": "swh-worker@loader_high_priority.service"
      "content": >>>
# Managed by puppet - modifications will be overwritten
# In defined class profile::swh::deploy::worker::instance

[Service]
Environment=CONCURRENCY=1
Environment=MAX_TASKS_PER_CHILD=100
Environment=LOGLEVEL=info
<<<
*******************************************
*** End octocatalog-diff on worker01.softwareheritage.org

No change when diffing this on azurfe workers (indexer)

Diff Detail

Repository
rSPSITE puppet-swh-site
Lint
Automatic diff as part of commit; lint not applicable.
Unit
Automatic diff as part of commit; unit tests not applicable.

Event Timeline

ardumont created this revision.
data/common/common.yaml
2239

It's in another commit pushed in the same diff.
Rationale is explained in the commit, it's never left as is and always overriden so this hard-coded list does not reflect reality at all.
So my take is to make it empty.

I need to reflect a bit more on this regarding the queue subscription that needs configuration here.

Subscribe new loaders to future new high priority message

Those messages will need to be defined accordingly in loaders (and deployed first)

olasd added inline comments.
data/common/common.yaml
2239

Sounds good to me!

2270–2276

These queue names are somewhat ambiguous.

I think queue names are arbitrary UTF-8, so I'd suggest something like save_code_now:<task FQN> ?

site-modules/profile/manifests/swh/deploy/worker/loader_deposit.pp
10–12 ↗(On Diff #19624)

I think this change is a mispaste?

data/common/common.yaml
2270–2276

Yes, they are. I'm not sure i get it.

Do you mean for example:

...
- save_code_now:swh.loader.svn.tasks.LoadSvnRepositoryHigh ?
...

What will be the end result in rabbitmq?

One queue named save_code_now with subscription to all those messages?

Or all separated queues with a prefix save_code_now:?

(i will check ;)

site-modules/profile/manifests/swh/deploy/worker/loader_deposit.pp
10–12 ↗(On Diff #19624)

quite!

data/common/common.yaml
2270–2276

Also what is the required change for D5488 (if any)?

data/common/common.yaml
2270–2276

Tryout suggests the latter:

[loaders]                 .> celery           exchange=celery(direct) key=celery
[loaders]                 .> save_code_now:swh.loader.git.tasks.UpdateGitRepositoryHigh exchange=save_code_now:swh.loader.git.tasks.UpdateGitRepositoryHigh(direct) key=save_code_now:swh.loader.git.tasks.UpdateGitRepositoryHigh

So the change required in the other diff would be to prefix save_code_now: in the message as well.
Otherwise, it's not consumed.
...

Although I cannot make it work... so far with no obvious error to show

data/common/common.yaml
2270

Furthermore, note that I don't understand what setup is required for the new worker to
consume from a different queue than the one it's hard-coded for in the
swh.loader.<type>.tasks module.

The configuration list done here might be sufficient to answer myself here, i think...

Adapt according to suggestions

Rebase and rework commit message

ardumont retitled this revision from Add new loader_high_priority systemd unit to Declare new service worker to consume save code now queues.Apr 14 2021, 5:31 PM
ardumont edited the summary of this revision. (Show Details)
This revision is now accepted and ready to land.Apr 14 2021, 5:36 PM