Page MenuHomeSoftware Heritage

Content replayer may try to copy objects before they are available from an objstorage
Closed, MigratedEdits Locked

Description

There is going to be an issue with the content replayer in steady state: content objects (without the data) are written to Kafka before the data is written to the objstorage.
So if content replayers are fast enough (and they probably will), they'll try to access the data in objstorage before it's there

(And if we wrote to Kafka after writing to the objstorage, then there would a risk of Kafka missing some content in case of failure, which is worse)

A possible solution is to have the content replayer retry reading objects, until they are available.

There is however the issue of missing objects (T1817), so it can't retry forever for all objects or it will get stuck. We see two possible solutions:

  • a retry timeout, but it means that some objects might be skipped when they shouldn't (eg. if the object takes a lot of time to be available in the objstorage)
  • "hardcoding" a list of missing objects in the configuration, but it could possibly grow large with time (hopefully it won't)

Event Timeline

vlorentz triaged this task as Normal priority.Sep 17 2019, 11:58 AM
vlorentz created this task.

My guts on this task tell me that what we need (what we really really need) is a 3rd solution:

Have a client-specific FIFO queue in which failed content ids are pushed. Then a garbage-collection process is in charge of pulling these content ids from the queue and attempt to copy them again a number of times (and give up after that, if any).

This would prevent the main 'steady-state' content replayers from being curbed by temp failures on some sort.
This needs to be marked out to ensure we do not overflow this queue (i.e. if a majority of object copies are in failure, we do not want to fille this queue as fast as kafka messages are pushed on the content topic.)

A simple solution would be to simply limit the capacity of this queue and make the content-replayer stop/crash if it needs to push an object id in this garbage-collector queue but cannot do so for some reason (like the queue is full).

This queue can be stored as a kafka topic, but this is an implementation detail, so it could be done by any FIFO capable provider. In fact, since it's purely a client-side, we do not want to provide any such 'service' as allowing a journal consumer (client) to create new topics in the main kafka.

I agree with @douardda that the "failed content" queue + separate processor approach would be the most sensible.

I also agree that the depth of this queue must be limited, and that reaching the queue depth should prevent further reads from being committed to kafka.

We can't really know the size of a kafka topic, so I don't think this would be an appropriate solution. The queue needs to be resilient across restarts of the client infrastructure, so memcached is out; It also needs to be shared across workers (ideally), so we should probably look at something rabbitmq (ugh) or redis-based.

douardda raised the priority of this task from Normal to High.Jan 23 2020, 1:53 PM

Since T1914 is high priority, this one is too.

So, now that T1914 is stuck, I'm giving this a harder think, and I'm wondering whether we shouldn't have a generic buffering/filtering component in the journal instead:

  1. journal writing and "archive integrity" checking are currently concurrent processes, and the main archive storage transaction can fail while the messages are still written to the journal (for any and all object types). We don't have a (reasonably performant) way of interlocking the main storage and journal transactions.
  2. We would like our mirrors to be as close to the main archive as possible (for instance, in terms of object counts, it'd be weird for a mirror to have more objects and be "ahead" of the original)...
  3. distributing the "retry getting this object because maybe it hasn't arrived yet" logic among clients feels pretty brittle to me
  4. we're already running a copy of the kafka topics from one cluster (the rocquencourt "buffer") to another (the azure "full copy")

I'm therefore tempted to just write a new swh.journal.filter (swh.journal.mirror_maker to replicate the kafka terminology?) component, which would read from a set of raw journal topics, check that the objects exist in a given storage + objstorage backend (or even on the readonly mirror), and if so push them to "cleaned" kafka topics.

This component would centralize the "has this object already appeared?" logic, as well as the queueing+retry logic, and would replace the current kafka mirror component.

How does that sound?

As for implementing the queue / retry behavior in the filter component:

The filter has a database of "failed to process" messages with the following schema:

  • (topic text, key bytea) primary key
  • value bytea
  • first_attempt timestamptz default now()
  • num_attempts short default 1
  • in_flight bool default false
  • given_up bool default false
  • latest_attempt timestamptz default now() (indexed where given_up = false and in_flight = false)

The filter journal client does the following:

  • When a message fails to process:
    • upsert it into the database; if the (topic, key) exists, set in_flight = false
  • When a message processes successfully:
    • if it exists in the database, clear it

The filter has a separate re-queueing process which polls the database every $timeframe:

  • reads messages where latest_attempt > threshold, given_up = false, in_flight = false
  • send them again to the buffer topic
  • increment num_attempts, set latest_attempt to now(), set in_flight to true
  • if the retry policy (num_attempts or retry delay) is exceeded, sets given_up = true

A housekeeping routine would clear old enough given_up entries.

Key metrics for the filter component:

  • kafka consumer offset
  • min(latest_attempt) where in_flight = true (time it takes for a message from submission in the buffer to (re-)processing by the filter; should stay close to the current time)
  • count(*) where given_up = false group by topic (number of objects pending a retry, should be small)
  • count(*) where in_flight = true group by topic (number of objects buffered for reprocessing, should be small)
  • max(latest_attempt) (last processing time by the requeuing process)
  • count(*) where given_up = true (checks whether the housekeeping process)

Note: haven't read the other comment below, just reacting at this one as I am reading it.

In T2003#41428, @olasd wrote:

So, now that T1914 is stuck, I'm giving this a harder think, and I'm wondering whether we shouldn't have a generic buffering/filtering component in the journal instead:

  1. journal writing and "archive integrity" checking are currently concurrent processes, and the main archive storage transaction can fail while the messages are still written to the journal (for any and all object types). We don't have a (reasonably performant) way of interlocking the main storage and journal transactions.
  2. We would like our mirrors to be as close to the main archive as possible (for instance, in terms of object counts, it'd be weird for a mirror to have more objects and be "ahead" of the original)...
  3. distributing the "retry getting this object because maybe it hasn't arrived yet" logic among clients feels pretty brittle to me
  4. we're already running a copy of the kafka topics from one cluster (the rocquencourt "buffer") to another (the azure "full copy")

I'm therefore tempted to just write a new swh.journal.filter (swh.journal.mirror_maker to replicate the kafka terminology?) component, which would read from a set of raw journal topics, check that the objects exist in a given storage + objstorage backend (or even on the readonly mirror), and if so push them to "cleaned" kafka topics.

This component would centralize the "has this object already appeared?" logic, as well as the queueing+retry logic, and would replace the current kafka mirror component.

How does that sound?

Question: do we (expect to) have the problem (temporary inconsistent state of the archive emitting messages in the kafka topics) for other object type than content?
I expect the content objects to be the more prone to this failure mode due to the "distributed" nature of storing them (storage + objstorage). If we focus on object content, maybe a simple solution would be to have the objstorage public a topic on kafka as well, with a message for each content added in the objstorage. So a content replayer would be dedicated to the objstorage replication.

I believe this solution can be seen as a simplified version of your swh.journal.filter proposal.

In T2003#41429, @olasd wrote:

Key metrics for the filter component:

  • kafka consumer offset
  • min(latest_attempt) where in_flight = true (time it takes for a message from submission in the buffer to (re-)processing by the filter; should stay close to the current time)
  • count(*) where given_up = false group by topic (number of objects pending a retry, should be small)
  • count(*) where in_flight = true group by topic (number of objects buffered for reprocessing, should be small)
  • max(latest_attempt) (last processing time by the requeuing process)
  • count(*) where given_up = true (checks whether the housekeeping process)

excellent idea to add this section in your description!

In T2003#41428, @olasd wrote:

This component would centralize the "has this object already appeared?" logic, as well as the queueing+retry logic, and would replace the current kafka mirror component.

How does that sound?

I'm a bit worried this would be a first step of "let's implement a transaction layer on our nosql distributed database". Maybe I'm a bit pessimistic here...

I mean, if we want the messages in kafka to be published only after the objects have been inserted for sure, why don't we just write these messages in kafka after we insert them in the DB in the first place?

@olasd I'm worried that implementing your idea would result in some complex piece of code. It also adds a new postgresql database and new kafka topics, that will need extra resources and management. And if at some point that queue database becomes too large, the retrier will become slower, causing the queue to grow even more.

Alternative idea, which I think is much simpler:

  • Writes to Kafka are handled by a specific swh-storage "backend" (which also copies to the objstorage), after input validation (using swh-model)
  • A Kafka consumer writes data to postgresql. If the data is refused by postgresql, then it's either a temporary failure or a bug in the input validation.

There's however the issue of dealing with input validation being too laxist if there's already a message in Kafka that shouldn't be there.

Question: do we (expect to) have the problem (temporary inconsistent state of the archive emitting messages in the kafka topics) for other object type than content?

No. postgresql may fail or time out on any object insertion. There's nothing special about contents.

I'm a bit worried this would be a first step of "let's implement a transaction layer on our nosql distributed database". Maybe I'm a bit pessimistic here...

I mean, if we want the messages in kafka to be published only after the objects have been inserted for sure, why don't we just write these messages in kafka after we insert them in the DB in the first place?

We already discussed this at the time we replaced the journal-publisher with journal-writer. Adding to Kafka after inserting to the DB means that Kafka will be missing some messages, and we would need to run a backfiller on a regular basis to fix it.

@olasd I'm worried that implementing your idea would result in some complex piece of code.

We're already using kafka's built-in mirror maker . It's also a complex piece of code, which sometimes mysteriously stops working (it's currently running in a for loop...).

It also adds a new postgresql database and new kafka topics, that will need extra resources and management.

Technically, we already have two sets of kafka topics. This idea would just repurpose them.

And if at some point that queue database becomes too large, the retrier will become slower, causing the queue to grow even more.

The retrier would only add entries for the objects which fail the existence check (which should be just a few), and will keep them around for just a few retries. We don't even /have/ to enable it for objects other than contents.

Alternative idea, which I think is much simpler:

  • Writes to Kafka are handled by a specific swh-storage "backend" (which also copies to the objstorage), after input validation (using swh-model)
  • A Kafka consumer writes data to postgresql. If the data is refused by postgresql, then it's either a temporary failure or a bug in the input validation.

There's however the issue of dealing with input validation being too laxist if there's already a message in Kafka that shouldn't be there.

I really like how your idea pushes us forward with several of our goals:

  • distributing the ingestion load to a, well, distributed system
  • centralizing the load on additions to the main archive to a single, more manageable component
  • adding input validation everywhere
  • disentangling the loading process away from the main storage technology towards our generic data model

The journal consumer component already exists, it's the replayer. My main concern here is how battle-tested it is, but I guess the currently running cassandra replay, as well as @douardda's earlier tests of the mirroring infra, can give us a good idea there.

As for the caveat of input validation being too laxist, or messages in kafka that shouldn't be there, we already have a handler for object format quirks in the replayer.

I guess the next step to move this forward would be implementing the kafka-first storage, and kicking its tires in the docker environment?

Now that I think of it, we can decompose this in stages in the storage pipeline:

  • add an input validating proxy high up the stack
  • replace the journal writer calls sprinkled in all methods with a journal writing proxy
  • add a "don't insert objects" filter low down the stack

so we'd end up with the following pipeline for workers:

  • input validation proxy
  • object bundling proxy
  • object deduplication against read-only proxy
  • journal writer proxy
  • addition-blocking filter
  • underlying read-only storage

and the following pipeline for the "main storage replayer":

  • underlying read-write storage

(it's a very short pipeline... a pipedash?)

We already discussed this at the time we replaced the journal-publisher with journal-writer. Adding to Kafka after inserting to the DB means that Kafka will be missing some messages, and we would need to run a backfiller on a regular basis to fix it.

I know this discussion already took place, but I could not remember why we chose to serialize write operations the way we did. One question could be 'what is the definitive source of truth in our stack?' Because if we use pg as such a trustworthy truth keeper (which is what we do for now), I don't see why we should run any backfiller process if an object could not reach it.

Indeed we need better data validation all along the path, especially at the very beginning of the ingestion process pipeline of "the storage" (whatever this means in the context of this discussion) as @olasd (and I believe everyone) already mentioned, but since sh*t happen, we must make sure we remain reasonably consistent and efficient.

Maybe we need to make sure we have a clear causality/dependency graph for data ingestion in the graph, including all the whistles and belts we have / plan to have (aka consumers of the graph; replication, mirrors, indexers, etc.) so we can estimate the consequences of any misbehavior of one node or edge of this graph...

The pipeline(s) would be then just an implementation of this model I guess...

In T2003#41456, @olasd wrote:

Now that I think of it, we can decompose this in stages in the storage pipeline:

  • add an input validating proxy high up the stack
  • replace the journal writer calls sprinkled in all methods with a journal writing proxy
  • add a "don't insert objects" filter low down the stack

so we'd end up with the following pipeline for workers:

  • input validation proxy
  • object bundling proxy
  • object deduplication against read-only proxy
  • journal writer proxy
  • addition-blocking filter
  • underlying read-only storage

and the following pipeline for the "main storage replayer":

  • underlying read-write storage

(it's a very short pipeline... a pipedash?)

Ok I am not sure I get how this (interesting) idea solves our current problem of space-time causality violation.

If an ingestion worker only writes to the journal (if I understand your idea correctly), who is responsible for pushing actual content in the (master) objstorage? Is it a missing step of the worker's pipeline?

One question could be 'what is the definitive source of truth in our stack?'

I assumed we wanted to aim for Kafka to be the source of truth

One question could be 'what is the definitive source of truth in our stack?'

I assumed we wanted to aim for Kafka to be the source of truth

I guess so, but since it is not a queryable (sic) Source of Truth, it installs asynchronicity and latency at the very heart of the infrastructure. And since we do not store file content in kafka, the question becomes: who is the consistency keeper in this model?

Let's assume we really use kafka as source of truth, we still have the race condition when an actual blob object is not yet available in an objstorage when the corresponding kafka message (add_content) is handled by a consumer.
So now I am beginning to rally @olasd's idea of a kafka swh.journal.mirror_maker to manage this re-synchronization of our 2 sources of temporal glitches (storage & objstorage). I see no proper way of handling this race condition without a buffering/resync component like this mirror_maker idea.

I am very not excited by the idea of adding a new source of "statefullness" in the machinery but since I agree 100% with @olasd's "distributing the 'retry getting this object because maybe it hasn't arrived yet' logic among clients feels pretty brittle to me", I guess it's worth the experiment.

I think it would be ok to write in the journal after adding to the objstorage.

zack renamed this task from Content replayer may try to copy objects before they are available in an objstorage to Content replayer may try to copy objects before they are available from an objstorage.Jul 30 2020, 8:18 AM
olasd assigned this task to vlorentz.

So D5246 has landed a while ago. The s3 object copy process has now caught up on some partitions and I can confirm that the copy of the latest added objects happens without any race condition.