Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,28 +37,40 @@ defer bq.Close()
Bigqueue also allows setting up the maximum possible memory that it
can use. By default, the maximum memory is set to [3 x Arena Size].
```go
bq, err := bigqueue.NewQueue("path/to/queue", bigqueue.SetArenaSize(4*1024),
bigqueue.SetMaxInMemArenas(10))
bq, err := bigqueue.NewMmapQueue("path/to/queue", bigqueue.SetArenaSize(4*1024),
bigqueue.SetMaxInMemArenas(10), bigqueue.SetMaxArenasToKeep(20))
defer bq.Close()
```
In this case, bigqueue will never allocate more memory than `4KB*10=40KB`. This
memory is above and beyond the memory used in buffers for copying data.
The `SetMaxArenasToKeep(20)` option tells bigqueue to retain at most 20 old
arena files on disk after they have been consumed; any older ones will
be automatically deleted.

Bigqueue allows to set periodic flush based on either elapsed time or number
of mutate (enqueue/dequeue) operations. Flush syncs the in memory changes of all
memory mapped files with disk. *This is a best effort flush*.

This is how we can set these options:
```go
bq, err := bigqueue.NewQueue("path/to/queue", bigqueue.SetPeriodicFlushOps(2))
bq, err := bigqueue.NewMmapQueue("path/to/queue", bigqueue.SetPeriodicFlushOps(2))
```
In this case, a flush is done after every two mutate operations.

```go
bq, err := bigqueue.NewQueue("path/to/queue", bigqueue.SetPeriodicFlushDuration(time.Minute))
bq, err := bigqueue.NewMmapQueue("path/to/queue", bigqueue.SetPeriodicFlushDuration(time.Minute))
```
In this case, a flush is done after one minute elapses and an Enqueue/Dequeue is called.

### Garbage Collection
By default, bigqueue automatically cleans up old arena files that have been
consumed by **all** consumers (based on the `SetMaxArenasToKeep` setting).
Comment thread
eeliu marked this conversation as resolved.
You can also manually trigger a garbage collection:
```go
bq.GC()
```


