python: add wait-for-emulator command

This commit is contained in:
matejcik
2020-01-14 11:30:41 +01:00
parent 29e883ab59
commit c151fdeefd
3 changed files with 49 additions and 1 deletions

View File

@@ -19,12 +19,14 @@
import json
import os
import sys
import time
import click
from .. import coins, log, messages, protobuf, ui
from ..client import TrezorClient
from ..transport import enumerate_devices, get_transport
from ..transport.udp import UdpTransport
from . import (
binance,
btc,
@@ -250,6 +252,28 @@ def usb_reset():
WebUsbTransport.enumerate(usb_reset=True)
@cli.command()
@click.option("-t", "--timeout", type=float, default=10, help="Timeout in seconds")
@click.pass_context
def wait_for_emulator(ctx, timeout):
"""Wait until Trezor Emulator comes up.
Tries to connect to emulator and returns when it succeeds.
"""
path = ctx.parent.params.get("path")
if path:
if not path.startswith("udp:"):
raise click.ClickException("You must use UDP path, not {}".format(path))
path = path.replace("udp:", "")
start = time.monotonic()
UdpTransport(path).wait_until_ready(timeout)
end = time.monotonic()
if ctx.parent.params.get("verbose"):
click.echo("Waited for {:.3f} seconds".format(end - start))
#
# Basic coin functions
#

View File

@@ -15,6 +15,7 @@
# If not, see <https://www.gnu.org/licenses/lgpl-3.0.html>.
import socket
import time
from typing import Iterable, Optional, cast
from . import TransportException
@@ -60,7 +61,7 @@ class UdpTransport(ProtocolBasedTransport):
return d
else:
raise TransportException(
"No Trezor device found at address {}".format(path)
"No Trezor device found at address {}".format(d.get_path())
)
finally:
d.close()
@@ -84,6 +85,22 @@ class UdpTransport(ProtocolBasedTransport):
path = path.replace("{}:".format(cls.PATH_PREFIX), "")
return cls._try_path(path)
def wait_until_ready(self, timeout: float = 10) -> None:
try:
self.open()
self.socket.settimeout(0)
start = time.monotonic()
while True:
if self._ping():
break
elapsed = time.monotonic() - start
if elapsed >= timeout:
raise TransportException("Timed out waiting for connection.")
time.sleep(0.05)
finally:
self.close()
def open(self) -> None:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.connect(self.device)