Page MenuHomeSoftware Heritage

Next generation archive counters
Closed, ResolvedPublic

Description

The current approach to the archive counters involves a significant amount of interaction with the main database, and this may lead to critical issues, as we have seen in T2828, so we need to find a way to maintain accurate counters without overloading the database.

A first attempt to maintain the archive counters without database interaction was made by having each loader adding to the global counters the number of new objects encountered. Unfortunately, this approach led to the counters ending up showing a value greater than the real one: indeed, if several workers happen to ingest forks of a same new project (and it does happen!), then the new objects are counted multiple times.

This tasks proposes a simple alternative approach to maintain archive counters that both drastically reduces interaction with the main database and avoids counter overrun.

The ArchiveCounter component

The key to the approach is a new ArchiveCounter service that maintains one standard Bloom filters and a global counter for each of the different kind of objects that are counted (contents, directories, revisions, releases, snapshots, origins). The Bloom filter allows very fast answers to the question "is this object new?" with no false negatives and a controlled probability of false positives. The global counter keeps the running count of the new objects seen so far (up to the probability of false positives).

In the following, we detail the operation just for the case of contents.

Loaders send to the ArchiveCounter the list newcontents of identifiers of the (supposedly) new contents they encounter, and ArchiveCounter performs the following simple operation:

for id in newcontents:
      if not(BloomFilter.add(id)):    # BloomFilter.add returns True if id considered known
         contentcounter += 1

The operation BloomFilter.add will return a false positive (i.e. consider a content as already seen, while it is actually new, hence resulting in counter underrun), with a probability that we can control. For example (see details), we can get a 1% probability of false positives with up to 20billion objects with a 24GB Bloom filter and 7 hash functions, and we can go to 0.1% with a 36GB Bloom filter and 10 hash functions.

With these parameters, at any moment in time contentcounter contains an underapproximation of the real number of contents stored in the archive that is totally acceptable for our use case.

Every so often, one can correct the counter drift by running an expensive query on the database, and update contentcounter accordingly.

Algorithmic remarks

Dillinger and Panagiotis show in their 2004 paper "Bloom Filters in Probabilistic Verification", that one can resort to using just two hash functions, which reduces the computation cost w.r.t. the standard Bloom filter construction.

But in our case, the "keys" are already randomly distributed, as they are SHA1 cryptographic hashes! So we could simply skip the Bloom hashing phase altogether, and do the 2 hashing construction using (fragments of) the key to be inserted in the filter.

Initialisation

In order to get to nominal operation, one can progressively fill the Bloom filters as follow:

  1. turn ArchiveCounter on, with an empty Bloom filter
  2. reconfigure loaders (or swh-storage) to write to ArchiveCounter
  3. dump the list of hashes from swh-storage or swh-journal
  4. switch the archive counters to use ArchiveCounter

This is sounds because all objects created after step 2 will be written directly, and all objects created before step 3 will be backfilled by step 3. (and the overlap is dealt with by ArchiveCounter)

Implementation

Implementing a Bloom filter is straightforward, and there are many libraries available, but with our space requirements, we need to benchmark existing implementations. I came across this Python3 library that has the added benefit of using a memory mapped data structure, that allows to save the filter on disk, and it could be a good candidate, but the ultimate choice needs to be made based on performance and stability, independently of the programming language used.

Resources
Dedicated page on Devopedia

Event Timeline

There are a very large number of changes, so older changes are hidden. Show Older Changes

A Python library may be an issue, as it requires a central process with a global lock. Sharding by hash may fix the issue, though.

ArchiveCounter need to be filled up with all the known objects in the database: this requires stopping archival

Not necessarily. We can also do it like this:

  1. turn ArchiveCounter on, with an empty Bloom filter
  2. reconfigure loaders (or swh-storage) to write to ArchiveCounter
  3. dump the list of hashes from swh-storage or swh-journal
  4. switch the archive counters to use ArchiveCounter

This is sounds because all objects created after step 2 will be written directly, and all objects created before step 3 will be backfilled by step 3. (and the overlap is dealt with by ArchiveCounter)

A Python library may be an issue, as it requires a central process with a global lock. Sharding by hash may fix the issue, though.

