Files
blockbook/server/websocket.go
2026-02-17 12:05:03 +01:00

1410 lines
45 KiB
Go

package server
import (
"encoding/json"
"math/big"
"net/http"
"os"
"runtime/debug"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/golang/glog"
"github.com/gorilla/websocket"
"github.com/juju/errors"
"github.com/trezor/blockbook/api"
"github.com/trezor/blockbook/bchain"
"github.com/trezor/blockbook/common"
"github.com/trezor/blockbook/db"
"github.com/trezor/blockbook/fiat"
)
const upgradeFailed = "Upgrade failed: "
const outChannelSize = 500
const defaultTimeout = 60 * time.Second
const unknownMethodLabel = "unknown"
// allRates is a special "currency" parameter that means all available currencies
const allFiatRates = "!ALL!"
var (
// ErrorMethodNotAllowed is returned when client tries to upgrade method other than GET
ErrorMethodNotAllowed = errors.New("Method not allowed")
connectionCounter uint64
)
type websocketChannel struct {
id uint64
conn *websocket.Conn
out chan *WsRes
ip string
requestHeader http.Header
alive bool
aliveLock sync.Mutex
closeReason string
addrDescs []string // subscribed address descriptors as strings
getAddressInfoDescriptorsMux sync.Mutex
getAddressInfoDescriptors map[string]struct{}
}
type addressDetails struct {
requestID string
// publishNewBlockTxs enables notifications for confirmed transactions
// detected while processing newly connected blocks.
publishNewBlockTxs bool
}
// WebsocketServer is a handle to websocket server
type WebsocketServer struct {
upgrader *websocket.Upgrader
db *db.RocksDB
txCache *db.TxCache
chain bchain.BlockChain
chainParser bchain.BlockChainParser
mempool bchain.Mempool
metrics *common.Metrics
is *common.InternalState
api *api.Worker
block0hash string
newBlockSubscriptions map[*websocketChannel]string
newBlockSubscriptionsLock sync.Mutex
newTransactionEnabled bool
newTransactionSubscriptions map[*websocketChannel]string
newTransactionSubscriptionsLock sync.Mutex
addressSubscriptions map[string]map[*websocketChannel]*addressDetails
addressSubscriptionsLock sync.Mutex
// newBlockTxsSubscriptionCount is a fast-path guard for OnNewBlock.
// It tracks how many address subscriptions requested newBlockTxs=true.
newBlockTxsSubscriptionCount int
fiatRatesSubscriptions map[string]map[*websocketChannel]string
fiatRatesTokenSubscriptions map[*websocketChannel][]string
fiatRatesSubscriptionsLock sync.Mutex
allowedRpcCallTo map[string]struct{}
}
// NewWebsocketServer creates new websocket interface to blockbook and returns its handle
func NewWebsocketServer(db *db.RocksDB, chain bchain.BlockChain, mempool bchain.Mempool, txCache *db.TxCache, metrics *common.Metrics, is *common.InternalState, fiatRates *fiat.FiatRates) (*WebsocketServer, error) {
api, err := api.NewWorker(db, chain, mempool, txCache, metrics, is, fiatRates)
if err != nil {
return nil, err
}
b0, err := db.GetBlockHash(0)
if err != nil {
return nil, err
}
s := &WebsocketServer{
upgrader: &websocket.Upgrader{
ReadBufferSize: 1024 * 32,
WriteBufferSize: 1024 * 32,
CheckOrigin: checkOrigin,
EnableCompression: true,
},
db: db,
txCache: txCache,
chain: chain,
chainParser: chain.GetChainParser(),
mempool: mempool,
metrics: metrics,
is: is,
api: api,
block0hash: b0,
newBlockSubscriptions: make(map[*websocketChannel]string),
newTransactionEnabled: is.EnableSubNewTx,
newTransactionSubscriptions: make(map[*websocketChannel]string),
addressSubscriptions: make(map[string]map[*websocketChannel]*addressDetails),
fiatRatesSubscriptions: make(map[string]map[*websocketChannel]string),
fiatRatesTokenSubscriptions: make(map[*websocketChannel][]string),
}
envRpcCall := os.Getenv(strings.ToUpper(is.GetNetwork()) + "_ALLOWED_RPC_CALL_TO")
if envRpcCall != "" {
s.allowedRpcCallTo = make(map[string]struct{})
for _, c := range strings.Split(envRpcCall, ",") {
s.allowedRpcCallTo[strings.ToLower(c)] = struct{}{}
}
glog.Info("Support of rpcCall for these contracts: ", envRpcCall)
}
if s.metrics != nil {
s.metrics.WebsocketNewBlockTxsSubscriptions.Set(0)
}
return s, nil
}
// allow all origins
func checkOrigin(r *http.Request) bool {
return true
}
func getIP(r *http.Request) string {
ip := r.Header.Get("cf-connecting-ip")
if ip != "" {
return ip
}
ip = r.Header.Get("X-Real-Ip")
if ip != "" {
return ip
}
return r.RemoteAddr
}
// ServeHTTP sets up handler of websocket channel
func (s *WebsocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
http.Error(w, upgradeFailed+ErrorMethodNotAllowed.Error(), http.StatusServiceUnavailable)
return
}
conn, err := s.upgrader.Upgrade(w, r, nil)
if err != nil {
http.Error(w, upgradeFailed+err.Error(), http.StatusServiceUnavailable)
return
}
c := &websocketChannel{
id: atomic.AddUint64(&connectionCounter, 1),
conn: conn,
out: make(chan *WsRes, outChannelSize),
ip: getIP(r),
requestHeader: r.Header,
alive: true,
}
if s.is.WsGetAccountInfoLimit > 0 {
c.getAddressInfoDescriptors = make(map[string]struct{})
}
go s.inputLoop(c)
go s.outputLoop(c)
s.onConnect(c)
}
// GetHandler returns http handler
func (s *WebsocketServer) GetHandler() http.Handler {
return s
}
func (s *WebsocketServer) closeChannel(c *websocketChannel, reason string) bool {
if closed, closeReason := c.CloseOut(reason); closed {
if s.metrics != nil {
s.metrics.WebsocketChannelCloses.With(common.Labels{"reason": closeReason}).Inc()
}
c.conn.Close()
s.onDisconnect(c)
return true
}
return false
}
func (c *websocketChannel) CloseOut(reason string) (bool, string) {
c.aliveLock.Lock()
defer c.aliveLock.Unlock()
if c.alive {
c.alive = false
if c.closeReason == "" {
c.closeReason = reason
}
closeReason := c.closeReason
//clean out
close(c.out)
for len(c.out) > 0 {
<-c.out
}
return true, closeReason
}
return false, ""
}
func (c *websocketChannel) DataOut(data *WsRes) {
c.aliveLock.Lock()
defer c.aliveLock.Unlock()
if c.alive {
if len(c.out) < outChannelSize-1 {
c.out <- data
} else {
glog.Warning("Channel ", c.id, " overflow, closing")
if c.closeReason == "" {
c.closeReason = "overflow"
}
// close the connection but do not call CloseOut - would call duplicate c.aliveLock.Lock
// CloseOut will be called because the closed connection will cause break in the inputLoop
c.conn.Close()
}
}
}
func (s *WebsocketServer) inputLoop(c *websocketChannel) {
defer func() {
if r := recover(); r != nil {
glog.Error("recovered from panic: ", r, ", ", c.id)
debug.PrintStack()
s.closeChannel(c, "panic")
}
}()
for {
t, d, err := c.conn.ReadMessage()
if err != nil {
s.closeChannel(c, "read_error")
return
}
switch t {
case websocket.TextMessage:
var req WsReq
err := json.Unmarshal(d, &req)
if err != nil {
glog.Error("Error parsing message from ", c.id, ", ", string(d), ", ", err)
s.closeChannel(c, "protocol_error")
return
}
go s.onRequest(c, &req)
case websocket.BinaryMessage:
glog.Error("Binary message received from ", c.id, ", ", c.ip)
s.closeChannel(c, "protocol_error")
return
case websocket.PingMessage:
c.conn.WriteControl(websocket.PongMessage, nil, time.Now().Add(defaultTimeout))
case websocket.CloseMessage:
s.closeChannel(c, "client_close")
return
case websocket.PongMessage:
// do nothing
}
}
}
func (s *WebsocketServer) outputLoop(c *websocketChannel) {
defer func() {
if r := recover(); r != nil {
glog.Error("recovered from panic: ", r, ", ", c.id)
s.closeChannel(c, "panic")
}
}()
for m := range c.out {
err := c.conn.WriteJSON(m)
if err != nil {
glog.Error("Error sending message to ", c.id, ", ", err)
s.closeChannel(c, "write_error")
return
}
}
}
func (s *WebsocketServer) onConnect(c *websocketChannel) {
glog.Info("Client connected ", c.id, ", ", c.ip)
s.metrics.WebsocketClients.Inc()
}
func (s *WebsocketServer) onDisconnect(c *websocketChannel) {
s.unsubscribeNewBlock(c)
s.unsubscribeNewTransaction(c)
s.unsubscribeAddresses(c)
s.unsubscribeFiatRates(c)
glog.Info("Client disconnected ", c.id, ", ", c.ip)
s.metrics.WebsocketClients.Dec()
}
var requestHandlers = map[string]func(*WebsocketServer, *websocketChannel, *WsReq) (interface{}, error){
"getAccountInfo": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) {
r, err := unmarshalGetAccountInfoRequest(req.Params)
if err == nil {
if s.is.WsGetAccountInfoLimit > 0 {
c.getAddressInfoDescriptorsMux.Lock()
c.getAddressInfoDescriptors[r.Descriptor] = struct{}{}
l := len(c.getAddressInfoDescriptors)
c.getAddressInfoDescriptorsMux.Unlock()
if l > s.is.WsGetAccountInfoLimit {
if s.closeChannel(c, "limit_exceeded") {
glog.Info("Client ", c.id, " exceeded getAddressInfo limit, ", c.ip)
s.is.AddWsLimitExceedingIP(c.ip)
}
return
}
}
rv, err = s.getAccountInfo(r)
}
return
},
"getInfo": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) {
return s.getInfo()
},
"getBlockHash": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) {
r := WsBlockHashReq{}
err = json.Unmarshal(req.Params, &r)
if err == nil {
rv, err = s.getBlockHash(r.Height)
}
return
},
"getBlock": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) {
if !s.is.ExtendedIndex {
return nil, errors.New("Not supported")
}
r := WsBlockReq{}
err = json.Unmarshal(req.Params, &r)
if r.PageSize == 0 {
r.PageSize = 1000000
}
if err == nil {
rv, err = s.getBlock(r.Id, r.Page, r.PageSize)
}
return
},
"getAccountUtxo": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) {
r := WsAccountUtxoReq{}
err = json.Unmarshal(req.Params, &r)
if err == nil {
rv, err = s.getAccountUtxo(r.Descriptor)
}
return
},
"getBalanceHistory": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) {
r := WsBalanceHistoryReq{}
err = json.Unmarshal(req.Params, &r)
if err == nil {
if r.From <= 0 {
r.From = 0
}
if r.To <= 0 {
r.To = 0
}
if r.GroupBy <= 0 {
r.GroupBy = 3600
}
rv, err = s.api.GetXpubBalanceHistory(r.Descriptor, r.From, r.To, r.Currencies, r.Gap, r.GroupBy)
if err != nil {
rv, err = s.api.GetBalanceHistory(r.Descriptor, r.From, r.To, r.Currencies, r.GroupBy)
}
}
return
},
"getTransaction": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) {
r := WsTransactionReq{}
err = json.Unmarshal(req.Params, &r)
if err == nil {
rv, err = s.getTransaction(r.Txid)
}
return
},
"getTransactionSpecific": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) {
r := WsTransactionSpecificReq{}
err = json.Unmarshal(req.Params, &r)
if err == nil {
rv, err = s.getTransactionSpecific(r.Txid)
}
return
},
"estimateFee": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) {
return s.estimateFee(req.Params)
},
"longTermFeeRate": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) {
return s.longTermFeeRate()
},
"sendTransaction": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) {
r := WsSendTransactionReq{}
err = json.Unmarshal(req.Params, &r)
if err == nil {
rv, err = s.sendTransaction(r.Hex, r.DisableAlternativeRPC)
}
return
},
"getMempoolFilters": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) {
r := WsMempoolFiltersReq{}
err = json.Unmarshal(req.Params, &r)
if err == nil {
rv, err = s.getMempoolFilters(&r)
}
return
},
"getBlockFilter": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) {
r := WsBlockFilterReq{}
err = json.Unmarshal(req.Params, &r)
if err == nil {
rv, err = s.getBlockFilter(&r)
}
return
},
"getBlockFiltersBatch": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) {
r := WsBlockFiltersBatchReq{}
err = json.Unmarshal(req.Params, &r)
if err == nil {
rv, err = s.getBlockFiltersBatch(&r)
}
return
},
"rpcCall": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) {
r := WsRpcCallReq{}
err = json.Unmarshal(req.Params, &r)
if err == nil {
rv, err = s.rpcCall(&r)
}
return
},
"subscribeNewBlock": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) {
return s.subscribeNewBlock(c, req)
},
"unsubscribeNewBlock": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) {
return s.unsubscribeNewBlock(c)
},
"subscribeNewTransaction": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) {
return s.subscribeNewTransaction(c, req)
},
"unsubscribeNewTransaction": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) {
return s.unsubscribeNewTransaction(c)
},
"subscribeAddresses": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) {
ad, nbtxs, err := s.unmarshalAddresses(req.Params)
if err == nil {
rv, err = s.subscribeAddresses(c, ad, nbtxs, req)
}
return
},
"unsubscribeAddresses": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) {
return s.unsubscribeAddresses(c)
},
"subscribeFiatRates": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) {
var r WsSubscribeFiatRatesReq
err = json.Unmarshal(req.Params, &r)
if err != nil {
return nil, err
}
r.Currency = strings.ToLower(r.Currency)
for i := range r.Tokens {
r.Tokens[i] = strings.ToLower(r.Tokens[i])
}
return s.subscribeFiatRates(c, &r, req)
},
"unsubscribeFiatRates": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) {
return s.unsubscribeFiatRates(c)
},
"ping": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) {
r := struct{}{}
return r, nil
},
"getCurrentFiatRates": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) {
r := WsCurrentFiatRatesReq{}
err = json.Unmarshal(req.Params, &r)
if err == nil {
rv, err = s.getCurrentFiatRates(r.Currencies, r.Token)
}
return
},
"getFiatRatesForTimestamps": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) {
r := WsFiatRatesForTimestampsReq{}
err = json.Unmarshal(req.Params, &r)
if err == nil {
rv, err = s.getFiatRatesForTimestamps(r.Timestamps, r.Currencies, r.Token)
}
return
},
"getFiatRatesTickersList": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) {
r := WsFiatRatesTickersListReq{}
err = json.Unmarshal(req.Params, &r)
if err == nil {
rv, err = s.getAvailableVsCurrencies(r.Timestamp, r.Token)
}
return
},
}
func (s *WebsocketServer) onRequest(c *websocketChannel, req *WsReq) {
var err error
var data interface{}
f, ok := requestHandlers[req.Method]
methodLabel := req.Method
if !ok {
methodLabel = unknownMethodLabel
}
defer func() {
if r := recover(); r != nil {
glog.Error("Client ", c.id, ", onRequest ", req.Method, " recovered from panic: ", r)
debug.PrintStack()
e := resultError{}
e.Error.Message = "Internal error"
data = e
}
// nil data means no response
if data != nil {
c.DataOut(&WsRes{
ID: req.ID,
Data: data,
})
}
s.metrics.WebsocketPendingRequests.With(common.Labels{"method": methodLabel}).Dec()
}()
t := time.Now()
s.metrics.WebsocketPendingRequests.With(common.Labels{"method": methodLabel}).Inc()
defer func() {
s.metrics.WebsocketReqDuration.With(common.Labels{"method": methodLabel}).Observe(float64(time.Since(t)) / 1e3) // in microseconds
}()
if ok {
data, err = f(s, c, req)
if err == nil {
glog.V(1).Info("Client ", c.id, " onRequest ", req.Method, " success")
s.metrics.WebsocketRequests.With(common.Labels{"method": methodLabel, "status": "success"}).Inc()
} else {
if apiErr, ok := err.(*api.APIError); !ok || !apiErr.Public {
glog.Error("Client ", c.id, " onMessage ", req.Method, ": ", errors.ErrorStack(err), ", data ", string(req.Params))
}
s.metrics.WebsocketRequests.With(common.Labels{"method": methodLabel, "status": "failure"}).Inc()
e := resultError{}
e.Error.Message = err.Error()
data = e
}
} else {
s.metrics.WebsocketUnknownMethods.With(common.Labels{"method": methodLabel}).Inc()
s.metrics.WebsocketRequests.With(common.Labels{"method": methodLabel, "status": "failure"}).Inc()
glog.V(1).Info("Client ", c.id, " onMessage ", req.Method, ": unknown method, data ", string(req.Params))
}
}
func unmarshalGetAccountInfoRequest(params []byte) (*WsAccountInfoReq, error) {
var r WsAccountInfoReq
err := json.Unmarshal(params, &r)
if err != nil {
return nil, err
}
return &r, nil
}
func (s *WebsocketServer) getAccountInfo(req *WsAccountInfoReq) (res *api.Address, err error) {
var opt api.AccountDetails
switch req.Details {
case "tokens":
opt = api.AccountDetailsTokens
case "tokenBalances":
opt = api.AccountDetailsTokenBalances
case "txids":
opt = api.AccountDetailsTxidHistory
case "txslight":
opt = api.AccountDetailsTxHistoryLight
case "txs":
opt = api.AccountDetailsTxHistory
default:
opt = api.AccountDetailsBasic
}
var tokensToReturn api.TokensToReturn
switch req.Tokens {
case "used":
tokensToReturn = api.TokensToReturnUsed
case "nonzero":
tokensToReturn = api.TokensToReturnNonzeroBalance
default:
tokensToReturn = api.TokensToReturnDerived
}
filter := api.AddressFilter{
FromHeight: uint32(req.FromHeight),
ToHeight: uint32(req.ToHeight),
Contract: req.ContractFilter,
Vout: api.AddressFilterVoutOff,
TokensToReturn: tokensToReturn,
}
if req.PageSize == 0 {
req.PageSize = txsOnPage
}
a, err := s.api.GetXpubAddress(req.Descriptor, req.Page, req.PageSize, opt, &filter, req.Gap, strings.ToLower(req.SecondaryCurrency))
if err != nil {
return s.api.GetAddress(req.Descriptor, req.Page, req.PageSize, opt, &filter, strings.ToLower(req.SecondaryCurrency))
}
return a, nil
}
func (s *WebsocketServer) getAccountUtxo(descriptor string) (api.Utxos, error) {
utxo, err := s.api.GetXpubUtxo(descriptor, false, 0)
if err != nil {
return s.api.GetAddressUtxo(descriptor, false)
}
return utxo, nil
}
func (s *WebsocketServer) getTransaction(txid string) (*api.Tx, error) {
return s.api.GetTransaction(txid, false, false)
}
func (s *WebsocketServer) getTransactionSpecific(txid string) (interface{}, error) {
return s.chain.GetTransactionSpecific(&bchain.Tx{Txid: txid})
}
func (s *WebsocketServer) getInfo() (*WsInfoRes, error) {
vi := common.GetVersionInfo()
bi := s.is.GetBackendInfo()
height, hash, err := s.db.GetBestBlock()
if err != nil {
return nil, err
}
return &WsInfoRes{
Name: s.is.Coin,
Shortcut: s.is.CoinShortcut,
Network: s.is.GetNetwork(),
Decimals: s.chainParser.AmountDecimals(),
BestHeight: int(height),
BestHash: hash,
Version: vi.Version,
Block0Hash: s.block0hash,
Testnet: s.chain.IsTestnet(),
Backend: WsBackendInfo{
Version: bi.Version,
Subversion: bi.Subversion,
ConsensusVersion: bi.ConsensusVersion,
Consensus: bi.Consensus,
},
}, nil
}
func (s *WebsocketServer) getBlockHash(height int) (*WsBlockHashRes, error) {
h, err := s.db.GetBlockHash(uint32(height))
if err != nil {
return nil, err
}
return &WsBlockHashRes{
Hash: h,
}, nil
}
func (s *WebsocketServer) getBlock(id string, page, pageSize int) (interface{}, error) {
block, err := s.api.GetBlock(id, page, pageSize)
if err != nil {
return nil, err
}
return block, nil
}
func eip1559FeesToApi(fee *bchain.Eip1559Fee) *api.Eip1559Fee {
if fee == nil {
return nil
}
apiFee := api.Eip1559Fee{}
apiFee.MaxFeePerGas = (*api.Amount)(fee.MaxFeePerGas)
apiFee.MaxPriorityFeePerGas = (*api.Amount)(fee.MaxPriorityFeePerGas)
apiFee.MaxWaitTimeEstimate = fee.MaxWaitTimeEstimate
apiFee.MinWaitTimeEstimate = fee.MinWaitTimeEstimate
return &apiFee
}
func eip1559FeeRangeToApi(feeRange []*big.Int) []*api.Amount {
if feeRange == nil {
return nil
}
apiFeeRange := make([]*api.Amount, len(feeRange))
for i := range feeRange {
apiFeeRange[i] = (*api.Amount)(feeRange[i])
}
return apiFeeRange
}
func (s *WebsocketServer) estimateFee(params []byte) (interface{}, error) {
var r WsEstimateFeeReq
err := json.Unmarshal(params, &r)
if err != nil {
return nil, err
}
res := make([]WsEstimateFeeRes, len(r.Blocks))
if s.chainParser.GetChainType() == bchain.ChainEthereumType {
gas, err := s.chain.EthereumTypeEstimateGas(r.Specific)
if err != nil {
return nil, err
}
sg := strconv.FormatUint(gas, 10)
b := 1
if len(r.Blocks) > 0 {
b = r.Blocks[0]
}
fee, err := s.api.EstimateFee(b, true)
if err != nil {
return nil, err
}
feePerTx := new(big.Int)
feePerTx.Mul(&fee, new(big.Int).SetUint64(gas))
eip1559, err := s.chain.EthereumTypeGetEip1559Fees()
if err != nil {
return nil, err
}
var eip1559Api *api.Eip1559Fees
if eip1559 != nil {
eip1559Api = &api.Eip1559Fees{}
eip1559Api.BaseFeePerGas = (*api.Amount)(eip1559.BaseFeePerGas)
eip1559Api.Instant = eip1559FeesToApi(eip1559.Instant)
eip1559Api.High = eip1559FeesToApi(eip1559.High)
eip1559Api.Medium = eip1559FeesToApi(eip1559.Medium)
eip1559Api.Low = eip1559FeesToApi(eip1559.Low)
eip1559Api.NetworkCongestion = eip1559.NetworkCongestion
eip1559Api.BaseFeeTrend = eip1559.BaseFeeTrend
eip1559Api.PriorityFeeTrend = eip1559.PriorityFeeTrend
eip1559Api.LatestPriorityFeeRange = eip1559FeeRangeToApi(eip1559.LatestPriorityFeeRange)
eip1559Api.HistoricalBaseFeeRange = eip1559FeeRangeToApi(eip1559.HistoricalBaseFeeRange)
eip1559Api.HistoricalPriorityFeeRange = eip1559FeeRangeToApi(eip1559.HistoricalPriorityFeeRange)
}
for i := range r.Blocks {
res[i].FeePerUnit = fee.String()
res[i].FeeLimit = sg
res[i].FeePerTx = feePerTx.String()
res[i].Eip1559 = eip1559Api
}
} else {
conservative := true
v, ok := r.Specific["conservative"]
if ok {
vc, ok := v.(bool)
if ok {
conservative = vc
}
}
txSize := 0
v, ok = r.Specific["txsize"]
if ok {
f, ok := v.(float64)
if ok {
txSize = int(f)
}
}
for i, b := range r.Blocks {
fee, err := s.api.EstimateFee(b, conservative)
if err != nil {
return nil, err
}
res[i].FeePerUnit = fee.String()
if txSize > 0 {
fee.Mul(&fee, big.NewInt(int64(txSize)))
fee.Add(&fee, big.NewInt(500))
fee.Div(&fee, big.NewInt(1000))
res[i].FeePerTx = fee.String()
}
}
}
return res, nil
}
func (s *WebsocketServer) longTermFeeRate() (res interface{}, err error) {
feeRate, err := s.chain.LongTermFeeRate()
if err != nil {
return nil, err
}
return WsLongTermFeeRateRes{
FeePerUnit: feeRate.FeePerUnit.String(),
Blocks: feeRate.Blocks,
}, nil
}
func (s *WebsocketServer) sendTransaction(tx string, disableAlternativeRPC bool) (res resultSendTransaction, err error) {
txid, err := s.chain.SendRawTransaction(tx, disableAlternativeRPC)
if err != nil {
return res, err
}
res.Result = txid
return
}
func (s *WebsocketServer) getMempoolFilters(r *WsMempoolFiltersReq) (res interface{}, err error) {
type resMempoolFilters struct {
ParamP uint8 `json:"P"`
ParamM uint64 `json:"M"`
ZeroedKey bool `json:"zeroedKey"`
Entries map[string]string `json:"entries"`
}
filterEntries, err := s.mempool.GetTxidFilterEntries(r.ScriptType, r.FromTimestamp)
if err != nil {
return nil, err
}
return resMempoolFilters{
ParamP: s.is.BlockGolombFilterP,
ParamM: bchain.GetGolombParamM(s.is.BlockGolombFilterP),
ZeroedKey: filterEntries.UsedZeroedKey,
Entries: filterEntries.Entries,
}, nil
}
func (s *WebsocketServer) getBlockFilter(r *WsBlockFilterReq) (res interface{}, err error) {
type resBlockFilter struct {
ParamP uint8 `json:"P"`
ParamM uint64 `json:"M"`
ZeroedKey bool `json:"zeroedKey"`
BlockFilter string `json:"blockFilter"`
}
if s.is.BlockFilterScripts != r.ScriptType {
return nil, errors.Errorf("Unsupported script type %s", r.ScriptType)
}
blockFilter, err := s.db.GetBlockFilter(r.BlockHash)
if err != nil {
return nil, err
}
return resBlockFilter{
ParamP: s.is.BlockGolombFilterP,
ParamM: bchain.GetGolombParamM(s.is.BlockGolombFilterP),
ZeroedKey: s.is.BlockFilterUseZeroedKey,
BlockFilter: blockFilter,
}, nil
}
func (s *WebsocketServer) getBlockFiltersBatch(r *WsBlockFiltersBatchReq) (res interface{}, err error) {
type resBlockFiltersBatch struct {
ParamP uint8 `json:"P"`
ParamM uint64 `json:"M"`
ZeroedKey bool `json:"zeroedKey"`
BlockFiltersBatch []string `json:"blockFiltersBatch"`
}
if s.is.BlockFilterScripts != r.ScriptType {
return nil, errors.Errorf("Unsupported script type %s", r.ScriptType)
}
blockFiltersBatch, err := s.api.GetBlockFiltersBatch(r.BlockHash, r.PageSize)
if err != nil {
return nil, err
}
return resBlockFiltersBatch{
ParamP: s.is.BlockGolombFilterP,
ParamM: bchain.GetGolombParamM(s.is.BlockGolombFilterP),
ZeroedKey: s.is.BlockFilterUseZeroedKey,
BlockFiltersBatch: blockFiltersBatch,
}, nil
}
func (s *WebsocketServer) rpcCall(r *WsRpcCallReq) (*WsRpcCallRes, error) {
if s.allowedRpcCallTo != nil {
_, ok := s.allowedRpcCallTo[strings.ToLower(r.To)]
if !ok {
return nil, errors.New("Not supported")
}
}
data, err := s.chain.EthereumTypeRpcCall(r.Data, r.To, r.From)
if err != nil {
return nil, err
}
return &WsRpcCallRes{Data: data}, nil
}
type subscriptionResponse struct {
Subscribed bool `json:"subscribed"`
}
type subscriptionResponseMessage struct {
Subscribed bool `json:"subscribed"`
Message string `json:"message"`
}
func (s *WebsocketServer) subscribeNewBlock(c *websocketChannel, req *WsReq) (res interface{}, err error) {
s.newBlockSubscriptionsLock.Lock()
defer s.newBlockSubscriptionsLock.Unlock()
s.newBlockSubscriptions[c] = req.ID
s.metrics.WebsocketSubscribes.With(common.Labels{"method": "subscribeNewBlock"}).Set(float64(len(s.newBlockSubscriptions)))
return &subscriptionResponse{true}, nil
}
func (s *WebsocketServer) unsubscribeNewBlock(c *websocketChannel) (res interface{}, err error) {
s.newBlockSubscriptionsLock.Lock()
defer s.newBlockSubscriptionsLock.Unlock()
delete(s.newBlockSubscriptions, c)
s.metrics.WebsocketSubscribes.With(common.Labels{"method": "subscribeNewBlock"}).Set(float64(len(s.newBlockSubscriptions)))
return &subscriptionResponse{false}, nil
}
func (s *WebsocketServer) subscribeNewTransaction(c *websocketChannel, req *WsReq) (res interface{}, err error) {
s.newTransactionSubscriptionsLock.Lock()
defer s.newTransactionSubscriptionsLock.Unlock()
if !s.newTransactionEnabled {
return &subscriptionResponseMessage{false, "subscribeNewTransaction not enabled, use -enablesubnewtx flag to enable."}, nil
}
s.newTransactionSubscriptions[c] = req.ID
s.metrics.WebsocketSubscribes.With(common.Labels{"method": "subscribeNewTransaction"}).Set(float64(len(s.newTransactionSubscriptions)))
return &subscriptionResponse{true}, nil
}
func (s *WebsocketServer) unsubscribeNewTransaction(c *websocketChannel) (res interface{}, err error) {
s.newTransactionSubscriptionsLock.Lock()
defer s.newTransactionSubscriptionsLock.Unlock()
if !s.newTransactionEnabled {
return &subscriptionResponseMessage{false, "unsubscribeNewTransaction not enabled, use -enablesubnewtx flag to enable."}, nil
}
delete(s.newTransactionSubscriptions, c)
s.metrics.WebsocketSubscribes.With(common.Labels{"method": "subscribeNewTransaction"}).Set(float64(len(s.newTransactionSubscriptions)))
return &subscriptionResponse{false}, nil
}
func (s *WebsocketServer) unmarshalAddresses(params []byte) ([]string, bool, error) {
r := WsSubscribeAddressesReq{}
err := json.Unmarshal(params, &r)
if err != nil {
return nil, false, err
}
rv := make([]string, len(r.Addresses))
for i, a := range r.Addresses {
ad, err := s.chainParser.GetAddrDescFromAddress(a)
if err != nil {
return nil, false, err
}
rv[i] = string(ad)
}
return rv, r.NewBlockTxs, nil
}
// doUnsubscribeAddresses removes all address subscriptions for a channel.
// addressSubscriptionsLock must be held by the caller.
func (s *WebsocketServer) doUnsubscribeAddresses(c *websocketChannel) {
for _, ads := range c.addrDescs {
sa, e := s.addressSubscriptions[ads]
if e {
for sc, details := range sa {
if sc == c {
if details.publishNewBlockTxs {
s.newBlockTxsSubscriptionCount--
}
delete(sa, c)
}
}
if len(sa) == 0 {
delete(s.addressSubscriptions, ads)
}
}
}
c.addrDescs = nil
}
// subscribeAddresses replaces previous address subscriptions for the channel.
// If newBlockTxs is enabled, the channel receives both mempool notifications and
// confirmed notifications detected from newly connected blocks.
func (s *WebsocketServer) subscribeAddresses(c *websocketChannel, addrDesc []string, newBlockTxs bool, req *WsReq) (res interface{}, err error) {
s.addressSubscriptionsLock.Lock()
defer s.addressSubscriptionsLock.Unlock()
// unsubscribe all previous subscriptions
s.doUnsubscribeAddresses(c)
for _, ads := range addrDesc {
as, ok := s.addressSubscriptions[ads]
if !ok {
as = make(map[*websocketChannel]*addressDetails)
s.addressSubscriptions[ads] = as
}
as[c] = &addressDetails{
requestID: req.ID,
publishNewBlockTxs: newBlockTxs,
}
if newBlockTxs {
s.newBlockTxsSubscriptionCount++
}
}
c.addrDescs = addrDesc
s.metrics.WebsocketSubscribes.With(common.Labels{"method": "subscribeAddresses"}).Set(float64(len(s.addressSubscriptions)))
s.metrics.WebsocketNewBlockTxsSubscriptions.Set(float64(s.newBlockTxsSubscriptionCount))
return &subscriptionResponse{true}, nil
}
// unsubscribeAddresses unsubscribes all address subscriptions by this channel
func (s *WebsocketServer) unsubscribeAddresses(c *websocketChannel) (res interface{}, err error) {
s.addressSubscriptionsLock.Lock()
defer s.addressSubscriptionsLock.Unlock()
s.doUnsubscribeAddresses(c)
s.metrics.WebsocketSubscribes.With(common.Labels{"method": "subscribeAddresses"}).Set(float64(len(s.addressSubscriptions)))
s.metrics.WebsocketNewBlockTxsSubscriptions.Set(float64(s.newBlockTxsSubscriptionCount))
return &subscriptionResponse{false}, nil
}
// doUnsubscribeFiatRates fiat rates without fiatRatesSubscriptionsLock - can be called only from subscribeFiatRates and unsubscribeFiatRates
func (s *WebsocketServer) doUnsubscribeFiatRates(c *websocketChannel) {
for fr, sa := range s.fiatRatesSubscriptions {
for sc := range sa {
if sc == c {
delete(sa, c)
}
}
if len(sa) == 0 {
delete(s.fiatRatesSubscriptions, fr)
}
}
delete(s.fiatRatesTokenSubscriptions, c)
}
// subscribeFiatRates subscribes all FiatRates subscriptions by this channel
func (s *WebsocketServer) subscribeFiatRates(c *websocketChannel, d *WsSubscribeFiatRatesReq, req *WsReq) (res interface{}, err error) {
s.fiatRatesSubscriptionsLock.Lock()
defer s.fiatRatesSubscriptionsLock.Unlock()
// unsubscribe all previous subscriptions
s.doUnsubscribeFiatRates(c)
currency := d.Currency
if currency == "" {
currency = allFiatRates
} else {
currency = strings.ToLower(currency)
}
as, ok := s.fiatRatesSubscriptions[currency]
if !ok {
as = make(map[*websocketChannel]string)
s.fiatRatesSubscriptions[currency] = as
}
as[c] = req.ID
if len(d.Tokens) != 0 {
s.fiatRatesTokenSubscriptions[c] = d.Tokens
}
s.metrics.WebsocketSubscribes.With(common.Labels{"method": "subscribeFiatRates"}).Set(float64(len(s.fiatRatesSubscriptions)))
return &subscriptionResponse{true}, nil
}
// unsubscribeFiatRates unsubscribes all FiatRates subscriptions by this channel
func (s *WebsocketServer) unsubscribeFiatRates(c *websocketChannel) (res interface{}, err error) {
s.fiatRatesSubscriptionsLock.Lock()
defer s.fiatRatesSubscriptionsLock.Unlock()
s.doUnsubscribeFiatRates(c)
s.metrics.WebsocketSubscribes.With(common.Labels{"method": "subscribeFiatRates"}).Set(float64(len(s.fiatRatesSubscriptions)))
return &subscriptionResponse{false}, nil
}
func (s *WebsocketServer) onNewBlockAsync(hash string, height uint32) {
s.newBlockSubscriptionsLock.Lock()
defer s.newBlockSubscriptionsLock.Unlock()
data := struct {
Height uint32 `json:"height"`
Hash string `json:"hash"`
}{
Height: height,
Hash: hash,
}
for c, id := range s.newBlockSubscriptions {
c.DataOut(&WsRes{
ID: id,
Data: &data,
})
}
glog.Info("broadcasting new block ", height, " ", hash, " to ", len(s.newBlockSubscriptions), " channels")
}
// setConfirmedBlockTxMetadata normalizes parsed block transactions.
// ParseBlock can return txs with zero confirmations; we force first-confirmed
// metadata so conversion does not take mempool-only branches.
func setConfirmedBlockTxMetadata(tx *bchain.Tx, blockTime int64) {
if tx.Confirmations == 0 {
tx.Confirmations = 1
tx.Blocktime = blockTime
tx.Time = blockTime
}
}
// getEthereumInternalTransfers safely extracts internal transfers from
// CoinSpecificData when present.
func getEthereumInternalTransfers(tx *bchain.Tx) []bchain.EthereumInternalTransfer {
esd, ok := tx.CoinSpecificData.(bchain.EthereumSpecificData)
if !ok || esd.InternalData == nil {
return nil
}
return esd.InternalData.Transfers
}
// setEthereumReceiptIfAvailable adds receipt data to Ethereum txs on a
// best-effort basis; failures are logged and notifications continue.
func setEthereumReceiptIfAvailable(tx *bchain.Tx, getReceipt func(string) (*bchain.RpcReceipt, error)) string {
csd, ok := tx.CoinSpecificData.(bchain.EthereumSpecificData)
if !ok {
return "skipped_non_eth"
}
receipt, err := getReceipt(tx.Txid)
if err != nil {
glog.Error("EthereumTypeGetTransactionReceipt error ", err, " for ", tx.Txid)
return "error"
}
csd.Receipt = receipt
tx.CoinSpecificData = csd
return "success"
}
func observeNewBlockTxDuration(metrics *common.Metrics, stage string, started time.Time) {
if metrics == nil {
return
}
metrics.WebsocketNewBlockTxsDuration.With(common.Labels{"stage": stage}).Observe(time.Since(started).Seconds())
}
func incNewBlockTxMetric(metrics *common.Metrics, stage, status string, value float64) {
if metrics == nil {
return
}
counter := metrics.WebsocketNewBlockTxs.With(common.Labels{"stage": stage, "status": status})
if value == 1 {
counter.Inc()
} else {
counter.Add(value)
}
}
// populateBitcoinVinAddrDescs fills missing vin address descriptors by loading
// previous outputs. This enables sender-side address subscription matching for
// Bitcoin transactions parsed from connected blocks.
func populateBitcoinVinAddrDescs(vins []bchain.MempoolVin, getAddrDesc func(string, uint32) (bchain.AddressDescriptor, error)) {
if getAddrDesc == nil {
return
}
for i := range vins {
if len(vins[i].AddrDesc) > 0 || vins[i].Txid == "" {
continue
}
addrDesc, err := getAddrDesc(vins[i].Txid, vins[i].Vout)
if err == nil && len(addrDesc) > 0 {
vins[i].AddrDesc = addrDesc
}
}
}
// getBitcoinVinAddrDesc resolves an input outpoint to an address descriptor
// using txCache. It is best-effort and can return chain-level not-found errors.
func (s *WebsocketServer) getBitcoinVinAddrDesc(txid string, vout uint32) (bchain.AddressDescriptor, error) {
if s.txCache == nil {
return nil, bchain.ErrTxNotFound
}
prevTx, _, err := s.txCache.GetTransaction(txid)
if err != nil {
return nil, err
}
if int(vout) >= len(prevTx.Vout) {
return nil, bchain.ErrAddressMissing
}
return s.chainParser.GetAddrDescFromVout(&prevTx.Vout[vout])
}
// publishNewBlockTxsByAddr emits confirmed transaction notifications only for
// subscribed addresses touched by transactions in the connected block.
func (s *WebsocketServer) publishNewBlockTxsByAddr(block *bchain.Block) {
blockStart := time.Now()
defer observeNewBlockTxDuration(s.metrics, "per_block", blockStart)
chainType := s.chainParser.GetChainType()
for _, tx := range block.Txs {
incNewBlockTxMetric(s.metrics, "scanned", "success", 1)
setConfirmedBlockTxMetadata(&tx, block.Time)
var tokenTransfers bchain.TokenTransfers
var internalTransfers []bchain.EthereumInternalTransfer
if chainType == bchain.ChainEthereumType {
tokenTransfers, _ = s.chainParser.EthereumTypeGetTokenTransfersFromTx(&tx)
internalTransfers = getEthereumInternalTransfers(&tx)
}
vins := make([]bchain.MempoolVin, len(tx.Vin))
for i, vin := range tx.Vin {
vins[i] = bchain.MempoolVin{Vin: vin}
}
if chainType == bchain.ChainBitcoinType {
populateBitcoinVinAddrDescs(vins, s.getBitcoinVinAddrDesc)
}
matchStart := time.Now()
subscribed := s.getNewTxSubscriptions(vins, tx.Vout, tokenTransfers, internalTransfers)
observeNewBlockTxDuration(s.metrics, "match", matchStart)
if len(subscribed) > 0 {
incNewBlockTxMetric(s.metrics, "matched", "success", 1)
// Convert and publish asynchronously so heavy tx conversion does not
// block processing of other transactions in the same block.
go func(tx bchain.Tx, subscribed map[string]struct{}) {
if chainType == bchain.ChainEthereumType {
receiptStatus := setEthereumReceiptIfAvailable(&tx, s.chain.EthereumTypeGetTransactionReceipt)
if s.metrics != nil {
s.metrics.WebsocketEthReceipt.With(common.Labels{"status": receiptStatus}).Inc()
}
}
convertStart := time.Now()
atx, err := s.api.GetTransactionFromBchainTx(&tx, int(block.Height), false, false, nil)
observeNewBlockTxDuration(s.metrics, "convert", convertStart)
if err != nil {
incNewBlockTxMetric(s.metrics, "converted", "failure", 1)
glog.Error("GetTransactionFromBchainTx error ", err, " for ", tx.Txid)
return
}
incNewBlockTxMetric(s.metrics, "converted", "success", 1)
for stringAddressDescriptor := range subscribed {
s.sendOnNewTxAddr(stringAddressDescriptor, atx, true)
}
incNewBlockTxMetric(s.metrics, "published", "success", float64(len(subscribed)))
}(tx, subscribed)
}
}
}
// OnNewBlock is a callback that broadcasts info about new block to subscribed clients
func (s *WebsocketServer) OnNewBlock(block *bchain.Block) {
s.addressSubscriptionsLock.Lock()
defer s.addressSubscriptionsLock.Unlock()
go s.onNewBlockAsync(block.Hash, block.Height)
if s.newBlockTxsSubscriptionCount > 0 {
// Skip per-tx address matching when nobody opted into newBlockTxs.
go s.publishNewBlockTxsByAddr(block)
}
}
func (s *WebsocketServer) sendOnNewTx(tx *api.Tx) {
s.newTransactionSubscriptionsLock.Lock()
defer s.newTransactionSubscriptionsLock.Unlock()
for c, id := range s.newTransactionSubscriptions {
c.DataOut(&WsRes{
ID: id,
Data: &tx,
})
}
glog.Info("broadcasting new tx ", tx.Txid, " to ", len(s.newTransactionSubscriptions), " channels")
}
func (s *WebsocketServer) sendOnNewTxAddr(stringAddressDescriptor string, tx *api.Tx, newBlockTx bool) {
addrDesc := bchain.AddressDescriptor(stringAddressDescriptor)
addr, _, err := s.chainParser.GetAddressesFromAddrDesc(addrDesc)
if err != nil {
glog.Error("GetAddressesFromAddrDesc error ", err, " for ", addrDesc)
return
}
if len(addr) == 1 {
data := struct {
Address string `json:"address"`
Tx *api.Tx `json:"tx"`
}{
Address: addr[0],
Tx: tx,
}
s.addressSubscriptionsLock.Lock()
defer s.addressSubscriptionsLock.Unlock()
as, ok := s.addressSubscriptions[stringAddressDescriptor]
if ok {
source := "mempool"
if newBlockTx {
source = "new_block"
}
for c, details := range as {
// Mempool notifications go to all address subscribers; confirmed
// block notifications only go to subscribers that requested them.
if newBlockTx && !details.publishNewBlockTxs {
continue
}
if s.metrics != nil {
s.metrics.WebsocketAddrNotifications.With(common.Labels{"source": source}).Inc()
}
c.DataOut(&WsRes{
ID: details.requestID,
Data: &data,
})
}
glog.Info("broadcasting new tx ", tx.Txid, ", addr ", addr[0], " to ", len(as), " channels")
}
}
}
func (s *WebsocketServer) getNewTxSubscriptions(vins []bchain.MempoolVin, vouts []bchain.Vout, tokenTransfers bchain.TokenTransfers, internalTransfers []bchain.EthereumInternalTransfer) map[string]struct{} {
// check if there is any subscription in inputs, outputs and transfers
s.addressSubscriptionsLock.Lock()
defer s.addressSubscriptionsLock.Unlock()
subscribed := make(map[string]struct{})
processAddress := func(address string) {
if addrDesc, err := s.chainParser.GetAddrDescFromAddress(address); err == nil && len(addrDesc) > 0 {
sad := string(addrDesc)
if as, ok := s.addressSubscriptions[sad]; ok && len(as) > 0 {
subscribed[sad] = struct{}{}
}
}
}
processVout := func(vout bchain.Vout) {
if addrDesc, err := s.chainParser.GetAddrDescFromVout(&vout); err == nil && len(addrDesc) > 0 {
sad := string(addrDesc)
if as, ok := s.addressSubscriptions[sad]; ok && len(as) > 0 {
subscribed[sad] = struct{}{}
}
}
}
for i := range vins {
if sad := string(vins[i].AddrDesc); len(sad) > 0 {
if as, ok := s.addressSubscriptions[sad]; ok && len(as) > 0 {
subscribed[sad] = struct{}{}
}
} else if s.chainParser.GetChainType() == bchain.ChainBitcoinType {
vout := int(vins[i].Vout)
if vout >= 0 && vout < len(vouts) {
processVout(vouts[vins[i].Vout])
}
} else if s.chainParser.GetChainType() == bchain.ChainEthereumType {
if len(vins[i].Addresses) > 0 {
processAddress(vins[i].Addresses[0])
}
}
}
for i := range vouts {
processVout(vouts[i])
}
for i := range tokenTransfers {
processAddress(tokenTransfers[i].From)
processAddress(tokenTransfers[i].To)
}
for i := range internalTransfers {
processAddress(internalTransfers[i].From)
processAddress(internalTransfers[i].To)
}
return subscribed
}
func (s *WebsocketServer) onNewTxAsync(tx *bchain.MempoolTx, subscribed map[string]struct{}) {
atx, err := s.api.GetTransactionFromMempoolTx(tx)
if err != nil {
glog.Error("GetTransactionFromMempoolTx error ", err, " for ", tx.Txid)
return
}
s.sendOnNewTx(atx)
for stringAddressDescriptor := range subscribed {
s.sendOnNewTxAddr(stringAddressDescriptor, atx, false)
}
}
// OnNewTx is a callback that broadcasts info about a tx affecting subscribed address
func (s *WebsocketServer) OnNewTx(tx *bchain.MempoolTx) {
subscribed := s.getNewTxSubscriptions(tx.Vin, tx.Vout, tx.TokenTransfers, nil)
if len(s.newTransactionSubscriptions) > 0 || len(subscribed) > 0 {
go s.onNewTxAsync(tx, subscribed)
}
}
func (s *WebsocketServer) broadcastTicker(currency string, rates map[string]float32, ticker *common.CurrencyRatesTicker) {
as, ok := s.fiatRatesSubscriptions[currency]
if ok && len(as) > 0 {
data := struct {
Rates interface{} `json:"rates"`
}{
Rates: rates,
}
for c, id := range as {
var tokens []string
if ticker != nil {
tokens = s.fiatRatesTokenSubscriptions[c]
}
if len(tokens) > 0 {
dataWithTokens := struct {
Rates interface{} `json:"rates"`
TokenRates map[string]float32 `json:"tokenRates,omitempty"`
}{
Rates: rates,
TokenRates: map[string]float32{},
}
for _, token := range tokens {
rate := ticker.TokenRateInCurrency(token, currency)
if rate > 0 {
dataWithTokens.TokenRates[token] = rate
}
}
c.DataOut(&WsRes{
ID: id,
Data: &dataWithTokens,
})
} else {
c.DataOut(&WsRes{
ID: id,
Data: &data,
})
}
}
glog.Info("broadcasting new rates for currency ", currency, " to ", len(as), " channels")
}
}
// OnNewFiatRatesTicker is a callback that broadcasts info about fiat rates affecting subscribed currency
func (s *WebsocketServer) OnNewFiatRatesTicker(ticker *common.CurrencyRatesTicker) {
s.fiatRatesSubscriptionsLock.Lock()
defer s.fiatRatesSubscriptionsLock.Unlock()
for currency, rate := range ticker.Rates {
s.broadcastTicker(currency, map[string]float32{currency: rate}, ticker)
}
s.broadcastTicker(allFiatRates, ticker.Rates, nil)
}
func (s *WebsocketServer) getCurrentFiatRates(currencies []string, token string) (*api.FiatTicker, error) {
ret, err := s.api.GetCurrentFiatRates(currencies, strings.ToLower(token))
return ret, err
}
func (s *WebsocketServer) getFiatRatesForTimestamps(timestamps []int64, currencies []string, token string) (*api.FiatTickers, error) {
ret, err := s.api.GetFiatRatesForTimestamps(timestamps, currencies, strings.ToLower(token))
return ret, err
}
func (s *WebsocketServer) getAvailableVsCurrencies(timestamp int64, token string) (*api.AvailableVsCurrencies, error) {
ret, err := s.api.GetAvailableVsCurrencies(timestamp, strings.ToLower(token))
return ret, err
}