Conversation
📝 WalkthroughWalkthroughIntroduces a generic in-memory, write-through cache for etcd-backed Value/StoredValue entries with Start/Stop lifecycle, prefix watch synchronization, thread-safe Get/GetPrefix read paths, write-through Put/Create/Update/Delete operations, watch-error propagation, and transaction-aware cache updates. Changes
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
a7db4be to
f3edc39
Compare
6fa02cb to
8739ec8
Compare
f3edc39 to
0905973
Compare
8739ec8 to
798404c
Compare
0905973 to
fa49e10
Compare
798404c to
31df024
Compare
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
server/internal/storage/cache.go (1)
52-53: Use a package sentinel for the double-start case.Returning a fresh
errors.New(...)here makes this condition hard to match witherrors.Isand breaks the package’s error taxonomy. Please promote it to a package-level sentinel such asErrCacheAlreadyStarted.As per coding guidelines, "Domain-specific errors should be defined in each package; API errors should be mapped to HTTP status codes via Goa".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@server/internal/storage/cache.go` around lines 52 - 53, Define a package-level sentinel error named ErrCacheAlreadyStarted (e.g., var ErrCacheAlreadyStarted = errors.New("cache already started")) and replace the inline errors.New call in the method that checks c.op (the block that currently returns errors.New("cache has already been started")) to return this sentinel instead; update any callers/tests to use errors.Is(..., ErrCacheAlreadyStarted) as needed and add a short comment on ErrCacheAlreadyStarted describing its purpose.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@server/internal/storage/cache.go`:
- Around line 176-194: The write() path can reintroduce stale data because
deletes remove entries and lose revision/tombstone info; update the cache to
track per-key last-seen revision/tombstone (extend cachedValue with a revision
and tombstone flag) and make all cache mutations (write(), UpdateCache(),
DeleteByKey, DeleteValue, DeletePrefix and watch handlers) accept and compare a
commit/delete revision before applying so only monotonically newer revisions
win; plumbing: propagate the etcd commit/delete revision into the cache-update
calls, never blindly delete the map entry on a watched/local delete (mark as
tombstone with its revision), and ensure write() compares itemVersion and the
stored cachedValue.revision to avoid resurrecting older state.
- Around line 159-173: The PropagateErrors goroutine can block if it receives
from c.errCh and then a cancellation happens before sending to ch; update
cache[V].PropagateErrors to mirror watchOp.PropagateErrors by replacing the
direct send with a nested select: after receiving err := <-c.errCh, do a select
that either sends err to ch or observes ctx.Done() (and skips/returns) so the
goroutine never blocks on a full/unconsumed ch after cancellation; reference
c.errCh, ch, ctx.Done() and the PropagateErrors method when making the change.
---
Nitpick comments:
In `@server/internal/storage/cache.go`:
- Around line 52-53: Define a package-level sentinel error named
ErrCacheAlreadyStarted (e.g., var ErrCacheAlreadyStarted = errors.New("cache
already started")) and replace the inline errors.New call in the method that
checks c.op (the block that currently returns errors.New("cache has already been
started")) to return this sentinel instead; update any callers/tests to use
errors.Is(..., ErrCacheAlreadyStarted) as needed and add a short comment on
ErrCacheAlreadyStarted describing its purpose.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 34f23d93-7b12-4b05-b647-f79080112650
📒 Files selected for processing (4)
server/internal/storage/cache.goserver/internal/storage/cache_test.goserver/internal/storage/interface.goserver/internal/storage/txn.go
c86bdfe to
7a8a522
Compare
rshoemaker
left a comment
There was a problem hiding this comment.
Looks good.
Not necessary now, but I wonder if we should consider adding something like Size() int on the Cache interface so we can periodically monitor how the cache is growing.
| if err != nil { | ||
| return fmt.Errorf("failed transaction: %w", err) | ||
| } | ||
| for _, o := range t.ops { |
There was a problem hiding this comment.
I'm not sure if this could cause a problem or not, but it seems like you shouldn't bother updating the rev unless the txn succeed. Maybe move this block below the !resp.Succeeded check?
There was a problem hiding this comment.
This was intentional, and I did the same in the other operations where I could. The server always returns the current revision, regardless of whether the transaction was successful, so I carried that over to these types.
| if err != nil { | ||
| // This should never happen, but if it does it's due to a programmer | ||
| // error. This needs to crash during development and testing. | ||
| panic(fmt.Errorf("failed to marshal cached value: %w", err)) |
There was a problem hiding this comment.
You want this panic committed for now, but eventually removed before the next release? If so, how are we tracking that this is removed?
There was a problem hiding this comment.
Sorry, maybe I can word this better. This message is just meant to justify why crashing is the right behavior. I want this to fail loudly and clearly so we can identify issues during development or testing. The alternative is to propagate this error everywhere, but in this case, I feel that that would have limited value and just create a lot of dead code.
The reason I say it shouldn't ever happen is that JSON marshal/unmarshal has a fairly limited set of failure conditions based on the value type, and we already depend on JSON serialization for our storage layer. There shouldn't be any conditions where those would succeed, but these would fail.
| if c.op != nil { | ||
| c.op.Close() | ||
| c.op = nil | ||
| } |
There was a problem hiding this comment.
Stop method closes the watch but doesn't reset lastWatchRevision, items, or tombstones.
fa49e10 to
f7cad02
Compare
7a8a522 to
ceead79
Compare
Up to standards ✅🟢 Issues
|
| Metric | Results |
|---|---|
| Complexity | 94 |
| Duplication | 0 |
TIP This summary will be updated as you push new changes. Give us feedback
f7cad02 to
8fbc079
Compare
ceead79 to
7dcf042
Compare
Adds a write-through cache implementation to the `storage` package. The cache and its operations serve reads from an in-memory map, while writes are served from Etcd and, if successful, persisted to the in-memory map. This cache is useful for areas where we need to perform frequent range queries over small to moderate numbers of small values types. This implementation is designed to be easy and transparent to add to existing stores.
7dcf042 to
4e6e1e5
Compare
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (3)
server/internal/storage/delete.go (1)
121-136: PropagateClientOpfailures here too.This is the only
Execpath that still discards the newClientOp(ctx)error. Even if it cannot fail today, the interface now allows it, so this would fall through into the txn with a zero-value op instead of failing fast.♻️ Proposed fix
func (o *deleteValueOp[V]) Exec(ctx context.Context) error { - op, _ := o.ClientOp(ctx) + op, err := o.ClientOp(ctx) + if err != nil { + return err + } resp, err := o.client.Txn(ctx). If(o.Cmps()...). Then(op).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@server/internal/storage/delete.go` around lines 121 - 136, The Exec method on deleteValueOp currently discards the error returned by ClientOp by calling op, _ := o.ClientOp(ctx); change this to capture and propagate the error (e.g., op, err := o.ClientOp(ctx); if err != nil { return err }) before using op in the txn so Exec fails fast instead of passing a zero-value op into o.client.Txn(...). If ClientOp cannot fail yet, still keep the error check to honor the updated signature of deleteValueOp.ClientOp(ctx).server/internal/storage/interface.go (1)
67-100: Update theRevision()docs to mention transaction commits.
Txn.Commitnow callsUpdateRevision(...)on each operation, so saying these values are populated only afterExecis inaccurate for ops that are used exclusively inside a transaction.📝 Suggested wording
- // The revision returned by the server. Only populated after Exec is called. + // The revision returned by the server. Populated after Exec, or after a + // Txn.Commit that includes this operation.Apply the same wording adjustment to
PutOp,DeleteOp, andDeleteValueOp.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@server/internal/storage/interface.go` around lines 67 - 100, Update the Revision() documentation on PutOp, DeleteOp and DeleteValueOp to state that the revision is populated after Exec(ctx) or, if the operation is executed as part of a transaction, after the transaction is committed (e.g., Txn.Commit calls UpdateRevision on each operation); reference the Revision() method on PutOp, DeleteOp, and DeleteValueOp and mention Txn.Commit/UpdateRevision to clarify when the value is set.server/internal/storage/cache.go (1)
61-63: Promote the started-state failure to a package error.Returning an inline
errors.Newmakes restart misuse hard to detect without string matching. Prefer a package-level sentinel alongside the other storage errors.As per coding guidelines: "Domain-specific errors should be defined in each package; API errors should be mapped to HTTP status codes via Goa".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@server/internal/storage/cache.go` around lines 61 - 63, Define a package-level sentinel error (e.g., ErrAlreadyStarted or ErrCacheAlreadyStarted) alongside the other storage errors and replace the inline errors.New call in the start path that checks c.op (the block using c.op != nil) to return that sentinel instead; update any callers/tests that compare the error to use errors.Is (or direct equality if unwrapped) against the new sentinel so restart misuse can be reliably detected.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@server/internal/storage/cache.go`:
- Around line 197-225: The >= revision check in unlockedWrite and
unlockedWriteTombstone lets later ops from the same txn (same revision) be
dropped; add an intra-revision tie-breaker: extend cachedValue with a seq (or
opIndex) and change unlockedWrite/unlockedWriteTombstone to compare
(existing.revision > revision) || (existing.revision == revision && existing.seq
>= seq) before returning, and update callers (the UpdateCache path / txn code
that stamps resp.Header.Revision) to supply a monotonically increasing seq
per-op within the txn so later ops at the same revision win.
- Around line 331-343: The deletePrefix function only tombstones in-memory keys
and misses unseen puts; add prefix-level tombstone state (e.g., a map like
prefixTombstones[string]int64 protected by c.mu) and in deletePrefix (and its
helper unlockedWriteTombstone) record the prefix->revision before iterating
existing c.items, then iterate c.items to tombstone existing keys as now;
finally update the watch/put handling path (the function that processes incoming
PUT events / unlockedWrite or handlePut) to consult prefixTombstones and
ignore/suppress any PUT whose revision <= the recorded tombstone revision for
any matching prefix. Ensure all accesses use c.mu to avoid races and clean up
prefix tombstones when appropriate.
- Around line 163-173: Stop() currently resets c.lastWatchRevision, c.items, and
c.tombstones while only holding c.opMu, causing races with readers/writers (Get,
GetPrefix, UpdateCache, watch callbacks) that use c.mu; change Stop() to close
and nil out c.op while holding c.opMu as now, but perform the state reset
(assign lastWatchRevision=0, reassign items, set tombstones=nil) while holding
c.mu to serialize with Get/GetPrefix/UpdateCache and watch handlers (i.e.,
acquire c.mu before mutating those fields, then release it).
In `@server/internal/storage/put.go`:
- Around line 294-317: clientPutOp currently truncates sub-second TTLs by
calling int64(ttl.Seconds()) before client.Grant; replace that conversion with a
safe rounding that ensures any non-zero duration yields at least 1 second (e.g.,
compute seconds using ceiling of ttl in seconds or via nanosecond arithmetic)
and pass that value to client.Grant so 500ms becomes 1s instead of 0; update the
conversion at the client.Grant(...) call site in clientPutOp (replace
int64(ttl.Seconds()) logic) and keep the rest of the lease handling and error
wrapping unchanged.
---
Nitpick comments:
In `@server/internal/storage/cache.go`:
- Around line 61-63: Define a package-level sentinel error (e.g.,
ErrAlreadyStarted or ErrCacheAlreadyStarted) alongside the other storage errors
and replace the inline errors.New call in the start path that checks c.op (the
block using c.op != nil) to return that sentinel instead; update any
callers/tests that compare the error to use errors.Is (or direct equality if
unwrapped) against the new sentinel so restart misuse can be reliably detected.
In `@server/internal/storage/delete.go`:
- Around line 121-136: The Exec method on deleteValueOp currently discards the
error returned by ClientOp by calling op, _ := o.ClientOp(ctx); change this to
capture and propagate the error (e.g., op, err := o.ClientOp(ctx); if err != nil
{ return err }) before using op in the txn so Exec fails fast instead of passing
a zero-value op into o.client.Txn(...). If ClientOp cannot fail yet, still keep
the error check to honor the updated signature of deleteValueOp.ClientOp(ctx).
In `@server/internal/storage/interface.go`:
- Around line 67-100: Update the Revision() documentation on PutOp, DeleteOp and
DeleteValueOp to state that the revision is populated after Exec(ctx) or, if the
operation is executed as part of a transaction, after the transaction is
committed (e.g., Txn.Commit calls UpdateRevision on each operation); reference
the Revision() method on PutOp, DeleteOp, and DeleteValueOp and mention
Txn.Commit/UpdateRevision to clarify when the value is set.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 23c8b7e8-1699-4cf9-baf9-2eb816650963
📒 Files selected for processing (11)
server/internal/storage/cache.goserver/internal/storage/cache_ordering_test.goserver/internal/storage/cache_test.goserver/internal/storage/delete.goserver/internal/storage/delete_test.goserver/internal/storage/interface.goserver/internal/storage/put.goserver/internal/storage/put_test.goserver/internal/storage/txn.goserver/internal/storage/txn_test.goserver/internal/storage/watch.go
✅ Files skipped from review due to trivial changes (1)
- server/internal/storage/cache_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
- server/internal/storage/txn.go
Summary
Adds a write-through cache implementation to the
storagepackage. The cache and its operations serve reads from an in-memory map, while writes are served from Etcd and, if successful, persisted to the in-memory map. This cache is useful for areas where we need to perform frequent range queries over small to moderate numbers of small values types. This implementation is designed to be easy and transparent to add to existing stores.Testing
This cache is used by the following PR, so there are no functional changes from this PR.