From 30f22ecef89ae0eec7939987ebf33ddf37ced637 Mon Sep 17 00:00:00 2001 From: pragmaxim Date: Fri, 20 Feb 2026 06:29:49 +0100 Subject: [PATCH] fiat: optimizing token db lookup Added batch token ticker lookup in DB using one iterator pass. Switched token path to use the batch lookup. Added tests --- db/fiat.go | 37 ++++++++ db/fiat_test.go | 36 ++++++++ fiat/fiat_rates.go | 33 ++----- fiat/fiat_rates_test.go | 184 +++++++++++++++++++++++++++++++++++++--- server/websocket.go | 1 + 5 files changed, 254 insertions(+), 37 deletions(-) diff --git a/db/fiat.go b/db/fiat.go index dee8aa94..a4f2ebb9 100644 --- a/db/fiat.go +++ b/db/fiat.go @@ -1,6 +1,7 @@ package db import ( + "bytes" "encoding/binary" "encoding/json" "math" @@ -148,6 +149,42 @@ func (d *RocksDB) FiatRatesFindTicker(tickerTime *time.Time, vsCurrency string, return nil, nil } +// FiatRatesFindTickers gets FiatRates data closest to each specified timestamp. +// The method is optimized for timestamps sorted in ascending order. +func (d *RocksDB) FiatRatesFindTickers(timestamps []int64, vsCurrency string, token string) ([]*common.CurrencyRatesTicker, error) { + tickers := make([]*common.CurrencyRatesTicker, len(timestamps)) + if len(timestamps) == 0 { + return tickers, nil + } + + it := d.db.NewIteratorCF(d.ro, d.cfh[cfFiatRates]) + defer it.Close() + + first := true + for i, ts := range timestamps { + seekKey := []byte(time.Unix(ts, 0).UTC().Format(FiatRatesTimeFormat)) + if first { + it.Seek(seekKey) + first = false + } else if it.Valid() && bytes.Compare(it.Key().Data(), seekKey) < 0 { + it.Seek(seekKey) + } + + for ; it.Valid(); it.Next() { + ticker, err := getTickerFromIterator(it, vsCurrency, token) + if err != nil { + glog.Error("FiatRatesFindTickers error: ", err) + return nil, err + } + if ticker != nil { + tickers[i] = ticker + break + } + } + } + return tickers, nil +} + // FiatRatesGetAllTickers gets FiatRates data closest to the specified timestamp, of the base currency, vsCurrency or the token if specified func (d *RocksDB) FiatRatesGetAllTickers(fn func(ticker *common.CurrencyRatesTicker) error) error { it := d.db.NewIteratorCF(d.ro, d.cfh[cfFiatRates]) diff --git a/db/fiat_test.go b/db/fiat_test.go index e9ce5b4e..b7e5dbc8 100644 --- a/db/fiat_test.go +++ b/db/fiat_test.go @@ -142,6 +142,42 @@ func TestRocksTickers(t *testing.T) { t.Errorf("Ticker %v found unexpectedly for aud vsCurrency", ticker) } + queries := []struct { + name string + vsCurrency string + token string + }{ + {name: "base", vsCurrency: "", token: ""}, + {name: "eur", vsCurrency: "eur", token: ""}, + {name: "token", vsCurrency: "", token: "0x6B175474E89094C44Da98b954EedeAC495271d0F"}, + } + timestamps := []int64{ + pastKey.Unix(), + ts1.Unix(), + ts1.Unix() + 3600, + ts2.Unix(), + futureKey.Unix(), + } + for _, q := range queries { + got, err := d.FiatRatesFindTickers(timestamps, q.vsCurrency, q.token) + if err != nil { + t.Fatalf("FiatRatesFindTickers(%s) returned error: %v", q.name, err) + } + if len(got) != len(timestamps) { + t.Fatalf("FiatRatesFindTickers(%s) returned %d items, want %d", q.name, len(got), len(timestamps)) + } + for i, ts := range timestamps { + tsTime := time.Unix(ts, 0).UTC() + want, err := d.FiatRatesFindTicker(&tsTime, q.vsCurrency, q.token) + if err != nil { + t.Fatalf("FiatRatesFindTicker(%s) returned error: %v", q.name, err) + } + if !reflect.DeepEqual(got[i], want) { + t.Fatalf("FiatRatesFindTickers(%s) mismatch at index %d: got %+v, want %+v", q.name, i, got[i], want) + } + } + } + } func Test_packUnpackCurrencyRatesTicker(t *testing.T) { diff --git a/fiat/fiat_rates.go b/fiat/fiat_rates.go index 6faeaa87..2aea70fb 100644 --- a/fiat/fiat_rates.go +++ b/fiat/fiat_rates.go @@ -62,8 +62,8 @@ type FiatRates struct { dailyTickersTo int64 } -var fiatRatesFindTicker = func(d *db.RocksDB, tickerTime *time.Time, vsCurrency string, token string) (*common.CurrencyRatesTicker, error) { - return d.FiatRatesFindTicker(tickerTime, vsCurrency, token) +var fiatRatesFindTickers = func(d *db.RocksDB, timestamps []int64, vsCurrency string, token string) ([]*common.CurrencyRatesTicker, error) { + return d.FiatRatesFindTickers(timestamps, vsCurrency, token) } // NewFiatRates initializes the FiatRates handler @@ -175,8 +175,7 @@ func (fr *FiatRates) getTokenTickersForTimestamps(timestamps []int64, vsCurrency return &tickers, nil } - // Query unique timestamps in ascending order so adjacent points can reuse the - // previously resolved ticker and avoid repeated DB scans. + // Query unique timestamps in ascending order to enable a single forward DB scan. uniqueMap := make(map[int64]struct{}, len(timestamps)) uniqueTimestamps := make([]int64, 0, len(timestamps)) for _, ts := range timestamps { @@ -190,30 +189,16 @@ func (fr *FiatRates) getTokenTickersForTimestamps(timestamps []int64, vsCurrency return uniqueTimestamps[i] < uniqueTimestamps[j] }) - var prevTicker *common.CurrencyRatesTicker - var prevTs int64 + foundTickers, err := fiatRatesFindTickers(fr.db, uniqueTimestamps, vsCurrency, token) + if err != nil { + return nil, err + } resolvedTickers := make(map[int64]*common.CurrencyRatesTicker, len(uniqueTimestamps)) - var err error - for _, t := range uniqueTimestamps { - var ticker *common.CurrencyRatesTicker - date := time.Unix(t, 0) - // if previously found ticker is newer than this one (token tickers may not be in DB for every day), skip search in DB - if prevTicker != nil && t >= prevTs && !date.After(prevTicker.Timestamp) { - ticker = prevTicker - prevTs = t - } else { - ticker, err = fiatRatesFindTicker(fr.db, &date, vsCurrency, token) - if err != nil { - return nil, err - } - prevTicker = ticker - prevTs = t - } + for i, t := range uniqueTimestamps { + ticker := foundTickers[i] // if ticker not found in DB, use current ticker if ticker == nil { resolvedTickers[t] = currentTicker - prevTicker = currentTicker - prevTs = t } else { resolvedTickers[t] = ticker } diff --git a/fiat/fiat_rates_test.go b/fiat/fiat_rates_test.go index d5596499..45c8ab0f 100644 --- a/fiat/fiat_rates_test.go +++ b/fiat/fiat_rates_test.go @@ -9,6 +9,7 @@ import ( "net/http/httptest" "os" "reflect" + "sync" "testing" "time" @@ -335,21 +336,175 @@ func TestGetTickersForTimestamps_UsesGranularityAndFallback(t *testing.T) { } } +func TestGetTickersForTimestamps_ConcurrentReadersAndWriters(t *testing.T) { + fr := &FiatRates{Enabled: true} + + const ( + writers = 2 + readers = 8 + testDuration = 1200 * time.Millisecond + waitTimeout = 3 * time.Second + ) + + stop := make(chan struct{}) + errCh := make(chan error, readers) + readerCalls := make([]int, readers) + var wg sync.WaitGroup + + setState := func(counter int64) { + currentTicker := &common.CurrencyRatesTicker{ + Timestamp: time.Unix(123456+counter, 0).UTC(), + Rates: map[string]float32{"usd": float32(100 + counter%100)}, + } + fr.mux.Lock() + fr.currentTicker = currentTicker + fr.fiveMinutesTickers = map[int64]*common.CurrencyRatesTicker{ + 600: { + Timestamp: time.Unix(600, 0).UTC(), + Rates: map[string]float32{"usd": float32(1 + counter%10)}, + }, + } + fr.fiveMinutesTickersFrom = 600 + fr.fiveMinutesTickersTo = 600 + fr.hourlyTickers = map[int64]*common.CurrencyRatesTicker{ + 3600: { + Timestamp: time.Unix(3600, 0).UTC(), + Rates: map[string]float32{"usd": float32(10 + counter%10)}, + }, + } + fr.hourlyTickersFrom = 3600 + fr.hourlyTickersTo = 3600 + fr.dailyTickers = map[int64]*common.CurrencyRatesTicker{ + 86400: { + Timestamp: time.Unix(86400, 0).UTC(), + Rates: map[string]float32{"usd": float32(20 + counter%10)}, + }, + } + fr.dailyTickersFrom = 86400 + fr.dailyTickersTo = 86400 + fr.mux.Unlock() + } + + // Seed cache state before readers start. + setState(0) + + for w := 0; w < writers; w++ { + wg.Add(1) + go func(seed int) { + defer wg.Done() + + counter := int64(seed) + for { + select { + case <-stop: + return + default: + } + + setState(counter) + + counter++ + time.Sleep(100 * time.Microsecond) + } + }(w) + } + + for r := 0; r < readers; r++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + + timestamps := []int64{600, 3600, 86400, 90000} + calls := 0 + for { + select { + case <-stop: + readerCalls[idx] = calls + return + default: + } + + tickers, err := fr.GetTickersForTimestamps(timestamps, "usd", "") + if err != nil { + errCh <- fmt.Errorf("reader %d returned error: %w", idx, err) + readerCalls[idx] = calls + return + } + if tickers == nil || len(*tickers) != len(timestamps) { + errCh <- fmt.Errorf("reader %d unexpected ticker shape: %+v", idx, tickers) + readerCalls[idx] = calls + return + } + for i, ticker := range *tickers { + if ticker == nil { + errCh <- fmt.Errorf("reader %d got nil ticker at index %d", idx, i) + readerCalls[idx] = calls + return + } + if _, found := ticker.Rates["usd"]; !found { + errCh <- fmt.Errorf("reader %d ticker at index %d missing usd rate", idx, i) + readerCalls[idx] = calls + return + } + } + calls++ + } + }(r) + } + + time.Sleep(testDuration) + close(stop) + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + select { + case <-done: + case <-time.After(waitTimeout): + t.Fatal("concurrent fiat readers/writers did not finish in time") + } + + close(errCh) + for err := range errCh { + if err != nil { + t.Fatal(err) + } + } + + totalCalls := 0 + for i, calls := range readerCalls { + if calls == 0 { + t.Fatalf("reader %d did not make any successful calls", i) + } + totalCalls += calls + } + if totalCalls < readers { + t.Fatalf("too few reader calls made: got %d", totalCalls) + } +} + func TestGetTokenTickersForTimestamps_QueriesUniqueSortedTimestamps(t *testing.T) { - originalFindTicker := fiatRatesFindTicker + originalFindTickers := fiatRatesFindTickers defer func() { - fiatRatesFindTicker = originalFindTicker + fiatRatesFindTickers = originalFindTickers }() lookupCalls := make([]int64, 0) - fiatRatesFindTicker = func(_ *db.RocksDB, tickerTime *time.Time, _, _ string) (*common.CurrencyRatesTicker, error) { - ts := tickerTime.UTC().Unix() - lookupCalls = append(lookupCalls, ts) - return &common.CurrencyRatesTicker{ - Timestamp: time.Unix(ts, 0).UTC(), - Rates: map[string]float32{"usd": float32(ts)}, - TokenRates: map[string]float32{"token": 1}, - }, nil + batchCalls := 0 + fiatRatesFindTickers = func(_ *db.RocksDB, timestamps []int64, _, _ string) ([]*common.CurrencyRatesTicker, error) { + batchCalls++ + lookupCalls = append(lookupCalls, timestamps...) + tickers := make([]*common.CurrencyRatesTicker, len(timestamps)) + for i, ts := range timestamps { + tickers[i] = &common.CurrencyRatesTicker{ + Timestamp: time.Unix(ts, 0).UTC(), + Rates: map[string]float32{"usd": float32(ts)}, + TokenRates: map[string]float32{"token": 1}, + } + } + return tickers, nil } fr := &FiatRates{ @@ -371,6 +526,9 @@ func TestGetTokenTickersForTimestamps_QueriesUniqueSortedTimestamps(t *testing.T if !reflect.DeepEqual(lookupCalls, []int64{100, 200, 250, 300}) { t.Fatalf("unexpected DB lookup order: got %v", lookupCalls) } + if batchCalls != 1 { + t.Fatalf("unexpected number of batch DB calls: got %d, want %d", batchCalls, 1) + } got := make([]float32, len(input)) for i := range input { @@ -386,13 +544,13 @@ func TestGetTokenTickersForTimestamps_QueriesUniqueSortedTimestamps(t *testing.T } func TestGetTokenTickersForTimestamps_SkipsDBLookupWhenCurrentTickerHasNoToken(t *testing.T) { - originalFindTicker := fiatRatesFindTicker + originalFindTickers := fiatRatesFindTickers defer func() { - fiatRatesFindTicker = originalFindTicker + fiatRatesFindTickers = originalFindTickers }() lookupCalls := 0 - fiatRatesFindTicker = func(_ *db.RocksDB, _ *time.Time, _, _ string) (*common.CurrencyRatesTicker, error) { + fiatRatesFindTickers = func(_ *db.RocksDB, _ []int64, _, _ string) ([]*common.CurrencyRatesTicker, error) { lookupCalls++ return nil, nil } diff --git a/server/websocket.go b/server/websocket.go index 963b9bcb..74d347fb 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -278,6 +278,7 @@ func (s *WebsocketServer) outputLoop(c *websocketChannel) { } }() for m := range c.out { + c.conn.SetWriteDeadline(time.Now().Add(defaultTimeout)) err := c.conn.WriteJSON(m) if err != nil { glog.Error("Error sending message to ", c.id, ", ", err)