144 lines
3.7 KiB
Go
144 lines
3.7 KiB
Go
// Package realtimedata define real time data operation functions
|
||
package realtimedata
|
||
|
||
import (
|
||
"container/heap"
|
||
"fmt"
|
||
"sync"
|
||
"time"
|
||
)
|
||
|
||
// DataItem define structure for storing data, insertion time, and last access time
|
||
type DataItem struct {
|
||
Data interface{}
|
||
InsertTime time.Time
|
||
LastAccess time.Time
|
||
Index int
|
||
}
|
||
|
||
// priorityQueueItem implement heap.Interface
|
||
type priorityQueueItem struct {
|
||
item *DataItem
|
||
prio int64
|
||
}
|
||
|
||
// priorityQueue implement heap.Interface
|
||
type priorityQueue []*priorityQueueItem
|
||
|
||
func (pq priorityQueue) Len() int { return len(pq) }
|
||
|
||
func (pq priorityQueue) Less(i, j int) bool {
|
||
// Note: use the UnixNano value of LastAccess time, with smaller values indicating earlier. Therefore, we need to perform reverse comparison to implement LRU like behavior
|
||
return pq[i].prio < pq[j].prio
|
||
}
|
||
|
||
func (pq priorityQueue) Swap(i, j int) {
|
||
pq[i], pq[j] = pq[j], pq[i]
|
||
pq[i].item.Index = i
|
||
pq[j].item.Index = j
|
||
}
|
||
|
||
func (pq *priorityQueue) Push(x interface{}) {
|
||
n := len(*pq)
|
||
queueItem := x.(*priorityQueueItem)
|
||
queueItem.item.Index = n
|
||
*pq = append(*pq, queueItem)
|
||
}
|
||
|
||
func (pq *priorityQueue) Pop() interface{} {
|
||
old := *pq
|
||
n := len(old)
|
||
queueItem := old[n-1]
|
||
queueItem.item.Index = -1
|
||
*pq = old[0 : n-1]
|
||
return queueItem
|
||
}
|
||
|
||
// update updates the priority of an item in the priority queue.
|
||
func (pq *priorityQueue) update(item *DataItem, newPrio int64) {
|
||
item.LastAccess = time.Unix(0, newPrio)
|
||
pqItem := (*pq)[item.Index]
|
||
pqItem.prio = newPrio
|
||
heap.Fix(pq, item.Index)
|
||
}
|
||
|
||
type TimeCache struct {
|
||
mu sync.Mutex
|
||
capacity int
|
||
items map[interface{}]*DataItem
|
||
pq priorityQueue
|
||
}
|
||
|
||
// NewTimeCache 创建一个新的 TimeCache 实例
|
||
func NewTimeCache(capacity int) *TimeCache {
|
||
return &TimeCache{
|
||
capacity: capacity,
|
||
items: make(map[interface{}]*DataItem),
|
||
pq: make(priorityQueue, 0, capacity),
|
||
}
|
||
}
|
||
|
||
// Add 添加一个新项到缓存中
|
||
func (tc *TimeCache) Add(data interface{}) {
|
||
tc.mu.Lock()
|
||
defer tc.mu.Unlock()
|
||
|
||
now := time.Now()
|
||
|
||
// 如果数据已经存在,则更新它的最后访问时间
|
||
if existingItem, ok := tc.items[data]; ok {
|
||
prio := existingItem.LastAccess.UnixNano()
|
||
tc.pq.update(existingItem, prio+1) // 实际上我们不需要 +1,但这里为了演示如何更新优先级
|
||
// 注意:上面的更新操作是多余的,因为我们总是可以重新插入一个新项并删除旧项
|
||
// 因此,下面的代码将替换上面的更新操作
|
||
tc.pq.Remove(tc.pq.search(existingItem))
|
||
delete(tc.items, data)
|
||
}
|
||
|
||
newItem := &DataItem{
|
||
Data: data,
|
||
InsertTime: now,
|
||
LastAccess: now,
|
||
Index: -1, // 初始时索引无效
|
||
}
|
||
pqItem := &priorityQueueItem{
|
||
item: newItem,
|
||
prio: newItem.LastAccess.UnixNano(),
|
||
}
|
||
heap.Push(&tc.pq, pqItem)
|
||
newItem.Index = pqItem.item.Index
|
||
tc.items[data] = newItem
|
||
|
||
// 如果缓存超过了容量,移除最后访问时间最早的项
|
||
if tc.pq.Len() > tc.capacity {
|
||
oldestItem := heap.Pop(&tc.pq).(*priorityQueueItem).item
|
||
delete(tc.items, oldestItem.Data)
|
||
}
|
||
}
|
||
|
||
// search 辅助函数,用于在优先级队列中搜索一个项
|
||
func (pq *priorityQueue) search(item *DataItem) int {
|
||
for i, pqItem := range *pq {
|
||
if pqItem.item == item {
|
||
return i
|
||
}
|
||
}
|
||
return -1 // 未找到
|
||
}
|
||
|
||
// Remove 辅助函数,用于从优先级队列中移除一个项(注意:这不是 heap.Interface 的一部分)
|
||
func (pq *priorityQueue) Remove(index int) {
|
||
pqItem := (*pq)[index]
|
||
pqItem.item.Index = -1 // 将索引设为无效值
|
||
*pq = append((*pq)[:index], (*pq)[index+1:]...)
|
||
heap.Fix(pq, index)
|
||
}
|
||
|
||
func (tc *TimeCache) PrintCache() {
|
||
tc.mu.Lock()
|
||
defer tc.mu.Unlock()
|
||
for _, item := range tc.items {
|
||
fmt.Printf("Data: %v, InsertTime: %s, LastAccess: %s\n", item.Data, item.InsertTime, item.LastAccess)
|
||
}
|
||
}
|