Sure, no obligation to stick to Python for this (the underlying implementation of the library pointed above is in C/C++)

ArchiveCounter need to be filled up with all the known objects in the database: this requires stopping archival

Not necessarily. We can also do it like this:

  1. turn ArchiveCounter on, with an empty Bloom filter
  2. reconfigure loaders (or swh-storage) to write to ArchiveCounter
  3. dump the list of hashes from swh-storage or swh-journal
  4. switch the archive counters to use ArchiveCounter

This is sounds because all objects created after step 2 will be written directly, and all objects created before step 3 will be backfilled by step 3. (and the overlap is dealt with by ArchiveCounter)

Good point!

Updated the proposal with your suggestions, thanks!

Thanks for sketching out this proposal! It looks quite promising (and neat!).

The current PostgreSQL implementation of counters works, but we're walking a tight rope balancing resource usage/efficiency/usefulness, and there's a concern that Cassandra will completely blow this up. It's great to be exploring a more effective design, that should be more durable than any given storage implementation.

I think the basic design, as well as the initialization plan, are sound.

The object types and amounts we want to count are:

object typecurrent countrelative to content
content9555554872100
directory804083871384.14
revision200783052221.01
origin_visit8687425599.091
origin1504653241.574
snapshot1423127301.489
release167441480.1752
skipped_content1407870.001473

Supposedly, the growth of the origin_visit table will be faster than that of the snapshot table, which will be higher than that of the origin table itself. We should definitely tune the size of bloom filters accordingly, to avoid using tons of memory for not much use.

I think we should be able to decouple these counters completely from the loaders, and have them directly updated/handled by a client of the swh-journal. This would be a "centralized" component, but which we can parallelize quite heavily thanks to basic kafka design. We can also leverage the way kafka clients do parallelism to sidestep the locking issues arising in a potentially distributed filter.

