Files
app-ethereum/tests/ragger/app/client.py

253 lines
9.8 KiB
Python
Raw Normal View History

from enum import IntEnum, auto
from typing import Optional
from ragger.backend import BackendInterface
from ragger.utils import RAPDU
from ragger.navigator import NavInsID, NavIns, NanoNavigator
from .command_builder import EthereumCmdBuilder
from .setting import SettingType, SettingImpl
from .eip712 import EIP712FieldType
from .response_parser import EthereumRespParser
from .tlv import format_tlv
import signal
2022-11-09 14:47:19 +01:00
import time
from pathlib import Path
import keychain
import rlp
ROOT_SCREENSHOT_PATH = Path(__file__).parent.parent
WEI_IN_ETH = 1e+18
2023-03-03 15:00:52 +01:00
class StatusWord(IntEnum):
OK = 0x9000
ERROR_NO_INFO = 0x6a00
INVALID_DATA = 0x6a80
INSUFFICIENT_MEMORY = 0x6a84
INVALID_INS = 0x6d00
INVALID_P1_P2 = 0x6b00
CONDITION_NOT_SATISFIED = 0x6985
REF_DATA_NOT_FOUND = 0x6a88
class DOMAIN_NAME_TAG(IntEnum):
STRUCTURE_TYPE = 0x01
STRUCTURE_VERSION = 0x02
CHALLENGE = 0x12
SIGNER_KEY_ID = 0x13
SIGNER_ALGO = 0x14
SIGNATURE = 0x15
DOMAIN_NAME = 0x20
COIN_TYPE = 0x21
ADDRESS = 0x22
class EthereumClient:
_settings: dict[SettingType, SettingImpl] = {
SettingType.BLIND_SIGNING: SettingImpl(
[ "nanos", "nanox", "nanosp" ]
),
SettingType.DEBUG_DATA: SettingImpl(
[ "nanos", "nanox", "nanosp" ]
),
SettingType.NONCE: SettingImpl(
[ "nanos", "nanox", "nanosp" ]
),
SettingType.VERBOSE_EIP712: SettingImpl(
[ "nanox", "nanosp" ]
),
SettingType.VERBOSE_ENS: SettingImpl(
[ "nanox", "nanosp" ]
)
}
_click_delay = 1/4
_eip712_filtering = False
def __init__(self, client: BackendInterface, golden_run: bool):
self._client = client
self._chain_id = 1
self._cmd_builder = EthereumCmdBuilder()
self._resp_parser = EthereumRespParser()
self._nav = NanoNavigator(client, client.firmware, golden_run)
signal.signal(signal.SIGALRM, self._click_signal_timeout)
for setting in self._settings.values():
setting.value = False
def _send(self, payload: bytearray):
return self._client.exchange_async_raw(payload)
def _recv(self) -> RAPDU:
return self._client._last_async_response
def _click_signal_timeout(self, _signum: int, _frame):
self._client.right_click()
def _enable_click_until_response(self):
signal.setitimer(signal.ITIMER_REAL,
self._click_delay,
self._click_delay)
def _disable_click_until_response(self):
signal.setitimer(signal.ITIMER_REAL, 0, 0)
def eip712_send_struct_def_struct_name(self, name: str):
with self._send(self._cmd_builder.eip712_send_struct_def_struct_name(name)):
pass
return self._recv().status == 0x9000
def eip712_send_struct_def_struct_field(self,
field_type: EIP712FieldType,
type_name: str,
type_size: int,
array_levels: [],
key_name: str):
with self._send(self._cmd_builder.eip712_send_struct_def_struct_field(
field_type,
type_name,
type_size,
array_levels,
key_name)):
pass
return self._recv()
def eip712_send_struct_impl_root_struct(self, name: str):
with self._send(self._cmd_builder.eip712_send_struct_impl_root_struct(name)):
self._enable_click_until_response()
self._disable_click_until_response()
return self._recv()
def eip712_send_struct_impl_array(self, size: int):
with self._send(self._cmd_builder.eip712_send_struct_impl_array(size)):
pass
return self._recv()
def eip712_send_struct_impl_struct_field(self, raw_value: bytes):
for apdu in self._cmd_builder.eip712_send_struct_impl_struct_field(raw_value):
with self._send(apdu):
self._enable_click_until_response()
self._disable_click_until_response()
assert self._recv().status == 0x9000
def eip712_sign_new(self, bip32_path: str):
with self._send(self._cmd_builder.eip712_sign_new(bip32_path)):
2022-11-09 14:47:19 +01:00
time.sleep(0.5) # tight on timing, needed by the CI otherwise might fail sometimes
if not self._settings[SettingType.VERBOSE_EIP712].value and \
not self._eip712_filtering: # need to skip the message hash
self._client.right_click()
self._client.right_click()
self._client.both_click() # approve signature
resp = self._recv()
assert resp.status == 0x9000
return self._resp_parser.sign(resp.data)
def eip712_sign_legacy(self,
bip32_path: str,
domain_hash: bytes,
message_hash: bytes):
with self._send(self._cmd_builder.eip712_sign_legacy(bip32_path,
domain_hash,
message_hash)):
self._client.right_click() # sign typed message screen
for _ in range(2): # two hashes (domain + message)
if self._client.firmware.device == "nanos":
screens_per_hash = 4
else:
screens_per_hash = 2
for _ in range(screens_per_hash):
self._client.right_click()
self._client.both_click() # approve signature
resp = self._recv()
assert resp.status == 0x9000
return self._resp_parser.sign(resp.data)
def settings_set(self, new_values: dict[SettingType, bool]):
# Go to settings
for _ in range(2):
self._client.right_click()
self._client.both_click()
for enum in self._settings.keys():
if self._client.firmware.device in self._settings[enum].devices:
if enum in new_values.keys():
if new_values[enum] != self._settings[enum].value:
self._client.both_click()
self._settings[enum].value = new_values[enum]
self._client.right_click()
self._client.both_click()
def eip712_filtering_activate(self):
with self._send(self._cmd_builder.eip712_filtering_activate()):
pass
self._eip712_filtering = True
assert self._recv().status == 0x9000
2022-08-19 18:31:00 +02:00
def eip712_filtering_message_info(self, name: str, filters_count: int, sig: bytes):
with self._send(self._cmd_builder.eip712_filtering_message_info(name, filters_count, sig)):
self._enable_click_until_response()
self._disable_click_until_response()
assert self._recv().status == 0x9000
2022-08-19 18:31:00 +02:00
def eip712_filtering_show_field(self, name: str, sig: bytes):
with self._send(self._cmd_builder.eip712_filtering_show_field(name, sig)):
pass
assert self._recv().status == 0x9000
def send_fund(self,
bip32_path: str,
nonce: int,
gas_price: int,
gas_limit: int,
to: bytes,
amount: float,
chain_id: int,
screenshot_collection: str = None):
data = list()
data.append(nonce)
data.append(gas_price)
data.append(gas_limit)
data.append(to)
data.append(int(amount * WEI_IN_ETH))
data.append(bytes())
data.append(chain_id)
data.append(bytes())
data.append(bytes())
for chunk in self._cmd_builder.sign(bip32_path, rlp.encode(data)):
with self._send(chunk):
nav_ins = NavIns(NavInsID.RIGHT_CLICK)
final_ins = [ NavIns(NavInsID.BOTH_CLICK) ]
target_text = "and send"
if screenshot_collection:
self._nav.navigate_until_text_and_compare(nav_ins,
final_ins,
target_text,
ROOT_SCREENSHOT_PATH,
screenshot_collection)
else:
self._nav.navigate_until_text(nav_ins,
final_ins,
target_text)
def get_challenge(self) -> int:
with self._send(self._cmd_builder.get_challenge()):
pass
resp = self._recv()
return self._resp_parser.challenge(resp.data)
def provide_domain_name(self, challenge: int, name: str, addr: bytes):
payload = format_tlv(DOMAIN_NAME_TAG.STRUCTURE_TYPE, 3) # TrustedDomainName
payload += format_tlv(DOMAIN_NAME_TAG.STRUCTURE_VERSION, 1)
payload += format_tlv(DOMAIN_NAME_TAG.SIGNER_KEY_ID, 0) # test key
payload += format_tlv(DOMAIN_NAME_TAG.SIGNER_ALGO, 1) # secp256k1
payload += format_tlv(DOMAIN_NAME_TAG.CHALLENGE, challenge)
payload += format_tlv(DOMAIN_NAME_TAG.COIN_TYPE, 0x3c) # ETH in slip-44
payload += format_tlv(DOMAIN_NAME_TAG.DOMAIN_NAME, name)
payload += format_tlv(DOMAIN_NAME_TAG.ADDRESS, addr)
payload += format_tlv(DOMAIN_NAME_TAG.SIGNATURE,
keychain.sign_data(keychain.Key.DOMAIN_NAME, payload))
for chunk in self._cmd_builder.provide_domain_name(payload):
with self._send(chunk):
pass