diff --git a/README.md b/README.md index 77e2810..2a37eaf 100644 --- a/README.md +++ b/README.md @@ -37,12 +37,15 @@ defer bq.Close() Bigqueue also allows setting up the maximum possible memory that it can use. By default, the maximum memory is set to [3 x Arena Size]. ```go -bq, err := bigqueue.NewQueue("path/to/queue", bigqueue.SetArenaSize(4*1024), - bigqueue.SetMaxInMemArenas(10)) +bq, err := bigqueue.NewMmapQueue("path/to/queue", bigqueue.SetArenaSize(4*1024), + bigqueue.SetMaxInMemArenas(10), bigqueue.SetMaxArenasToKeep(20)) defer bq.Close() ``` In this case, bigqueue will never allocate more memory than `4KB*10=40KB`. This memory is above and beyond the memory used in buffers for copying data. +The `SetMaxArenasToKeep(20)` option tells bigqueue to retain at most 20 old +arena files on disk after they have been consumed; any older ones will +be automatically deleted. Bigqueue allows to set periodic flush based on either elapsed time or number of mutate (enqueue/dequeue) operations. Flush syncs the in memory changes of all @@ -50,15 +53,24 @@ memory mapped files with disk. *This is a best effort flush*. This is how we can set these options: ```go -bq, err := bigqueue.NewQueue("path/to/queue", bigqueue.SetPeriodicFlushOps(2)) +bq, err := bigqueue.NewMmapQueue("path/to/queue", bigqueue.SetPeriodicFlushOps(2)) ``` In this case, a flush is done after every two mutate operations. ```go -bq, err := bigqueue.NewQueue("path/to/queue", bigqueue.SetPeriodicFlushDuration(time.Minute)) +bq, err := bigqueue.NewMmapQueue("path/to/queue", bigqueue.SetPeriodicFlushDuration(time.Minute)) ``` In this case, a flush is done after one minute elapses and an Enqueue/Dequeue is called. +### Garbage Collection +By default, bigqueue automatically cleans up old arena files that have been +consumed by **all** consumers (based on the `SetMaxArenasToKeep` setting). +You can also manually trigger a garbage collection: +```go +bq.GC() +``` + + Write to bigqueue: ```go err := bq.Enqueue([]byte("elem")) diff --git a/arenamanager.go b/arenamanager.go index 2d863c5..4edf013 100644 --- a/arenamanager.go +++ b/arenamanager.go @@ -2,8 +2,8 @@ package bigqueue import ( "fmt" + "os" "path" - "strconv" "syscall" "github.com/grandecola/mmap" @@ -15,28 +15,22 @@ const ( // arenaManager manages all the arenas for a bigqueue type arenaManager struct { - dir string - conf *bqConfig - md *metadata - baseAid int - arenas []*mmap.File - inMem int - fullPath []byte + dir string + conf *bqConfig + md *metadata + arenas map[int]*mmap.File + inMem int } // newArenaManager returns a pointer to new arenaManager. func newArenaManager(dir string, conf *bqConfig, md *metadata) (*arenaManager, error) { - headAid, _ := md.getHead() tailAid, _ := md.getTail() - numArenas := tailAid + 1 - headAid - arenas := make([]*mmap.File, numArenas) am := &arenaManager{ - dir: path.Clean(dir), - conf: conf, - md: md, - baseAid: headAid, - arenas: arenas, + dir: path.Clean(dir), + conf: conf, + md: md, + arenas: make(map[int]*mmap.File), } // we load the tail arena into memory @@ -44,20 +38,78 @@ func newArenaManager(dir string, conf *bqConfig, md *metadata) (*arenaManager, e return nil, err } + am.gc() + return am, nil } -// getArena returns arena for a given arena ID -func (m *arenaManager) getArena(aid int) (*mmap.File, error) { - relAid := aid - m.baseAid - if relAid == len(m.arenas) { - m.arenas = append(m.arenas, nil) +// gc deletes the arena files that are no longer needed. +func (m *arenaManager) gc() { + if m.conf.maxArenasToKeep <= 0 { + return + } + + // find the minimum consumer head aid + minHeadAid, minHeadOff := -1, -1 + for _, base := range m.md.co { + aid, off := m.md.getConsumerHead(base) + if minHeadAid == -1 || aid < minHeadAid || (aid == minHeadAid && off < minHeadOff) { + minHeadAid = aid + minHeadOff = off + } + } + + if minHeadAid == -1 { + return + } + + // update global head to the minimum among all consumers. + // this is to ensure new consumers start from the earliest available data. + m.md.putHead(minHeadAid, minHeadOff) + _ = m.md.flush() + + // we keep maxArenasToKeep arenas before the minHeadAid + // everything before (minHeadAid - maxArenasToKeep) can be deleted. + limitAid := minHeadAid - m.conf.maxArenasToKeep + if limitAid <= 0 { + return + } + + // startAid - we can delete from 0 up to limitAid-1. + for aid := 0; aid < limitAid; aid++ { + // remove from memory if loaded + if aa, ok := m.arenas[aid]; ok && aa != nil { + _ = m.unloadArena(aid) + } + delete(m.arenas, aid) + + arenaPath := m.getArenaPath(aid) + if _, err := os.Stat(arenaPath); err == nil { + if err := os.Remove(arenaPath); err != nil { + } else { + } + } } - aa := m.arenas[relAid] - if aa != nil { +} + +// getArenaPath returns the full path for a given arena ID. +func (m *arenaManager) getArenaPath(aid int) string { + fileName := fmt.Sprintf("%d%s", aid, cArenaFileSuffix) + return path.Join(m.dir, fileName) +} + +// loadOrGetArena returns arena for a given arena ID +func (m *arenaManager) getArena(aid int) (*mmap.File, error) { + if aa, ok := m.arenas[aid]; ok && aa != nil { return aa, nil } + // if this is a new arena being requested (tail expansion) + // getTail doesn't help here because writer might be calling it before metadata update + // but basically if it's not in the map, we try to load or create it. + // In the original slice logic, relAid == len(m.arenas) triggered GC. + // Here we can check if it's beyond a certain aid or just trigger GC occasionally. + // before we get a new arena into memory, we need to ensure that after fetching // a new arena into memory, we do not cross the provided memory limit. if err := m.ensureEnoughMem(); err != nil { @@ -69,6 +121,8 @@ func (m *arenaManager) getArena(aid int) (*mmap.File, error) { return nil, err } + m.gc() + return m.arenas[aid], nil } @@ -90,19 +144,16 @@ func (m *arenaManager) ensureEnoughMem() error { // Simply iterate from the last arena until enough memory is // available for a new arena to be loaded into memory tailAid, _ := m.md.getTail() - curAid := m.baseAid + len(m.arenas) - for m.conf.maxInMemArenas-m.inMem <= 0 { - curAid-- - - if curAid < 0 { - panic("not enough memory to hold arenas in memory") + headAid, _ := m.md.getHead() + for aid, aa := range m.arenas { + if m.inMem < m.conf.maxInMemArenas { + break } - if curAid == tailAid { + if aid == tailAid || aid == headAid || aa == nil { continue } - - if err := m.unloadArena(curAid); err != nil { + if err := m.unloadArena(aid); err != nil { return err } } @@ -112,36 +163,33 @@ func (m *arenaManager) ensureEnoughMem() error { // loadArena will fetch the arena into memory. func (m *arenaManager) loadArena(aid int) error { - if m.arenas[aid-m.baseAid] != nil { + if aa, ok := m.arenas[aid]; ok && aa != nil { return nil } - m.fullPath = append(m.fullPath[:0], m.dir...) - m.fullPath = append(m.fullPath, '/') - m.fullPath = strconv.AppendInt(m.fullPath, int64(aid), 10) - m.fullPath = append(m.fullPath, cArenaFileSuffix...) - aa, err := newArena(string(m.fullPath), m.conf.arenaSize) + arenaPath := m.getArenaPath(aid) + aa, err := newArena(arenaPath, m.conf.arenaSize) if err != nil { return err } m.inMem++ - m.arenas[aid-m.baseAid] = aa + m.arenas[aid] = aa return nil } // unloadArena will remove the arena from memory. func (m *arenaManager) unloadArena(aid int) error { - if m.arenas[aid-m.baseAid] == nil { + aa, ok := m.arenas[aid] + if !ok || aa == nil { return nil } - - if err := m.arenas[aid-m.baseAid].Unmap(); err != nil { + if err := aa.Unmap(); err != nil { return fmt.Errorf("error in unmap :: %w", err) } m.inMem-- - m.arenas[aid-m.baseAid] = nil + delete(m.arenas, aid) return nil } @@ -166,7 +214,7 @@ func (m *arenaManager) flush() error { // close unmaps all the arenas managed by arenaManager. func (m *arenaManager) close() error { var retErr error - for _, aa := range m.arenas { + for id, aa := range m.arenas { if aa == nil { continue } @@ -174,6 +222,7 @@ func (m *arenaManager) close() error { if err := aa.Unmap(); err != nil { retErr = err } + delete(m.arenas, id) } if retErr != nil { diff --git a/bigqueue.go b/bigqueue.go index 76143bd..a2c023f 100644 --- a/bigqueue.go +++ b/bigqueue.go @@ -232,3 +232,11 @@ func (q *MmapQueue) periodicFlush() { } } } + +// GC will trigger the garbage collection of the arena files. +func (q *MmapQueue) GC() { + q.lock.Lock() + defer q.lock.Unlock() + + q.am.gc() +} diff --git a/bigqueue_test.go b/bigqueue_test.go index 262d82d..118e9e4 100644 --- a/bigqueue_test.go +++ b/bigqueue_test.go @@ -1246,3 +1246,14 @@ func TestParallel(t *testing.T) { t.FailNow() } } + +func TestSetMaxArenasToKeepNegativeErr(t *testing.T) { + t.Parallel() + + testDir := t.TempDir() + arenaSize := os.Getpagesize() * 2 + bq, err := NewMmapQueue(testDir, SetArenaSize(arenaSize), SetMaxArenasToKeep(-1)) + if err != ErrNegativeMaxArenasToKeep || bq != nil { + t.Fatalf("expected max arenas to keep negative error, returned: %v", err) + } +} diff --git a/config.go b/config.go index 0c94987..4b8ff25 100644 --- a/config.go +++ b/config.go @@ -22,14 +22,17 @@ var ( ErrTooSmallArenaSize = errors.New("too small arena size") // ErrTooFewInMemArenas is returned when number of arenas allowed in memory < 3. ErrTooFewInMemArenas = errors.New("too few in memory arenas") + // ErrNegativeMaxArenasToKeep is returned when max arenas to keep is negative. + ErrNegativeMaxArenasToKeep = errors.New("max arenas to keep cannot be negative") ) // bqConfig stores all the configuration related to bigqueue. type bqConfig struct { - arenaSize int - maxInMemArenas int - flushMutOps int64 - flushPeriod time.Duration + arenaSize int + maxInMemArenas int + maxArenasToKeep int + flushMutOps int64 + flushPeriod time.Duration } // Option is function type that takes a bqConfig object @@ -39,10 +42,11 @@ type Option func(*bqConfig) error // newConfig creates an object of bqConfig with default parameter values. func newConfig() *bqConfig { return &bqConfig{ - arenaSize: cDefaultArenaSize, - maxInMemArenas: cMinMaxInMemArenas, - flushMutOps: cDefaultMutOps, - flushPeriod: cDefaultflushPeriod, + arenaSize: cDefaultArenaSize, + maxInMemArenas: cMinMaxInMemArenas, + maxArenasToKeep: 0, + flushMutOps: cDefaultMutOps, + flushPeriod: cDefaultflushPeriod, } } @@ -105,3 +109,18 @@ func SetPeriodicFlushDuration(flushPeriod time.Duration) Option { return nil } } + +// SetMaxArenasToKeep returns an Option that sets the maximum number of arena +// files to keep on disk. When a new arena is created, or when the queue is +// opened, any arena files with an ID less than (min(consumer heads) - maxArenasToKeep) +// will be deleted. If maxArenasToKeep is 0 (default), no arena files are deleted. +func SetMaxArenasToKeep(maxArenasToKeep int) Option { + return func(c *bqConfig) error { + if maxArenasToKeep < 0 { + return ErrNegativeMaxArenasToKeep + } + c.maxArenasToKeep = maxArenasToKeep + return nil + } +} + diff --git a/crash_recovery_test.go b/crash_recovery_test.go new file mode 100644 index 0000000..59dc39f --- /dev/null +++ b/crash_recovery_test.go @@ -0,0 +1,233 @@ +package bigqueue + +import ( + "bytes" + "fmt" + "os" + "os/exec" + "path/filepath" + "testing" + "time" +) + +// TestHelperCrashEnqueue is a helper process that constantly enqueues and then lets itself be killed. +func TestHelperCrashEnqueue(_ *testing.T) { + if os.Getenv("GO_WANT_HELPER_PROCESS") != "1" { + return + } + testDir := os.Getenv("CRASH_TEST_DIR") + bq, err := NewMmapQueue(testDir, SetArenaSize(4096)) + if err != nil { + os.Exit(2) + } + msg := []byte("enqueue crash testing message data block") + for { + if err := bq.Enqueue(msg); err != nil { + os.Exit(3) + } + } +} + +// TestCrashRecovery_Enqueue uses a multi-process approach. +// A child process executes a real Enqueue operation continuously and is forcefully killed. +// The parent process then checks if the queue file can be recovered without corruption. +func TestCrashRecovery_Enqueue(t *testing.T) { + testDir := filepath.Join(t.TempDir(), "test_crash_enqueue_exec") + os.MkdirAll(testDir, 0755) + + cmd := exec.Command(os.Args[0], "-test.run=TestHelperCrashEnqueue") + cmd.Env = append(os.Environ(), "GO_WANT_HELPER_PROCESS=1", "CRASH_TEST_DIR="+testDir) + if err := cmd.Start(); err != nil { + t.Fatalf("failed to start helper process: %v", err) + } + + // Wait briefly to allow some data to be written, then aggressively kill the process. + time.Sleep(100 * time.Millisecond) + _ = cmd.Process.Kill() + _ = cmd.Wait() // wait for it to actually die + + // Now we reopen the queue to ensure it hasn't been corrupted by the torn write. + bq, err := NewMmapQueue(testDir, SetArenaSize(4096)) + if err != nil { + t.Fatalf("failed to reopen queue after crash: %v", err) + } + defer bq.Close() + + // Verify the queue is readable without any internal panic or metadata inconsistency. + count := 0 + expectedMsg := []byte("enqueue crash testing message data block") + for !bq.IsEmpty() { + msg, err := bq.Dequeue() + if err != nil { + t.Fatalf("failed to dequeue recovered message at index %d: %v", count, err) + } + if !bytes.Equal(msg, expectedMsg) { + t.Fatalf("message logic corrupted, got %s", string(msg)) + } + count++ + } + + if count == 0 { + t.Logf("Queue was clean but empty. Try increasing sleep to verify writes if needed.") + } else { + t.Logf("Successfully read %d messages recovered cleanly after enqueue crash", count) + } +} + +// TestHelperCrashDequeue is a helper process that constantly dequeues. +func TestHelperCrashDequeue(_ *testing.T) { + if os.Getenv("GO_WANT_HELPER_PROCESS") != "1" { + return + } + testDir := os.Getenv("CRASH_TEST_DIR") + bq, err := NewMmapQueue(testDir, SetArenaSize(4096)) + if err != nil { + os.Exit(2) + } + for { + if bq.IsEmpty() { + time.Sleep(10 * time.Millisecond) + continue + } + _, err := bq.Dequeue() + if err != nil { + os.Exit(3) + } + // Slow down so the parent process can reliably kill it mid-flight + time.Sleep(1 * time.Millisecond) + } +} + +// TestCrashRecovery_Dequeue uses a multi-process approach. +// A child process dequeues messages continuously and gets forcefully killed. +// The parent process then checks if the unconsumed (or partially consumed but uncommitted) +// messages can still be safely dequeued without loss. +func TestCrashRecovery_Dequeue(t *testing.T) { + testDir := filepath.Join(t.TempDir(), "test_crash_dequeue_exec") + os.MkdirAll(testDir, 0755) + + // Pre-fill the queue + bq, err := NewMmapQueue(testDir, SetArenaSize(4096)) + if err != nil { + t.Fatalf("failed to create queue: %v", err) + } + msg := []byte("dequeue crash testing message data") + total := 1000 + for i := 0; i < total; i++ { + if err := bq.Enqueue(msg); err != nil { + t.Fatalf("failed to enqueue: %v", err) + } + } + bq.Close() // Ensure all data is on disk + + cmd := exec.Command(os.Args[0], "-test.run=TestHelperCrashDequeue") + cmd.Env = append(os.Environ(), "GO_WANT_HELPER_PROCESS=1", "CRASH_TEST_DIR="+testDir) + if err := cmd.Start(); err != nil { + t.Fatalf("failed to start helper process: %v", err) + } + + // Wait briefly to allow some reading to begin, then kill it. + time.Sleep(50 * time.Millisecond) + _ = cmd.Process.Kill() + _ = cmd.Wait() // Wait for it to actually die + + // Reopen and check consistency + bq2, err := NewMmapQueue(testDir, SetArenaSize(4096)) + if err != nil { + t.Fatalf("failed to reopen queue after crash: %v", err) + } + defer bq2.Close() + + remaining := 0 + for !bq2.IsEmpty() { + recoveredMsg, err := bq2.Dequeue() + if err != nil { + t.Fatalf("failed to dequeue recovered message at offset %d: %v", remaining, err) + } + if !bytes.Equal(recoveredMsg, msg) { + t.Fatalf("corrupted message found") + } + remaining++ + } + + t.Logf("Recovered %d messages from original %d after dequeue crash", remaining, total) + if remaining > total { + t.Fatalf("somehow got more messages than we inserted! %d > %d", remaining, total) + } +} + +// TestCrashRecovery_GC simulates a torn/interrupted GC state. +// We manually construct an inconsistent state (half-deleted arenas) and ensure the +// queue can still open, be correctly cleaned up, and gracefully recover. +func TestCrashRecovery_GC(t *testing.T) { + testDir := filepath.Join(t.TempDir(), "test_crash_gc_torn") + os.MkdirAll(testDir, 0755) + + arenaSize := 4096 + maxKeep := 0 + + // 1. Create a queue and fill it to create multiple arenas. + bq, err := NewMmapQueue(testDir, SetArenaSize(arenaSize), SetMaxArenasToKeep(maxKeep)) + if err != nil { + t.Fatalf("failed to create queue: %v", err) + } + + msg := make([]byte, 1000) + for i := 0; i < 20; i++ { // Creates ~5 arenas + if err := bq.Enqueue(msg); err != nil { + t.Fatalf("failed to enqueue: %v", err) + } + } + + // 2. Consume enough to advance head significantly. + // Since 1000 * 20 = 20k bytes, and arena is 4096, + // this should span about 5 arenas. + for i := 0; i < 18; i++ { + if _, err := bq.Dequeue(); err != nil { + t.Fatalf("failed to dequeue: %v", err) + } + } + + bq.GC() // Let it compute the min consumer head properly while still open. + + // Update global head since it's lazily updated in GC. + // Actually we should just read the consumer head because global head only advances + // when GC happens and there aren't many extra arenas. + consumerHeadAid, _ := bq.md.getConsumerHead(bq.dc) + t.Logf("Consumer head advanced to arena #%d", consumerHeadAid) + + if consumerHeadAid < 4 { + t.Logf("Warning: Expected consumer head to have advanced more") + } + + bq.Flush() + bq.Close() + + // 3. Simulate a crash during GC cleanup where only SOME files got deleted. + // Assume arenas 0 and 1 got deleted before crash, but 2 and 3 did not. + _ = os.Remove(filepath.Join(testDir, "0_arena.dat")) + _ = os.Remove(filepath.Join(testDir, "1_arena.dat")) + + // 4. Verification: Reopen the queue with torn GC state. + // Provide 0 for MaxArenasToKeep so that it aggressive cleans up. + bq2, err := NewMmapQueue(testDir, SetArenaSize(arenaSize), SetMaxArenasToKeep(0)) + if err != nil { + t.Fatalf("failed to reopen queue with torn GC state: %v", err) + } + + // Run GC explicitly to see if it cleans up the remainder (like 2_arena.dat and 3_arena.dat) + // We need to fetch global head to see what the system thinks is obsolete. + bq2.GC() + globalHeadAid, _ := bq2.md.getHead() + t.Logf("Computed Global head by GC: %d", globalHeadAid) + bq2.Close() + + // 5. Verify the remaining obsolete files are cleaned up gracefully. + // Note: We used maxKeep=0 for the recovery, so everything before globalHeadAid should be deleted. + for i := 0; i < globalHeadAid; i++ { + path := filepath.Join(testDir, fmt.Sprintf("%d_arena.dat", i)) + if _, err := os.Stat(path); err == nil { + t.Errorf("arena %d should have been deleted by recovery GC! (globalHeadAid=%d)", i, globalHeadAid) + } + } +} diff --git a/doc.go b/doc.go index 0e38ea9..83d8e91 100644 --- a/doc.go +++ b/doc.go @@ -5,21 +5,29 @@ // // Create or open a bigqueue: // -// bq, err := bigqueue.NewQueue("path/to/queue") +// bq, err := bigqueue.NewMmapQueue("path/to/queue") // defer bq.Close() // // bigqueue persists the data of the queue in multiple Arenas. // Each Arena is a file on disk that is mapped into memory (RAM) // using mmap syscall. Default size of each Arena is set to 128MB. +// Garbage Collection (GC) can be configured to automatically delete +// arena files that have been fully consumed: +// +// bq, err := bigqueue.NewMmapQueue("path/to/queue", bigqueue.SetMaxArenasToKeep(10)) +// +// Or triggered manually: +// +// bq.GC() // It is possible to create a bigqueue with custom Arena size: // -// bq, err := bigqueue.NewQueue("path/to/queue", bigqueue.SetArenaSize(4*1024)) +// bq, err := bigqueue.NewMmapQueue("path/to/queue", bigqueue.SetArenaSize(4*1024)) // defer bq.Close() // // bigqueue also allows setting up the maximum possible memory that it // can use. By default, the maximum memory is set to [3 x Arena Size]. // -// bq, err := bigqueue.NewQueue("path/to/queue", bigqueue.SetArenaSize(4*1024), +// bq, err := bigqueue.NewMmapQueue("path/to/queue", bigqueue.SetArenaSize(4*1024), // bigqueue.SetMaxInMemArenas(10)) // defer bq.Close() // @@ -33,11 +41,11 @@ // // This is how we can set these options: // -// bq, err := bigqueue.NewQueue("path/to/queue", bigqueue.SetPeriodicFlushOps(2)) +// bq, err := bigqueue.NewMmapQueue("path/to/queue", bigqueue.SetPeriodicFlushOps(2)) // // In this case, a flush is done after every two mutate operations. // -// bq, err := bigqueue.NewQueue("path/to/queue", bigqueue.SetPeriodicFlushDuration(time.Minute)) +// bq, err := bigqueue.NewMmapQueue("path/to/queue", bigqueue.SetPeriodicFlushDuration(time.Minute)) // // In this case, a flush is done after one minute elapses and an Enqueue/Dequeue is called. // diff --git a/gc_concurrency_test.go b/gc_concurrency_test.go new file mode 100644 index 0000000..3cefd02 --- /dev/null +++ b/gc_concurrency_test.go @@ -0,0 +1,141 @@ +package bigqueue + +import ( + "fmt" + "os" + "path/filepath" + "sync" + "testing" + "time" +) + +func TestArenaGC_Concurrency(t *testing.T) { + testDir := filepath.Join(os.TempDir(), "test_bigqueue_gc_concurrency") + os.RemoveAll(testDir) + os.MkdirAll(testDir, 0755) + defer os.RemoveAll(testDir) + + // Configuration: Small arena to trigger expansion and GC frequently + arenaSize := 4096 + maxKeep := 2 + bq, err := NewMmapQueue(testDir, SetArenaSize(arenaSize), SetMaxArenasToKeep(maxKeep)) + if err != nil { + t.Fatalf("failed to create queue: %v", err) + } + + const ( + numProducers = 3 + numConsumers = 2 + msgsPerProducer = 200 + msgSize = 512 // Approx 8 messages per arena + ) + + var wg sync.WaitGroup + startSignal := make(chan struct{}) + + // 1. Start Producers + for i := 0; i < numProducers; i++ { + wg.Add(1) + go func(pid int) { + defer wg.Done() + <-startSignal + for j := 0; j < msgsPerProducer; j++ { + msg := []byte(fmt.Sprintf("p%d-m%d", pid, j)) + payload := make([]byte, msgSize) + copy(payload, msg) + if err := bq.Enqueue(payload); err != nil { + t.Errorf("Producer %d failed: %v", pid, err) + return + } + } + }(i) + } + + // 2. Start Consumers + counts := make([]int, numConsumers) + var countMu sync.Mutex + for i := 0; i < numConsumers; i++ { + wg.Add(1) + go func(cid int) { + defer wg.Done() + <-startSignal + c, err := bq.NewConsumer(fmt.Sprintf("c%d", cid)) + if err != nil { + t.Errorf("Consumer %d failed to init: %v", cid, err) + return + } + + localCount := 0 + totalExpected := numProducers * msgsPerProducer + + // Use a deadline to prevent infinite hanging + deadline := time.Now().Add(10 * time.Second) + for time.Now().Before(deadline) { + _, err := c.Dequeue() + if err == nil { + localCount++ + } else if err == ErrEmptyQueue { + if localCount >= totalExpected { + break + } + time.Sleep(10 * time.Millisecond) + } else { + // Other errors might happen during concurrent GC + time.Sleep(1 * time.Millisecond) + } + + if localCount >= totalExpected { + break + } + } + + countMu.Lock() + counts[cid] = localCount + countMu.Unlock() + }(i) + } + + // 3. Periodic GC and New Consumer injection + stopGC := make(chan struct{}) + doneGC := make(chan struct{}) + go func() { + defer close(doneGC) + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + newConsumerID := 100 + for { + select { + case <-stopGC: + return + case <-ticker.C: + bq.GC() + // Inject a new consumer middle of the process + name := fmt.Sprintf("late-c%d", newConsumerID) + nc, err := bq.NewConsumer(name) + if err == nil { + // Just try to read whatever is available + _, _ = nc.Dequeue() + } + newConsumerID++ + } + } + }() + + // Ignite! + close(startSignal) + + // Wait for producers and consumers to finish work + wg.Wait() + close(stopGC) + <-doneGC + + // Final Verification + for i, c := range counts { + if c < numProducers*msgsPerProducer { + t.Errorf("Consumer %d only processed %d/%d messages", i, c, numProducers*msgsPerProducer) + } + } + + bq.Close() +} diff --git a/gc_test.go b/gc_test.go new file mode 100644 index 0000000..37314a8 --- /dev/null +++ b/gc_test.go @@ -0,0 +1,357 @@ +package bigqueue + +import ( + "fmt" + "os" + "path/filepath" + "testing" +) + +func TestArenaGC(t *testing.T) { + testDir := filepath.Join(os.TempDir(), "test_bigqueue_gc") + os.RemoveAll(testDir) + if err := os.MkdirAll(testDir, 0755); err != nil { + t.Fatalf("failed to create test dir: %v", err) + } + defer os.RemoveAll(testDir) + + arenaSize := 4096 + maxKeep := 1 + + bq, err := NewMmapQueue(testDir, SetArenaSize(arenaSize), SetMaxArenasToKeep(maxKeep)) + if err != nil { + t.Fatalf("failed to create queue: %v", err) + } + + // 1. Fill several arenas + msg := make([]byte, 1024) + for i := 0; i < 10; i++ { + if err := bq.Enqueue(msg); err != nil { + t.Fatalf("failed to enqueue: %v", i) + } + } + + tailAid, _ := bq.md.getTail() + if tailAid < 2 { + t.Fatalf("tailAid too small: %v", tailAid) + } + + // 2. Consume to advance head + consumer, _ := bq.NewConsumer("__default__") + for i := 0; i < 8; i++ { + _, err := consumer.Dequeue() + if err != nil { + t.Fatalf("failed to dequeue: %v", err) + } + } + + if err := bq.Flush(); err != nil { + t.Fatalf("failed to flush: %v", err) + } + + headAid, _ := bq.md.getConsumerHead(bq.dc) + bq.Close() + + // 3. Reopen to trigger GC + bq, err = NewMmapQueue(testDir, SetArenaSize(arenaSize), SetMaxArenasToKeep(maxKeep)) + if err != nil { + t.Fatalf("failed to reopen queue: %v", err) + } + bq.Close() + + limitAid := headAid - maxKeep + + // Arenas < limitAid should be deleted. + for i := 0; i < limitAid; i++ { + path := filepath.Join(testDir, fmt.Sprintf("%d_arena.dat", i)) + if _, err := os.Stat(path); !os.IsNotExist(err) { + t.Errorf("arena %d should have been deleted (headAid=%d, limitAid=%d)", i, headAid, limitAid) + } + } + + // Arenas >= limitAid should still exist. + for i := limitAid; i <= tailAid; i++ { + path := filepath.Join(testDir, fmt.Sprintf("%d_arena.dat", i)) + if _, err := os.Stat(path); os.IsNotExist(err) { + t.Errorf("arena %d should exist", i) + } + } +} + +func TestArenaGC_MultipleArenasAndGC(t *testing.T) { + testDir := filepath.Join(os.TempDir(), "test_bigqueue_gc_multi_arenas") + os.RemoveAll(testDir) + os.MkdirAll(testDir, 0755) + defer os.RemoveAll(testDir) + + arenaSize := 4096 // OS Page size (minimum allowed) + maxKeep := 1 // Keep only 1 arena before the current head + + bq, err := NewMmapQueue(testDir, SetArenaSize(arenaSize), SetMaxArenasToKeep(maxKeep)) + if err != nil { + t.Fatalf("failed to create queue: %v", err) + } + + // 1. Enqueue enough messages to create multiple arena files. + // 4096 byte arena. Message size will be half-ish to ensure split. + msg := make([]byte, 2000) + // 2000 + 8 = 2008 bytes. 2 messages per arena. + for i := 0; i < 20; i++ { + if err := bq.Enqueue(msg); err != nil { + t.Fatalf("failed to enqueue msg %d: %v", i, err) + } + } + + // Verify we have multiple arenas (approx 10 arenas) + tailAid, _ := bq.md.getTail() + if tailAid < 9 { + t.Fatalf("expected tailAid to be at least 9, got %d", tailAid) + } + + // 2. Consume some messages to move the head of the default consumer. + // Move it to arena 6. + c, _ := bq.NewConsumer("__default__") + for i := 0; i < 14; i++ { // 14 msgs = 7 arenas approx + if _, err := c.Dequeue(); err != nil { + t.Fatalf("failed to dequeue msg %d: %v", i, err) + } + } + bq.Flush() + + headAid, _ := bq.md.getConsumerHead(c.base) + if headAid < 6 { + t.Fatalf("expected headAid to be at least 6, got %d", headAid) + } + + // 3. Trigger GC. + // Since maxKeep is 1, it should keep arena 'headAid' and 'headAid-1'. + bq.GC() + + // 4. Verify deletions + limitAid := headAid - maxKeep + for i := 0; i < limitAid; i++ { + path := filepath.Join(testDir, fmt.Sprintf("%d_arena.dat", i)) + if _, err := os.Stat(path); !os.IsNotExist(err) { + t.Errorf("arena %d should have been deleted", i) + } + } + + // 5. Verify survivors + for i := limitAid; i <= int(tailAid); i++ { + path := filepath.Join(testDir, fmt.Sprintf("%d_arena.dat", i)) + if _, err := os.Stat(path); os.IsNotExist(err) { + t.Errorf("arena %d should still exist", i) + } + } + + // 6. Test: New consumer created AFTER GC. + // It should NOT be able to see or read deleted data (arenas 0-4). + // Its head AID should be global head (which was updated to 6). + cNew, err := bq.NewConsumer("after_gc_consumer") + if err != nil { + t.Fatalf("failed to create consumer after GC: %v", err) + } + + newAid, _ := bq.md.getConsumerHead(cNew.base) + if newAid != headAid { + t.Errorf("new consumer should start at global head %d, but got %d", headAid, newAid) + } + + // 7. Test: Existing consumer (the default one) after GC. + // We want to see if bq.am.baseAid not being updated causes issues. + // c is already at arena 6. Let's try to dequeue more. + // Enqueue one more message to be sure there's something to read. + if err := bq.Enqueue(msg); err != nil { + t.Fatalf("failed to enqueue extra msg: %v", err) + } + + data, err := c.Dequeue() + if err != nil { + t.Fatalf("existing consumer failed to dequeue after GC: %v", err) + } + if len(data) != 2000 { + t.Errorf("expected 2000 bytes, got %d", len(data)) + } + + // 8. Test: New consumer dequeue. + data, err = cNew.Dequeue() + if err != nil { + t.Fatalf("new consumer failed to dequeue: %v", err) + } + if len(data) != 2000 { + t.Errorf("expected 2000 bytes, got %d", len(data)) + } + + bq.Close() +} + +func TestArenaGC_MultipleConsumers(t *testing.T) { + testDir := filepath.Join(os.TempDir(), "test_bigqueue_gc_multi") + os.RemoveAll(testDir) + os.MkdirAll(testDir, 0755) + defer os.RemoveAll(testDir) + + arenaSize := 4096 + maxKeep := 1 + + bq, err := NewMmapQueue(testDir, SetArenaSize(arenaSize), SetMaxArenasToKeep(maxKeep)) + if err != nil { + t.Fatalf("failed to create queue: %v", err) + } + + msg := make([]byte, 1024) + for i := 0; i < 20; i++ { + bq.Enqueue(msg) + } + + c1, _ := bq.NewConsumer("c1") + c2, _ := bq.NewConsumer("c2") + + for i := 0; i < 10; i++ { + c1.Dequeue() + } + for i := 0; i < 4; i++ { + c2.Dequeue() + } + + bq.Flush() + + head1, _ := bq.md.getConsumerHead(c1.base) + head2, _ := bq.md.getConsumerHead(c2.base) + + minHead := head1 + if head2 < minHead { + minHead = head2 + } + + // Trigger GC via new arena + for i := 0; i < 10; i++ { + bq.Enqueue(msg) + } + + limitAid := minHead - maxKeep + for i := 0; i < limitAid; i++ { + path := filepath.Join(testDir, fmt.Sprintf("%d_arena.dat", i)) + if _, err := os.Stat(path); !os.IsNotExist(err) { + t.Errorf("arena %d should have been deleted (head1=%d, head2=%d)", i, head1, head2) + } + } + + path := filepath.Join(testDir, fmt.Sprintf("%d_arena.dat", limitAid)) + if _, err := os.Stat(path); os.IsNotExist(err) { + t.Errorf("arena %d should exist", limitAid) + } + + bq.Close() +} +func TestArenaGC_NewConsumerAfterGC(t *testing.T) { + testDir := filepath.Join(os.TempDir(), "test_bigqueue_gc_new_consumer") + os.RemoveAll(testDir) + os.MkdirAll(testDir, 0755) + defer os.RemoveAll(testDir) + + arenaSize := 4096 + maxKeep := 1 // Keep 1 arena before minHead + + bq, err := NewMmapQueue(testDir, SetArenaSize(arenaSize), SetMaxArenasToKeep(maxKeep)) + if err != nil { + t.Fatalf("failed to create queue: %v", err) + } + + // 1. Fill 5 arenas + msg := make([]byte, 1024) + for i := 0; i < 15; i++ { + bq.Enqueue(msg) + } + + // 2. Advance default consumer to arena 2 or more + c, _ := bq.NewConsumer("__default__") + // 4096 / (1024 + 8) = 3.96 -> 3 messages per arena. + // To reach arena 3, we need 3 * 3 = 9 messages. + // But let's just check what we get. + for i := 0; i < 12; i++ { + c.Dequeue() + } + bq.Flush() + + headAid, headOff := bq.md.getConsumerHead(c.base) + if headAid < 2 { + t.Fatalf("headAid should be at least 2, got %d", headAid) + } + + // 3. Trigger GC + bq.GC() + + // 4. Create new consumer. + cNew, err := bq.NewConsumer("newbie") + if err != nil { + t.Fatalf("failed to create newbie: %v", err) + } + newAid, newOff := bq.md.getConsumerHead(cNew.base) + if newAid != headAid || newOff != headOff { + t.Errorf("new consumer should start at global head (%d, %d), but got (%d, %d)", headAid, headOff, newAid, newOff) + } + + bq.Close() +} + +func TestArenaGC_OffsetPrecision(t *testing.T) { + testDir := filepath.Join(os.TempDir(), "test_bigqueue_gc_offset") + os.RemoveAll(testDir) + os.MkdirAll(testDir, 0755) + defer os.RemoveAll(testDir) + + arenaSize := 4096 + bq, err := NewMmapQueue(testDir, SetArenaSize(arenaSize), SetMaxArenasToKeep(1)) + if err != nil { + t.Fatalf("failed to create queue: %v", err) + } + + msg := make([]byte, 100) + for i := 0; i < 5; i++ { + bq.Enqueue(msg) + } + + c1, _ := bq.NewConsumer("c1") + c2, _ := bq.NewConsumer("c2") + + // c1 and c2 consume, but we MUST also advance the default consumer + // because GC considers all consumers. + dc, _ := bq.NewConsumer("__default__") + dc.Dequeue() // moves it to same as c1 + + // c1 consumes 1 msg + c1.Dequeue() + // c2 consumes 2 msgs + c2.Dequeue() + c2.Dequeue() + + // Both are in arena 0 + aid1, off1 := bq.md.getConsumerHead(c1.base) + aid2, off2 := bq.md.getConsumerHead(c2.base) + + if aid1 != aid2 { + t.Fatalf("both consumers should be in same arena for this test, got %d and %d", aid1, aid2) + } + if off2 <= off1 { + t.Fatalf("c2 offset should be greater than c1, got %d and %d", off2, off1) + } + + // Trigger GC + bq.GC() + + // Global head should be equal to c1 (the laggard) + globalAid, globalOff := bq.md.getHead() + if globalAid != aid1 || globalOff != off1 { + t.Errorf("global head should match c1 (min offset), expected (%d,%d), got (%d,%d)", aid1, off1, globalAid, globalOff) + } + + // Now create a new consumer and ensure it picks up global head + c3, _ := bq.NewConsumer("c3") + aid3, off3 := bq.md.getConsumerHead(c3.base) + if aid3 != aid1 || off3 != off1 { + t.Errorf("new consumer should match min office, expected (%d,%d), got (%d,%d)", aid1, off1, aid3, off3) + } + + bq.Close() +} diff --git a/metadata.go b/metadata.go index bd2e8d9..b603ecc 100644 --- a/metadata.go +++ b/metadata.go @@ -123,10 +123,10 @@ func (m *metadata) getHead() (int, int) { } // putHead stores the value of head in the metadata. -// func (m *metadata) putHead(aid, pos int) { -// m.aa.WriteUint64At(uint64(aid), 8) -// m.aa.WriteUint64At(uint64(pos), 16) -// } +func (m *metadata) putHead(aid, pos int) { + m.aa.WriteUint64At(uint64(aid), 8) + m.aa.WriteUint64At(uint64(pos), 16) +} // getTail reads the values of tail of the queue from the metadata arena. // Tail of a bigqueue, similar to head, can be identified using: