refactor(python): streamline record_screen functionality

The function as written didn't really belong into debuglink, so it was
moved to trezorctl, and emu.py can import it from there. Also improved
type annotations, simplified implementation, and made sure that
recording is properly stopped even on an error.

[no changelog]
This commit is contained in:
matejcik
2026-01-21 14:30:08 +01:00
committed by Roman Zeyde
parent 8ebd829152
commit 776355e003
5 changed files with 94 additions and 93 deletions

View File

@@ -9,12 +9,12 @@ import subprocess
import sys import sys
import tempfile import tempfile
from pathlib import Path from pathlib import Path
from typing import Optional, TextIO from typing import TextIO
import click import click
import trezorlib.debuglink import trezorlib.debuglink
import trezorlib.device from trezorlib.cli.debug import record_screen
from trezorlib._internal.emulator import CoreEmulator from trezorlib._internal.emulator import CoreEmulator
try: try:
@@ -133,7 +133,7 @@ def _from_env(name: str) -> bool:
@click.option("-p", "--profile", metavar="NAME", help="Profile name or path") @click.option("-p", "--profile", metavar="NAME", help="Profile name or path")
@click.option("-P", "--port", metavar="PORT", type=int, default=int(os.environ.get("TREZOR_UDP_PORT", 0)) or None, help="UDP port number") @click.option("-P", "--port", metavar="PORT", type=int, default=int(os.environ.get("TREZOR_UDP_PORT", 0)) or None, help="UDP port number")
@click.option("-q", "--quiet", is_flag=True, help="Silence emulator output") @click.option("-q", "--quiet", is_flag=True, help="Silence emulator output")
@click.option("-r", "--record-dir", help="Directory where to record screen changes") @click.option("-r", "--record-dir", help="Directory where to record screen changes", type=click.Path(file_okay=False, dir_okay=True, path_type=Path))
@click.option("-s", "--slip0014", is_flag=True, help="Initialize device with SLIP-14 seed (all all all...)") @click.option("-s", "--slip0014", is_flag=True, help="Initialize device with SLIP-14 seed (all all all...)")
@click.option("-S", "--script-gdb-file", type=click.Path(exists=True, dir_okay=False), help="Run gdb with an init file") @click.option("-S", "--script-gdb-file", type=click.Path(exists=True, dir_okay=False), help="Run gdb with an init file")
@click.option("-V", "--valgrind", is_flag=True, help="Use valgrind instead of debugger (-D)") @click.option("-V", "--valgrind", is_flag=True, help="Use valgrind instead of debugger (-D)")
@@ -160,7 +160,7 @@ def cli(
port: int, port: int,
output: TextIO | None, output: TextIO | None,
quiet: bool, quiet: bool,
record_dir: Optional[str], record_dir: Path | None,
slip0014: bool, slip0014: bool,
script_gdb_file: str | Path | None, script_gdb_file: str | Path | None,
valgrind: bool, valgrind: bool,
@@ -312,10 +312,7 @@ def cli(
) )
if record_dir: if record_dir:
assert emulator.client is not None record_screen(emulator.transport, record_dir)
trezorlib.debuglink.record_screen(
emulator.client, record_dir, report_func=print
)
if run_command: if run_command:
ret = run_command_with_emulator(emulator, command) ret = run_command_with_emulator(emulator, command)

View File

@@ -24,6 +24,7 @@ import sys
import typing as t import typing as t
from contextlib import contextmanager from contextlib import contextmanager
from enum import Enum from enum import Enum
from pathlib import Path
import click import click
@@ -151,11 +152,13 @@ class TrezorConnection:
script: bool, script: bool,
*, *,
app_name: str = "trezorctl", app_name: str = "trezorctl",
record_dir: Path | None = None,
) -> None: ) -> None:
self.path = path self.path = path
self.session_id = session_id self.session_id = session_id
self.passphrase_source = passphrase_source self.passphrase_source = passphrase_source
self.script = script self.script = script
self.record_dir = record_dir
self.credentials = credentials.CredentialStore(app_name) self.credentials = credentials.CredentialStore(app_name)
self.app = AppManifest(app_name=app_name, credentials=self.credentials.list) self.app = AppManifest(app_name=app_name, credentials=self.credentials.list)
if self.script: if self.script:
@@ -202,12 +205,24 @@ class TrezorConnection:
assert self._transport is not None assert self._transport is not None
return self._transport return self._transport
def _record_screen(self, start: bool) -> None:
"""Helper wrapping `debug.record_screen()` to avoid circular import."""
from .debug import record_screen
if self.record_dir is None:
return
assert self._transport is not None
record_screen(self._transport, self.record_dir if start else None)
def open(self) -> None: def open(self) -> None:
if self._transport is None: if self._transport is None:
self._transport = self._get_transport() self._transport = self._get_transport()
self._transport.open() self._transport.open()
self._record_screen(True)
def close(self) -> None: def close(self) -> None:
self._record_screen(False)
if self._transport is not None: if self._transport is not None:
self._transport.close() self._transport.close()
self._transport = None self._transport = None

