From a04c538db5df71fb8effb971cc9f9e3cc77ce3af Mon Sep 17 00:00:00 2001 From: Marc Pervaz Boocha Date: Fri, 28 Feb 2025 18:09:36 +0530 Subject: Improved Concurency Part1 --- conn.go | 4 +- encoding.go | 6 +- encoding_test.go | 4 +- evict.go | 103 +++++++++----- evict_test.go | 427 +++++++++++++++++++++++++++++++++---------------------- store.go | 76 ++++++---- store_test.go | 82 +++++++++++ 7 files changed, 459 insertions(+), 243 deletions(-) diff --git a/conn.go b/conn.go index f34db81..ecd92aa 100644 --- a/conn.go +++ b/conn.go @@ -76,8 +76,8 @@ func (d *db) Start() { // SetConfig applies configuration options to the db. func (d *db) SetConfig(options ...Option) error { - d.Store.mu.Lock() - defer d.Store.mu.Unlock() + d.Store.Lock.Lock() + defer d.Store.Lock.Unlock() for _, opt := range options { if err := opt(d); err != nil { diff --git a/encoding.go b/encoding.go index 343e345..fe376b2 100644 --- a/encoding.go +++ b/encoding.go @@ -224,15 +224,15 @@ func (d *decoder) DecodeStore(s *store) error { v.EvictNext.EvictPrev = v v.EvictPrev.EvictNext = v - s.Cost = s.Cost + uint64(len(v.Key)) + uint64(len(v.Value)) + s.Cost = s.Cost + v.Cost() } return nil } func (s *store) Snapshot(w io.WriteSeeker) error { - s.mu.Lock() - defer s.mu.Unlock() + s.Lock.Lock() + defer s.Lock.Unlock() if _, err := w.Seek(0, io.SeekStart); err != nil { return err diff --git a/encoding_test.go b/encoding_test.go index 44104e4..dadde12 100644 --- a/encoding_test.go +++ b/encoding_test.go @@ -322,7 +322,7 @@ func TestEncodeDecodeStrorage(t *testing.T) { } } -func BenchmarkEncoder_EncodeStore(b *testing.B) { +func BenchmarkStoreLoadSnapshot(b *testing.B) { file, err := os.CreateTemp(b.TempDir(), "benchmark_test_") if err != nil { b.Fatal(err) @@ -363,7 +363,7 @@ func BenchmarkEncoder_EncodeStore(b *testing.B) { } } -func BenchmarkDecoder_DecodeStore(b *testing.B) { +func BenchmarkStoreLoadSnapsot(b *testing.B) { file, err := os.CreateTemp(b.TempDir(), "benchmark_test_") if err != nil { diff --git a/evict.go b/evict.go index 6b977b1..4ce577a 100644 --- a/evict.go +++ b/evict.go @@ -2,6 +2,7 @@ package cache import ( "errors" + "sync" ) // EvictionPolicyType defines the type of eviction policy. @@ -27,8 +28,9 @@ type evictionStrategies interface { // evictionPolicy struct holds the eviction strategy and its type. type evictionPolicy struct { evictionStrategies - Type EvictionPolicyType - evict *node + Type EvictionPolicyType + Sentinel *node + ListLock *sync.RWMutex } // pushEvict adds a node to the eviction list. @@ -43,19 +45,19 @@ func pushEvict(node *node, sentinnel *node) { func (e *evictionPolicy) SetPolicy(y EvictionPolicyType) error { store := map[EvictionPolicyType]func() evictionStrategies{ PolicyNone: func() evictionStrategies { - return fifoPolicy{evict: e.evict, shouldEvict: false} + return fifoPolicy{List: e.Sentinel, ShouldEvict: false, Lock: e.ListLock} }, PolicyFIFO: func() evictionStrategies { - return fifoPolicy{evict: e.evict, shouldEvict: true} + return fifoPolicy{List: e.Sentinel, Lock: e.ListLock} }, PolicyLRU: func() evictionStrategies { - return lruPolicy{evict: e.evict} + return lruPolicy{List: e.Sentinel, Lock: e.ListLock} }, PolicyLFU: func() evictionStrategies { - return lfuPolicy{evict: e.evict} + return lfuPolicy{List: e.Sentinel, Lock: e.ListLock} }, PolicyLTR: func() evictionStrategies { - return ltrPolicy{evict: e.evict} + return ltrPolicy{List: e.Sentinel, EvictZero: true, Lock: e.ListLock} }, } @@ -75,13 +77,17 @@ type evictOrderedPolicy interface { } type fifoPolicy struct { - evict *node - shouldEvict bool + List *node + Lock *sync.RWMutex + ShouldEvict bool } // OnInsert adds a node to the eviction list. func (s fifoPolicy) OnInsert(n *node) { - pushEvict(n, s.evict) + s.Lock.Lock() + defer s.Lock.Unlock() + + pushEvict(n, s.List) } // OnAccess is a no-op for fifoPolicy. @@ -96,25 +102,29 @@ func (fifoPolicy) OnUpdate(n *node) { // Evict returns the oldest node for fifoPolicy. func (s fifoPolicy) Evict() *node { - if s.shouldEvict && s.evict.EvictPrev != s.evict { - return s.evict.EvictPrev + if s.ShouldEvict && s.List.EvictPrev != s.List { + return s.List.EvictPrev } else { return nil } } func (s fifoPolicy) getEvict() *node { - return s.evict + return s.List } // lruPolicy struct represents the Least Recently Used eviction policy. type lruPolicy struct { - evict *node + List *node + Lock *sync.RWMutex } // OnInsert adds a node to the eviction list. func (s lruPolicy) OnInsert(n *node) { - pushEvict(n, s.evict) + s.Lock.Lock() + defer s.Lock.Unlock() + + pushEvict(n, s.List) } // OnUpdate moves the accessed node to the front of the eviction list. @@ -124,6 +134,9 @@ func (s lruPolicy) OnUpdate(n *node) { // OnAccess moves the accessed node to the front of the eviction list. func (s lruPolicy) OnAccess(n *node) { + s.Lock.Lock() + defer s.Lock.Unlock() + n.EvictNext.EvictPrev = n.EvictPrev n.EvictPrev.EvictNext = n.EvictNext s.OnInsert(n) @@ -131,25 +144,29 @@ func (s lruPolicy) OnAccess(n *node) { // Evict returns the least recently used node for lruPolicy. func (s lruPolicy) Evict() *node { - if s.evict.EvictPrev != s.evict { - return s.evict.EvictPrev + if s.List.EvictPrev != s.List { + return s.List.EvictPrev } else { return nil } } func (s lruPolicy) getEvict() *node { - return s.evict + return s.List } // lfuPolicy struct represents the Least Frequently Used eviction policy. type lfuPolicy struct { - evict *node + List *node + Lock *sync.RWMutex } // OnInsert adds a node to the eviction list and initializes its access count. func (s lfuPolicy) OnInsert(n *node) { - pushEvict(n, s.evict) + s.Lock.Lock() + defer s.Lock.Unlock() + + pushEvict(n, s.List) } // OnUpdate increments the access count of the node and reorders the list. @@ -159,9 +176,12 @@ func (s lfuPolicy) OnUpdate(n *node) { // OnAccess increments the access count of the node and reorders the list. func (s lfuPolicy) OnAccess(n *node) { + s.Lock.Lock() + defer s.Lock.Unlock() + n.Access++ - for v := n.EvictPrev; v.EvictPrev != s.evict; v = v.EvictPrev { + for v := n.EvictPrev; v.EvictPrev != s.List; v = v.EvictPrev { if v.Access <= n.Access { n.EvictNext.EvictPrev = n.EvictPrev n.EvictPrev.EvictNext = n.EvictNext @@ -178,7 +198,7 @@ func (s lfuPolicy) OnAccess(n *node) { n.EvictNext.EvictPrev = n.EvictPrev n.EvictPrev.EvictNext = n.EvictNext - n.EvictPrev = s.evict + n.EvictPrev = s.List n.EvictNext = n.EvictPrev.EvictNext n.EvictNext.EvictPrev = n n.EvictPrev.EvictNext = n @@ -186,29 +206,33 @@ func (s lfuPolicy) OnAccess(n *node) { // Evict returns the least frequently used node for LFU. func (s lfuPolicy) Evict() *node { - if s.evict.EvictPrev != s.evict { - return s.evict.EvictPrev + if s.List.EvictPrev != s.List { + return s.List.EvictPrev } else { return nil } } -func (s ltrPolicy) getEvict() *node { - return s.evict +func (s lfuPolicy) getEvict() *node { + return s.List } // ltrPolicy struct represents the Least Remaining Time eviction policy. type ltrPolicy struct { - evict *node - evictZero bool + List *node + Lock *sync.RWMutex + EvictZero bool } // OnInsert adds a node to the eviction list based on its TTL (Time To Live). // It places the node in the correct position in the list based on TTL. func (s ltrPolicy) OnInsert(n *node) { - pushEvict(n, s.evict) + s.Lock.Lock() + defer s.Lock.Unlock() - s.OnUpdate(n) + pushEvict(n, s.List) + + s.update(n) } // OnAccess is a no-op for ltrPolicy. @@ -220,11 +244,18 @@ func (s ltrPolicy) OnAccess(n *node) { // OnUpdate updates the position of the node in the eviction list based on its TTL. // It reorders the list to maintain the correct order based on TTL. func (s ltrPolicy) OnUpdate(n *node) { + s.Lock.Lock() + defer s.Lock.Unlock() + + s.update(n) +} + +func (s ltrPolicy) update(n *node) { if n.TTL() == 0 { return } - for v := n.EvictPrev; v.EvictPrev != s.evict; v = v.EvictPrev { + for v := n.EvictPrev; v.EvictPrev != s.List; v = v.EvictPrev { if v.TTL() == 0 { continue } @@ -242,7 +273,7 @@ func (s ltrPolicy) OnUpdate(n *node) { } } - for v := n.EvictNext; v.EvictNext != s.evict; v = v.EvictNext { + for v := n.EvictNext; v.EvictNext != s.List; v = v.EvictNext { if v.TTL() == 0 { continue } @@ -264,13 +295,13 @@ func (s ltrPolicy) OnUpdate(n *node) { // Evict returns the node with the least remaining time to live for ltrPolicy. // It returns the node at the end of the eviction list. func (s ltrPolicy) Evict() *node { - if s.evict.EvictPrev != s.evict && (s.evict.EvictPrev.TTL() != 0 || s.evictZero) { - return s.evict.EvictPrev + if s.List.EvictPrev != s.List && (s.List.EvictPrev.TTL() != 0 || s.EvictZero) { + return s.List.EvictPrev } return nil } -func (s lfuPolicy) getEvict() *node { - return s.evict +func (s ltrPolicy) getEvict() *node { + return s.List } diff --git a/evict_test.go b/evict_test.go index 5938b51..f597537 100644 --- a/evict_test.go +++ b/evict_test.go @@ -1,6 +1,8 @@ package cache import ( + "strconv" + "sync" "testing" "time" ) @@ -15,15 +17,30 @@ func createSentinel(tb testing.TB) *node { return n1 } +func createPolicy(tb testing.TB, policyType EvictionPolicyType, flag bool) evictOrderedPolicy { + tb.Helper() + + switch policyType { + case PolicyFIFO: + return &fifoPolicy{List: createSentinel(tb), ShouldEvict: flag, Lock: &sync.RWMutex{}} + case PolicyLTR: + return <rPolicy{List: createSentinel(tb), EvictZero: flag, Lock: &sync.RWMutex{}} + case PolicyLRU: + return &lruPolicy{List: createSentinel(tb), Lock: &sync.RWMutex{}} + case PolicyLFU: + return &lfuPolicy{List: createSentinel(tb), Lock: &sync.RWMutex{}} + } + tb.Fatalf("unknown policy type: %v", policyType) + return nil +} + func getListOrder(tb testing.TB, evict *node) []*node { tb.Helper() var order []*node - current := evict.EvictNext - for current != evict { + for current := evict.EvictNext; current != evict; current = current.EvictNext { order = append(order, current) - current = current.EvictNext } for _, n := range order { @@ -47,7 +64,7 @@ func checkOrder(tb testing.TB, policy evictOrderedPolicy, expected []*node) { for i, n := range expected { if order[i] != n { - tb.Errorf("element %v did not match: \nexpected: %#v\n got: %#v", i, n, order[i]) + tb.Errorf("element %v did not match: expected: %#v got: %#v", i, string(n.Key), string(order[i].Key)) } } } @@ -58,7 +75,7 @@ func TestFIFOPolicy(t *testing.T) { t.Run("OnInsert", func(t *testing.T) { t.Parallel() - policy := fifoPolicy{evict: createSentinel(t), shouldEvict: true} + policy := fifoPolicy{List: createSentinel(t), ShouldEvict: true, Lock: &sync.RWMutex{}} n0 := &node{Key: []byte("0")} n1 := &node{Key: []byte("1")} @@ -75,7 +92,7 @@ func TestFIFOPolicy(t *testing.T) { t.Run("Evict", func(t *testing.T) { t.Parallel() - policy := fifoPolicy{evict: createSentinel(t), shouldEvict: true} + policy := fifoPolicy{List: createSentinel(t), ShouldEvict: true, Lock: &sync.RWMutex{}} n0 := &node{Key: []byte("0")} n1 := &node{Key: []byte("1")} @@ -92,7 +109,7 @@ func TestFIFOPolicy(t *testing.T) { t.Run("Evict noEvict", func(t *testing.T) { t.Parallel() - policy := fifoPolicy{evict: createSentinel(t), shouldEvict: false} + policy := fifoPolicy{List: createSentinel(t), ShouldEvict: false, Lock: &sync.RWMutex{}} policy.OnInsert(&node{}) @@ -104,7 +121,7 @@ func TestFIFOPolicy(t *testing.T) { t.Run("Empty List", func(t *testing.T) { t.Parallel() - policy := fifoPolicy{evict: createSentinel(t)} + policy := fifoPolicy{List: createSentinel(t), ShouldEvict: true, Lock: &sync.RWMutex{}} if policy.Evict() != nil { t.Errorf("expected nil, got %#v", policy.Evict()) } @@ -118,7 +135,7 @@ func TestLRUPolicy(t *testing.T) { t.Run("OnInsert", func(t *testing.T) { t.Parallel() - policy := lruPolicy{evict: createSentinel(t)} + policy := lruPolicy{List: createSentinel(t), Lock: &sync.RWMutex{}} n0 := &node{Key: []byte("0")} n1 := &node{Key: []byte("1")} @@ -132,7 +149,7 @@ func TestLRUPolicy(t *testing.T) { t.Run("OnAccess", func(t *testing.T) { t.Parallel() - policy := lruPolicy{evict: createSentinel(t)} + policy := lruPolicy{List: createSentinel(t), Lock: &sync.RWMutex{}} n0 := &node{Key: []byte("0")} n1 := &node{Key: []byte("1")} @@ -151,7 +168,7 @@ func TestLRUPolicy(t *testing.T) { t.Run("Evict", func(t *testing.T) { t.Parallel() - policy := lruPolicy{evict: createSentinel(t)} + policy := lruPolicy{List: createSentinel(t), Lock: &sync.RWMutex{}} n0 := &node{Key: []byte("0")} n1 := &node{Key: []byte("1")} @@ -168,7 +185,7 @@ func TestLRUPolicy(t *testing.T) { t.Run("OnAccess End", func(t *testing.T) { t.Parallel() - policy := lruPolicy{evict: createSentinel(t)} + policy := lruPolicy{List: createSentinel(t), Lock: &sync.RWMutex{}} n0 := &node{Key: []byte("0")} n1 := &node{Key: []byte("1")} @@ -187,7 +204,7 @@ func TestLRUPolicy(t *testing.T) { t.Run("OnAccess Interleaved", func(t *testing.T) { t.Parallel() - policy := lruPolicy{evict: createSentinel(t)} + policy := lruPolicy{List: createSentinel(t), Lock: &sync.RWMutex{}} n0 := &node{Key: []byte("0")} n1 := &node{Key: []byte("1")} @@ -206,7 +223,7 @@ func TestLRUPolicy(t *testing.T) { t.Run("Empty", func(t *testing.T) { t.Parallel() - policy := lruPolicy{evict: createSentinel(t)} + policy := lruPolicy{List: createSentinel(t), Lock: &sync.RWMutex{}} if policy.Evict() != nil { t.Errorf("expected nil, got %#v", policy.Evict()) } @@ -220,7 +237,7 @@ func TestLFUPolicy(t *testing.T) { t.Run("OnInsert", func(t *testing.T) { t.Parallel() - policy := lfuPolicy{evict: createSentinel(t)} + policy := lfuPolicy{List: createSentinel(t), Lock: &sync.RWMutex{}} n0 := &node{Key: []byte("0")} n1 := &node{Key: []byte("1")} @@ -234,7 +251,7 @@ func TestLFUPolicy(t *testing.T) { t.Run("OnAccess", func(t *testing.T) { t.Parallel() - policy := lfuPolicy{evict: createSentinel(t)} + policy := lfuPolicy{List: createSentinel(t), Lock: &sync.RWMutex{}} n0 := &node{Key: []byte("0")} n1 := &node{Key: []byte("1")} @@ -253,7 +270,7 @@ func TestLFUPolicy(t *testing.T) { t.Run("Evict", func(t *testing.T) { t.Parallel() - policy := lfuPolicy{evict: createSentinel(t)} + policy := lfuPolicy{List: createSentinel(t), Lock: &sync.RWMutex{}} n0 := &node{Key: []byte("0")} n1 := &node{Key: []byte("1")} @@ -272,7 +289,7 @@ func TestLFUPolicy(t *testing.T) { t.Run("Evict After Multiple Accesses", func(t *testing.T) { t.Parallel() - policy := lfuPolicy{evict: createSentinel(t)} + policy := lfuPolicy{List: createSentinel(t), Lock: &sync.RWMutex{}} n0 := &node{Key: []byte("0")} n1 := &node{Key: []byte("1")} @@ -295,7 +312,7 @@ func TestLFUPolicy(t *testing.T) { t.Run("Empty List", func(t *testing.T) { t.Parallel() - policy := lfuPolicy{evict: createSentinel(t)} + policy := lfuPolicy{List: createSentinel(t), Lock: &sync.RWMutex{}} if policy.Evict() != nil { t.Errorf("expected nil, got %#v", policy.Evict()) } @@ -303,182 +320,246 @@ func TestLFUPolicy(t *testing.T) { }) } -func TestLTRPolicy(t *testing.T) { +func TestPolicyHooks(t *testing.T) { t.Parallel() - t.Run("OnInsert", func(t *testing.T) { - t.Parallel() - - t.Run("With TTL", func(t *testing.T) { - t.Parallel() - - policy := ltrPolicy{evict: createSentinel(t), evictZero: true} - - n0 := &node{Key: []byte("0"), Expiration: time.Now().Add(1 * time.Hour)} - n1 := &node{Key: []byte("1"), Expiration: time.Now().Add(2 * time.Hour)} - - policy.OnInsert(n0) - policy.OnInsert(n1) - - checkOrder(t, policy, []*node{n0, n1}) - }) - - t.Run("Without TTL", func(t *testing.T) { - t.Parallel() - - policy := ltrPolicy{evict: createSentinel(t), evictZero: true} - - n0 := &node{Key: []byte("0")} - n1 := &node{Key: []byte("1")} - - policy.OnInsert(n0) - policy.OnInsert(n1) - - checkOrder(t, policy, []*node{n1, n0}) - }) - }) - - t.Run("OnUpdate", func(t *testing.T) { - t.Parallel() - - t.Run("With TTL", func(t *testing.T) { - t.Parallel() - - policy := ltrPolicy{evict: createSentinel(t), evictZero: true} - - n0 := &node{Key: []byte("0"), Expiration: time.Now().Add(1 * time.Hour)} - n1 := &node{Key: []byte("1"), Expiration: time.Now().Add(2 * time.Hour)} - - policy.OnInsert(n0) - policy.OnInsert(n1) - - n0.Expiration = time.Now().Add(3 * time.Hour) - policy.OnUpdate(n0) - - checkOrder(t, policy, []*node{n1, n0}) - }) - - t.Run("With TTL Decrease", func(t *testing.T) { - t.Parallel() - - policy := ltrPolicy{evict: createSentinel(t), evictZero: true} - - n0 := &node{Key: []byte("0"), Expiration: time.Now().Add(1 * time.Hour)} - n1 := &node{Key: []byte("1"), Expiration: time.Now().Add(2 * time.Hour)} - - policy.OnInsert(n0) - policy.OnInsert(n1) - - n1.Expiration = time.Now().Add(30 * time.Minute) - policy.OnUpdate(n1) - - checkOrder(t, policy, []*node{n0, n1}) - }) - }) - - t.Run("Evict", func(t *testing.T) { - t.Parallel() - - t.Run("Evict", func(t *testing.T) { - t.Parallel() - - policy := ltrPolicy{evict: createSentinel(t), evictZero: true} - - n0 := &node{Key: []byte("0")} - n1 := &node{Key: []byte("1")} - - policy.OnInsert(n0) - policy.OnInsert(n1) - - evictedNode := policy.Evict() - if n0 != evictedNode { - t.Errorf("expected %#v, got %#v", n0, evictedNode) - } - }) - - t.Run("no evictZero", func(t *testing.T) { - t.Parallel() - - policy := ltrPolicy{evict: createSentinel(t), evictZero: false} - - n0 := &node{Key: []byte("0")} - n1 := &node{Key: []byte("1")} - - policy.OnInsert(n0) - policy.OnInsert(n1) + type test struct { + name string + flag bool + numOfNodes int + actions func(policy evictOrderedPolicy, nodes []*node) + expected func(nodes []*node) []*node + } - if policy.Evict() != nil { - t.Errorf("expected nil, got %#v", policy.Evict()) - } - }) + tests := []struct { + name string + policyType EvictionPolicyType + tests []test + }{ + { + name: "LTR", + policyType: PolicyLTR, + tests: []test{ + { + name: "OnInsert With TTL", + flag: true, + numOfNodes: 2, + actions: func(policy evictOrderedPolicy, nodes []*node) { + nodes[0].Expiration = time.Now().Add(1 * time.Hour) + nodes[1].Expiration = time.Now().Add(2 * time.Hour) + + policy.OnInsert(nodes[0]) + policy.OnInsert(nodes[1]) + }, + expected: func(nodes []*node) []*node { + return []*node{nodes[0], nodes[1]} + }, + }, + { + name: "OnInsert Without TTL", + flag: true, + numOfNodes: 2, + actions: func(policy evictOrderedPolicy, nodes []*node) { + policy.OnInsert(nodes[0]) + policy.OnInsert(nodes[1]) + }, + expected: func(nodes []*node) []*node { + return []*node{nodes[1], nodes[0]} + }, + }, + { + name: "OnUpdate With TTL", + flag: true, + numOfNodes: 2, + actions: func(policy evictOrderedPolicy, nodes []*node) { + nodes[0].Expiration = time.Now().Add(1 * time.Hour) + nodes[1].Expiration = time.Now().Add(2 * time.Hour) + policy.OnInsert(nodes[0]) + policy.OnInsert(nodes[1]) + + nodes[0].Expiration = time.Now().Add(3 * time.Hour) + policy.OnUpdate(nodes[0]) + }, + expected: func(nodes []*node) []*node { + return []*node{nodes[1], nodes[0]} + }, + }, + { + name: "OnUpdate With TTL Decrease", + flag: true, + numOfNodes: 2, + actions: func(policy evictOrderedPolicy, nodes []*node) { + nodes[0].Expiration = time.Now().Add(1 * time.Hour) + nodes[1].Expiration = time.Now().Add(2 * time.Hour) + policy.OnInsert(nodes[0]) + policy.OnInsert(nodes[1]) + + nodes[0].Expiration = time.Now().Add(20 * time.Minute) + policy.OnUpdate(nodes[0]) + }, + expected: func(nodes []*node) []*node { + return []*node{nodes[0], nodes[1]} + }, + }, + }, + }, + } - t.Run("Evict TTL", func(t *testing.T) { + for _, ts := range tests { + t.Run(ts.name, func(t *testing.T) { t.Parallel() - policy := ltrPolicy{evict: createSentinel(t), evictZero: true} + for _, tt := range ts.tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() - n0 := &node{Key: []byte("0"), Expiration: time.Now().Add(1 * time.Hour)} - n1 := &node{Key: []byte("1"), Expiration: time.Now().Add(2 * time.Hour)} + policy := createPolicy(t, ts.policyType, tt.flag) - policy.OnInsert(n0) - policy.OnInsert(n1) + nodes := make([]*node, tt.numOfNodes) + for i := range nodes { + nodes[i] = &node{Key: []byte(strconv.Itoa(i))} + } - evictedNode := policy.Evict() + tt.actions(policy, nodes) - if n1 != evictedNode { - t.Errorf("expected %#v, got %#v", n0, evictedNode) + checkOrder(t, policy, tt.expected(nodes)) + }) } }) + } +} - t.Run("Evict TTL Update", func(t *testing.T) { - t.Parallel() - - policy := ltrPolicy{evict: createSentinel(t), evictZero: true} - - n0 := &node{Key: []byte("0"), Expiration: time.Now().Add(1 * time.Hour)} - n1 := &node{Key: []byte("1"), Expiration: time.Now().Add(2 * time.Hour)} - - policy.OnInsert(n0) - policy.OnInsert(n1) - - n0.Expiration = time.Now().Add(3 * time.Hour) - policy.OnUpdate(n0) +func TestPolicyEvict(t *testing.T) { + t.Parallel() - evictedNode := policy.Evict() + type test struct { + name string + flag bool + numOfNodes int + actions func(policy evictOrderedPolicy, nodes []*node) + expected func(nodes []*node) *node + } - if n0 != evictedNode { - t.Errorf("expected %#v, got %#v", n0, evictedNode) - } - }) + tests := []struct { + name string + policyType EvictionPolicyType + tests []test + }{ + { + name: "LTR", + policyType: PolicyLTR, + tests: []test{ + { + name: "Evict", + flag: true, + numOfNodes: 2, + actions: func(policy evictOrderedPolicy, nodes []*node) { + policy.OnInsert(nodes[0]) + policy.OnInsert(nodes[1]) + }, + expected: func(nodes []*node) *node { + return nodes[0] + }, + }, + { + name: "no evictZero", + flag: false, + numOfNodes: 2, + actions: func(policy evictOrderedPolicy, nodes []*node) { + policy.OnInsert(nodes[0]) + policy.OnInsert(nodes[1]) + }, + expected: func(nodes []*node) *node { + return nil + }, + }, + { + name: "Evict TTL", + flag: true, + numOfNodes: 2, + actions: func(policy evictOrderedPolicy, nodes []*node) { + nodes[0].Expiration = time.Now().Add(1 * time.Hour) + nodes[1].Expiration = time.Now().Add(2 * time.Hour) + + policy.OnInsert(nodes[0]) + policy.OnInsert(nodes[1]) + }, + expected: func(nodes []*node) *node { + return nodes[1] + }, + }, + { + name: "Evict TTL Update", + flag: true, + numOfNodes: 2, + actions: func(policy evictOrderedPolicy, nodes []*node) { + nodes[0].Expiration = time.Now().Add(1 * time.Hour) + nodes[1].Expiration = time.Now().Add(2 * time.Hour) + + policy.OnInsert(nodes[0]) + policy.OnInsert(nodes[1]) + + nodes[0].Expiration = time.Now().Add(3 * time.Hour) + policy.OnUpdate(nodes[0]) + }, + expected: func(nodes []*node) *node { + return nodes[0] + }, + }, + { + name: "Evict TTL Update Down", + flag: true, + numOfNodes: 2, + actions: func(policy evictOrderedPolicy, nodes []*node) { + nodes[0].Expiration = time.Now().Add(1 * time.Hour) + nodes[1].Expiration = time.Now().Add(2 * time.Hour) + + policy.OnInsert(nodes[0]) + policy.OnInsert(nodes[1]) + + nodes[0].Expiration = time.Now().Add(20 * time.Minute) + policy.OnUpdate(nodes[0]) + }, + expected: func(nodes []*node) *node { + return nodes[1] + }, + }, + { + name: "Empty List", + flag: true, + numOfNodes: 0, + actions: func(policy evictOrderedPolicy, nodes []*node) {}, + expected: func(nodes []*node) *node { + return nil + }, + }, + }, + }, + } - t.Run("Evict TTL Update Down", func(t *testing.T) { + for _, ts := range tests { + t.Run(ts.name, func(t *testing.T) { t.Parallel() - policy := ltrPolicy{evict: createSentinel(t), evictZero: true} - - n0 := &node{Key: []byte("0"), Expiration: time.Now().Add(1 * time.Hour)} - n1 := &node{Key: []byte("1"), Expiration: time.Now().Add(2 * time.Hour)} + for _, tt := range ts.tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() - policy.OnInsert(n0) - policy.OnInsert(n1) + policy := createPolicy(t, ts.policyType, tt.flag) - n1.Expiration = time.Now().Add(20 * time.Minute) - policy.OnUpdate(n1) + nodes := make([]*node, tt.numOfNodes) + for i := range nodes { + nodes[i] = &node{Key: []byte(strconv.Itoa(i))} + } - evictedNode := policy.Evict() + tt.actions(policy, nodes) - if n1 != evictedNode { - t.Errorf("expected %#v, got %#v", n0, evictedNode) + evictedNode := policy.Evict() + if evictedNode != tt.expected(nodes) { + t.Errorf("expected %#v, got %#v", tt.expected(nodes), evictedNode) + } + }) } }) - - t.Run("Empty List", func(t *testing.T) { - t.Parallel() - - policy := ltrPolicy{evict: createSentinel(t), evictZero: true} - if policy.Evict() != nil { - t.Errorf("expected nil, got %#v", policy.Evict()) - } - }) - }) + } } diff --git a/store.go b/store.go index 5e110d9..366bbd3 100644 --- a/store.go +++ b/store.go @@ -70,13 +70,17 @@ type store struct { CleanupTicker *pausedtimer.PauseTimer Policy evictionPolicy - mu sync.Mutex + Lock sync.RWMutex + EvictLock sync.RWMutex } // Init initializes the store with default settings. func (s *store) Init() { s.Clear() - s.Policy.evict = &s.EvictList + s.Policy = evictionPolicy{ + ListLock: &s.EvictLock, + Sentinel: &s.EvictList, + } s.SnapshotTicker = pausedtimer.NewStopped(0) s.CleanupTicker = pausedtimer.NewStopped(10 * time.Second) @@ -87,8 +91,8 @@ func (s *store) Init() { // Clear removes all entries from the store. func (s *store) Clear() { - s.mu.Lock() - defer s.mu.Unlock() + s.Lock.Lock() + defer s.Lock.Unlock() s.Bucket = make([]node, initialBucketSize) s.Length = 0 @@ -132,13 +136,13 @@ func (s *store) lookup(key []byte) (*node, uint64, uint64) { // Get retrieves a value from the store by key with locking. func (s *store) Get(key []byte) ([]byte, time.Duration, bool) { - s.mu.Lock() - defer s.mu.Unlock() + s.Lock.Lock() + defer s.Lock.Unlock() v, _, _ := s.lookup(key) if v != nil { if !v.IsValid() { - deleteNode(s, v) + //deleteNode(s, v) return nil, 0, false } @@ -155,16 +159,28 @@ func (s *store) Get(key []byte) ([]byte, time.Duration, bool) { func (s *store) Resize() { bucket := make([]node, 2*len(s.Bucket)) - for v := s.EvictList.EvictNext; v != &s.EvictList; v = v.EvictNext { - idx := v.Hash % uint64(len(bucket)) + for i := range s.Bucket { + sentinel := &s.Bucket[i] + if sentinel.HashNext == nil { + continue + } + + var order []*node + for v := sentinel.HashNext; v != sentinel; v = v.HashNext { + order = append(order, v) + } - n := &bucket[idx] - lazyInitBucket(n) + for _, v := range order { + idx := v.Hash % uint64(len(bucket)) - v.HashPrev = n - v.HashNext = v.HashPrev.HashNext - v.HashNext.HashPrev = v - v.HashPrev.HashNext = v + n := &bucket[idx] + lazyInitBucket(n) + + v.HashPrev = n + v.HashNext = v.HashPrev.HashNext + v.HashNext.HashPrev = v + v.HashPrev.HashNext = v + } } s.Bucket = bucket @@ -172,8 +188,11 @@ func (s *store) Resize() { // cleanup removes expired entries from the store. func (s *store) Cleanup() { - s.mu.Lock() - defer s.mu.Unlock() + s.Lock.Lock() + defer s.Lock.Unlock() + + s.EvictLock.Lock() + defer s.EvictLock.Unlock() for v := s.EvictList.EvictNext; v != &s.EvictList; { n := v.EvictNext @@ -186,8 +205,11 @@ func (s *store) Cleanup() { // evict removes entries from the store based on the eviction policy. func (s *store) Evict() bool { - s.mu.Lock() - defer s.mu.Unlock() + s.Lock.Lock() + defer s.Lock.Unlock() + + s.EvictLock.Lock() + defer s.EvictLock.Unlock() if s.MaxCost == 0 { return true @@ -242,8 +264,8 @@ func (s *store) insert(key []byte, value []byte, ttl time.Duration) { // Set adds or updates a key-value pair in the store with locking. func (s *store) Set(key []byte, value []byte, ttl time.Duration) { - s.mu.Lock() - defer s.mu.Unlock() + s.Lock.Lock() + defer s.Lock.Unlock() v, _, _ := s.lookup(key) if v != nil { @@ -273,8 +295,8 @@ func deleteNode(s *store, v *node) { // Delete removes a key-value pair from the store with locking. func (s *store) Delete(key []byte) bool { - s.mu.Lock() - defer s.mu.Unlock() + s.Lock.Lock() + defer s.Lock.Unlock() v, _, _ := s.lookup(key) if v != nil { @@ -289,8 +311,8 @@ func (s *store) Delete(key []byte) bool { // UpdateInPlace retrieves a value from the store, processes it using the provided function, // and then sets the result back into the store with the same key. func (s *store) UpdateInPlace(key []byte, processFunc func([]byte) ([]byte, error), ttl time.Duration) error { - s.mu.Lock() - defer s.mu.Unlock() + s.Lock.Lock() + defer s.Lock.Unlock() v, _, _ := s.lookup(key) if v == nil { @@ -323,8 +345,8 @@ func (s *store) UpdateInPlace(key []byte, processFunc func([]byte) ([]byte, erro // Memorize attempts to retrieve a value from the store. If the retrieval fails, // it sets the result of the factory function into the store and returns that result. func (s *store) Memorize(key []byte, factory func() ([]byte, error), ttl time.Duration) ([]byte, error) { - s.mu.Lock() - defer s.mu.Unlock() + s.Lock.Lock() + defer s.Lock.Unlock() v, _, _ := s.lookup(key) if v != nil && v.IsValid() { diff --git a/store_test.go b/store_test.go index cf8e58e..01af0c3 100644 --- a/store_test.go +++ b/store_test.go @@ -448,7 +448,89 @@ func BenchmarkStoreGet(b *testing.B) { } } +func BenchmarkStoreGetParallel(b *testing.B) { + policy := map[string]EvictionPolicyType{ + "None": PolicyNone, + "FIFO": PolicyFIFO, + "LRU": PolicyLRU, + "LFU": PolicyLFU, + "LTR": PolicyLTR, + } + for k, v := range policy { + b.Run(k, func(b *testing.B) { + for n := 1; n <= 10000; n *= 10 { + b.Run(strconv.Itoa(n), func(b *testing.B) { + want := setupTestStore(b) + + if err := want.Policy.SetPolicy(v); err != nil { + b.Fatalf("unexpected error: %v", err) + } + + for i := range n - 1 { + buf := make([]byte, 8) + binary.LittleEndian.PutUint64(buf, uint64(i)) + want.Set(buf, buf, 0) + } + + key := []byte("Key") + want.Set(key, []byte("Store"), 0) + b.ReportAllocs() + + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + want.Get(key) + } + }) + }) + } + }) + } +} + func BenchmarkStoreSet(b *testing.B) { + policy := map[string]EvictionPolicyType{ + "None": PolicyNone, + "FIFO": PolicyFIFO, + "LRU": PolicyLRU, + "LFU": PolicyLFU, + "LTR": PolicyLTR, + } + for k, v := range policy { + b.Run(k, func(b *testing.B) { + for n := 1; n <= 10000; n *= 10 { + b.Run(strconv.Itoa(n), func(b *testing.B) { + want := setupTestStore(b) + + if err := want.Policy.SetPolicy(v); err != nil { + b.Fatalf("unexpected error: %v", err) + } + + for i := range n - 1 { + buf := make([]byte, 8) + binary.LittleEndian.PutUint64(buf, uint64(i)) + want.Set(buf, buf, 0) + } + + key := []byte("Key") + store := []byte("Store") + + b.ReportAllocs() + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + want.Set(key, store, 0) + } + }) + }) + } + }) + } +} + +func BenchmarkStoreSetParallel(b *testing.B) { policy := map[string]EvictionPolicyType{ "None": PolicyNone, "FIFO": PolicyFIFO, -- cgit v1.2.3-70-g09d2