Write to bigqueue:
```go
err := bq.Enqueue([]byte("elem"))
Expand Down
137 changes: 93 additions & 44 deletions arenamanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package bigqueue

import (
"fmt"
"os"
"path"
Comment thread
eeliu marked this conversation as resolved.
"strconv"
"syscall"

"github.com/grandecola/mmap"
Expand All @@ -15,49 +15,101 @@ const (

// arenaManager manages all the arenas for a bigqueue
type arenaManager struct {
dir string
conf *bqConfig
md *metadata
baseAid int
arenas []*mmap.File
inMem int
fullPath []byte
dir string
conf *bqConfig
md *metadata
arenas map[int]*mmap.File
inMem int
}

// newArenaManager returns a pointer to new arenaManager.
func newArenaManager(dir string, conf *bqConfig, md *metadata) (*arenaManager, error) {
headAid, _ := md.getHead()
tailAid, _ := md.getTail()

numArenas := tailAid + 1 - headAid
arenas := make([]*mmap.File, numArenas)
am := &arenaManager{
dir: path.Clean(dir),
conf: conf,
md: md,
baseAid: headAid,
arenas: arenas,
dir: path.Clean(dir),
conf: conf,
md: md,
arenas: make(map[int]*mmap.File),
}

// we load the tail arena into memory
if err := am.loadArena(tailAid); err != nil {
return nil, err
}

am.gc()

return am, nil
}

// getArena returns arena for a given arena ID
func (m *arenaManager) getArena(aid int) (*mmap.File, error) {
relAid := aid - m.baseAid
if relAid == len(m.arenas) {
m.arenas = append(m.arenas, nil)
// gc deletes the arena files that are no longer needed.
func (m *arenaManager) gc() {
if m.conf.maxArenasToKeep <= 0 {
return
}
Comment thread
eeliu marked this conversation as resolved.

// find the minimum consumer head aid
minHeadAid, minHeadOff := -1, -1
for _, base := range m.md.co {
aid, off := m.md.getConsumerHead(base)
if minHeadAid == -1 || aid < minHeadAid || (aid == minHeadAid && off < minHeadOff) {
minHeadAid = aid
minHeadOff = off
}
}

if minHeadAid == -1 {
return
}

// update global head to the minimum among all consumers.
// this is to ensure new consumers start from the earliest available data.
m.md.putHead(minHeadAid, minHeadOff)
_ = m.md.flush()
Comment thread
eeliu marked this conversation as resolved.

// we keep maxArenasToKeep arenas before the minHeadAid
// everything before (minHeadAid - maxArenasToKeep) can be deleted.
limitAid := minHeadAid - m.conf.maxArenasToKeep
if limitAid <= 0 {
return
}

// startAid - we can delete from 0 up to limitAid-1.
for aid := 0; aid < limitAid; aid++ {
// remove from memory if loaded
if aa, ok := m.arenas[aid]; ok && aa != nil {
_ = m.unloadArena(aid)
}
delete(m.arenas, aid)

arenaPath := m.getArenaPath(aid)
if _, err := os.Stat(arenaPath); err == nil {
if err := os.Remove(arenaPath); err != nil {
} else {
}
}
Comment thread
eeliu marked this conversation as resolved.
}
aa := m.arenas[relAid]
if aa != nil {
}

// getArenaPath returns the full path for a given arena ID.
func (m *arenaManager) getArenaPath(aid int) string {
fileName := fmt.Sprintf("%d%s", aid, cArenaFileSuffix)
return path.Join(m.dir, fileName)
}
Comment thread
eeliu marked this conversation as resolved.

// loadOrGetArena returns arena for a given arena ID
func (m *arenaManager) getArena(aid int) (*mmap.File, error) {
if aa, ok := m.arenas[aid]; ok && aa != nil {
return aa, nil
}

// if this is a new arena being requested (tail expansion)
// getTail doesn't help here because writer might be calling it before metadata update
// but basically if it's not in the map, we try to load or create it.
// In the original slice logic, relAid == len(m.arenas) triggered GC.
// Here we can check if it's beyond a certain aid or just trigger GC occasionally.

// before we get a new arena into memory, we need to ensure that after fetching
// a new arena into memory, we do not cross the provided memory limit.
if err := m.ensureEnoughMem(); err != nil {
Expand All @@ -69,6 +121,8 @@ func (m *arenaManager) getArena(aid int) (*mmap.File, error) {
return nil, err
}

m.gc()

return m.arenas[aid], nil
}

Expand All @@ -90,19 +144,16 @@ func (m *arenaManager) ensureEnoughMem() error {
// Simply iterate from the last arena until enough memory is
// available for a new arena to be loaded into memory
tailAid, _ := m.md.getTail()
Comment thread
eeliu marked this conversation as resolved.
curAid := m.baseAid + len(m.arenas)
for m.conf.maxInMemArenas-m.inMem <= 0 {
curAid--

if curAid < 0 {
panic("not enough memory to hold arenas in memory")
headAid, _ := m.md.getHead()
for aid, aa := range m.arenas {
if m.inMem < m.conf.maxInMemArenas {
break
}

if curAid == tailAid {
if aid == tailAid || aid == headAid || aa == nil {
continue
}

if err := m.unloadArena(curAid); err != nil {
if err := m.unloadArena(aid); err != nil {
return err
}
}
Comment on lines +147 to 159
Expand All @@ -112,36 +163,33 @@ func (m *arenaManager) ensureEnoughMem() error {

// loadArena will fetch the arena into memory.
func (m *arenaManager) loadArena(aid int) error {
if m.arenas[aid-m.baseAid] != nil {
if aa, ok := m.arenas[aid]; ok && aa != nil {
return nil
}

m.fullPath = append(m.fullPath[:0], m.dir...)
m.fullPath = append(m.fullPath, '/')
m.fullPath = strconv.AppendInt(m.fullPath, int64(aid), 10)
m.fullPath = append(m.fullPath, cArenaFileSuffix...)
aa, err := newArena(string(m.fullPath), m.conf.arenaSize)
arenaPath := m.getArenaPath(aid)
aa, err := newArena(arenaPath, m.conf.arenaSize)
if err != nil {
return err
}

m.inMem++
m.arenas[aid-m.baseAid] = aa
m.arenas[aid] = aa
return nil
}

// unloadArena will remove the arena from memory.
func (m *arenaManager) unloadArena(aid int) error {
if m.arenas[aid-m.baseAid] == nil {
aa, ok := m.arenas[aid]
if !ok || aa == nil {
return nil
}

if err := m.arenas[aid-m.baseAid].Unmap(); err != nil {
if err := aa.Unmap(); err != nil {
return fmt.Errorf("error in unmap :: %w", err)
}

m.inMem--
m.arenas[aid-m.baseAid] = nil
delete(m.arenas, aid)
return nil
}

Expand All @@ -166,14 +214,15 @@ func (m *arenaManager) flush() error {
// close unmaps all the arenas managed by arenaManager.
func (m *arenaManager) close() error {
var retErr error
for _, aa := range m.arenas {
for id, aa := range m.arenas {
if aa == nil {
continue
}

if err := aa.Unmap(); err != nil {
retErr = err
}
delete(m.arenas, id)
}

if retErr != nil {
Expand Down
8 changes: 8 additions & 0 deletions bigqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,3 +232,11 @@ func (q *MmapQueue) periodicFlush() {
}
}
}

// GC will trigger the garbage collection of the arena files.
func (q *MmapQueue) GC() {
q.lock.Lock()
defer q.lock.Unlock()

q.am.gc()
}
11 changes: 11 additions & 0 deletions bigqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1246,3 +1246,14 @@ func TestParallel(t *testing.T) {
t.FailNow()
}
}

func TestSetMaxArenasToKeepNegativeErr(t *testing.T) {
t.Parallel()

testDir := t.TempDir()
arenaSize := os.Getpagesize() * 2
bq, err := NewMmapQueue(testDir, SetArenaSize(arenaSize), SetMaxArenasToKeep(-1))
if err != ErrNegativeMaxArenasToKeep || bq != nil {
t.Fatalf("expected max arenas to keep negative error, returned: %v", err)
}
}
35 changes: 27 additions & 8 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,17 @@ var (
ErrTooSmallArenaSize = errors.New("too small arena size")
// ErrTooFewInMemArenas is returned when number of arenas allowed in memory < 3.
ErrTooFewInMemArenas = errors.New("too few in memory arenas")
// ErrNegativeMaxArenasToKeep is returned when max arenas to keep is negative.
ErrNegativeMaxArenasToKeep = errors.New("max arenas to keep cannot be negative")
)

// bqConfig stores all the configuration related to bigqueue.
type bqConfig struct {
arenaSize int
maxInMemArenas int
flushMutOps int64
flushPeriod time.Duration
arenaSize int
maxInMemArenas int
maxArenasToKeep int
flushMutOps int64
flushPeriod time.Duration
}

// Option is function type that takes a bqConfig object
Expand All @@ -39,10 +42,11 @@ type Option func(*bqConfig) error
// newConfig creates an object of bqConfig with default parameter values.
func newConfig() *bqConfig {
return &bqConfig{
arenaSize: cDefaultArenaSize,
maxInMemArenas: cMinMaxInMemArenas,
flushMutOps: cDefaultMutOps,
flushPeriod: cDefaultflushPeriod,
arenaSize: cDefaultArenaSize,
maxInMemArenas: cMinMaxInMemArenas,
maxArenasToKeep: 0,
flushMutOps: cDefaultMutOps,
flushPeriod: cDefaultflushPeriod,
}
}

Expand Down Expand Up @@ -105,3 +109,18 @@ func SetPeriodicFlushDuration(flushPeriod time.Duration) Option {
return nil
}
}

// SetMaxArenasToKeep returns an Option that sets the maximum number of arena
// files to keep on disk. When a new arena is created, or when the queue is
// opened, any arena files with an ID less than (min(consumer heads) - maxArenasToKeep)
// will be deleted. If maxArenasToKeep is 0 (default), no arena files are deleted.
func SetMaxArenasToKeep(maxArenasToKeep int) Option {
return func(c *bqConfig) error {
if maxArenasToKeep < 0 {
return ErrNegativeMaxArenasToKeep
}
c.maxArenasToKeep = maxArenasToKeep
return nil
}
}

Loading