There's a few pros and cons to the "full journal client" approach that I can think of:

  • cons:
    • the journal has very slightly more objects than the database, as some objects can be written to the journal and be eventually rejected by postgres
      • (this is a transient issue and a very overwhelming majority of objects should eventually make it into postgres; that overcount can be handled within the false positive margin of the bloom filters)
    • one more separate component to manage
      • (but we need to manage the component that would allow distributed access to the bloom filters for workers anyway)
    • centralized load in the counter management component instead of diffuse load across workers (it's also a pro!)
    • if we use sharded bloom filters/counters along the lines of partitioning for kafka, correcting the counters for false positives is harder than if we shard objects using their SWHID (a count of objects within a range of SWHIDs can be counted with a postgres query, while we'd need to roll our own "get an exact count of unique swhids per partition" journal client).
  • pros:
    • no additional load or even code on the loaders
    • centralized monitoring and "load tuning" of the counter component via the monitoring of kafka consumer offsets
    • built-in parallelization design with guaranteed consistent sharding of objects (orthogonal to the swhids) removes the filter locking issue: we can map one bloom filter per kafka partition, which will only ever be consumed by a single client process at once. Considering the "natural kafka sharding" is orthogonal to the object SWHIDs, we should be able to use the SWHID directly as input to the bloom filter without fear of inefficiency caused by non-uniform SWHIDs in a given shard.
    • using a journal client removes the need for a specific filter initialization step; we initialize using the full journal, and we can switch over the "display" when the consumer offset is small enough

I'm also having the "full journal" approach in mind after a quick reading of this neat proposal :-)

In T2912#55849, @olasd wrote:

Thanks for sketching out this proposal! It looks quite promising (and neat!).

Happy you like it!

I think we should be able to decouple these counters completely from the loaders, and have them directly updated/handled by a client of the swh-journal. This would be a "centralized" component, but which we can parallelize quite heavily thanks to basic kafka design. We can also leverage the way kafka clients do parallelism to sidestep the locking issues arising in a potentially distributed filter.

Maybe my writing was not all that clear: I also had in mind a single centralised component (the ArchiveCounter) per Bloom filter, receiving the lists newcontents of ids from the loaders.
Getting the feed of ids from swh-journal instead of from the loaders is really neat: we avoid touching the loader code, and we gain a better capability of monitoring the load on the ArchiveCounter, so I'm all for it :-)

A few comments about the cons below:

There's a few pros and cons to the "full journal client" approach that I can think of:

  • cons:
    • the journal has very slightly more objects than the database, as some objects can be written to the journal and be eventually rejected by postgres
      • (this is a transient issue and a very overwhelming majority of objects should eventually make it into postgres; that overcount can be handled within the false positive margin of the bloom filters)

Considering the Bloom filter slight underrun, and the marginality of the phaenomenon, this seems not to be an issue indeed.

  • one more separate component to manage
    • (but we need to manage the component that would allow distributed access to the bloom filters for workers anyway)

Agree, this is not really an issue.

  • centralized load in the counter management component instead of diffuse load across workers (it's also a pro!)

I see it also as a big pro: we can control better the load if we have only one source of inputs, instead of a gazillion loaders. We should expect much higher throughput in our case than with a standard Bloom filter, though, as we can bypass the costly hash computations by using (fragments of) the SWHIDs as the two hashes that are sufficient for the construction proposed by Dillinger and Panagiotis. Some care needs to be taken (when all bits happen to be 0), but the speedup should be significant, compared to computing 10 hashes. A real test will tell :-)

  • if we use sharded bloom filters/counters along the lines of partitioning for kafka, correcting the counters for false positives is harder than if we shard objects using their SWHID (a count of objects within a range of SWHIDs can be counted with a postgres query, while we'd need to roll our own "get an exact count of unique swhids per partition" journal client).

If the filters will be as fast as I hope (see point above), I believe we can avoid this extra complication, and just keep one filter per counter.

In T2912#55849, @olasd wrote:

I think we should be able to decouple these counters completely from the loaders, and have them directly updated/handled by a client of the swh-journal. This would be a "centralized" component, but which we can parallelize quite heavily thanks to basic kafka design. We can also leverage the way kafka clients do parallelism to sidestep the locking issues arising in a potentially distributed filter.

Maybe my writing was not all that clear: I also had in mind a single centralised component (the ArchiveCounter) per Bloom filter, receiving the lists newcontents of ids from the loaders.
Getting the feed of ids from swh-journal instead of from the loaders is really neat: we avoid touching the loader code, and we gain a better capability of monitoring the load on the ArchiveCounter, so I'm all for it :-)

It looks like you already agree, but FWIW I'd also would like to have a dedicated (micro)service that keeps an up-to-date bloom filter for the entire archive, with a REST API.
It might be useful for other use cases (swh-scanner comes to mind, but I'm sure we'll find others as time passes).

It looks like you already agree, but FWIW I'd also would like to have a dedicated (micro)service that keeps an up-to-date bloom filter for the entire archive, with a REST API.
It might be useful for other use cases (swh-scanner comes to mind, but I'm sure we'll find others as time passes).

Very good point! So we have an extra reason to move forward ;-)

It seems redis has a Hyperloglog functionnality[1] that can match with the requirements (bloom filter / limited deviation / small memory footprint / efficiency).

I made a small poc[2] to count the number of content objects from the content topic and it seems to work quite well with less than 1% of deviation with 130 000 000 content objects in staging (it tooks ~ 2hours on my laptop)

The usage is quite simple with basically 2 redis instructions :

To add an element (the key can be used as it without deserializing it) :

redis_client.pfadd(collection, message.key)

To retrieve the estimated number of elements :

current_count = redis_client.pfcount(collection)

Perhaps it can help to have a first simple version focused on the plumbing.

[1] https://redis.io/commands/pfcount / http://antirez.com/news/75
[2] https://forge.softwareheritage.org/source/snippets/browse/master/vsellier/counters/counter.py

Thanks @vsellier

Bloom filters are still on the table for other use cases, like testing super quickly for contents that we do not have, but if nobody has strong objections, this seems the way to go for the counters (very small footprint, small under/over counting errors, thanks Philippe Flajolet's magic :-))

For information, the poc was launched on the content topic of production, the results seems to be acceptable with a count a little more important on the redis counter, probably due to some messages sent to kafka but not persisted in the database .

Database countredisRatio
9 737 467 0159 774 001 5721.003751957

thanks Philippe Flajolet's magic :-))

I agree it really feels like magic to have this working only with a 12kb collection

❯ time psql service=mirror-swh -P pager=off -c "select now(), count(*) from content"
              now              |   count    
-------------------------------+------------
 2021-01-29 08:47:32.527878+00 | 9737467015
(1 row)

Redis count :

01-29-2021 09:47:30 redis_counter=9774001572 (0m/s redis: 0.000000s)                                                                                                                      
01-29-2021 09:47:35 redis_counter=9774001572 (0m/s redis: 0.000000s)

I don't think this solves the issue of overestimating the number of objects, when two threads insert the same objects at the same time.

With PostgreSQL you can probably get the number of inserted objects after committing the transaction and increment the counter with that number; but that is not true with Cassandra, because it is only eventually consistent and will silently merge identical rows.

I'm not sure to understand, the hyperloglog function is precisely used to deduplicate the messages based on their keys (at least in the poc).

The HyperLogLog data structure can be used in order to count unique elements in a set using just a small constant amount of memory, specifically 12k bytes for every HyperLogLog (plus a few bytes for the key itself).

(from https://redis.io/commands/pfcount)

A not covered situation I see, is when entries are inserted in the journal but the database insert is rollbacked.

I don't think this solves the issue of overestimating the number of objects, when two threads insert the same objects at the same time.

! In T2912#57655, @vsellier wrote:

I'm not sure to understand,

Just to clarify: both the Bloom filter approach and HyperLogLog are probabilistic, so one gets an estimate and not the real value.
The difference is that with a Bloom filter we are guaranteed to always underestimate the real value, while HyperLogLog may underestimate or overestimate.
Underestimating seems preferable as one can always bump up the shown counter to the real value every now an then by updating a drift counter.
With an overestimation, updating to the real value every now and then may show up as a negative bump in the graph (as it happened before) which is counterintuitive for our users.

But if it turns out we can keep the error below 1%, then it becomes quite attractive to use up only 12k instead of several Gbs, and one can find a way to smooth the graph when a catch up is needed.

The question is whether the error figure will stay below 1% even for contents, revisions and directories, which are in the range of 10^10.

As said before, there are other uses for the Bloom filter approach so we'll need to look into it in any case, but if we have a quick and cheap satisfactory solution for the counters, why not ... :-)

Oh indeed, it sounds good then. :)

This is the results for the count of the directories and revisions (the content is still running, so there is some fresh statistics) :

Object typeDatabase countCounterRatio
Content9 738 530 5179 774 472 0951.0037
Directory8 194 371 0798 188 615 2210.9993
Revision2 044 980 8132 059 724 0311.0072

It seems to remains in the <1% range

Thanks @vsellier, that seems quite ok indeed. The only question left is to know if the estimator implemented is monotonic (i.e. we will never have negative bumps in the graph :-))

Thanks @vsellier, that seems quite ok indeed. The only question left is to know if the estimator implemented is monotonic (i.e. we will never have negative bumps in the graph :-))

The question is interesting per se, but may I suggest (for reasons discussed in the past) to just remove the graphs from the main archive.s.o page (we can keep them in grafana or elsewhere), instead of adding this as a requirement for what otherwise already looks like a good solution to the efficient counter problem?

In T2912#58063, @zack wrote:

Thanks @vsellier, that seems quite ok indeed. The only question left is to know if the estimator implemented is monotonic (i.e. we will never have negative bumps in the graph :-))