View File

@@ -14,45 +14,94 @@
# You should have received a copy of the License along with this library. # You should have received a copy of the License along with this library.
# If not, see <https://www.gnu.org/licenses/lgpl-3.0.html>. # If not, see <https://www.gnu.org/licenses/lgpl-3.0.html>.
from typing import TYPE_CHECKING, Union from __future__ import annotations
import typing as t
from datetime import datetime
from pathlib import Path
import click import click
from ..client import Session from ..client import Session
from ..debuglink import DebugLink, TrezorTestContext from ..debuglink import DebugLink
from ..debuglink import optiga_set_sec_max as debuglink_optiga_set_sec_max from ..debuglink import optiga_set_sec_max as debuglink_optiga_set_sec_max
from ..debuglink import prodtest_t1 as debuglink_prodtest_t1 from ..debuglink import prodtest_t1 as debuglink_prodtest_t1
from ..debuglink import record_screen
from ..debuglink import set_log_filter as debuglink_set_log_filter from ..debuglink import set_log_filter as debuglink_set_log_filter
from ..transport import Timeout
from ..transport.udp import UdpTransport
from . import with_session from . import with_session
if TYPE_CHECKING: if t.TYPE_CHECKING:
from ..transport import Transport
from . import TrezorConnection from . import TrezorConnection
def _get_session_screenshot_dir(base_dir: Path) -> Path:
"""Create and return screenshot dir for the current session, according to datetime."""
timestamp_str = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
session_dir = base_dir / timestamp_str
ctr = 1
while session_dir.exists() and any(session_dir.iterdir()):
session_dir = base_dir / f"{timestamp_str}_{ctr}"
ctr += 1
session_dir.mkdir(parents=True, exist_ok=True)
return session_dir
def record_screen(transport: Transport, base_dir: Path | None) -> None:
"""Record screen changes into a specified directory.
Passing `None` as `directory` stops the recording.
Creates subdirectories inside a specified directory, one for each session
(for each new call of this function).
(So that older screenshots are not overwritten by new ones.)
Is available only for emulators, hardware devices are not capable of that.
"""
if not isinstance(transport, UdpTransport):
raise click.ClickException("Recording is only supported on emulator.")
debug_transport = transport.find_debug()
with debug_transport:
try:
debug_transport.wait_until_ready(timeout=1)
except Timeout:
raise click.ClickException("Debuglink is not responding.") from None
debug = DebugLink(transport=debug_transport)
if base_dir is None:
debug.stop_recording()
click.echo("Recording stopped.")
else:
current_session_dir = _get_session_screenshot_dir(base_dir)
debug.start_recording(str(current_session_dir.resolve()))
click.echo(f"Recording started into {current_session_dir}.")
@click.group(name="debug") @click.group(name="debug")
def cli() -> None: def cli() -> None:
"""Miscellaneous debug features.""" """Miscellaneous debug features."""
@cli.command() @cli.command()
@click.argument("directory", required=False) @click.argument(
"directory",
required=False,
type=click.Path(file_okay=False, dir_okay=True, path_type=Path),
)
@click.option("-s", "--stop", is_flag=True, help="Stop the recording") @click.option("-s", "--stop", is_flag=True, help="Stop the recording")
@click.pass_obj @click.pass_obj
def record(obj: "TrezorConnection", directory: Union[str, None], stop: bool) -> None: def record(obj: "TrezorConnection", directory: Path | None, stop: bool) -> None:
"""Record screen changes into a specified directory. """Record screen changes into a specified directory.
Recording can be stopped with `-s / --stop` option. Recording can be stopped with `-s / --stop` option.
""" """
record_screen_from_connection(obj, None if stop else directory) if not stop and directory is None:
raise click.ClickException("Specify either a directory path or --stop.")
record_screen(obj.transport, None if stop else directory)
def record_screen_from_connection(
obj: "TrezorConnection", directory: Union[str, None]
) -> None:
"""Record screen helper to transform TrezorConnection into TrezorClientDebugLink."""
debug_client = TrezorTestContext(transport=obj.transport, auto_interact=False)
record_screen(debug_client, directory, report_func=click.echo)
@cli.command() @cli.command()

View File

