modelRT/real-time-data/cache.go

144 lines
3.7 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Package realtimedata define real time data operation functions
package realtimedata
import (
"container/heap"
"fmt"
"sync"
"time"
)
// DataItem define structure for storing data, insertion time, and last access time
type DataItem struct {
Data interface{}
InsertTime time.Time
LastAccess time.Time
Index int
}
// priorityQueueItem implement heap.Interface
type priorityQueueItem struct {
item *DataItem
prio int64
}
// priorityQueue implement heap.Interface
type priorityQueue []*priorityQueueItem
func (pq priorityQueue) Len() int { return len(pq) }
func (pq priorityQueue) Less(i, j int) bool {
// Note: use the UnixNano value of LastAccess time, with smaller values indicating earlier. Therefore, we need to perform reverse comparison to implement LRU like behavior
return pq[i].prio < pq[j].prio
}
func (pq priorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].item.Index = i
pq[j].item.Index = j
}
func (pq *priorityQueue) Push(x interface{}) {
n := len(*pq)
queueItem := x.(*priorityQueueItem)
queueItem.item.Index = n
*pq = append(*pq, queueItem)
}
func (pq *priorityQueue) Pop() interface{} {
old := *pq
n := len(old)
queueItem := old[n-1]
queueItem.item.Index = -1
*pq = old[0 : n-1]
return queueItem
}
// update updates the priority of an item in the priority queue.
func (pq *priorityQueue) update(item *DataItem, newPrio int64) {
item.LastAccess = time.Unix(0, newPrio)
pqItem := (*pq)[item.Index]
pqItem.prio = newPrio
heap.Fix(pq, item.Index)
}
type TimeCache struct {
mu sync.Mutex
capacity int
items map[interface{}]*DataItem
pq priorityQueue
}
// NewTimeCache 创建一个新的 TimeCache 实例
func NewTimeCache(capacity int) *TimeCache {
return &TimeCache{
capacity: capacity,
items: make(map[interface{}]*DataItem),
pq: make(priorityQueue, 0, capacity),
}
}
// Add 添加一个新项到缓存中
func (tc *TimeCache) Add(data interface{}) {
tc.mu.Lock()
defer tc.mu.Unlock()
now := time.Now()
// 如果数据已经存在,则更新它的最后访问时间
if existingItem, ok := tc.items[data]; ok {
prio := existingItem.LastAccess.UnixNano()
tc.pq.update(existingItem, prio+1) // 实际上我们不需要 +1但这里为了演示如何更新优先级
// 注意:上面的更新操作是多余的,因为我们总是可以重新插入一个新项并删除旧项
// 因此,下面的代码将替换上面的更新操作
tc.pq.Remove(tc.pq.search(existingItem))
delete(tc.items, data)
}
newItem := &DataItem{
Data: data,
InsertTime: now,
LastAccess: now,
Index: -1, // 初始时索引无效
}
pqItem := &priorityQueueItem{
item: newItem,
prio: newItem.LastAccess.UnixNano(),
}
heap.Push(&tc.pq, pqItem)
newItem.Index = pqItem.item.Index
tc.items[data] = newItem
// 如果缓存超过了容量,移除最后访问时间最早的项
if tc.pq.Len() > tc.capacity {
oldestItem := heap.Pop(&tc.pq).(*priorityQueueItem).item
delete(tc.items, oldestItem.Data)
}
}
// search 辅助函数,用于在优先级队列中搜索一个项
func (pq *priorityQueue) search(item *DataItem) int {
for i, pqItem := range *pq {
if pqItem.item == item {
return i
}
}
return -1 // 未找到
}
// Remove 辅助函数,用于从优先级队列中移除一个项(注意:这不是 heap.Interface 的一部分)
func (pq *priorityQueue) Remove(index int) {
pqItem := (*pq)[index]
pqItem.item.Index = -1 // 将索引设为无效值
*pq = append((*pq)[:index], (*pq)[index+1:]...)
heap.Fix(pq, index)
}
func (tc *TimeCache) PrintCache() {
tc.mu.Lock()
defer tc.mu.Unlock()
for _, item := range tc.items {
fmt.Printf("Data: %v, InsertTime: %s, LastAccess: %s\n", item.Data, item.InsertTime, item.LastAccess)
}
}