diff options
Diffstat (limited to '')
-rw-r--r-- | conn.go | 66 | ||||
-rw-r--r-- | encoding.go | 14 | ||||
-rw-r--r-- | evict.go | 28 | ||||
-rw-r--r-- | map.go | 95 | ||||
-rw-r--r-- | map_test.go | 89 | ||||
-rw-r--r-- | utils.go | 53 |
6 files changed, 212 insertions, 133 deletions
@@ -2,6 +2,7 @@ package cache import ( "errors" + "io" "os" "sync" "time" @@ -10,14 +11,16 @@ import ( ) type DB[K any, V any] struct { - file *os.File - Store Store - stop chan struct{} - wg sync.WaitGroup + file io.WriteSeeker + Store Store + snapshotTicker *pauseTimer + cleanupTicker *pauseTimer + stop chan struct{} + wg sync.WaitGroup } func OpenFile[K any, V any](filename string) (*DB[K, V], error) { - ret := &DB[K, V]{} + ret := OpenMem[K, V]() file, err := os.OpenFile(filename, os.O_RDWR, 0) if errors.Is(err, os.ErrNotExist) { file, err := os.Create(filename) @@ -25,48 +28,60 @@ func OpenFile[K any, V any](filename string) (*DB[K, V], error) { return nil, err } ret.file = file - ret.SetStratergy(StrategyNone) - ret.Clear() ret.Flush() } else if err == nil { - ret.Clear() + ret.Store.LoadSnapshot(file) ret.file = file - ret.Store.LoadSnapshot(ret.file) } else { return nil, err } - ret.wg.Add(1) - go ret.backgroundWorker() + return ret, nil } func OpenMem[K any, V any]() *DB[K, V] { - ret := &DB[K, V]{} - ret.SetStratergy(StrategyNone) + ret := &DB[K, V]{ + snapshotTicker: newPauseTimer(0), + } + ret.snapshotTicker.Stop() ret.Clear() + ret.Store.strategy.evict = &ret.Store.evict + ret.SetStratergy(StrategyNone) return ret } -func (d *DB[K, V]) SetStratergy(e EvictionPolicy) error { - strategy, err := e.ToStratergy(&d.Store.evict) - if err != nil { - return err - } - d.Store.strategy = strategy - return nil +func (d *DB[K, V]) Start() { + d.stop = make(chan struct{}) + d.wg.Add(1) + go d.backgroundWorker() } -func (d *DB[K, V]) SetMaxCost(e EvictionPolicy) { - d.Store.max_cost = d.Store.max_cost +func (d *DB[K, V]) SetStratergy(e EvictionPolicyType) error { + return d.Store.strategy.SetPolicy(e) +} + +func (d *DB[K, V]) SetMaxCost(e EvictionPolicyType) { + d.Store.maxCost = d.Store.maxCost +} + +func (d *DB[K, V]) SetSnapshotTime(t time.Duration) { + d.snapshotTicker.Reset(t) } func (d *DB[K, V]) backgroundWorker() { defer d.wg.Done() + + d.snapshotTicker.Resume() + defer d.snapshotTicker.Stop() + for { select { case <-d.stop: return - // TODO: Do house keeping + case <-d.snapshotTicker.C: + d.Flush() + case <-d.snapshotTicker.C: + d.Flush() } } } @@ -77,7 +92,10 @@ func (d *DB[K, V]) Close() { d.Flush() d.Clear() if d.file != nil { - d.file.Close() + closer, ok := d.file.(io.Closer) + if ok { + closer.Close() + } } } diff --git a/encoding.go b/encoding.go index ff07d2d..bd2d4b7 100644 --- a/encoding.go +++ b/encoding.go @@ -140,6 +140,8 @@ func (s *Store) Snapshot(w io.WriteSeeker) error { w.Seek(0, io.SeekStart) wr := NewEncoder(w) + wr.EncodeUint64(s.maxCost) + wr.EncodeUint64(uint64(s.strategy.Type)) wr.EncodeUint64(s.lenght) for v := s.evict.EvictNext; v != &s.evict; v = v.EvictNext { @@ -155,6 +157,18 @@ func (s *Store) LoadSnapshot(r io.ReadSeeker) error { r.Seek(0, io.SeekStart) rr := NewDecoder(r) + maxCost, err := rr.DecodeUint64() + if err != nil { + return err + } + s.maxCost = maxCost + + policy, err := rr.DecodeUint64() + if err != nil { + return err + } + s.strategy.SetPolicy(EvictionPolicyType(policy)) + lenght, err := rr.DecodeUint64() if err != nil { return err @@ -2,10 +2,10 @@ package cache import "errors" -type EvictionPolicy int +type EvictionPolicyType int const ( - StrategyNone EvictionPolicy = iota + StrategyNone EvictionPolicyType = iota StrategyFIFO StrategyLRU StrategyLFU @@ -17,23 +17,30 @@ type EvictionStrategies interface { Evict() *Node } -func (e EvictionPolicy) ToStratergy(evict *Node) (EvictionStrategies, error) { - store := map[EvictionPolicy]func() EvictionStrategies{ +type EvictionPolicy struct { + EvictionStrategies + Type EvictionPolicyType + evict *Node +} + +func (e *EvictionPolicy) SetPolicy(y EvictionPolicyType) error { + store := map[EvictionPolicyType]func() EvictionStrategies{ StrategyNone: func() EvictionStrategies { - return NoneStrategies{evict: evict} + return NoneStrategies{evict: e.evict} }, StrategyFIFO: func() EvictionStrategies { - return FIFOStrategies{evict: evict} + return FIFOStrategies{evict: e.evict} }, StrategyLRU: func() EvictionStrategies { - return LRUStrategies{evict: evict} + return LRUStrategies{evict: e.evict} }, } - factory, ok := store[e] + factory, ok := store[y] if !ok { - return nil, errors.New("Invalid Policy") + return errors.New("Invalid Policy") } - return factory(), nil + e.EvictionStrategies = factory() + return nil } type NoneStrategies struct { @@ -102,7 +109,6 @@ func (s LFUStrategies) OnInsert(node *Node) { node.EvictNext = node.EvictPrev.EvictNext node.EvictNext.EvictPrev = node node.EvictPrev.EvictNext = node - } func (s LFUStrategies) OnAccess(node *Node) { @@ -2,17 +2,11 @@ package cache import ( "bytes" - "hash/fnv" "iter" "sync" "time" ) -func zero[T any]() T { - var ret T - return ret -} - type Map[K any, V any] interface { Set(key K, value V, ttl time.Duration) error Get(key K) (V, time.Duration, error) @@ -32,16 +26,29 @@ type Node struct { EvictPrev *Node } +func (n *Node) IsValid() bool { + return n.Expiration.IsZero() || n.Expiration.After(time.Now()) +} + +func (n *Node) Detach() { +} + type Store struct { bucket []Node lenght uint64 cost uint64 evict Node - max_cost uint64 - strategy EvictionStrategies + maxCost uint64 + strategy EvictionPolicy mu sync.RWMutex } +func (s *Store) Init() { + s.Clear() + s.strategy.evict = &s.evict + s.strategy.SetPolicy(StrategyNone) +} + func (s *Store) Clear() { s.mu.Lock() defer s.mu.Unlock() @@ -54,14 +61,6 @@ func (s *Store) Clear() { s.evict.EvictPrev = &s.evict } -func hash(data []byte) uint64 { - hasher := fnv.New64() - if _, err := hasher.Write(data); err != nil { - panic(err) - } - return hasher.Sum64() -} - func lookup(s *Store, key []byte) (uint64, uint64) { hash := hash(key) return hash % uint64(len(s.bucket)), hash @@ -86,8 +85,11 @@ func (s *Store) Get(key []byte) ([]byte, time.Duration, bool) { for v := bucket.HashNext; v != bucket; v = v.HashNext { if bytes.Equal(key, v.Key) { + if !v.IsValid() { + return nil, 0, false + } s.strategy.OnAccess(v) - return v.Value, 0, true + return v.Value, time.Until(v.Expiration), true } } @@ -98,6 +100,9 @@ func resize(s *Store) { bucket := make([]Node, 2*len(s.bucket)) for v := s.evict.EvictNext; v != &s.evict; v = v.EvictNext { + if !v.IsValid() { + continue + } idx := v.Hash % uint64(len(bucket)) n := &bucket[idx] @@ -112,6 +117,23 @@ func resize(s *Store) { s.bucket = bucket } +func cleanup(s *Store) { + for v := s.evict.EvictNext; v != &s.evict; v = v.EvictNext { + if !v.IsValid() { + deleteNode(s, v) + } + } +} + +func evict(s *Store) bool { + n := s.strategy.Evict() + if n == nil { + return false + } + deleteNode(s, n) + return true +} + func (s *Store) Set(key []byte, value []byte, ttl time.Duration) { s.mu.Lock() defer s.mu.Unlock() @@ -138,10 +160,13 @@ func (s *Store) Set(key []byte, value []byte, ttl time.Duration) { } node := &Node{ - Hash: hash, - Key: key, - Value: value, - Expiration: time.Now().Add(ttl), + Hash: hash, + Key: key, + Value: value, + } + + if ttl != 0 { + node.Expiration = time.Now().Add(ttl) } node.HashPrev = bucket @@ -156,6 +181,21 @@ func (s *Store) Set(key []byte, value []byte, ttl time.Duration) { s.lenght = s.lenght + 1 } +func deleteNode(s *Store, v *Node) { + v.HashNext.HashPrev = v.HashPrev + v.HashPrev.HashNext = v.HashNext + v.HashNext = nil + v.HashPrev = nil + + v.EvictNext.EvictPrev = v.EvictPrev + v.EvictPrev.EvictNext = v.EvictNext + v.EvictNext = nil + v.EvictPrev = nil + + s.cost = s.cost - (uint64(len(v.Key)) + uint64(len(v.Value))) + s.lenght = s.lenght - 1 +} + func (s *Store) Delete(key []byte) bool { s.mu.Lock() defer s.mu.Unlock() @@ -168,18 +208,7 @@ func (s *Store) Delete(key []byte) bool { for v := bucket.HashNext; v != bucket; v = v.HashNext { if bytes.Equal(key, v.Key) { - v.HashNext.HashPrev = v.HashPrev - v.HashPrev.HashNext = v.HashNext - v.HashNext = nil - v.HashPrev = nil - - v.EvictNext.EvictPrev = v.EvictPrev - v.EvictPrev.EvictNext = v.EvictNext - v.EvictNext = nil - v.EvictPrev = nil - - s.cost = s.cost - (uint64(len(v.Key)) + uint64(len(v.Value))) - s.lenght = s.lenght - 1 + deleteNode(s, v) return true } } diff --git a/map_test.go b/map_test.go index b829fac..0a758d0 100644 --- a/map_test.go +++ b/map_test.go @@ -6,13 +6,22 @@ import ( "github.com/stretchr/testify/assert" ) +func setupTestStore(t testing.TB) *Store { + t.Helper() + + store := &Store{} + store.Init() + return store +} + func TestStoreGetSet(t *testing.T) { t.Parallel() t.Run("Exists", func(t *testing.T) { t.Parallel() - store := &New[any, any]().Store + store := setupTestStore(t) + want := []byte("Value") store.Set([]byte("Key"), want, 0) got, _, ok := store.Get([]byte("Key")) @@ -23,7 +32,8 @@ func TestStoreGetSet(t *testing.T) { t.Run("Not Exists", func(t *testing.T) { t.Parallel() - store := &New[any, any]().Store + store := setupTestStore(t) + _, _, ok := store.Get([]byte("Key")) assert.False(t, ok) }) @@ -31,7 +41,8 @@ func TestStoreGetSet(t *testing.T) { t.Run("Update", func(t *testing.T) { t.Parallel() - store := &New[any, any]().Store + store := setupTestStore(t) + store.Set([]byte("Key"), []byte("Other"), 0) want := []byte("Value") store.Set([]byte("Key"), want, 0) @@ -47,7 +58,8 @@ func TestStoreDelete(t *testing.T) { t.Run("Exists", func(t *testing.T) { t.Parallel() - store := &New[any, any]().Store + store := setupTestStore(t) + want := []byte("Value") store.Set([]byte("Key"), want, 0) ok := store.Delete([]byte("Key")) @@ -59,7 +71,8 @@ func TestStoreDelete(t *testing.T) { t.Run("Not Exists", func(t *testing.T) { t.Parallel() - store := &New[any, any]().Store + store := setupTestStore(t) + ok := store.Delete([]byte("Key")) assert.False(t, ok) }) @@ -68,7 +81,8 @@ func TestStoreDelete(t *testing.T) { func TestStoreClear(t *testing.T) { t.Parallel() - store := &New[any, any]().Store + store := setupTestStore(t) + want := []byte("Value") store.Set([]byte("Key"), want, 0) store.Clear() @@ -76,65 +90,9 @@ func TestStoreClear(t *testing.T) { assert.False(t, ok) } -func TestHashMapGetSet(t *testing.T) { - t.Parallel() - - t.Run("Exists", func(t *testing.T) { - t.Parallel() - - store := New[string, string]() - want := "Value" - store.Set("Key", want, 0) - got, _, err := store.Get("Key") - assert.Equal(t, want, got) - assert.NoError(t, err) - }) - - t.Run("Not Exists", func(t *testing.T) { - t.Parallel() - - store := New[string, string]() - _, _, err := store.Get("Key") - assert.ErrorIs(t, err, ErrKeyNotFound) - }) - - t.Run("Update", func(t *testing.T) { - t.Parallel() - - store := New[string, string]() - store.Set("Key", "Other", 0) - want := "Value" - store.Set("Key", want, 0) - got, _, err := store.Get("Key") - assert.Equal(t, want, got) - assert.NoError(t, err) - }) -} - -func TestHashMapDelete(t *testing.T) { - t.Parallel() - - t.Run("Exists", func(t *testing.T) { - t.Parallel() - - store := New[string, string]() - want := "Value" - store.Set("Key", want, 0) - err := store.Delete("Key") - assert.NoError(t, err) - }) - - t.Run("Not Exists", func(t *testing.T) { - t.Parallel() - - store := New[string, string]() - err := store.Delete("Key") - assert.ErrorIs(t, err, ErrKeyNotFound) - }) -} - func BenchmarkStoreGet(b *testing.B) { - store := &New[any, any]().Store + store := setupTestStore(b) + key := []byte("Key") store.Set(key, []byte("Store"), 0) b.SetBytes(1) @@ -146,7 +104,8 @@ func BenchmarkStoreGet(b *testing.B) { } func BenchmarkStoreSet(b *testing.B) { - store := &New[any, any]().Store + store := setupTestStore(b) + key := []byte("Key") b.SetBytes(1) b.RunParallel(func(pb *testing.PB) { diff --git a/utils.go b/utils.go new file mode 100644 index 0000000..9d1408e --- /dev/null +++ b/utils.go @@ -0,0 +1,53 @@ +package cache + +import ( + "hash/fnv" + "math" + "time" +) + +type pauseTimer struct { + *time.Ticker + duration time.Duration +} + +func newPauseTimer(d time.Duration) *pauseTimer { + ret := &pauseTimer{duration: d} + if d != 0 { + ret.Ticker = time.NewTicker(d) + } else { + ret.Ticker = time.NewTicker(math.MaxInt64) + ret.Reset(0) + } + return ret +} + +func (t *pauseTimer) Reset(d time.Duration) { + t.duration = d + if t.duration == 0 { + t.Stop() + } else { + t.Ticker.Reset(d) + } +} + +func (t *pauseTimer) Resume() { + t.Reset(t.GetDuration()) +} + +func (t *pauseTimer) GetDuration() time.Duration { + return t.duration +} + +func zero[T any]() T { + var ret T + return ret +} + +func hash(data []byte) uint64 { + hasher := fnv.New64() + if _, err := hasher.Write(data); err != nil { + panic(err) + } + return hasher.Sum64() +} |