@@ -24,6 +24,7 @@ import json
import logging import logging
import os import os
import time import time
from pathlib import Path
from typing import Any, Callable, Optional, TypeVar, cast from typing import Any, Callable, Optional, TypeVar, cast
import click import click
@@ -204,6 +205,7 @@ def configure_logging(verbose: int) -> None:
@click.option( @click.option(
"-r", "-r",
"--record", "--record",
type=click.Path(file_okay=False, dir_okay=True, path_type=Path),
help="Record screen changes into a specified directory.", help="Record screen changes into a specified directory.",
) )
@click.version_option(package_name="trezor") @click.version_option(package_name="trezor")
@@ -217,7 +219,7 @@ def cli_main(
passphrase_on_host: bool, passphrase_on_host: bool,
script: bool, script: bool,
session_id: Optional[str], session_id: Optional[str],
record: Optional[str], record: Path | None,
) -> None: ) -> None:
configure_logging(verbose) configure_logging(verbose)
@@ -232,14 +234,16 @@ def cli_main(
else: else:
passphrase_source = PassphraseSource.AUTO passphrase_source = PassphraseSource.AUTO
ctx.obj = TrezorConnection(path, session_id, passphrase_source, script) ctx.obj = TrezorConnection(
path=path,
session_id=session_id,
passphrase_source=passphrase_source,
script=script,
record_dir=record,
)
ctx.obj.open() ctx.obj.open()
atexit.register(ctx.obj.close) atexit.register(ctx.obj.close)
# Optionally record the screen into a specified directory.
if record:
debug.record_screen_from_connection(ctx.obj, record)
# Creating a cli function that has the right types for future usage # Creating a cli function that has the right types for future usage
cli = cast(TrezorctlGroup, cli_main) cli = cast(TrezorctlGroup, cli_main)
@@ -273,19 +277,6 @@ def print_result(res: Any, is_json: bool, script: bool, **kwargs: Any) -> None:
click.echo(res) click.echo(res)
@cli.set_result_callback()
@click.pass_obj
def stop_recording_action(obj: TrezorConnection, *args: Any, **kwargs: Any) -> None:
"""Stop recording screen changes when the recording was started by `cli_main`.
(When user used the `-r / --record` option of `trezorctl` command.)
It allows for isolating screen directories only for specific actions/commands.
"""
if kwargs.get("record"):
debug.record_screen_from_connection(obj, None)
def format_device_name(features: messages.Features) -> str: def format_device_name(features: messages.Features) -> str:
model = features.model or "1" model = features.model or "1"
if features.bootloader_mode: if features.bootloader_mode:

View File

@@ -25,7 +25,6 @@ import typing as t
import warnings import warnings
from contextlib import contextmanager from contextlib import contextmanager
from copy import deepcopy from copy import deepcopy
from datetime import datetime
from enum import Enum, IntEnum, auto from enum import Enum, IntEnum, auto
from itertools import zip_longest from itertools import zip_longest
from pathlib import Path from pathlib import Path
@@ -1793,56 +1792,6 @@ def prodtest_t1(session: client.Session) -> None:
) )
def record_screen(
debug_client: "TrezorTestContext",
directory: str | None,
report_func: t.Callable[[str], None] | None = None,
) -> None:
"""Record screen changes into a specified directory.
Passing `None` as `directory` stops the recording.
Creates subdirectories inside a specified directory, one for each session
(for each new call of this function).
(So that older screenshots are not overwritten by new ones.)
Is available only for emulators, hardware devices are not capable of that.
"""
def get_session_screenshot_dir(directory: Path) -> Path:
"""Create and return screenshot dir for the current session, according to datetime."""
session_dir = directory / datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
session_dir.mkdir(parents=True, exist_ok=True)
return session_dir
if not _is_emulator(debug_client):
raise RuntimeError("Recording is only supported on emulator.")
if directory is None:
with debug_client.debug.transport:
debug_client.debug.stop_recording()
if report_func is not None:
report_func("Recording stopped.")
else:
# Transforming the directory into an absolute path,
# because emulator demands it
abs_directory = Path(directory).resolve()
# Creating the dir when it does not exist yet
if not abs_directory.exists():
abs_directory.mkdir(parents=True, exist_ok=True)
# Getting a new screenshot dir for the current session
current_session_dir = get_session_screenshot_dir(abs_directory)
with debug_client.debug.transport:
debug_client.debug.start_recording(str(current_session_dir))
if report_func is not None:
report_func(f"Recording started into {current_session_dir}.")
def _is_emulator(debug_client: "TrezorTestContext") -> bool:
"""Check if we are connected to emulator, in contrast to hardware device."""
return debug_client.features.fw_vendor == "EMULATOR"
def optiga_set_sec_max(debug: DebugLink) -> None: def optiga_set_sec_max(debug: DebugLink) -> None:
debug._call(messages.DebugLinkOptigaSetSecMax()) debug._call(messages.DebugLinkOptigaSetSecMax())