may I suggest (for reasons discussed in the past) to just remove the graphs from the main archive.s.o page

We decided to keep the counters.

The question is not an abstract one: there are implementations of HyperLogLog that are monotonic, maybe the Redis one is already, we just need to know.

Another bonus point with this approach is that we could also unstuck the indexer
counters (graphs for those are stuck since november 2020) [1] [2]

They share the same difficulties when it comes to count data there.

[1] https://grafana.softwareheritage.org/goto/oMRgRsLGk

[2] the indexer storage is able to write to its own topics now, I don't recall
immediately the blocker of why that's not deployed yet (there is one but anyhow,
unrelated)

The question is not an abstract one: there are implementations of HyperLogLog that are monotonic, maybe the Redis one is already, we just need to know.

Yes sure, it's important, I will record the counters values the next few days to be sure the progression is indeed monotonic

I asked one of the authors of the original HyperLogLog paper (not Philippe, that unfortunately passed away years ago :-()
The original HyperLogLog has three different behaviour, one for small cardinals, another for median cardinals, and a third for very large cardinals.
There is indeed a risk of breaking monotonicity at the boundaries between segments, but in each segment it is monotonic.
Our counters are already in the "very large cardinal" zone, so we should be safe with any implementation.

So, at this point, I would say it's a definite go this solution.
Thanks @vsellier :-)

