fiat: optimizing token db lookup

Added batch token ticker lookup in DB using one iterator pass. Switched token path to use the batch lookup. Added tests
This commit is contained in:
pragmaxim
2026-02-20 06:29:49 +01:00
parent df2afbef62
commit 30f22ecef8
5 changed files with 254 additions and 37 deletions

View File

@@ -1,6 +1,7 @@
package db
import (
"bytes"
"encoding/binary"
"encoding/json"
"math"
@@ -148,6 +149,42 @@ func (d *RocksDB) FiatRatesFindTicker(tickerTime *time.Time, vsCurrency string,
return nil, nil
}
// FiatRatesFindTickers gets FiatRates data closest to each specified timestamp.
// The method is optimized for timestamps sorted in ascending order.
func (d *RocksDB) FiatRatesFindTickers(timestamps []int64, vsCurrency string, token string) ([]*common.CurrencyRatesTicker, error) {
tickers := make([]*common.CurrencyRatesTicker, len(timestamps))
if len(timestamps) == 0 {
return tickers, nil
}
it := d.db.NewIteratorCF(d.ro, d.cfh[cfFiatRates])
defer it.Close()
first := true
for i, ts := range timestamps {
seekKey := []byte(time.Unix(ts, 0).UTC().Format(FiatRatesTimeFormat))
if first {
it.Seek(seekKey)
first = false
} else if it.Valid() && bytes.Compare(it.Key().Data(), seekKey) < 0 {
it.Seek(seekKey)
}
for ; it.Valid(); it.Next() {
ticker, err := getTickerFromIterator(it, vsCurrency, token)
if err != nil {
glog.Error("FiatRatesFindTickers error: ", err)
return nil, err
}
if ticker != nil {
tickers[i] = ticker
break
}
}
}
return tickers, nil
}
// FiatRatesGetAllTickers gets FiatRates data closest to the specified timestamp, of the base currency, vsCurrency or the token if specified
func (d *RocksDB) FiatRatesGetAllTickers(fn func(ticker *common.CurrencyRatesTicker) error) error {
it := d.db.NewIteratorCF(d.ro, d.cfh[cfFiatRates])

View File

@@ -142,6 +142,42 @@ func TestRocksTickers(t *testing.T) {
t.Errorf("Ticker %v found unexpectedly for aud vsCurrency", ticker)
}
queries := []struct {
name string
vsCurrency string
token string
}{
{name: "base", vsCurrency: "", token: ""},
{name: "eur", vsCurrency: "eur", token: ""},
{name: "token", vsCurrency: "", token: "0x6B175474E89094C44Da98b954EedeAC495271d0F"},
}
timestamps := []int64{
pastKey.Unix(),
ts1.Unix(),
ts1.Unix() + 3600,
ts2.Unix(),
futureKey.Unix(),
}
for _, q := range queries {
got, err := d.FiatRatesFindTickers(timestamps, q.vsCurrency, q.token)
if err != nil {
t.Fatalf("FiatRatesFindTickers(%s) returned error: %v", q.name, err)
}
if len(got) != len(timestamps) {
t.Fatalf("FiatRatesFindTickers(%s) returned %d items, want %d", q.name, len(got), len(timestamps))
}
for i, ts := range timestamps {
tsTime := time.Unix(ts, 0).UTC()
want, err := d.FiatRatesFindTicker(&tsTime, q.vsCurrency, q.token)
if err != nil {
t.Fatalf("FiatRatesFindTicker(%s) returned error: %v", q.name, err)
}
if !reflect.DeepEqual(got[i], want) {
t.Fatalf("FiatRatesFindTickers(%s) mismatch at index %d: got %+v, want %+v", q.name, i, got[i], want)
}
}
}
}
func Test_packUnpackCurrencyRatesTicker(t *testing.T) {

View File

@@ -62,8 +62,8 @@ type FiatRates struct {
dailyTickersTo int64
}
var fiatRatesFindTicker = func(d *db.RocksDB, tickerTime *time.Time, vsCurrency string, token string) (*common.CurrencyRatesTicker, error) {
return d.FiatRatesFindTicker(tickerTime, vsCurrency, token)
var fiatRatesFindTickers = func(d *db.RocksDB, timestamps []int64, vsCurrency string, token string) ([]*common.CurrencyRatesTicker, error) {
return d.FiatRatesFindTickers(timestamps, vsCurrency, token)
}
// NewFiatRates initializes the FiatRates handler
@@ -175,8 +175,7 @@ func (fr *FiatRates) getTokenTickersForTimestamps(timestamps []int64, vsCurrency
return &tickers, nil
}
// Query unique timestamps in ascending order so adjacent points can reuse the
// previously resolved ticker and avoid repeated DB scans.
// Query unique timestamps in ascending order to enable a single forward DB scan.
uniqueMap := make(map[int64]struct{}, len(timestamps))
uniqueTimestamps := make([]int64, 0, len(timestamps))
for _, ts := range timestamps {
@@ -190,30 +189,16 @@ func (fr *FiatRates) getTokenTickersForTimestamps(timestamps []int64, vsCurrency
return uniqueTimestamps[i] < uniqueTimestamps[j]
})
var prevTicker *common.CurrencyRatesTicker
var prevTs int64
foundTickers, err := fiatRatesFindTickers(fr.db, uniqueTimestamps, vsCurrency, token)
if err != nil {
return nil, err
}
resolvedTickers := make(map[int64]*common.CurrencyRatesTicker, len(uniqueTimestamps))
var err error
for _, t := range uniqueTimestamps {
var ticker *common.CurrencyRatesTicker
date := time.Unix(t, 0)
// if previously found ticker is newer than this one (token tickers may not be in DB for every day), skip search in DB
if prevTicker != nil && t >= prevTs && !date.After(prevTicker.Timestamp) {
ticker = prevTicker
prevTs = t
} else {
ticker, err = fiatRatesFindTicker(fr.db, &date, vsCurrency, token)
if err != nil {
return nil, err
}
prevTicker = ticker
prevTs = t
}
for i, t := range uniqueTimestamps {
ticker := foundTickers[i]
// if ticker not found in DB, use current ticker
if ticker == nil {
resolvedTickers[t] = currentTicker
prevTicker = currentTicker
prevTs = t
} else {
resolvedTickers[t] = ticker
}

View File

@@ -9,6 +9,7 @@ import (
"net/http/httptest"
"os"
"reflect"
"sync"
"testing"
"time"
@@ -335,21 +336,175 @@ func TestGetTickersForTimestamps_UsesGranularityAndFallback(t *testing.T) {
}
}
func TestGetTickersForTimestamps_ConcurrentReadersAndWriters(t *testing.T) {
fr := &FiatRates{Enabled: true}
const (
writers = 2
readers = 8
testDuration = 1200 * time.Millisecond
waitTimeout = 3 * time.Second
)
stop := make(chan struct{})
errCh := make(chan error, readers)
readerCalls := make([]int, readers)
var wg sync.WaitGroup
setState := func(counter int64) {
currentTicker := &common.CurrencyRatesTicker{
Timestamp: time.Unix(123456+counter, 0).UTC(),
Rates: map[string]float32{"usd": float32(100 + counter%100)},
}
fr.mux.Lock()
fr.currentTicker = currentTicker
fr.fiveMinutesTickers = map[int64]*common.CurrencyRatesTicker{
600: {
Timestamp: time.Unix(600, 0).UTC(),
Rates: map[string]float32{"usd": float32(1 + counter%10)},
},
}
fr.fiveMinutesTickersFrom = 600
fr.fiveMinutesTickersTo = 600
fr.hourlyTickers = map[int64]*common.CurrencyRatesTicker{
3600: {
Timestamp: time.Unix(3600, 0).UTC(),
Rates: map[string]float32{"usd": float32(10 + counter%10)},
},
}
fr.hourlyTickersFrom = 3600
fr.hourlyTickersTo = 3600
fr.dailyTickers = map[int64]*common.CurrencyRatesTicker{
86400: {
Timestamp: time.Unix(86400, 0).UTC(),
Rates: map[string]float32{"usd": float32(20 + counter%10)},
},
}
fr.dailyTickersFrom = 86400
fr.dailyTickersTo = 86400
fr.mux.Unlock()
}
// Seed cache state before readers start.
setState(0)
for w := 0; w < writers; w++ {
wg.Add(1)
go func(seed int) {
defer wg.Done()
counter := int64(seed)
for {
select {
case <-stop:
return
default:
}
setState(counter)
counter++
time.Sleep(100 * time.Microsecond)
}
}(w)
}
for r := 0; r < readers; r++ {
wg.Add(1)
go func(idx int) {
defer wg.Done()
timestamps := []int64{600, 3600, 86400, 90000}
calls := 0
for {
select {
case <-stop:
readerCalls[idx] = calls
return
default:
}
tickers, err := fr.GetTickersForTimestamps(timestamps, "usd", "")
if err != nil {
errCh <- fmt.Errorf("reader %d returned error: %w", idx, err)
readerCalls[idx] = calls
return
}
if tickers == nil || len(*tickers) != len(timestamps) {
errCh <- fmt.Errorf("reader %d unexpected ticker shape: %+v", idx, tickers)
readerCalls[idx] = calls
return
}
for i, ticker := range *tickers {
if ticker == nil {
errCh <- fmt.Errorf("reader %d got nil ticker at index %d", idx, i)
readerCalls[idx] = calls
return
}
if _, found := ticker.Rates["usd"]; !found {
errCh <- fmt.Errorf("reader %d ticker at index %d missing usd rate", idx, i)
readerCalls[idx] = calls
return
}
}
calls++
}
}(r)
}
time.Sleep(testDuration)
close(stop)
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
case <-time.After(waitTimeout):
t.Fatal("concurrent fiat readers/writers did not finish in time")
}
close(errCh)
for err := range errCh {
if err != nil {
t.Fatal(err)
}
}
totalCalls := 0
for i, calls := range readerCalls {
if calls == 0 {
t.Fatalf("reader %d did not make any successful calls", i)
}
totalCalls += calls
}
if totalCalls < readers {
t.Fatalf("too few reader calls made: got %d", totalCalls)
}
}
func TestGetTokenTickersForTimestamps_QueriesUniqueSortedTimestamps(t *testing.T) {
originalFindTicker := fiatRatesFindTicker
originalFindTickers := fiatRatesFindTickers
defer func() {
fiatRatesFindTicker = originalFindTicker
fiatRatesFindTickers = originalFindTickers
}()
lookupCalls := make([]int64, 0)
fiatRatesFindTicker = func(_ *db.RocksDB, tickerTime *time.Time, _, _ string) (*common.CurrencyRatesTicker, error) {
ts := tickerTime.UTC().Unix()
lookupCalls = append(lookupCalls, ts)
return &common.CurrencyRatesTicker{
Timestamp: time.Unix(ts, 0).UTC(),
Rates: map[string]float32{"usd": float32(ts)},
TokenRates: map[string]float32{"token": 1},
}, nil
batchCalls := 0
fiatRatesFindTickers = func(_ *db.RocksDB, timestamps []int64, _, _ string) ([]*common.CurrencyRatesTicker, error) {
batchCalls++
lookupCalls = append(lookupCalls, timestamps...)
tickers := make([]*common.CurrencyRatesTicker, len(timestamps))
for i, ts := range timestamps {
tickers[i] = &common.CurrencyRatesTicker{
Timestamp: time.Unix(ts, 0).UTC(),
Rates: map[string]float32{"usd": float32(ts)},
TokenRates: map[string]float32{"token": 1},
}
}
return tickers, nil
}
fr := &FiatRates{
@@ -371,6 +526,9 @@ func TestGetTokenTickersForTimestamps_QueriesUniqueSortedTimestamps(t *testing.T
if !reflect.DeepEqual(lookupCalls, []int64{100, 200, 250, 300}) {
t.Fatalf("unexpected DB lookup order: got %v", lookupCalls)
}
if batchCalls != 1 {
t.Fatalf("unexpected number of batch DB calls: got %d, want %d", batchCalls, 1)
}
got := make([]float32, len(input))
for i := range input {
@@ -386,13 +544,13 @@ func TestGetTokenTickersForTimestamps_QueriesUniqueSortedTimestamps(t *testing.T
}
func TestGetTokenTickersForTimestamps_SkipsDBLookupWhenCurrentTickerHasNoToken(t *testing.T) {
originalFindTicker := fiatRatesFindTicker
originalFindTickers := fiatRatesFindTickers
defer func() {
fiatRatesFindTicker = originalFindTicker
fiatRatesFindTickers = originalFindTickers
}()
lookupCalls := 0
fiatRatesFindTicker = func(_ *db.RocksDB, _ *time.Time, _, _ string) (*common.CurrencyRatesTicker, error) {
fiatRatesFindTickers = func(_ *db.RocksDB, _ []int64, _, _ string) ([]*common.CurrencyRatesTicker, error) {
lookupCalls++
return nil, nil
}

View File

@@ -278,6 +278,7 @@ func (s *WebsocketServer) outputLoop(c *websocketChannel) {
}
}()
for m := range c.out {
c.conn.SetWriteDeadline(time.Now().Add(defaultTimeout))
err := c.conn.WriteJSON(m)
if err != nil {
glog.Error("Error sending message to ", c.id, ", ", err)