Page MenuHomeSoftware Heritage

Next generation archive counters
Open, NormalPublic

Description

The current approach to the archive counters involves a significant amount of interaction with the main database, and this may lead to critical issues, as we have seen in T2828, so we need to find a way to maintain accurate counters without overloading the database.

A first attempt to maintain the archive counters without database interaction was made by having each loader adding to the global counters the number of new objects encountered. Unfortunately, this approach led to the counters ending up showing a value greater than the real one: indeed, if several workers happen to ingest forks of a same new project (and it does happen!), then the new objects are counted multiple times.

This tasks proposes a simple alternative approach to maintain archive counters that both drastically reduces interaction with the main database and avoids counter overrun.

The ArchiveCounter component

The key to the approach is a new ArchiveCounter service that maintains one standard Bloom filters and a global counter for each of the different kind of objects that are counted (contents, directories, revisions, releases, snapshots, origins). The Bloom filter allows very fast answers to the question "is this object new?" with no false negatives and a controlled probability of false positives. The global counter keeps the running count of the new objects seen so far (up to the probability of false positives).

In the following, we detail the operation just for the case of contents.

Loaders send to the ArchiveCounter the list newcontents of identifiers of the (supposedly) new contents they encounter, and ArchiveCounter performs the following simple operation:

for id in newcontents:
      if not(BloomFilter.add(id)):    # BloomFilter.add returns True if id considered known
         contentcounter += 1

The operation BloomFilter.add will return a false positive (i.e. consider a content as already seen, while it is actually new, hence resulting in counter underrun), with a probability that we can control. For example (see details), we can get a 1% probability of false positives with up to 20billion objects with a 24GB Bloom filter and 7 hash functions, and we can go to 0.1% with a 36GB Bloom filter and 10 hash functions.

With these parameters, at any moment in time contentcounter contains an underapproximation of the real number of contents stored in the archive that is totally acceptable for our use case.

Every so often, one can correct the counter drift by running an expensive query on the database, and update contentcounter accordingly.

Algorithmic remarks

Dillinger and Panagiotis show in their 2004 paper "Bloom Filters in Probabilistic Verification", that one can resort to using just two hash functions, which reduces the computation cost w.r.t. the standard Bloom filter construction.

But in our case, the "keys" are already randomly distributed, as they are SHA1 cryptographic hashes! So we could simply skip the Bloom hashing phase altogether, and do the 2 hashing construction using (fragments of) the key to be inserted in the filter.

Initialisation

In order to get to nominal operation, one can progressively fill the Bloom filters as follow:

  1. turn ArchiveCounter on, with an empty Bloom filter
  2. reconfigure loaders (or swh-storage) to write to ArchiveCounter
  3. dump the list of hashes from swh-storage or swh-journal
  4. switch the archive counters to use ArchiveCounter

This is sounds because all objects created after step 2 will be written directly, and all objects created before step 3 will be backfilled by step 3. (and the overlap is dealt with by ArchiveCounter)

Implementation

Implementing a Bloom filter is straightforward, and there are many libraries available, but with our space requirements, we need to benchmark existing implementations. I came across this Python3 library that has the added benefit of using a memory mapped data structure, that allows to save the filter on disk, and it could be a good candidate, but the ultimate choice needs to be made based on performance and stability, independently of the programming language used.

Resources
Dedicated page on Devopedia

Event Timeline

rdicosmo triaged this task as Normal priority.Tue, Dec 22, 12:57 PM
rdicosmo created this task.
rdicosmo updated the task description. (Show Details)

A Python library may be an issue, as it requires a central process with a global lock. Sharding by hash may fix the issue, though.

ArchiveCounter need to be filled up with all the known objects in the database: this requires stopping archival

Not necessarily. We can also do it like this:

  1. turn ArchiveCounter on, with an empty Bloom filter
  2. reconfigure loaders (or swh-storage) to write to ArchiveCounter
  3. dump the list of hashes from swh-storage or swh-journal
  4. switch the archive counters to use ArchiveCounter

This is sounds because all objects created after step 2 will be written directly, and all objects created before step 3 will be backfilled by step 3. (and the overlap is dealt with by ArchiveCounter)

A Python library may be an issue, as it requires a central process with a global lock. Sharding by hash may fix the issue, though.

Sure, no obligation to stick to Python for this (the underlying implementation of the library pointed above is in C/C++)

ArchiveCounter need to be filled up with all the known objects in the database: this requires stopping archival

Not necessarily. We can also do it like this:

  1. turn ArchiveCounter on, with an empty Bloom filter
  2. reconfigure loaders (or swh-storage) to write to ArchiveCounter
  3. dump the list of hashes from swh-storage or swh-journal
  4. switch the archive counters to use ArchiveCounter

This is sounds because all objects created after step 2 will be written directly, and all objects created before step 3 will be backfilled by step 3. (and the overlap is dealt with by ArchiveCounter)

Good point!

Updated the proposal with your suggestions, thanks!

Thanks for sketching out this proposal! It looks quite promising (and neat!).

The current PostgreSQL implementation of counters works, but we're walking a tight rope balancing resource usage/efficiency/usefulness, and there's a concern that Cassandra will completely blow this up. It's great to be exploring a more effective design, that should be more durable than any given storage implementation.

I think the basic design, as well as the initialization plan, are sound.

The object types and amounts we want to count are:

object typecurrent countrelative to content
content9555554872100
directory804083871384.14
revision200783052221.01
origin_visit8687425599.091
origin1504653241.574
snapshot1423127301.489
release167441480.1752
skipped_content1407870.001473

Supposedly, the growth of the origin_visit table will be faster than that of the snapshot table, which will be higher than that of the origin table itself. We should definitely tune the size of bloom filters accordingly, to avoid using tons of memory for not much use.

I think we should be able to decouple these counters completely from the loaders, and have them directly updated/handled by a client of the swh-journal. This would be a "centralized" component, but which we can parallelize quite heavily thanks to basic kafka design. We can also leverage the way kafka clients do parallelism to sidestep the locking issues arising in a potentially distributed filter.

There's a few pros and cons to the "full journal client" approach that I can think of:

  • cons:
    • the journal has very slightly more objects than the database, as some objects can be written to the journal and be eventually rejected by postgres
      • (this is a transient issue and a very overwhelming majority of objects should eventually make it into postgres; that overcount can be handled within the false positive margin of the bloom filters)
    • one more separate component to manage
      • (but we need to manage the component that would allow distributed access to the bloom filters for workers anyway)
    • centralized load in the counter management component instead of diffuse load across workers (it's also a pro!)
    • if we use sharded bloom filters/counters along the lines of partitioning for kafka, correcting the counters for false positives is harder than if we shard objects using their SWHID (a count of objects within a range of SWHIDs can be counted with a postgres query, while we'd need to roll our own "get an exact count of unique swhids per partition" journal client).
  • pros:
    • no additional load or even code on the loaders
    • centralized monitoring and "load tuning" of the counter component via the monitoring of kafka consumer offsets
    • built-in parallelization design with guaranteed consistent sharding of objects (orthogonal to the swhids) removes the filter locking issue: we can map one bloom filter per kafka partition, which will only ever be consumed by a single client process at once. Considering the "natural kafka sharding" is orthogonal to the object SWHIDs, we should be able to use the SWHID directly as input to the bloom filter without fear of inefficiency caused by non-uniform SWHIDs in a given shard.
    • using a journal client removes the need for a specific filter initialization step; we initialize using the full journal, and we can switch over the "display" when the consumer offset is small enough

I'm also having the "full journal" approach in mind after a quick reading of this neat proposal :-)

In T2912#55849, @olasd wrote:

Thanks for sketching out this proposal! It looks quite promising (and neat!).

Happy you like it!

I think we should be able to decouple these counters completely from the loaders, and have them directly updated/handled by a client of the swh-journal. This would be a "centralized" component, but which we can parallelize quite heavily thanks to basic kafka design. We can also leverage the way kafka clients do parallelism to sidestep the locking issues arising in a potentially distributed filter.

Maybe my writing was not all that clear: I also had in mind a single centralised component (the ArchiveCounter) per Bloom filter, receiving the lists newcontents of ids from the loaders.
Getting the feed of ids from swh-journal instead of from the loaders is really neat: we avoid touching the loader code, and we gain a better capability of monitoring the load on the ArchiveCounter, so I'm all for it :-)

A few comments about the cons below:

There's a few pros and cons to the "full journal client" approach that I can think of:

  • cons:
    • the journal has very slightly more objects than the database, as some objects can be written to the journal and be eventually rejected by postgres
      • (this is a transient issue and a very overwhelming majority of objects should eventually make it into postgres; that overcount can be handled within the false positive margin of the bloom filters)

Considering the Bloom filter slight underrun, and the marginality of the phaenomenon, this seems not to be an issue indeed.

  • one more separate component to manage
    • (but we need to manage the component that would allow distributed access to the bloom filters for workers anyway)

Agree, this is not really an issue.

  • centralized load in the counter management component instead of diffuse load across workers (it's also a pro!)

I see it also as a big pro: we can control better the load if we have only one source of inputs, instead of a gazillion loaders. We should expect much higher throughput in our case than with a standard Bloom filter, though, as we can bypass the costly hash computations by using (fragments of) the SWHIDs as the two hashes that are sufficient for the construction proposed by Dillinger and Panagiotis. Some care needs to be taken (when all bits happen to be 0), but the speedup should be significant, compared to computing 10 hashes. A real test will tell :-)

  • if we use sharded bloom filters/counters along the lines of partitioning for kafka, correcting the counters for false positives is harder than if we shard objects using their SWHID (a count of objects within a range of SWHIDs can be counted with a postgres query, while we'd need to roll our own "get an exact count of unique swhids per partition" journal client).

If the filters will be as fast as I hope (see point above), I believe we can avoid this extra complication, and just keep one filter per counter.

In T2912#55849, @olasd wrote:

I think we should be able to decouple these counters completely from the loaders, and have them directly updated/handled by a client of the swh-journal. This would be a "centralized" component, but which we can parallelize quite heavily thanks to basic kafka design. We can also leverage the way kafka clients do parallelism to sidestep the locking issues arising in a potentially distributed filter.

Maybe my writing was not all that clear: I also had in mind a single centralised component (the ArchiveCounter) per Bloom filter, receiving the lists newcontents of ids from the loaders.
Getting the feed of ids from swh-journal instead of from the loaders is really neat: we avoid touching the loader code, and we gain a better capability of monitoring the load on the ArchiveCounter, so I'm all for it :-)

It looks like you already agree, but FWIW I'd also would like to have a dedicated (micro)service that keeps an up-to-date bloom filter for the entire archive, with a REST API.
It might be useful for other use cases (swh-scanner comes to mind, but I'm sure we'll find others as time passes).

It looks like you already agree, but FWIW I'd also would like to have a dedicated (micro)service that keeps an up-to-date bloom filter for the entire archive, with a REST API.
It might be useful for other use cases (swh-scanner comes to mind, but I'm sure we'll find others as time passes).

Very good point! So we have an extra reason to move forward ;-)