Nice, thanks for confirming this at the source.

This is the behavior of the counters since my last comment (~3 days)
They seem to increase step by steps but looks monotonic on this short period.

Directories:

Contents:

Revisions:

@vsellier nice. Note that if we draw these with a y-axis starting from 0, the step shape will be really negligible, so IMHO it's really not a problem.

I wrote a proposal for the next steps [1] so we could start the work on these counters. All comments/contributions are welcome.

[1] https://hedgedoc.softwareheritage.org/s/hIqe-DAOL

vsellier changed the task status from Open to Work in Progress.Mar 5 2021, 11:07 AM
vsellier claimed this task.

Let's start the subject ;)

Staging webapp[1] and webapp1 on production [2] are now configured to use swh-counters to display the historical values and the live object counts.

As planned, the authors field is no longer present as the information is not directly available from the journal.
If the field needs to be reactivated, it will be possible to compute the information from the revisions and the releases but the journal client will need to be improved.

If the final result is considered correct, the configuration can be deployed on the main webapp.

The old counters could be let in place for some time in case a rollback is needed.

[1] https://webapp.staging.swh.network
[2] https://webapp1.internal.softwareheritage.org

Staging webapp[1] and webapp1 on production [2] are now configured to use swh-counters to display the historical values and the live object counts.

Great to see this getting to the finish line, congrats!

The authors field is no longer present as the information is not directly available from the journal.
If the field needs to be reactivated, it will be possible to compute the information from the revisions and the releases but the journal client will need to be improved.

I would really like to keep the author counter: how complex is it to add it?

I would really like to keep the author counter: how complex is it to add it?

There is no real complexity, it's just an implementation of a dedicated journal client that deserializes the message to extract information instead of just sending the message's key to redis.
This kind of journal client will be necessary in any case if we want to extend the usage of the counters for other perimeters (metadata count, origin per forge, ...)

This kind of journal client will be necessary in any case if we want to extend the usage of the counters for other perimeters (metadata count, origin per forge, ...)

Let's go for it, then. May you take this over?

Let's go for it, then. May you take this over?

Yes, @vsellier opened T3251 for that part ;)

Last bits deployed on archive.s.o (including the author counters).

Last bits deployed on archive.s.o (including the author counters).

Great, works like a charm.

What about the old counter pipeline? Has it been decommissioned already?

What about the old counter pipeline? Has it been decommissioned already?

I don't think so as I do not recall seeing diffs about clean up.

In any case, it's not part of what's currently deployed (so no risk for
data mangling if that's part the concern).

What about the old counter pipeline? Has it been decommissioned already?

I don't think so as I do not recall seeing diffs about clean up.

In any case, it's not part of what's currently deployed (so no risk for
data mangling if that's part the concern).

I also recall now that vincent added a graph [1] recently enough.
This to try and compare a bit the counter approaches together.

So that's still using the old plumbing at least for that part.

[1] https://grafana.softwareheritage.org/goto/BlkwHorMz

> I also recall now that vincent added a graph [1] recently enough.

This to try and compare a bit the counter approaches together.

So that's still using the old plumbing at least for that part.

[1] https://grafana.softwareheritage.org/goto/BlkwHorMz

Ok thanks, we can keep this for a while to be fully confident (a couple of months).
Then it will be nice to just disable the old pipeline, to reduce the load on the DB (which was our initial objective :-))

vsellier moved this task from Backlog to done on the System administration board.

The cleanup of the old counters is done so it can be closed