package server import ( "encoding/json" "math/big" "net/http" "os" "runtime/debug" "strconv" "strings" "sync" "sync/atomic" "time" "github.com/golang/glog" "github.com/gorilla/websocket" "github.com/juju/errors" "github.com/trezor/blockbook/api" "github.com/trezor/blockbook/bchain" "github.com/trezor/blockbook/common" "github.com/trezor/blockbook/db" "github.com/trezor/blockbook/fiat" ) const upgradeFailed = "Upgrade failed: " const outChannelSize = 500 const defaultTimeout = 60 * time.Second const unknownMethodLabel = "unknown" // allRates is a special "currency" parameter that means all available currencies const allFiatRates = "!ALL!" var ( // ErrorMethodNotAllowed is returned when client tries to upgrade method other than GET ErrorMethodNotAllowed = errors.New("Method not allowed") connectionCounter uint64 ) type websocketChannel struct { id uint64 conn *websocket.Conn out chan *WsRes ip string requestHeader http.Header alive bool aliveLock sync.Mutex closeReason string addrDescs []string // subscribed address descriptors as strings getAddressInfoDescriptorsMux sync.Mutex getAddressInfoDescriptors map[string]struct{} } type addressDetails struct { requestID string // publishNewBlockTxs enables notifications for confirmed transactions // detected while processing newly connected blocks. publishNewBlockTxs bool } // WebsocketServer is a handle to websocket server type WebsocketServer struct { upgrader *websocket.Upgrader db *db.RocksDB txCache *db.TxCache chain bchain.BlockChain chainParser bchain.BlockChainParser mempool bchain.Mempool metrics *common.Metrics is *common.InternalState api *api.Worker block0hash string newBlockSubscriptions map[*websocketChannel]string newBlockSubscriptionsLock sync.Mutex newTransactionEnabled bool newTransactionSubscriptions map[*websocketChannel]string newTransactionSubscriptionsLock sync.Mutex addressSubscriptions map[string]map[*websocketChannel]*addressDetails addressSubscriptionsLock sync.Mutex // newBlockTxsSubscriptionCount is a fast-path guard for OnNewBlock. // It tracks how many address subscriptions requested newBlockTxs=true. newBlockTxsSubscriptionCount int fiatRatesSubscriptions map[string]map[*websocketChannel]string fiatRatesTokenSubscriptions map[*websocketChannel][]string fiatRatesSubscriptionsLock sync.Mutex allowedRpcCallTo map[string]struct{} } // NewWebsocketServer creates new websocket interface to blockbook and returns its handle func NewWebsocketServer(db *db.RocksDB, chain bchain.BlockChain, mempool bchain.Mempool, txCache *db.TxCache, metrics *common.Metrics, is *common.InternalState, fiatRates *fiat.FiatRates) (*WebsocketServer, error) { api, err := api.NewWorker(db, chain, mempool, txCache, metrics, is, fiatRates) if err != nil { return nil, err } b0, err := db.GetBlockHash(0) if err != nil { return nil, err } s := &WebsocketServer{ upgrader: &websocket.Upgrader{ ReadBufferSize: 1024 * 32, WriteBufferSize: 1024 * 32, CheckOrigin: checkOrigin, EnableCompression: true, }, db: db, txCache: txCache, chain: chain, chainParser: chain.GetChainParser(), mempool: mempool, metrics: metrics, is: is, api: api, block0hash: b0, newBlockSubscriptions: make(map[*websocketChannel]string), newTransactionEnabled: is.EnableSubNewTx, newTransactionSubscriptions: make(map[*websocketChannel]string), addressSubscriptions: make(map[string]map[*websocketChannel]*addressDetails), fiatRatesSubscriptions: make(map[string]map[*websocketChannel]string), fiatRatesTokenSubscriptions: make(map[*websocketChannel][]string), } envRpcCall := os.Getenv(strings.ToUpper(is.GetNetwork()) + "_ALLOWED_RPC_CALL_TO") if envRpcCall != "" { s.allowedRpcCallTo = make(map[string]struct{}) for _, c := range strings.Split(envRpcCall, ",") { s.allowedRpcCallTo[strings.ToLower(c)] = struct{}{} } glog.Info("Support of rpcCall for these contracts: ", envRpcCall) } if s.metrics != nil { s.metrics.WebsocketNewBlockTxsSubscriptions.Set(0) } return s, nil } // allow all origins func checkOrigin(r *http.Request) bool { return true } func getIP(r *http.Request) string { ip := r.Header.Get("cf-connecting-ip") if ip != "" { return ip } ip = r.Header.Get("X-Real-Ip") if ip != "" { return ip } return r.RemoteAddr } // ServeHTTP sets up handler of websocket channel func (s *WebsocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { if r.Method != "GET" { http.Error(w, upgradeFailed+ErrorMethodNotAllowed.Error(), http.StatusServiceUnavailable) return } conn, err := s.upgrader.Upgrade(w, r, nil) if err != nil { http.Error(w, upgradeFailed+err.Error(), http.StatusServiceUnavailable) return } c := &websocketChannel{ id: atomic.AddUint64(&connectionCounter, 1), conn: conn, out: make(chan *WsRes, outChannelSize), ip: getIP(r), requestHeader: r.Header, alive: true, } if s.is.WsGetAccountInfoLimit > 0 { c.getAddressInfoDescriptors = make(map[string]struct{}) } go s.inputLoop(c) go s.outputLoop(c) s.onConnect(c) } // GetHandler returns http handler func (s *WebsocketServer) GetHandler() http.Handler { return s } func (s *WebsocketServer) closeChannel(c *websocketChannel, reason string) bool { if closed, closeReason := c.CloseOut(reason); closed { if s.metrics != nil { s.metrics.WebsocketChannelCloses.With(common.Labels{"reason": closeReason}).Inc() } c.conn.Close() s.onDisconnect(c) return true } return false } func (c *websocketChannel) CloseOut(reason string) (bool, string) { c.aliveLock.Lock() defer c.aliveLock.Unlock() if c.alive { c.alive = false if c.closeReason == "" { c.closeReason = reason } closeReason := c.closeReason //clean out close(c.out) for len(c.out) > 0 { <-c.out } return true, closeReason } return false, "" } func (c *websocketChannel) DataOut(data *WsRes) { c.aliveLock.Lock() defer c.aliveLock.Unlock() if c.alive { if len(c.out) < outChannelSize-1 { c.out <- data } else { glog.Warning("Channel ", c.id, " overflow, closing") if c.closeReason == "" { c.closeReason = "overflow" } // close the connection but do not call CloseOut - would call duplicate c.aliveLock.Lock // CloseOut will be called because the closed connection will cause break in the inputLoop c.conn.Close() } } } func (s *WebsocketServer) inputLoop(c *websocketChannel) { defer func() { if r := recover(); r != nil { glog.Error("recovered from panic: ", r, ", ", c.id) debug.PrintStack() s.closeChannel(c, "panic") } }() for { t, d, err := c.conn.ReadMessage() if err != nil { s.closeChannel(c, "read_error") return } switch t { case websocket.TextMessage: var req WsReq err := json.Unmarshal(d, &req) if err != nil { glog.Error("Error parsing message from ", c.id, ", ", string(d), ", ", err) s.closeChannel(c, "protocol_error") return } go s.onRequest(c, &req) case websocket.BinaryMessage: glog.Error("Binary message received from ", c.id, ", ", c.ip) s.closeChannel(c, "protocol_error") return case websocket.PingMessage: c.conn.WriteControl(websocket.PongMessage, nil, time.Now().Add(defaultTimeout)) case websocket.CloseMessage: s.closeChannel(c, "client_close") return case websocket.PongMessage: // do nothing } } } func (s *WebsocketServer) outputLoop(c *websocketChannel) { defer func() { if r := recover(); r != nil { glog.Error("recovered from panic: ", r, ", ", c.id) s.closeChannel(c, "panic") } }() for m := range c.out { err := c.conn.WriteJSON(m) if err != nil { glog.Error("Error sending message to ", c.id, ", ", err) s.closeChannel(c, "write_error") return } } } func (s *WebsocketServer) onConnect(c *websocketChannel) { glog.Info("Client connected ", c.id, ", ", c.ip) s.metrics.WebsocketClients.Inc() } func (s *WebsocketServer) onDisconnect(c *websocketChannel) { s.unsubscribeNewBlock(c) s.unsubscribeNewTransaction(c) s.unsubscribeAddresses(c) s.unsubscribeFiatRates(c) glog.Info("Client disconnected ", c.id, ", ", c.ip) s.metrics.WebsocketClients.Dec() } var requestHandlers = map[string]func(*WebsocketServer, *websocketChannel, *WsReq) (interface{}, error){ "getAccountInfo": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) { r, err := unmarshalGetAccountInfoRequest(req.Params) if err == nil { if s.is.WsGetAccountInfoLimit > 0 { c.getAddressInfoDescriptorsMux.Lock() c.getAddressInfoDescriptors[r.Descriptor] = struct{}{} l := len(c.getAddressInfoDescriptors) c.getAddressInfoDescriptorsMux.Unlock() if l > s.is.WsGetAccountInfoLimit { if s.closeChannel(c, "limit_exceeded") { glog.Info("Client ", c.id, " exceeded getAddressInfo limit, ", c.ip) s.is.AddWsLimitExceedingIP(c.ip) } return } } rv, err = s.getAccountInfo(r) } return }, "getInfo": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) { return s.getInfo() }, "getBlockHash": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) { r := WsBlockHashReq{} err = json.Unmarshal(req.Params, &r) if err == nil { rv, err = s.getBlockHash(r.Height) } return }, "getBlock": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) { if !s.is.ExtendedIndex { return nil, errors.New("Not supported") } r := WsBlockReq{} err = json.Unmarshal(req.Params, &r) if r.PageSize == 0 { r.PageSize = 1000000 } if err == nil { rv, err = s.getBlock(r.Id, r.Page, r.PageSize) } return }, "getAccountUtxo": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) { r := WsAccountUtxoReq{} err = json.Unmarshal(req.Params, &r) if err == nil { rv, err = s.getAccountUtxo(r.Descriptor) } return }, "getBalanceHistory": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) { r := WsBalanceHistoryReq{} err = json.Unmarshal(req.Params, &r) if err == nil { if r.From <= 0 { r.From = 0 } if r.To <= 0 { r.To = 0 } if r.GroupBy <= 0 { r.GroupBy = 3600 } rv, err = s.api.GetXpubBalanceHistory(r.Descriptor, r.From, r.To, r.Currencies, r.Gap, r.GroupBy) if err != nil { rv, err = s.api.GetBalanceHistory(r.Descriptor, r.From, r.To, r.Currencies, r.GroupBy) } } return }, "getTransaction": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) { r := WsTransactionReq{} err = json.Unmarshal(req.Params, &r) if err == nil { rv, err = s.getTransaction(r.Txid) } return }, "getTransactionSpecific": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) { r := WsTransactionSpecificReq{} err = json.Unmarshal(req.Params, &r) if err == nil { rv, err = s.getTransactionSpecific(r.Txid) } return }, "estimateFee": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) { return s.estimateFee(req.Params) }, "longTermFeeRate": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) { return s.longTermFeeRate() }, "sendTransaction": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) { r := WsSendTransactionReq{} err = json.Unmarshal(req.Params, &r) if err == nil { rv, err = s.sendTransaction(r.Hex, r.DisableAlternativeRPC) } return }, "getMempoolFilters": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) { r := WsMempoolFiltersReq{} err = json.Unmarshal(req.Params, &r) if err == nil { rv, err = s.getMempoolFilters(&r) } return }, "getBlockFilter": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) { r := WsBlockFilterReq{} err = json.Unmarshal(req.Params, &r) if err == nil { rv, err = s.getBlockFilter(&r) } return }, "getBlockFiltersBatch": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) { r := WsBlockFiltersBatchReq{} err = json.Unmarshal(req.Params, &r) if err == nil { rv, err = s.getBlockFiltersBatch(&r) } return }, "rpcCall": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) { r := WsRpcCallReq{} err = json.Unmarshal(req.Params, &r) if err == nil { rv, err = s.rpcCall(&r) } return }, "subscribeNewBlock": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) { return s.subscribeNewBlock(c, req) }, "unsubscribeNewBlock": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) { return s.unsubscribeNewBlock(c) }, "subscribeNewTransaction": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) { return s.subscribeNewTransaction(c, req) }, "unsubscribeNewTransaction": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) { return s.unsubscribeNewTransaction(c) }, "subscribeAddresses": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) { ad, nbtxs, err := s.unmarshalAddresses(req.Params) if err == nil { rv, err = s.subscribeAddresses(c, ad, nbtxs, req) } return }, "unsubscribeAddresses": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) { return s.unsubscribeAddresses(c) }, "subscribeFiatRates": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) { var r WsSubscribeFiatRatesReq err = json.Unmarshal(req.Params, &r) if err != nil { return nil, err } r.Currency = strings.ToLower(r.Currency) for i := range r.Tokens { r.Tokens[i] = strings.ToLower(r.Tokens[i]) } return s.subscribeFiatRates(c, &r, req) }, "unsubscribeFiatRates": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) { return s.unsubscribeFiatRates(c) }, "ping": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) { r := struct{}{} return r, nil }, "getCurrentFiatRates": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) { r := WsCurrentFiatRatesReq{} err = json.Unmarshal(req.Params, &r) if err == nil { rv, err = s.getCurrentFiatRates(r.Currencies, r.Token) } return }, "getFiatRatesForTimestamps": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) { r := WsFiatRatesForTimestampsReq{} err = json.Unmarshal(req.Params, &r) if err == nil { rv, err = s.getFiatRatesForTimestamps(r.Timestamps, r.Currencies, r.Token) } return }, "getFiatRatesTickersList": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) { r := WsFiatRatesTickersListReq{} err = json.Unmarshal(req.Params, &r) if err == nil { rv, err = s.getAvailableVsCurrencies(r.Timestamp, r.Token) } return }, } func (s *WebsocketServer) onRequest(c *websocketChannel, req *WsReq) { var err error var data interface{} f, ok := requestHandlers[req.Method] methodLabel := req.Method if !ok { methodLabel = unknownMethodLabel } defer func() { if r := recover(); r != nil { glog.Error("Client ", c.id, ", onRequest ", req.Method, " recovered from panic: ", r) debug.PrintStack() e := resultError{} e.Error.Message = "Internal error" data = e } // nil data means no response if data != nil { c.DataOut(&WsRes{ ID: req.ID, Data: data, }) } s.metrics.WebsocketPendingRequests.With(common.Labels{"method": methodLabel}).Dec() }() t := time.Now() s.metrics.WebsocketPendingRequests.With(common.Labels{"method": methodLabel}).Inc() defer func() { s.metrics.WebsocketReqDuration.With(common.Labels{"method": methodLabel}).Observe(float64(time.Since(t)) / 1e3) // in microseconds }() if ok { data, err = f(s, c, req) if err == nil { glog.V(1).Info("Client ", c.id, " onRequest ", req.Method, " success") s.metrics.WebsocketRequests.With(common.Labels{"method": methodLabel, "status": "success"}).Inc() } else { if apiErr, ok := err.(*api.APIError); !ok || !apiErr.Public { glog.Error("Client ", c.id, " onMessage ", req.Method, ": ", errors.ErrorStack(err), ", data ", string(req.Params)) } s.metrics.WebsocketRequests.With(common.Labels{"method": methodLabel, "status": "failure"}).Inc() e := resultError{} e.Error.Message = err.Error() data = e } } else { s.metrics.WebsocketUnknownMethods.With(common.Labels{"method": methodLabel}).Inc() s.metrics.WebsocketRequests.With(common.Labels{"method": methodLabel, "status": "failure"}).Inc() glog.V(1).Info("Client ", c.id, " onMessage ", req.Method, ": unknown method, data ", string(req.Params)) } } func unmarshalGetAccountInfoRequest(params []byte) (*WsAccountInfoReq, error) { var r WsAccountInfoReq err := json.Unmarshal(params, &r) if err != nil { return nil, err } return &r, nil } func (s *WebsocketServer) getAccountInfo(req *WsAccountInfoReq) (res *api.Address, err error) { var opt api.AccountDetails switch req.Details { case "tokens": opt = api.AccountDetailsTokens case "tokenBalances": opt = api.AccountDetailsTokenBalances case "txids": opt = api.AccountDetailsTxidHistory case "txslight": opt = api.AccountDetailsTxHistoryLight case "txs": opt = api.AccountDetailsTxHistory default: opt = api.AccountDetailsBasic } var tokensToReturn api.TokensToReturn switch req.Tokens { case "used": tokensToReturn = api.TokensToReturnUsed case "nonzero": tokensToReturn = api.TokensToReturnNonzeroBalance default: tokensToReturn = api.TokensToReturnDerived } filter := api.AddressFilter{ FromHeight: uint32(req.FromHeight), ToHeight: uint32(req.ToHeight), Contract: req.ContractFilter, Vout: api.AddressFilterVoutOff, TokensToReturn: tokensToReturn, } if req.PageSize == 0 { req.PageSize = txsOnPage } a, err := s.api.GetXpubAddress(req.Descriptor, req.Page, req.PageSize, opt, &filter, req.Gap, strings.ToLower(req.SecondaryCurrency)) if err != nil { return s.api.GetAddress(req.Descriptor, req.Page, req.PageSize, opt, &filter, strings.ToLower(req.SecondaryCurrency)) } return a, nil } func (s *WebsocketServer) getAccountUtxo(descriptor string) (api.Utxos, error) { utxo, err := s.api.GetXpubUtxo(descriptor, false, 0) if err != nil { return s.api.GetAddressUtxo(descriptor, false) } return utxo, nil } func (s *WebsocketServer) getTransaction(txid string) (*api.Tx, error) { return s.api.GetTransaction(txid, false, false) } func (s *WebsocketServer) getTransactionSpecific(txid string) (interface{}, error) { return s.chain.GetTransactionSpecific(&bchain.Tx{Txid: txid}) } func (s *WebsocketServer) getInfo() (*WsInfoRes, error) { vi := common.GetVersionInfo() bi := s.is.GetBackendInfo() height, hash, err := s.db.GetBestBlock() if err != nil { return nil, err } return &WsInfoRes{ Name: s.is.Coin, Shortcut: s.is.CoinShortcut, Network: s.is.GetNetwork(), Decimals: s.chainParser.AmountDecimals(), BestHeight: int(height), BestHash: hash, Version: vi.Version, Block0Hash: s.block0hash, Testnet: s.chain.IsTestnet(), Backend: WsBackendInfo{ Version: bi.Version, Subversion: bi.Subversion, ConsensusVersion: bi.ConsensusVersion, Consensus: bi.Consensus, }, }, nil } func (s *WebsocketServer) getBlockHash(height int) (*WsBlockHashRes, error) { h, err := s.db.GetBlockHash(uint32(height)) if err != nil { return nil, err } return &WsBlockHashRes{ Hash: h, }, nil } func (s *WebsocketServer) getBlock(id string, page, pageSize int) (interface{}, error) { block, err := s.api.GetBlock(id, page, pageSize) if err != nil { return nil, err } return block, nil } func eip1559FeesToApi(fee *bchain.Eip1559Fee) *api.Eip1559Fee { if fee == nil { return nil } apiFee := api.Eip1559Fee{} apiFee.MaxFeePerGas = (*api.Amount)(fee.MaxFeePerGas) apiFee.MaxPriorityFeePerGas = (*api.Amount)(fee.MaxPriorityFeePerGas) apiFee.MaxWaitTimeEstimate = fee.MaxWaitTimeEstimate apiFee.MinWaitTimeEstimate = fee.MinWaitTimeEstimate return &apiFee } func eip1559FeeRangeToApi(feeRange []*big.Int) []*api.Amount { if feeRange == nil { return nil } apiFeeRange := make([]*api.Amount, len(feeRange)) for i := range feeRange { apiFeeRange[i] = (*api.Amount)(feeRange[i]) } return apiFeeRange } func (s *WebsocketServer) estimateFee(params []byte) (interface{}, error) { var r WsEstimateFeeReq err := json.Unmarshal(params, &r) if err != nil { return nil, err } res := make([]WsEstimateFeeRes, len(r.Blocks)) if s.chainParser.GetChainType() == bchain.ChainEthereumType { gas, err := s.chain.EthereumTypeEstimateGas(r.Specific) if err != nil { return nil, err } sg := strconv.FormatUint(gas, 10) b := 1 if len(r.Blocks) > 0 { b = r.Blocks[0] } fee, err := s.api.EstimateFee(b, true) if err != nil { return nil, err } feePerTx := new(big.Int) feePerTx.Mul(&fee, new(big.Int).SetUint64(gas)) eip1559, err := s.chain.EthereumTypeGetEip1559Fees() if err != nil { return nil, err } var eip1559Api *api.Eip1559Fees if eip1559 != nil { eip1559Api = &api.Eip1559Fees{} eip1559Api.BaseFeePerGas = (*api.Amount)(eip1559.BaseFeePerGas) eip1559Api.Instant = eip1559FeesToApi(eip1559.Instant) eip1559Api.High = eip1559FeesToApi(eip1559.High) eip1559Api.Medium = eip1559FeesToApi(eip1559.Medium) eip1559Api.Low = eip1559FeesToApi(eip1559.Low) eip1559Api.NetworkCongestion = eip1559.NetworkCongestion eip1559Api.BaseFeeTrend = eip1559.BaseFeeTrend eip1559Api.PriorityFeeTrend = eip1559.PriorityFeeTrend eip1559Api.LatestPriorityFeeRange = eip1559FeeRangeToApi(eip1559.LatestPriorityFeeRange) eip1559Api.HistoricalBaseFeeRange = eip1559FeeRangeToApi(eip1559.HistoricalBaseFeeRange) eip1559Api.HistoricalPriorityFeeRange = eip1559FeeRangeToApi(eip1559.HistoricalPriorityFeeRange) } for i := range r.Blocks { res[i].FeePerUnit = fee.String() res[i].FeeLimit = sg res[i].FeePerTx = feePerTx.String() res[i].Eip1559 = eip1559Api } } else { conservative := true v, ok := r.Specific["conservative"] if ok { vc, ok := v.(bool) if ok { conservative = vc } } txSize := 0 v, ok = r.Specific["txsize"] if ok { f, ok := v.(float64) if ok { txSize = int(f) } } for i, b := range r.Blocks { fee, err := s.api.EstimateFee(b, conservative) if err != nil { return nil, err } res[i].FeePerUnit = fee.String() if txSize > 0 { fee.Mul(&fee, big.NewInt(int64(txSize))) fee.Add(&fee, big.NewInt(500)) fee.Div(&fee, big.NewInt(1000)) res[i].FeePerTx = fee.String() } } } return res, nil } func (s *WebsocketServer) longTermFeeRate() (res interface{}, err error) { feeRate, err := s.chain.LongTermFeeRate() if err != nil { return nil, err } return WsLongTermFeeRateRes{ FeePerUnit: feeRate.FeePerUnit.String(), Blocks: feeRate.Blocks, }, nil } func (s *WebsocketServer) sendTransaction(tx string, disableAlternativeRPC bool) (res resultSendTransaction, err error) { txid, err := s.chain.SendRawTransaction(tx, disableAlternativeRPC) if err != nil { return res, err } res.Result = txid return } func (s *WebsocketServer) getMempoolFilters(r *WsMempoolFiltersReq) (res interface{}, err error) { type resMempoolFilters struct { ParamP uint8 `json:"P"` ParamM uint64 `json:"M"` ZeroedKey bool `json:"zeroedKey"` Entries map[string]string `json:"entries"` } filterEntries, err := s.mempool.GetTxidFilterEntries(r.ScriptType, r.FromTimestamp) if err != nil { return nil, err } return resMempoolFilters{ ParamP: s.is.BlockGolombFilterP, ParamM: bchain.GetGolombParamM(s.is.BlockGolombFilterP), ZeroedKey: filterEntries.UsedZeroedKey, Entries: filterEntries.Entries, }, nil } func (s *WebsocketServer) getBlockFilter(r *WsBlockFilterReq) (res interface{}, err error) { type resBlockFilter struct { ParamP uint8 `json:"P"` ParamM uint64 `json:"M"` ZeroedKey bool `json:"zeroedKey"` BlockFilter string `json:"blockFilter"` } if s.is.BlockFilterScripts != r.ScriptType { return nil, errors.Errorf("Unsupported script type %s", r.ScriptType) } blockFilter, err := s.db.GetBlockFilter(r.BlockHash) if err != nil { return nil, err } return resBlockFilter{ ParamP: s.is.BlockGolombFilterP, ParamM: bchain.GetGolombParamM(s.is.BlockGolombFilterP), ZeroedKey: s.is.BlockFilterUseZeroedKey, BlockFilter: blockFilter, }, nil } func (s *WebsocketServer) getBlockFiltersBatch(r *WsBlockFiltersBatchReq) (res interface{}, err error) { type resBlockFiltersBatch struct { ParamP uint8 `json:"P"` ParamM uint64 `json:"M"` ZeroedKey bool `json:"zeroedKey"` BlockFiltersBatch []string `json:"blockFiltersBatch"` } if s.is.BlockFilterScripts != r.ScriptType { return nil, errors.Errorf("Unsupported script type %s", r.ScriptType) } blockFiltersBatch, err := s.api.GetBlockFiltersBatch(r.BlockHash, r.PageSize) if err != nil { return nil, err } return resBlockFiltersBatch{ ParamP: s.is.BlockGolombFilterP, ParamM: bchain.GetGolombParamM(s.is.BlockGolombFilterP), ZeroedKey: s.is.BlockFilterUseZeroedKey, BlockFiltersBatch: blockFiltersBatch, }, nil } func (s *WebsocketServer) rpcCall(r *WsRpcCallReq) (*WsRpcCallRes, error) { if s.allowedRpcCallTo != nil { _, ok := s.allowedRpcCallTo[strings.ToLower(r.To)] if !ok { return nil, errors.New("Not supported") } } data, err := s.chain.EthereumTypeRpcCall(r.Data, r.To, r.From) if err != nil { return nil, err } return &WsRpcCallRes{Data: data}, nil } type subscriptionResponse struct { Subscribed bool `json:"subscribed"` } type subscriptionResponseMessage struct { Subscribed bool `json:"subscribed"` Message string `json:"message"` } func (s *WebsocketServer) subscribeNewBlock(c *websocketChannel, req *WsReq) (res interface{}, err error) { s.newBlockSubscriptionsLock.Lock() defer s.newBlockSubscriptionsLock.Unlock() s.newBlockSubscriptions[c] = req.ID s.metrics.WebsocketSubscribes.With(common.Labels{"method": "subscribeNewBlock"}).Set(float64(len(s.newBlockSubscriptions))) return &subscriptionResponse{true}, nil } func (s *WebsocketServer) unsubscribeNewBlock(c *websocketChannel) (res interface{}, err error) { s.newBlockSubscriptionsLock.Lock() defer s.newBlockSubscriptionsLock.Unlock() delete(s.newBlockSubscriptions, c) s.metrics.WebsocketSubscribes.With(common.Labels{"method": "subscribeNewBlock"}).Set(float64(len(s.newBlockSubscriptions))) return &subscriptionResponse{false}, nil } func (s *WebsocketServer) subscribeNewTransaction(c *websocketChannel, req *WsReq) (res interface{}, err error) { s.newTransactionSubscriptionsLock.Lock() defer s.newTransactionSubscriptionsLock.Unlock() if !s.newTransactionEnabled { return &subscriptionResponseMessage{false, "subscribeNewTransaction not enabled, use -enablesubnewtx flag to enable."}, nil } s.newTransactionSubscriptions[c] = req.ID s.metrics.WebsocketSubscribes.With(common.Labels{"method": "subscribeNewTransaction"}).Set(float64(len(s.newTransactionSubscriptions))) return &subscriptionResponse{true}, nil } func (s *WebsocketServer) unsubscribeNewTransaction(c *websocketChannel) (res interface{}, err error) { s.newTransactionSubscriptionsLock.Lock() defer s.newTransactionSubscriptionsLock.Unlock() if !s.newTransactionEnabled { return &subscriptionResponseMessage{false, "unsubscribeNewTransaction not enabled, use -enablesubnewtx flag to enable."}, nil } delete(s.newTransactionSubscriptions, c) s.metrics.WebsocketSubscribes.With(common.Labels{"method": "subscribeNewTransaction"}).Set(float64(len(s.newTransactionSubscriptions))) return &subscriptionResponse{false}, nil } func (s *WebsocketServer) unmarshalAddresses(params []byte) ([]string, bool, error) { r := WsSubscribeAddressesReq{} err := json.Unmarshal(params, &r) if err != nil { return nil, false, err } rv := make([]string, len(r.Addresses)) for i, a := range r.Addresses { ad, err := s.chainParser.GetAddrDescFromAddress(a) if err != nil { return nil, false, err } rv[i] = string(ad) } return rv, r.NewBlockTxs, nil } // doUnsubscribeAddresses removes all address subscriptions for a channel. // addressSubscriptionsLock must be held by the caller. func (s *WebsocketServer) doUnsubscribeAddresses(c *websocketChannel) { for _, ads := range c.addrDescs { sa, e := s.addressSubscriptions[ads] if e { for sc, details := range sa { if sc == c { if details.publishNewBlockTxs { s.newBlockTxsSubscriptionCount-- } delete(sa, c) } } if len(sa) == 0 { delete(s.addressSubscriptions, ads) } } } c.addrDescs = nil } // subscribeAddresses replaces previous address subscriptions for the channel. // If newBlockTxs is enabled, the channel receives both mempool notifications and // confirmed notifications detected from newly connected blocks. func (s *WebsocketServer) subscribeAddresses(c *websocketChannel, addrDesc []string, newBlockTxs bool, req *WsReq) (res interface{}, err error) { s.addressSubscriptionsLock.Lock() defer s.addressSubscriptionsLock.Unlock() // unsubscribe all previous subscriptions s.doUnsubscribeAddresses(c) for _, ads := range addrDesc { as, ok := s.addressSubscriptions[ads] if !ok { as = make(map[*websocketChannel]*addressDetails) s.addressSubscriptions[ads] = as } as[c] = &addressDetails{ requestID: req.ID, publishNewBlockTxs: newBlockTxs, } if newBlockTxs { s.newBlockTxsSubscriptionCount++ } } c.addrDescs = addrDesc s.metrics.WebsocketSubscribes.With(common.Labels{"method": "subscribeAddresses"}).Set(float64(len(s.addressSubscriptions))) s.metrics.WebsocketNewBlockTxsSubscriptions.Set(float64(s.newBlockTxsSubscriptionCount)) return &subscriptionResponse{true}, nil } // unsubscribeAddresses unsubscribes all address subscriptions by this channel func (s *WebsocketServer) unsubscribeAddresses(c *websocketChannel) (res interface{}, err error) { s.addressSubscriptionsLock.Lock() defer s.addressSubscriptionsLock.Unlock() s.doUnsubscribeAddresses(c) s.metrics.WebsocketSubscribes.With(common.Labels{"method": "subscribeAddresses"}).Set(float64(len(s.addressSubscriptions))) s.metrics.WebsocketNewBlockTxsSubscriptions.Set(float64(s.newBlockTxsSubscriptionCount)) return &subscriptionResponse{false}, nil } // doUnsubscribeFiatRates fiat rates without fiatRatesSubscriptionsLock - can be called only from subscribeFiatRates and unsubscribeFiatRates func (s *WebsocketServer) doUnsubscribeFiatRates(c *websocketChannel) { for fr, sa := range s.fiatRatesSubscriptions { for sc := range sa { if sc == c { delete(sa, c) } } if len(sa) == 0 { delete(s.fiatRatesSubscriptions, fr) } } delete(s.fiatRatesTokenSubscriptions, c) } // subscribeFiatRates subscribes all FiatRates subscriptions by this channel func (s *WebsocketServer) subscribeFiatRates(c *websocketChannel, d *WsSubscribeFiatRatesReq, req *WsReq) (res interface{}, err error) { s.fiatRatesSubscriptionsLock.Lock() defer s.fiatRatesSubscriptionsLock.Unlock() // unsubscribe all previous subscriptions s.doUnsubscribeFiatRates(c) currency := d.Currency if currency == "" { currency = allFiatRates } else { currency = strings.ToLower(currency) } as, ok := s.fiatRatesSubscriptions[currency] if !ok { as = make(map[*websocketChannel]string) s.fiatRatesSubscriptions[currency] = as } as[c] = req.ID if len(d.Tokens) != 0 { s.fiatRatesTokenSubscriptions[c] = d.Tokens } s.metrics.WebsocketSubscribes.With(common.Labels{"method": "subscribeFiatRates"}).Set(float64(len(s.fiatRatesSubscriptions))) return &subscriptionResponse{true}, nil } // unsubscribeFiatRates unsubscribes all FiatRates subscriptions by this channel func (s *WebsocketServer) unsubscribeFiatRates(c *websocketChannel) (res interface{}, err error) { s.fiatRatesSubscriptionsLock.Lock() defer s.fiatRatesSubscriptionsLock.Unlock() s.doUnsubscribeFiatRates(c) s.metrics.WebsocketSubscribes.With(common.Labels{"method": "subscribeFiatRates"}).Set(float64(len(s.fiatRatesSubscriptions))) return &subscriptionResponse{false}, nil } func (s *WebsocketServer) onNewBlockAsync(hash string, height uint32) { s.newBlockSubscriptionsLock.Lock() defer s.newBlockSubscriptionsLock.Unlock() data := struct { Height uint32 `json:"height"` Hash string `json:"hash"` }{ Height: height, Hash: hash, } for c, id := range s.newBlockSubscriptions { c.DataOut(&WsRes{ ID: id, Data: &data, }) } glog.Info("broadcasting new block ", height, " ", hash, " to ", len(s.newBlockSubscriptions), " channels") } // setConfirmedBlockTxMetadata normalizes parsed block transactions. // ParseBlock can return txs with zero confirmations; we force first-confirmed // metadata so conversion does not take mempool-only branches. func setConfirmedBlockTxMetadata(tx *bchain.Tx, blockTime int64) { if tx.Confirmations == 0 { tx.Confirmations = 1 tx.Blocktime = blockTime tx.Time = blockTime } } // getEthereumInternalTransfers safely extracts internal transfers from // CoinSpecificData when present. func getEthereumInternalTransfers(tx *bchain.Tx) []bchain.EthereumInternalTransfer { esd, ok := tx.CoinSpecificData.(bchain.EthereumSpecificData) if !ok || esd.InternalData == nil { return nil } return esd.InternalData.Transfers } // setEthereumReceiptIfAvailable adds receipt data to Ethereum txs on a // best-effort basis; failures are logged and notifications continue. func setEthereumReceiptIfAvailable(tx *bchain.Tx, getReceipt func(string) (*bchain.RpcReceipt, error)) string { csd, ok := tx.CoinSpecificData.(bchain.EthereumSpecificData) if !ok { return "skipped_non_eth" } receipt, err := getReceipt(tx.Txid) if err != nil { glog.Error("EthereumTypeGetTransactionReceipt error ", err, " for ", tx.Txid) return "error" } csd.Receipt = receipt tx.CoinSpecificData = csd return "success" } func observeNewBlockTxDuration(metrics *common.Metrics, stage string, started time.Time) { if metrics == nil { return } metrics.WebsocketNewBlockTxsDuration.With(common.Labels{"stage": stage}).Observe(time.Since(started).Seconds()) } func incNewBlockTxMetric(metrics *common.Metrics, stage, status string, value float64) { if metrics == nil { return } counter := metrics.WebsocketNewBlockTxs.With(common.Labels{"stage": stage, "status": status}) if value == 1 { counter.Inc() } else { counter.Add(value) } } // populateBitcoinVinAddrDescs fills missing vin address descriptors by loading // previous outputs. This enables sender-side address subscription matching for // Bitcoin transactions parsed from connected blocks. func populateBitcoinVinAddrDescs(vins []bchain.MempoolVin, getAddrDesc func(string, uint32) (bchain.AddressDescriptor, error)) { if getAddrDesc == nil { return } for i := range vins { if len(vins[i].AddrDesc) > 0 || vins[i].Txid == "" { continue } addrDesc, err := getAddrDesc(vins[i].Txid, vins[i].Vout) if err == nil && len(addrDesc) > 0 { vins[i].AddrDesc = addrDesc } } } // getBitcoinVinAddrDesc resolves an input outpoint to an address descriptor // using txCache. It is best-effort and can return chain-level not-found errors. func (s *WebsocketServer) getBitcoinVinAddrDesc(txid string, vout uint32) (bchain.AddressDescriptor, error) { if s.txCache == nil { return nil, bchain.ErrTxNotFound } prevTx, _, err := s.txCache.GetTransaction(txid) if err != nil { return nil, err } if int(vout) >= len(prevTx.Vout) { return nil, bchain.ErrAddressMissing } return s.chainParser.GetAddrDescFromVout(&prevTx.Vout[vout]) } // publishNewBlockTxsByAddr emits confirmed transaction notifications only for // subscribed addresses touched by transactions in the connected block. func (s *WebsocketServer) publishNewBlockTxsByAddr(block *bchain.Block) { blockStart := time.Now() defer observeNewBlockTxDuration(s.metrics, "per_block", blockStart) chainType := s.chainParser.GetChainType() for _, tx := range block.Txs { incNewBlockTxMetric(s.metrics, "scanned", "success", 1) setConfirmedBlockTxMetadata(&tx, block.Time) var tokenTransfers bchain.TokenTransfers var internalTransfers []bchain.EthereumInternalTransfer if chainType == bchain.ChainEthereumType { tokenTransfers, _ = s.chainParser.EthereumTypeGetTokenTransfersFromTx(&tx) internalTransfers = getEthereumInternalTransfers(&tx) } vins := make([]bchain.MempoolVin, len(tx.Vin)) for i, vin := range tx.Vin { vins[i] = bchain.MempoolVin{Vin: vin} } if chainType == bchain.ChainBitcoinType { populateBitcoinVinAddrDescs(vins, s.getBitcoinVinAddrDesc) } matchStart := time.Now() subscribed := s.getNewTxSubscriptions(vins, tx.Vout, tokenTransfers, internalTransfers) observeNewBlockTxDuration(s.metrics, "match", matchStart) if len(subscribed) > 0 { incNewBlockTxMetric(s.metrics, "matched", "success", 1) // Convert and publish asynchronously so heavy tx conversion does not // block processing of other transactions in the same block. go func(tx bchain.Tx, subscribed map[string]struct{}) { if chainType == bchain.ChainEthereumType { receiptStatus := setEthereumReceiptIfAvailable(&tx, s.chain.EthereumTypeGetTransactionReceipt) if s.metrics != nil { s.metrics.WebsocketEthReceipt.With(common.Labels{"status": receiptStatus}).Inc() } } convertStart := time.Now() atx, err := s.api.GetTransactionFromBchainTx(&tx, int(block.Height), false, false, nil) observeNewBlockTxDuration(s.metrics, "convert", convertStart) if err != nil { incNewBlockTxMetric(s.metrics, "converted", "failure", 1) glog.Error("GetTransactionFromBchainTx error ", err, " for ", tx.Txid) return } incNewBlockTxMetric(s.metrics, "converted", "success", 1) for stringAddressDescriptor := range subscribed { s.sendOnNewTxAddr(stringAddressDescriptor, atx, true) } incNewBlockTxMetric(s.metrics, "published", "success", float64(len(subscribed))) }(tx, subscribed) } } } // OnNewBlock is a callback that broadcasts info about new block to subscribed clients func (s *WebsocketServer) OnNewBlock(block *bchain.Block) { s.addressSubscriptionsLock.Lock() defer s.addressSubscriptionsLock.Unlock() go s.onNewBlockAsync(block.Hash, block.Height) if s.newBlockTxsSubscriptionCount > 0 { // Skip per-tx address matching when nobody opted into newBlockTxs. go s.publishNewBlockTxsByAddr(block) } } func (s *WebsocketServer) sendOnNewTx(tx *api.Tx) { s.newTransactionSubscriptionsLock.Lock() defer s.newTransactionSubscriptionsLock.Unlock() for c, id := range s.newTransactionSubscriptions { c.DataOut(&WsRes{ ID: id, Data: &tx, }) } glog.Info("broadcasting new tx ", tx.Txid, " to ", len(s.newTransactionSubscriptions), " channels") } func (s *WebsocketServer) sendOnNewTxAddr(stringAddressDescriptor string, tx *api.Tx, newBlockTx bool) { addrDesc := bchain.AddressDescriptor(stringAddressDescriptor) addr, _, err := s.chainParser.GetAddressesFromAddrDesc(addrDesc) if err != nil { glog.Error("GetAddressesFromAddrDesc error ", err, " for ", addrDesc) return } if len(addr) == 1 { data := struct { Address string `json:"address"` Tx *api.Tx `json:"tx"` }{ Address: addr[0], Tx: tx, } s.addressSubscriptionsLock.Lock() defer s.addressSubscriptionsLock.Unlock() as, ok := s.addressSubscriptions[stringAddressDescriptor] if ok { source := "mempool" if newBlockTx { source = "new_block" } for c, details := range as { // Mempool notifications go to all address subscribers; confirmed // block notifications only go to subscribers that requested them. if newBlockTx && !details.publishNewBlockTxs { continue } if s.metrics != nil { s.metrics.WebsocketAddrNotifications.With(common.Labels{"source": source}).Inc() } c.DataOut(&WsRes{ ID: details.requestID, Data: &data, }) } glog.Info("broadcasting new tx ", tx.Txid, ", addr ", addr[0], " to ", len(as), " channels") } } } func (s *WebsocketServer) getNewTxSubscriptions(vins []bchain.MempoolVin, vouts []bchain.Vout, tokenTransfers bchain.TokenTransfers, internalTransfers []bchain.EthereumInternalTransfer) map[string]struct{} { // check if there is any subscription in inputs, outputs and transfers s.addressSubscriptionsLock.Lock() defer s.addressSubscriptionsLock.Unlock() subscribed := make(map[string]struct{}) processAddress := func(address string) { if addrDesc, err := s.chainParser.GetAddrDescFromAddress(address); err == nil && len(addrDesc) > 0 { sad := string(addrDesc) if as, ok := s.addressSubscriptions[sad]; ok && len(as) > 0 { subscribed[sad] = struct{}{} } } } processVout := func(vout bchain.Vout) { if addrDesc, err := s.chainParser.GetAddrDescFromVout(&vout); err == nil && len(addrDesc) > 0 { sad := string(addrDesc) if as, ok := s.addressSubscriptions[sad]; ok && len(as) > 0 { subscribed[sad] = struct{}{} } } } for i := range vins { if sad := string(vins[i].AddrDesc); len(sad) > 0 { if as, ok := s.addressSubscriptions[sad]; ok && len(as) > 0 { subscribed[sad] = struct{}{} } } else if s.chainParser.GetChainType() == bchain.ChainBitcoinType { vout := int(vins[i].Vout) if vout >= 0 && vout < len(vouts) { processVout(vouts[vins[i].Vout]) } } else if s.chainParser.GetChainType() == bchain.ChainEthereumType { if len(vins[i].Addresses) > 0 { processAddress(vins[i].Addresses[0]) } } } for i := range vouts { processVout(vouts[i]) } for i := range tokenTransfers { processAddress(tokenTransfers[i].From) processAddress(tokenTransfers[i].To) } for i := range internalTransfers { processAddress(internalTransfers[i].From) processAddress(internalTransfers[i].To) } return subscribed } func (s *WebsocketServer) onNewTxAsync(tx *bchain.MempoolTx, subscribed map[string]struct{}) { atx, err := s.api.GetTransactionFromMempoolTx(tx) if err != nil { glog.Error("GetTransactionFromMempoolTx error ", err, " for ", tx.Txid) return } s.sendOnNewTx(atx) for stringAddressDescriptor := range subscribed { s.sendOnNewTxAddr(stringAddressDescriptor, atx, false) } } // OnNewTx is a callback that broadcasts info about a tx affecting subscribed address func (s *WebsocketServer) OnNewTx(tx *bchain.MempoolTx) { subscribed := s.getNewTxSubscriptions(tx.Vin, tx.Vout, tx.TokenTransfers, nil) if len(s.newTransactionSubscriptions) > 0 || len(subscribed) > 0 { go s.onNewTxAsync(tx, subscribed) } } func (s *WebsocketServer) broadcastTicker(currency string, rates map[string]float32, ticker *common.CurrencyRatesTicker) { as, ok := s.fiatRatesSubscriptions[currency] if ok && len(as) > 0 { data := struct { Rates interface{} `json:"rates"` }{ Rates: rates, } for c, id := range as { var tokens []string if ticker != nil { tokens = s.fiatRatesTokenSubscriptions[c] } if len(tokens) > 0 { dataWithTokens := struct { Rates interface{} `json:"rates"` TokenRates map[string]float32 `json:"tokenRates,omitempty"` }{ Rates: rates, TokenRates: map[string]float32{}, } for _, token := range tokens { rate := ticker.TokenRateInCurrency(token, currency) if rate > 0 { dataWithTokens.TokenRates[token] = rate } } c.DataOut(&WsRes{ ID: id, Data: &dataWithTokens, }) } else { c.DataOut(&WsRes{ ID: id, Data: &data, }) } } glog.Info("broadcasting new rates for currency ", currency, " to ", len(as), " channels") } } // OnNewFiatRatesTicker is a callback that broadcasts info about fiat rates affecting subscribed currency func (s *WebsocketServer) OnNewFiatRatesTicker(ticker *common.CurrencyRatesTicker) { s.fiatRatesSubscriptionsLock.Lock() defer s.fiatRatesSubscriptionsLock.Unlock() for currency, rate := range ticker.Rates { s.broadcastTicker(currency, map[string]float32{currency: rate}, ticker) } s.broadcastTicker(allFiatRates, ticker.Rates, nil) } func (s *WebsocketServer) getCurrentFiatRates(currencies []string, token string) (*api.FiatTicker, error) { ret, err := s.api.GetCurrentFiatRates(currencies, strings.ToLower(token)) return ret, err } func (s *WebsocketServer) getFiatRatesForTimestamps(timestamps []int64, currencies []string, token string) (*api.FiatTickers, error) { ret, err := s.api.GetFiatRatesForTimestamps(timestamps, currencies, strings.ToLower(token)) return ret, err } func (s *WebsocketServer) getAvailableVsCurrencies(timestamp int64, token string) (*api.AvailableVsCurrencies, error) { ret, err := s.api.GetAvailableVsCurrencies(timestamp, strings.ToLower(token)) return ret, err }