fix(parsers.avro): Add mutex to cache access (#15921)
This commit is contained in:
parent
0febb7e588
commit
66245c41b9
|
|
@ -9,6 +9,7 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/linkedin/goavro/v2"
|
"github.com/linkedin/goavro/v2"
|
||||||
|
|
@ -25,6 +26,7 @@ type schemaRegistry struct {
|
||||||
password string
|
password string
|
||||||
cache map[int]*schemaAndCodec
|
cache map[int]*schemaAndCodec
|
||||||
client *http.Client
|
client *http.Client
|
||||||
|
mu sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
const schemaByID = "%s/schemas/ids/%d"
|
const schemaByID = "%s/schemas/ids/%d"
|
||||||
|
|
@ -73,10 +75,22 @@ func newSchemaRegistry(addr, caCertPath string) (*schemaRegistry, error) {
|
||||||
return registry, nil
|
return registry, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sr *schemaRegistry) getSchemaAndCodec(id int) (*schemaAndCodec, error) {
|
// Helper function to make managing lock easier
|
||||||
|
func (sr *schemaRegistry) getSchemaAndCodecFromCache(id int) (*schemaAndCodec, error) {
|
||||||
|
// Read-lock the cache map before access.
|
||||||
|
sr.mu.RLock()
|
||||||
|
defer sr.mu.RUnlock()
|
||||||
if v, ok := sr.cache[id]; ok {
|
if v, ok := sr.cache[id]; ok {
|
||||||
return v, nil
|
return v, nil
|
||||||
}
|
}
|
||||||
|
return nil, fmt.Errorf("schema %d not in cache", id)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sr *schemaRegistry) getSchemaAndCodec(id int) (*schemaAndCodec, error) {
|
||||||
|
v, err := sr.getSchemaAndCodecFromCache(id)
|
||||||
|
if err == nil {
|
||||||
|
return v, nil
|
||||||
|
}
|
||||||
|
|
||||||
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf(schemaByID, sr.url, id), nil)
|
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf(schemaByID, sr.url, id), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -112,6 +126,9 @@ func (sr *schemaRegistry) getSchemaAndCodec(id int) (*schemaAndCodec, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
retval := &schemaAndCodec{Schema: schemaValue, Codec: codec}
|
retval := &schemaAndCodec{Schema: schemaValue, Codec: codec}
|
||||||
|
// Lock the cache map before update.
|
||||||
|
sr.mu.Lock()
|
||||||
|
defer sr.mu.Unlock()
|
||||||
sr.cache[id] = retval
|
sr.cache[id] = retval
|
||||||
return retval, nil
|
return retval, nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue