From 24b6ee3f647d5e1f527c1400617d1cd9afa7482b Mon Sep 17 00:00:00 2001 From: Liquid369 Date: Tue, 23 Jul 2024 10:23:23 -0500 Subject: [PATCH 01/11] Update GUI files Python 3.10 --- src/qt/dlg_configureRPCservers.py | 101 ++++++++++++++---------------- src/qt/dlg_pinMatrix.py | 12 ++-- src/qt/dlg_signmessage.py | 56 ++++++++--------- src/qt/gui_tabRewards.py | 23 +++++-- 4 files changed, 96 insertions(+), 96 deletions(-) diff --git a/src/qt/dlg_configureRPCservers.py b/src/qt/dlg_configureRPCservers.py index 94e7ede..81a76eb 100644 --- a/src/qt/dlg_configureRPCservers.py +++ b/src/qt/dlg_configureRPCservers.py @@ -4,17 +4,17 @@ # Distributed under the MIT software license, see the accompanying # file LICENSE.txt or http://www.opensource.org/licenses/mit-license.php. -from PyQt5.QtWidgets import QDialog, QHBoxLayout, QVBoxLayout, QLabel, \ - QListWidget, QFrame, QFormLayout, QComboBox, QLineEdit, QListWidgetItem, \ - QWidget, QPushButton, QMessageBox - +from PyQt5.QtWidgets import ( + QDialog, QHBoxLayout, QVBoxLayout, QLabel, QListWidget, QFrame, QFormLayout, + QComboBox, QLineEdit, QListWidgetItem, QWidget, QPushButton, QMessageBox +) from misc import myPopUp, checkRPCstring from threads import ThreadFuns -class ConfigureRPCservers_dlg(QDialog): +class ConfigureRPCserversDlg(QDialog): def __init__(self, main_wnd): - QDialog.__init__(self, parent=main_wnd) + super().__init__(parent=main_wnd) self.main_wnd = main_wnd self.setWindowTitle('RPC Servers Configuration') self.changing_index = None @@ -29,49 +29,48 @@ def clearEditFrame(self): self.ui.host_edt.clear() def initUI(self): - self.ui = Ui_ConfigureRPCserversDlg() + self.ui = UiConfigureRPCserversDlg() self.ui.setupUi(self) def insert_server_list(self, server): - id = server['id'] + server_id = server['id'] index = self.main_wnd.mainWindow.getServerListIndex(server) server_line = QWidget() server_row = QHBoxLayout() - server_text = "%s://%s" % (server['protocol'], server['host']) - if server['id'] == 0 and server['isCustom']: - # Local Wallet - server_text = server_text + "  Local Wallet" + server_text = f"{server['protocol']}://{server['host']}" + if server_id == 0 and server['isCustom']: + server_text += "  Local Wallet" elif not server['isCustom']: - server_text = "%s" % server_text + server_text = f"{server_text}" server_row.addWidget(QLabel(server_text)) server_row.addStretch(1) - # -- Edit button - editBtn = QPushButton() - editBtn.setIcon(self.main_wnd.mainWindow.editMN_icon) - editBtn.setToolTip("Edit server configuration") + + edit_btn = QPushButton() + edit_btn.setIcon(self.main_wnd.mainWindow.editMN_icon) + edit_btn.setToolTip("Edit server configuration") if not server['isCustom']: - editBtn.setDisabled(True) - editBtn.setToolTip('Default servers are not editable') - editBtn.clicked.connect(lambda: self.onAddServer(index)) - server_row.addWidget(editBtn) - # -- Remove button - removeBtn = QPushButton() - removeBtn.setIcon(self.main_wnd.mainWindow.removeMN_icon) - removeBtn.setToolTip("Remove server configuration") - if id == 0: - removeBtn.setDisabled(True) - removeBtn.setToolTip('Cannot remove local wallet') + edit_btn.setDisabled(True) + edit_btn.setToolTip('Default servers are not editable') + edit_btn.clicked.connect(lambda: self.onAddServer(index)) + server_row.addWidget(edit_btn) + + remove_btn = QPushButton() + remove_btn.setIcon(self.main_wnd.mainWindow.removeMN_icon) + remove_btn.setToolTip("Remove server configuration") + if server_id == 0: + remove_btn.setDisabled(True) + remove_btn.setToolTip('Cannot remove local wallet') if not server['isCustom']: - removeBtn.setDisabled(True) - removeBtn.setToolTip('Cannot remove default servers') - removeBtn.clicked.connect(lambda: self.onRemoveServer(index)) - server_row.addWidget(removeBtn) - # -- + remove_btn.setDisabled(True) + remove_btn.setToolTip('Cannot remove default servers') + remove_btn.clicked.connect(lambda: self.onRemoveServer(index)) + server_row.addWidget(remove_btn) + server_line.setLayout(server_row) - self.serverItems[id] = QListWidgetItem() - self.serverItems[id].setSizeHint(server_line.sizeHint()) - self.ui.serversBox.addItem(self.serverItems[id]) - self.ui.serversBox.setItemWidget(self.serverItems[id], server_line) + self.serverItems[server_id] = QListWidgetItem() + self.serverItems[server_id].setSizeHint(server_line.sizeHint()) + self.ui.serversBox.addItem(self.serverItems[server_id]) + self.ui.serversBox.setItemWidget(self.serverItems[server_id], server_line) def loadServers(self): # Clear serversBox @@ -85,10 +84,7 @@ def loadEditFrame(self, index): server = self.main_wnd.mainWindow.rpcServersList[index] self.ui.user_edt.setText(server['user']) self.ui.passwd_edt.setText(server['password']) - if server['protocol'] == 'https': - self.ui.protocol_select.setCurrentIndex(1) - else: - self.ui.protocol_select.setCurrentIndex(0) + self.ui.protocol_select.setCurrentIndex(1 if server['protocol'] == 'https' else 0) self.ui.host_edt.setText(server['host']) def onAddServer(self, index=None): @@ -122,13 +118,11 @@ def onClose(self): self.close() def onRemoveServer(self, index): - mess = "Are you sure you want to remove server with index %d (%s) from list?" % ( - index, self.main_wnd.mainWindow.rpcServersList[index].get('host')) + mess = f"Are you sure you want to remove server with index {index} ({self.main_wnd.mainWindow.rpcServersList[index].get('host')}) from list?" ans = myPopUp(self, QMessageBox.Question, 'PET4L - remove server', mess) if ans == QMessageBox.Yes: - # Remove entry from database - id = self.main_wnd.mainWindow.rpcServersList[index].get('id') - self.main_wnd.db.removeRPCServer(id) + server_id = self.main_wnd.mainWindow.rpcServersList[index].get('id') + self.main_wnd.db.removeRPCServer(server_id) def onSave(self): # Get new config data @@ -136,27 +130,24 @@ def onSave(self): host = self.ui.host_edt.text() user = self.ui.user_edt.text() passwd = self.ui.passwd_edt.text() - # Check malformed URL - url_string = "%s://%s:%s@%s" % (protocol, user, passwd, host) + url_string = f"{protocol}://{user}:{passwd}@{host}" if checkRPCstring(url_string): if self.changing_index is None: # Save new entry in DB. self.main_wnd.db.addRPCServer(protocol, host, user, passwd) else: - # Edit existing entry to DB. - id = self.main_wnd.mainWindow.rpcServersList[self.changing_index].get('id') - self.main_wnd.db.editRPCServer(protocol, host, user, passwd, id) - # If this was previously selected in mainWindow, update status + server_id = self.main_wnd.mainWindow.rpcServersList[self.changing_index].get('id') + self.main_wnd.db.editRPCServer(protocol, host, user, passwd, server_id) clients = self.main_wnd.mainWindow.header.rpcClientsBox data = clients.itemData(clients.currentIndex()) - if data.get('id') == id and data.get('isCustom'): - ThreadFuns.runInThread(self.main_wnd.mainWindow.updateRPCstatus, (True,), ) + if data.get('id') == server_id and data.get('isCustom'): + ThreadFuns.runInThread(self.main_wnd.mainWindow.updateRPCstatus, (True,)) # call onCancel self.onCancel() -class Ui_ConfigureRPCserversDlg(object): +class UiConfigureRPCserversDlg: def setupUi(self, ConfigureRPCserversDlg): ConfigureRPCserversDlg.setModal(True) # -- Layout diff --git a/src/qt/dlg_pinMatrix.py b/src/qt/dlg_pinMatrix.py index fbb9656..c5670b4 100644 --- a/src/qt/dlg_pinMatrix.py +++ b/src/qt/dlg_pinMatrix.py @@ -10,10 +10,10 @@ from misc import myPopUp_sb -class PinMatrix_dlg(QDialog): +class PinMatrixDlg(QDialog): def __init__(self, text='', title='Enter PIN', fHideBtns=True): - QDialog.__init__(self) - self.text = text if text != '' else "Check device and enter PIN" + super().__init__() + self.text = text if text else "Check device and enter PIN" self.hideBtns = fHideBtns self.pin = '' self.setWindowTitle(title) @@ -46,7 +46,7 @@ def onOK(self): myPopUp_sb(self, "warn", 'Wrong PIN!', text) def setupUI(self): - Ui_pinMatrixDlg.setupUi(self, self) + UiPinMatrixDlg.setupUi(self, self) # Connect buttons matrix for i in range(9): self.btn[i].clicked.connect(lambda _, b=i + 1: self.btn_clicked(str(b))) @@ -56,14 +56,14 @@ def setupUI(self): self.btn_cancel.clicked.connect(self.onCancel) -class Ui_pinMatrixDlg(object): +class UiPinMatrixDlg: def setupUi(self, PinMatrixDlg): PinMatrixDlg.setModal(True) layout = QVBoxLayout(PinMatrixDlg) layout.setContentsMargins(10, 8, 10, 10) # Header - title = QLabel("%s" % PinMatrixDlg.text) + title = QLabel(f"{PinMatrixDlg.text}") title.setAlignment(Qt.AlignCenter) layout.addWidget(title) diff --git a/src/qt/dlg_signmessage.py b/src/qt/dlg_signmessage.py index 2a9b55d..8dbf7bb 100644 --- a/src/qt/dlg_signmessage.py +++ b/src/qt/dlg_signmessage.py @@ -32,18 +32,18 @@ from utils import checkPivxAddr, ecdsa_verify_addr -class SignMessage_dlg(QDialog): +class SignMessageDlg(QDialog): def __init__(self, main_wnd): - QDialog.__init__(self, parent=main_wnd) + super().__init__(parent=main_wnd) self.setWindowTitle('Sign/Verify Message') self.initUI(main_wnd) def initUI(self, main_wnd): - self.ui = Ui_SignMessageDlg() + self.ui = UiSignMessageDlg() self.ui.setupUi(self, main_wnd) -class Ui_SignMessageDlg(object): +class UiSignMessageDlg: def setupUi(self, SignMessageDlg, main_wnd): SignMessageDlg.setModal(True) SignMessageDlg.setMinimumWidth(600) @@ -61,7 +61,7 @@ def setupUi(self, SignMessageDlg, main_wnd): class TabSign: def __init__(self, main_wnd): self.main_wnd = main_wnd - self.ui = TabSign_gui() + self.ui = TabSignGui() self.loadAddressComboBox() # connect signals/buttons self.ui.addressComboBox.currentIndexChanged.connect(lambda: self.onChangeSelectedAddress()) @@ -102,9 +102,9 @@ def findPubKey(self): pk = device.scanForPubKey(self.hwAcc, self.spath, self.currIsTestnet) if pk is None: - mess = "Unable to find public key. The action was refused on the device or another application " - mess += "might have taken over the USB communication with the device.

" - mess += "The operation was canceled." + mess = ("Unable to find public key. The action was refused on the device or another application " + "might have taken over the USB communication with the device.

" + "The operation was canceled.") myPopUp_sb(self.main_wnd, QMessageBox.Critical, 'PET4L - PK not found', mess) return self.updateGenericAddress(pk) @@ -124,10 +124,9 @@ def findSpath_done(self): addy = self.ui.addressLineEdit.text().strip() starting_spath = self.curr_starting_spath spath_count = self.curr_spath_count - mess = "Scanned addresses %d to %d of HW account %d.
" % ( - starting_spath, starting_spath + spath_count - 1, self.hwAcc) - mess += "Unable to find the address %s.
Maybe it's on a different account.

" % addy - mess += "Do you want to scan %d more addresses of account n.%d ?" % (spath_count, self.hwAcc) + mess = (f"Scanned addresses {starting_spath} to {starting_spath + spath_count - 1} of HW account {self.hwAcc}.
" + f"Unable to find the address {addy}.
Maybe it's on a different account.

" + f"Do you want to scan {spath_count} more addresses of account n.{self.hwAcc} ?") ans = myPopUp(self.main_wnd, QMessageBox.Question, 'PET4L - spath search', mess) if ans == QMessageBox.Yes: # Look for 10 more addresses @@ -165,9 +164,8 @@ def onSave(self): "All Files (*);; Text Files (*.txt)", options=options) try: if fileName: - save_file = open(fileName, 'w', encoding="utf-8") - save_file.write(self.ui.signatureTextEdt.toPlainText()) - save_file.close() + with open(fileName, 'w', encoding="utf-8") as save_file: + save_file.write(self.ui.signatureTextEdt.toPlainText()) myPopUp_sb(self.main_wnd, QMessageBox.Information, 'PET4L - saved', "Signature saved to file") return except Exception as e: @@ -188,7 +186,7 @@ def onSearchPK(self): if not checkPivxAddr(addy, self.currIsTestnet): net = "testnet" if self.currIsTestnet else "mainnet" - mess = "PIVX address not valid. Insert valid PIVX %s address" % net + mess = f"PIVX address not valid. Insert valid PIVX {net} address" myPopUp_sb(self.main_wnd, QMessageBox.Warning, 'PET4L - invalid address', mess) return @@ -251,9 +249,9 @@ def setSignEnabled(self, enabled): self.ui.hiddenLine.setVisible(not enabled) tooltip = "" if not enabled: - tooltip = "You need to find the address PK in your hardware device first.\n" \ - "Insert the account number (usually 0) and either a PIVX address\n" \ - "or the spath_id (address number) and click 'Search HW'." + tooltip = ("You need to find the address PK in your hardware device first.\n" + "Insert the account number (usually 0) and either a PIVX address\n" + "or the spath_id (address number) and click 'Search HW'.") self.ui.addressLabel.setText("") self.ui.signBtn.setToolTip(tooltip) @@ -263,14 +261,14 @@ def updateGenericAddress(self, pk): # double check address addy = self.ui.addressLineEdit.text().strip() if addy != genericAddy: - mess = "Error! retrieved address (%s) different from input (%s)" % (genericAddy, addy) + mess = f"Error! retrieved address ({genericAddy}) different from input ({addy})" myPopUp_sb(self.main_wnd, QMessageBox.Critical, 'PET4L - address mismatch', mess) self.ui.addressLabel.setText("") return # update generic address self.setSignEnabled(True) self.currAddress = genericAddy - self.currHwPath = "%d'/0/%d" % (self.hwAcc, self.spath) + self.currHwPath = f"{self.hwAcc}'/0/{self.spath}" self.ui.addressLabel.setText(self.currAddress) self.ui.editBtn.setVisible(True) @@ -278,7 +276,7 @@ def updateGenericAddress(self, pk): class TabVerify: def __init__(self, main_wnd): self.main_wnd = main_wnd - self.ui = TabVerify_gui() + self.ui = TabVerifyGui() # connect signals/buttons self.ui.verifyBtn.clicked.connect(lambda: self.onVerify()) @@ -307,21 +305,21 @@ def onVerify(self): self.ui.signatureTextEdt.toPlainText(), self.ui.addressLineEdit.text().strip()) except Exception as e: - mess = "Error decoding signature:\n" + str(e) + mess = f"Error decoding signature:\n{e}" myPopUp_sb(self.main_wnd, QMessageBox.Warning, 'PET4L - invalid signature', mess) ok = False if ok: mess = "Signature OK" else: mess = "Signature doesn't verify" - mess = "" + mess + "" + mess = f"{mess}" self.ui.resultLabel.setText(mess) self.ui.resultLabel.setVisible(True) -class TabSign_gui(QWidget): +class TabSignGui(QWidget): def __init__(self): - QWidget.__init__(self) + super().__init__() layout = QVBoxLayout() layout.setContentsMargins(10, 10, 10, 10) layout.setSpacing(13) @@ -426,7 +424,7 @@ def __init__(self): self.copyBtn.setVisible(False) row5.addWidget(self.copyBtn) self.saveBtn = QPushButton("Save") - self.saveBtn.setToolTip("Save signature to ca file") + self.saveBtn.setToolTip("Save signature to a file") self.saveBtn.setVisible(False) row5.addWidget(self.saveBtn) layout.addLayout(row5) @@ -434,9 +432,9 @@ def __init__(self): self.setLayout(layout) -class TabVerify_gui(QWidget): +class TabVerifyGui(QWidget): def __init__(self): - QWidget.__init__(self) + super().__init__() layout = QVBoxLayout() layout.setContentsMargins(10, 10, 10, 10) layout.setSpacing(13) diff --git a/src/qt/gui_tabRewards.py b/src/qt/gui_tabRewards.py index 31c1da5..944385d 100644 --- a/src/qt/gui_tabRewards.py +++ b/src/qt/gui_tabRewards.py @@ -8,16 +8,18 @@ import os.path from PyQt5.QtCore import Qt -from PyQt5.Qt import QLabel, QFormLayout, QDoubleSpinBox, QTableWidget, QTableWidgetItem, QAbstractItemView, QHeaderView, QSpinBox -from PyQt5.QtWidgets import QWidget, QPushButton, QHBoxLayout, QGroupBox, QVBoxLayout -from PyQt5.QtWidgets import QLineEdit, QComboBox, QProgressBar +from PyQt5.QtWidgets import ( + QWidget, QPushButton, QHBoxLayout, QGroupBox, QVBoxLayout, QLineEdit, QComboBox, + QProgressBar, QLabel, QFormLayout, QDoubleSpinBox, QTableWidget, QTableWidgetItem, + QAbstractItemView, QHeaderView, QSpinBox +) sys.path.append(os.path.join(os.path.dirname(__file__), 'src')) -class TabRewards_gui(QWidget): +class TabRewardsGui(QWidget): def __init__(self, imgDir, *args, **kwargs): - QWidget.__init__(self) + super().__init__(*args, **kwargs) self.imgDir = imgDir self.initRewardsForm() mainVertical = QVBoxLayout() @@ -34,6 +36,7 @@ def initRewardsForm(self): layout.setContentsMargins(10, 10, 10, 10) layout.setSpacing(13) layout.setFieldGrowthPolicy(QFormLayout.AllNonFixedFieldsGrow) + # --- ROW 1 line1 = QHBoxLayout() line1.addWidget(QLabel("Account HW")) @@ -69,6 +72,7 @@ def initRewardsForm(self): self.btn_reload.setToolTip("Reload data from ledger device") line1.addWidget(self.btn_reload) layout.addRow(line1) + # --- ROW 2: address and copy btn hBox = QHBoxLayout() self.addySelect = QComboBox() @@ -79,6 +83,7 @@ def initRewardsForm(self): self.btn_Copy.setToolTip("Copy address to clipboard") hBox.addWidget(self.btn_Copy) layout.addRow(hBox) + # --- ROW 3: UTXOs self.rewardsList = QVBoxLayout() self.rewardsList.statusLabel = QLabel('Reload Rewards') @@ -86,7 +91,6 @@ def initRewardsForm(self): self.rewardsList.addWidget(self.rewardsList.statusLabel) self.rewardsList.box = QTableWidget() self.rewardsList.box.setMinimumHeight(230) - # self.rewardsList.box.setMaximumHeight(140) self.rewardsList.box.setHorizontalScrollBarPolicy(Qt.ScrollBarAlwaysOff) self.rewardsList.box.setSelectionMode(QAbstractItemView.MultiSelection) self.rewardsList.box.setSelectionBehavior(QAbstractItemView.SelectRows) @@ -95,6 +99,7 @@ def initRewardsForm(self): self.rewardsList.box.setRowCount(0) self.rewardsList.box.horizontalHeader().setSectionResizeMode(2, QHeaderView.Stretch) self.rewardsList.box.verticalHeader().hide() + item = QTableWidgetItem() item.setText("PIVs") item.setTextAlignment(Qt.AlignCenter) @@ -111,8 +116,10 @@ def initRewardsForm(self): item.setText("TX Output N") item.setTextAlignment(Qt.AlignCenter) self.rewardsList.box.setHorizontalHeaderItem(3, item) + self.rewardsList.addWidget(self.rewardsList.box) layout.addRow(self.rewardsList) + # --- ROW 3 hBox2 = QHBoxLayout() self.btn_selectAllRewards = QPushButton("Select All") @@ -129,6 +136,7 @@ def initRewardsForm(self): hBox2.addWidget(self.selectedRewardsLine) hBox2.addStretch(1) layout.addRow(hBox2) + # --- ROW 4 hBox3 = QHBoxLayout() self.destinationLine = QLineEdit() @@ -145,6 +153,7 @@ def initRewardsForm(self): self.btn_sendRewards = QPushButton("Send") hBox3.addWidget(self.btn_sendRewards) layout.addRow(QLabel("Destination Address"), hBox3) + hBox4 = QHBoxLayout() hBox4.addStretch(1) self.loadingLine = QLabel("Preparing TX. Completed: ") @@ -157,8 +166,10 @@ def initRewardsForm(self): self.loadingLine.hide() self.loadingLinePercent.hide() layout.addRow(hBox4) + # --- Set Layout self.rewardsForm.setLayout(layout) + # --- ROW 5 self.btn_Cancel = QPushButton("Clear") From 0e3a8bab81f96626f9f5028e9a8ee61832ab53bd Mon Sep 17 00:00:00 2001 From: Liquid369 Date: Tue, 23 Jul 2024 10:26:06 -0500 Subject: [PATCH 02/11] libs, misc, utils fstrings and simplification --- src/misc.py | 250 +++++++++++++++++++------------------------- src/pivx_b58.py | 14 ++- src/pivx_hashlib.py | 26 +++-- src/pivx_parser.py | 71 ++++++------- src/utils.py | 21 ++-- 5 files changed, 165 insertions(+), 217 deletions(-) diff --git a/src/misc.py b/src/misc.py index 6e377ec..8c7792d 100644 --- a/src/misc.py +++ b/src/misc.py @@ -11,6 +11,7 @@ from contextlib import redirect_stdout from ipaddress import ip_address from urllib.parse import urlparse +from typing import Any, Callable, Dict, Optional, Type import simplejson as json from PyQt5.QtCore import QObject, pyqtSignal, QSettings @@ -19,13 +20,13 @@ from constants import log_File, DefaultCache, wqueue, MAX_INPUTS_NO_WARNING -def add_defaultKeys_to_dict(dictObj, defaultObj): +def add_defaultKeys_to_dict(dictObj: Dict[str, Any], defaultObj: Dict[str, Any]) -> None: for key in defaultObj: if key not in dictObj: dictObj[key] = defaultObj[key] -QT_MESSAGE_TYPE = { +QT_MESSAGE_TYPE: Dict[str, Type[QMessageBox.Icon]] = { "info": QMessageBox.Information, "warn": QMessageBox.Warning, "crit": QMessageBox.Critical, @@ -33,71 +34,69 @@ def add_defaultKeys_to_dict(dictObj, defaultObj): } -def checkRPCstring(urlstring): +def checkRPCstring(urlstring: str) -> bool: try: o = urlparse(urlstring) - if o.scheme is None or o.scheme == '': - raise Exception("Wrong protocol. Set either http or https.") - if o.netloc is None or o.netloc == '': - raise Exception("Malformed host network location part.") - if o.port is None or o.port == '': - raise Exception("Wrong IP port number") - if o.username is None: - raise Exception("Malformed username") - if o.password is None: - raise Exception("Malformed password") + if not o.scheme: + raise ValueError("Wrong protocol. Set either http or https.") + if not o.netloc: + raise ValueError("Malformed host network location part.") + if not o.port: + raise ValueError("Wrong IP port number") + if not o.username: + raise ValueError("Malformed username") + if not o.password: + raise ValueError("Malformed password") return True - except Exception as e: error_msg = "Unable to parse URL" printException(getCallerName(), getFunctionName(), error_msg, e) return False -def checkTxInputs(parentWindow, num_of_inputs): +def checkTxInputs(parentWindow: Any, num_of_inputs: int) -> Optional[int]: if num_of_inputs == 0: myPopUp_sb(parentWindow, "warn", 'Transaction NOT sent', "No UTXO to send") return None if num_of_inputs > MAX_INPUTS_NO_WARNING: - warning = "Warning: Trying to spend %d inputs.\nA few minutes could be required " \ - "for the transaction to be prepared and signed.\n\nThe hardware device must remain unlocked " \ - "during the whole time (it's advised to disable the auto-lock feature)\n\n" \ - "Do you wish to proceed?" % num_of_inputs - title = "PET4L - spending more than %d inputs" % MAX_INPUTS_NO_WARNING + warning = (f"Warning: Trying to spend {num_of_inputs} inputs.\n" + "A few minutes could be required for the transaction to be prepared and signed.\n\n" + "The hardware device must remain unlocked during the whole time " + "(it's advised to disable the auto-lock feature)\n\n" + "Do you wish to proceed?") + title = f"PET4L - spending more than {MAX_INPUTS_NO_WARNING} inputs" return myPopUp(parentWindow, "warn", title, warning) return QMessageBox.Yes -def clean_for_html(text): +def clean_for_html(text: Optional[str]) -> str: if text is None: return "" return text.replace("<", "{").replace(">", "}") -def clear_screen(): +def clear_screen() -> None: os.system('clear') -def getCallerName(inDecorator=False): +def getCallerName(inDecorator: bool = False) -> Optional[str]: try: - if inDecorator: - return sys._getframe(3).f_code.co_name - return sys._getframe(2).f_code.co_name + frame = sys._getframe(3 if inDecorator else 2) + return frame.f_code.co_name except Exception: return None -def getFunctionName(inDecorator=False): +def getFunctionName(inDecorator: bool = False) -> Optional[str]: try: - if inDecorator: - return sys._getframe(2).f_code.co_name - return sys._getframe(1).f_code.co_name + frame = sys._getframe(2 if inDecorator else 1) + return frame.f_code.co_name except Exception: return None -def getRemotePET4Lversion(): +def getRemotePET4Lversion() -> str: import requests try: resp = requests.get("https://raw.githubusercontent.com/PIVX-Project/PET4L/master/src/version.txt") @@ -105,78 +104,65 @@ def getRemotePET4Lversion(): data = resp.json() return data['number'] else: - raise Exception - + raise ValueError("Invalid response from GitHub") except Exception: redirect_print("Invalid response getting version from GitHub\n") return "0.0.0" -def getVersion(): - version_file = os.path.join( - os.path.dirname(os.path.abspath(__file__)), 'version.txt') +def getVersion() -> Dict[str, Any]: + version_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'version.txt') with open(version_file, encoding="utf-8") as data_file: data = json.load(data_file) - return data -def getTxidTxidn(txid, txidn): +def getTxidTxidn(txid: Optional[str], txidn: Optional[int]) -> Optional[str]: if txid is None or txidn is None: return None else: - return txid + '-' + str(txidn) + return f"{txid}-{txidn}" -def initLogs(): +def initLogs() -> None: filename = log_File filemode = 'w' format = '%(asctime)s - %(levelname)s - %(threadName)s | %(message)s' level = logging.DEBUG - logging.basicConfig(filename=filename, - filemode=filemode, - format=format, - level=level - ) + logging.basicConfig(filename=filename, filemode=filemode, format=format, level=level) -def ipport(ip, port): +def ipport(ip: Optional[str], port: Optional[str]) -> Optional[str]: if ip is None or port is None: return None elif ip.endswith('.onion'): - return ip + ':' + port + return f"{ip}:{port}" else: ipAddr = ip_address(ip) if ipAddr.version == 4: - return ip + ':' + port + return f"{ip}:{port}" elif ipAddr.version == 6: - return "[" + ip + "]:" + port + return f"[{ip}]:{port}" else: - raise Exception("invalid IP version number") + raise ValueError("Invalid IP version number") -def myPopUp(parentWindow, messType, messTitle, messText, defaultButton=QMessageBox.No): - if messType in QT_MESSAGE_TYPE: - type = QT_MESSAGE_TYPE[messType] - else: - type = QMessageBox.Question - mess = QMessageBox(type, messTitle, messText, defaultButton, parent=parentWindow) +def myPopUp(parentWindow: Any, messType: str, messTitle: str, messText: str, defaultButton: QMessageBox.StandardButton = QMessageBox.No) -> int: + message_type = QT_MESSAGE_TYPE.get(messType, QMessageBox.Question) + mess = QMessageBox(message_type, messTitle, messText, defaultButton, parent=parentWindow) mess.setStandardButtons(QMessageBox.Yes | QMessageBox.No) mess.setDefaultButton(defaultButton) return mess.exec_() -def myPopUp_sb(parentWindow, messType, messTitle, messText, singleButton=QMessageBox.Ok): - if messType in QT_MESSAGE_TYPE: - type = QT_MESSAGE_TYPE[messType] - else: - type = QMessageBox.Information - mess = QMessageBox(type, messTitle, messText, singleButton, parent=parentWindow) +def myPopUp_sb(parentWindow: Any, messType: str, messTitle: str, messText: str, singleButton: QMessageBox.StandardButton = QMessageBox.Ok) -> int: + message_type = QT_MESSAGE_TYPE.get(messType, QMessageBox.Information) + mess = QMessageBox(message_type, messTitle, messText, singleButton, parent=parentWindow) mess.setStandardButtons(singleButton | singleButton) return mess.exec_() -def is_hex(s): +def is_hex(s: str) -> bool: try: int(s, 16) return True @@ -184,122 +170,99 @@ def is_hex(s): return False -def now(): +def now() -> int: return int(time.time()) -def persistCacheSetting(cache_key, cache_value): +def persistCacheSetting(cache_key: str, cache_value: Any) -> Any: settings = QSettings('PIVX', 'PET4L') if not settings.contains(cache_key): - printDbg("Cache key %s not found" % str(cache_key)) + printDbg(f"Cache key {cache_key} not found") printOK("Adding new cache key to settings...") - - if type(cache_value) in [list, dict]: - settings.setValue(cache_key, json.dumps(cache_value)) - else: - settings.setValue(cache_key, cache_value) - + settings.setValue(cache_key, json.dumps(cache_value) if isinstance(cache_value, (list, dict)) else cache_value) return cache_value -def printDbg(what): +def printDbg(what: str) -> None: logging.info(what) log_line = printDbg_msg(what) redirect_print(log_line) -def printDbg_msg(what): +def printDbg_msg(what: str) -> str: what = clean_for_html(what) timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(now())) - log_line = '{} : {}
'.format(timestamp, what) + log_line = f'{timestamp} : {what}
' return log_line -def printError( - caller_name, - function_name, - what -): - logging.error("%s | %s | %s" % (caller_name, function_name, what)) +def printError(caller_name: Optional[str], function_name: Optional[str], what: str) -> None: + logging.error(f"{caller_name} | {function_name} | {what}") log_line = printException_msg(caller_name, function_name, what, None, True) redirect_print(log_line) -def printException( - caller_name, - function_name, - err_msg, - errargs=None -): +def printException(caller_name: Optional[str], function_name: Optional[str], err_msg: str, errargs: Optional[Any] = None) -> None: what = err_msg if errargs is not None: - what += " ==> %s" % str(errargs) - logging.warning("%s | %s | %s" % (caller_name, function_name, what)) + what += f" ==> {errargs}" + logging.warning(f"{caller_name} | {function_name} | {what}") text = printException_msg(caller_name, function_name, err_msg, errargs) redirect_print(text) -def printException_msg( - caller_name, - function_name, - err_msg, - errargs=None, - is_error=False -): - if is_error: - msg = 'ERROR
' - else: - msg = 'EXCEPTION
' - msg += 'caller : %s
' % caller_name - msg += 'function : %s
' % function_name +def printException_msg(caller_name: Optional[str], function_name: Optional[str], err_msg: str, errargs: Optional[Any] = None, is_error: bool = False) -> str: + msg = 'ERROR
' if is_error else 'EXCEPTION
' + msg += f'caller : {caller_name}
' + msg += f'function : {function_name}
' msg += '' if errargs: - msg += 'err: %s
' % str(errargs) - - msg += '===> %s

' % err_msg + msg += f'err: {errargs}
' + msg += f'===> {err_msg}
' return msg -def printOK(what): +def printOK(what: str) -> None: logging.debug(what) - msg = '===> ' + what + '
' + msg = f'===> {what}
' redirect_print(msg) -def splitString(text, n): +def splitString(text: str, n: int) -> str: arr = [text[i:i + n] for i in range(0, len(text), n)] return '\n'.join(arr) -def readCacheSettings(): +def readCacheSettings() -> Dict[str, Any]: settings = QSettings('PIVX', 'PET4L') try: - cache = {} - cache["lastAddress"] = settings.value('cache_lastAddress', DefaultCache["lastAddress"], type=str) - cache["window_width"] = settings.value('cache_winWidth', DefaultCache["window_width"], type=int) - cache["window_height"] = settings.value('cache_winHeight', DefaultCache["window_height"], type=int) - cache["splitter_x"] = settings.value('cache_splitterX', DefaultCache["splitter_x"], type=int) - cache["splitter_y"] = settings.value('cache_splitterY', DefaultCache["splitter_y"], type=int) - cache["console_hidden"] = settings.value('cache_consoleHidden', DefaultCache["console_hidden"], type=bool) - cache["selectedHW_index"] = settings.value('cache_HWindex', DefaultCache["selectedHW_index"], type=int) - cache["selectedRPC_index"] = settings.value('cache_RPCindex', DefaultCache["selectedRPC_index"], type=int) - cache["isTestnetRPC"] = settings.value('cache_isTestnetRPC', DefaultCache["isTestnetRPC"], type=bool) - cache["hwAcc"] = settings.value('cache_hwAcc', DefaultCache["hwAcc"], type=int) - cache["spathFrom"] = settings.value('cache_spathFrom', DefaultCache["spathFrom"], type=int) - cache["spathTo"] = settings.value('cache_spathTo', DefaultCache["spathTo"], type=int) - cache["intExt"] = settings.value('cache_intExt', DefaultCache["intExt"], type=int) + cache = { + "lastAddress": settings.value('cache_lastAddress', DefaultCache["lastAddress"], type=str), + "window_width": settings.value('cache_winWidth', DefaultCache["window_width"], type=int), + "window_height": settings.value('cache_winHeight', DefaultCache["window_height"], type=int), + "splitter_x": settings.value('cache_splitterX', DefaultCache["splitter_x"], type=int), + "splitter_y": settings.value('cache_splitterY', DefaultCache["splitter_y"], type=int), + "console_hidden": settings.value('cache_consoleHidden', DefaultCache["console_hidden"], type=bool), + "selectedHW_index": settings.value('cache_HWindex', DefaultCache["selectedHW_index"], type=int), + "selectedRPC_index": settings.value('cache_RPCindex', DefaultCache["selectedRPC_index"], type=int), + "isTestnetRPC": settings.value('cache_isTestnetRPC', DefaultCache["isTestnetRPC"], type=bool), + "hwAcc": settings.value('cache_hwAcc', DefaultCache["hwAcc"], type=int), + "spathFrom": settings.value('cache_spathFrom', DefaultCache["spathFrom"], type=int), + "spathTo": settings.value('cache_spathTo', DefaultCache["spathTo"], type=int), + "intExt": settings.value('cache_intExt', DefaultCache["intExt"], type=int) + } add_defaultKeys_to_dict(cache, DefaultCache) return cache - except: + except Exception: return DefaultCache -def redirect_print(what): +def redirect_print(what: str) -> None: with redirect_stdout(WriteStream(wqueue)): print(what) -def saveCacheSettings(cache): +def saveCacheSettings(cache: Dict[str, Any]) -> None: settings = QSettings('PIVX', 'PET4L') settings.setValue('cache_lastAddress', cache.get('lastAddress')) settings.setValue('cache_winWidth', cache.get('window_width')) @@ -316,28 +279,25 @@ def saveCacheSettings(cache): settings.setValue('cache_intExt', cache.get('intExt')) -def sec_to_time(seconds): - days = seconds // 86400 - seconds -= days * 86400 - hrs = seconds // 3600 - seconds -= hrs * 3600 - mins = seconds // 60 - seconds -= mins * 60 - return "{} days, {} hrs, {} mins, {} secs".format(days, hrs, mins, seconds) +def sec_to_time(seconds: int) -> str: + days, seconds = divmod(seconds, 86400) + hrs, seconds = divmod(seconds, 3600) + mins, seconds = divmod(seconds, 60) + return f"{days} days, {hrs} hrs, {mins} mins, {seconds} secs" -def timeThis(function, *args): +def timeThis(function: Callable[..., Any], *args: Any) -> tuple[Optional[Any], Optional[float]]: try: - start = time.clock() + start = time.perf_counter() val = function(*args) - end = time.clock() + end = time.perf_counter() return val, (end - start) except Exception: return None, None class DisconnectedException(Exception): - def __init__(self, message, hwDevice): + def __init__(self, message: str, hwDevice: Any): # Call the base class constructor super().__init__(message) # clear device @@ -345,14 +305,14 @@ def __init__(self, message, hwDevice): # Stream object to redirect sys.stdout and sys.stderr to a queue -class WriteStream(object): - def __init__(self, queue): +class WriteStream: + def __init__(self, queue: Any): self.queue = queue - def write(self, text): + def write(self, text: str) -> None: self.queue.put(text) - def flush(self): + def flush(self) -> None: pass @@ -361,11 +321,11 @@ def flush(self): class WriteStreamReceiver(QObject): mysignal = pyqtSignal(str) - def __init__(self, queue, *args, **kwargs): - QObject.__init__(self, *args, **kwargs) + def __init__(self, queue: Any, *args: Any, **kwargs: Any): + super().__init__(*args, **kwargs) self.queue = queue - def run(self): + def run(self) -> None: while True: text = self.queue.get() self.mysignal.emit(text) diff --git a/src/pivx_b58.py b/src/pivx_b58.py index 1518dda..fda81be 100644 --- a/src/pivx_b58.py +++ b/src/pivx_b58.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -# -*- coding: utf-8 -*-import sys +# -*- coding: utf-8 -*- # Copyright (c) 2017-2019 Random.Zebra (https://github.com/random-zebra/) # Distributed under the MIT software license, see the accompanying # file LICENSE.txt or http://www.opensource.org/licenses/mit-license.php. @@ -8,17 +8,16 @@ __b58base = len(__b58chars) b58chars = __b58chars -long = int _bchr = lambda x: bytes([x]) _bord = lambda x: x -def b58encode(v): +def b58encode(v: bytes) -> str: """ encode v, which is a string of bytes, to base58. """ long_value = 0 - for (i, c) in enumerate(v[::-1]): + for i, c in enumerate(v[::-1]): long_value += (256 ** i) * _bord(c) result = '' @@ -40,11 +39,10 @@ def b58encode(v): return (__b58chars[0] * nPad) + result -def b58decode(v, length=None): - """ decode v into a string of len bytes - """ +def b58decode(v: str, length: int = None) -> bytes: + """ decode v into a string of len bytes """ long_value = 0 - for (i, c) in enumerate(v[::-1]): + for i, c in enumerate(v[::-1]): long_value += __b58chars.find(c) * (__b58base ** i) result = bytes() diff --git a/src/pivx_hashlib.py b/src/pivx_hashlib.py index 628d7f2..58164d0 100644 --- a/src/pivx_hashlib.py +++ b/src/pivx_hashlib.py @@ -12,20 +12,20 @@ from pivx_b58 import b58encode, b58decode -def double_sha256(data): +def double_sha256(data: bytes) -> bytes: return hashlib.sha256(hashlib.sha256(data).digest()).digest() -def single_sha256(data): +def single_sha256(data: bytes) -> bytes: return hashlib.sha256(data).digest() -def generate_privkey(isTestnet=False): +def generate_privkey(isTestnet: bool = False) -> str: """ Based on Andreas Antonopolous work from 'Mastering Bitcoin'. """ valid = False - privkey = 0 + privkey = "" while not valid: privkey = bitcoin.random_key() decoded_private_key = bitcoin.decode_privkey(privkey, 'hex') @@ -33,20 +33,20 @@ def generate_privkey(isTestnet=False): return base58fromhex(privkey, isTestnet) -def base58fromhex(hexstr, isTestnet): +def base58fromhex(hexstr: str, isTestnet: bool) -> str: base58_secret = TESTNET_WIF_PREFIX if isTestnet else WIF_PREFIX data = bytes([base58_secret]) + bytes.fromhex(hexstr) checksum = bitcoin.bin_dbl_sha256(data)[0:4] return b58encode(data + checksum) -def pubkey_to_address(pubkey, isTestnet=False, isCold=False): +def pubkey_to_address(pubkey: str, isTestnet: bool = False, isCold: bool = False) -> str: pubkey_bin = bytes.fromhex(pubkey) pkey_hash = bitcoin.bin_hash160(pubkey_bin) return pubkeyhash_to_address(pkey_hash, isTestnet, isCold) -def pubkeyhash_to_address(pkey_hash, isTestnet=False, isCold=False): +def pubkeyhash_to_address(pkey_hash: bytes, isTestnet: bool = False, isCold: bool = False) -> str: if isCold: base58_secret = TESTNET_STAKE_MAGIC_BYTE if isTestnet else STAKE_MAGIC_BYTE else: @@ -56,24 +56,22 @@ def pubkeyhash_to_address(pkey_hash, isTestnet=False, isCold=False): return b58encode(data + checksum) -def wif_to_privkey(string): - wif_compressed = 52 == len(string) +def wif_to_privkey(string: str) -> str | None: + wif_compressed = len(string) == 52 pvkeyencoded = b58decode(string).hex() wifversion = pvkeyencoded[:2] checksum = pvkeyencoded[-8:] vs = bytes.fromhex(pvkeyencoded[:-8]) check = double_sha256(vs)[0:4] - if (wifversion == WIF_PREFIX.to_bytes(1, byteorder='big').hex() and checksum == check.hex()) \ - or (wifversion == TESTNET_WIF_PREFIX.to_bytes(1, byteorder='big').hex() and checksum == check.hex()): + if (wifversion == WIF_PREFIX.to_bytes(1, byteorder='big').hex() and checksum == check.hex()) or \ + (wifversion == TESTNET_WIF_PREFIX.to_bytes(1, byteorder='big').hex() and checksum == check.hex()): if wif_compressed: privkey = pvkeyencoded[2:-10] - else: privkey = pvkeyencoded[2:-8] return privkey - else: - return None + return None diff --git a/src/pivx_parser.py b/src/pivx_parser.py index ce95c88..14b7e55 100644 --- a/src/pivx_parser.py +++ b/src/pivx_parser.py @@ -10,11 +10,11 @@ class HexParser: - def __init__(self, hex_str): + def __init__(self, hex_str: str): self.cursor = 0 self.hex_str = hex_str - def readInt(self, nbytes, byteorder="big", signed=False): + def readInt(self, nbytes: int, byteorder: str = "big", signed: bool = False) -> int: if self.cursor + nbytes * 2 > len(self.hex_str): raise Exception("HexParser range error") b = bytes.fromhex(self.hex_str[self.cursor:self.cursor + nbytes * 2]) @@ -22,7 +22,7 @@ def readInt(self, nbytes, byteorder="big", signed=False): self.cursor += nbytes * 2 return res - def readVarInt(self): + def readVarInt(self) -> int: r = self.readInt(1) if r == 253: return self.readInt(2, "little") @@ -32,7 +32,7 @@ def readVarInt(self): return self.readInt(8, "little") return r - def readString(self, nbytes, byteorder="big"): + def readString(self, nbytes: int, byteorder: str = "big") -> str: if self.cursor + nbytes * 2 > len(self.hex_str): raise Exception("HexParser range error") res = self.hex_str[self.cursor:self.cursor + nbytes * 2] @@ -43,18 +43,19 @@ def readString(self, nbytes, byteorder="big"): return res -def IsCoinBase(vin): +def IsCoinBase(vin: dict) -> bool: return vin["txid"] == "0" * 64 and vin["vout"] == 4294967295 and vin["scriptSig"]["hex"][:2] != "c2" -def ParseTxInput(p): - vin = {} - vin["txid"] = p.readString(32, "little") - vin["vout"] = p.readInt(4, "little") - script_len = p.readVarInt() - vin["scriptSig"] = {} - vin["scriptSig"]["hex"] = p.readString(script_len, "big") - vin["sequence"] = p.readInt(4, "little") +def ParseTxInput(p: HexParser) -> dict: + vin = { + "txid": p.readString(32, "little"), + "vout": p.readInt(4, "little"), + "scriptSig": { + "hex": p.readString(p.readVarInt(), "big") + }, + "sequence": p.readInt(4, "little") + } if IsCoinBase(vin): del vin["txid"] del vin["vout"] @@ -64,13 +65,14 @@ def ParseTxInput(p): return vin -def ParseTxOutput(p, isTestnet=False): - vout = {} - vout["value"] = p.readInt(8, "little") - script_len = p.readVarInt() - vout["scriptPubKey"] = {} - vout["scriptPubKey"]["hex"] = p.readString(script_len, "big") - vout["scriptPubKey"]["addresses"] = [] +def ParseTxOutput(p: HexParser, isTestnet: bool = False) -> dict: + vout = { + "value": p.readInt(8, "little"), + "scriptPubKey": { + "hex": p.readString(p.readVarInt(), "big"), + "addresses": [] + } + } try: locking_script = bytes.fromhex(vout["scriptPubKey"]["hex"]) # add addresses only if P2PKH, P2PK or P2CS @@ -83,37 +85,28 @@ def ParseTxOutput(p, isTestnet=False): return vout -def ParseTx(hex_string, isTestnet=False): +def ParseTx(hex_string: str, isTestnet: bool = False) -> dict: p = HexParser(hex_string) - tx = {} - - tx["version"] = p.readInt(4, "little") - - num_of_inputs = p.readVarInt() - tx["vin"] = [] - for i in range(num_of_inputs): - tx["vin"].append(ParseTxInput(p)) - - num_of_outputs = p.readVarInt() - tx["vout"] = [] - for i in range(num_of_outputs): - tx["vout"].append(ParseTxOutput(p, isTestnet)) - - tx["locktime"] = p.readInt(4, "little") + tx = { + "version": p.readInt(4, "little"), + "vin": [ParseTxInput(p) for _ in range(p.readVarInt())], + "vout": [ParseTxOutput(p, isTestnet) for _ in range(p.readVarInt())], + "locktime": p.readInt(4, "little") + } return tx -def IsPayToColdStaking(rawtx, out_n): +def IsPayToColdStaking(rawtx: str, out_n: int) -> tuple[bool, bool]: tx = ParseTx(rawtx) script = tx['vout'][out_n]["scriptPubKey"]["hex"] return utils.IsPayToColdStaking(bytes.fromhex(script)), IsCoinStake(tx) -def IsCoinStake(json_tx): +def IsCoinStake(json_tx: dict) -> bool: return json_tx['vout'][0]["scriptPubKey"]["hex"] == "" -def GetDelegatedStaker(rawtx, out_n, isTestnet): +def GetDelegatedStaker(rawtx: str, out_n: int, isTestnet: bool) -> str: tx = ParseTx(rawtx) script = tx['vout'][out_n]["scriptPubKey"]["hex"] if not utils.IsPayToColdStaking(bytes.fromhex(script)): diff --git a/src/utils.py b/src/utils.py index 911ebb7..d7c16e0 100644 --- a/src/utils.py +++ b/src/utils.py @@ -35,13 +35,13 @@ def b64encode(text): def checkPivxAddr(address, isTestnet=False): try: - # check leading char 'D' or (for testnet) 'x' or 'y' + # Check leading char 'D' or (for testnet) 'x' or 'y' if isTestnet and address[0] not in P2PKH_PREFIXES_TNET + P2SH_PREFIXES_TNET: return False if not isTestnet and address[0] not in P2PKH_PREFIXES + P2SH_PREFIXES: return False - # decode and verify checksum + # Decode and verify checksum addr_bin = bytes.fromhex(b58decode(address).hex()) addr_bin_check = bin_dbl_sha256(addr_bin[0:-4])[0:4] if addr_bin[-4:] != addr_bin_check: @@ -58,13 +58,13 @@ def compose_tx_locking_script(dest_address, isTestnet=False): :param dest_address: destination address in Base58Check format :return: sequence of opcodes and its arguments, defining logic of the locking script """ - pubkey_hash = bytearray.fromhex(b58check_to_hex(dest_address)) # convert address to a public key hash + pubkey_hash = bytearray.fromhex(b58check_to_hex(dest_address)) # Convert address to a public key hash if len(pubkey_hash) != 20: raise Exception('Invalid length of the public key hash: ' + str(len(pubkey_hash))) if (((not isTestnet) and (dest_address[0] in P2PKH_PREFIXES)) or (isTestnet and (dest_address[0] in P2PKH_PREFIXES_TNET))): - # sequence of opcodes/arguments for p2pkh (pay-to-public-key-hash) + # Sequence of opcodes/arguments for p2pkh (pay-to-public-key-hash) scr = OP_DUP + \ OP_HASH160 + \ int.to_bytes(len(pubkey_hash), 1, byteorder='little') + \ @@ -73,7 +73,7 @@ def compose_tx_locking_script(dest_address, isTestnet=False): OP_CHECKSIG elif (((not isTestnet) and (dest_address[0] in P2SH_PREFIXES)) or (isTestnet and (dest_address[0] in P2SH_PREFIXES_TNET))): - # sequence of opcodes/arguments for p2sh (pay-to-script-hash) + # Sequence of opcodes/arguments for p2sh (pay-to-script-hash) scr = OP_HASH160 + \ int.to_bytes(len(pubkey_hash), 1, byteorder='little') + \ pubkey_hash + \ @@ -182,7 +182,7 @@ def ipmap(ip, port): ipv6map += a else: - raise Exception("invalid version number (%d)" % ipAddr.version) + raise Exception("Invalid version number (%d)" % ipAddr.version) ipv6map += int(port).to_bytes(2, byteorder='big').hex() if len(ipv6map) != 36: @@ -210,16 +210,16 @@ def num_to_varint(a): def read_varint(buffer, offset): - if (buffer[offset] < 0xfd): + if buffer[offset] < 0xfd: value_size = 1 value = buffer[offset] - elif (buffer[offset] == 0xfd): + elif buffer[offset] == 0xfd: value_size = 3 value = int.from_bytes(buffer[offset + 1: offset + 3], byteorder='little') - elif (buffer[offset] == 0xfe): + elif buffer[offset] == 0xfe: value_size = 5 value = int.from_bytes(buffer[offset + 1: offset + 5], byteorder='little') - elif (buffer[offset] == 0xff): + elif buffer[offset] == 0xff: value_size = 9 value = int.from_bytes(buffer[offset + 1: offset + 9], byteorder='little') else: @@ -236,7 +236,6 @@ def serialize_input_str(tx, prevout_n, sequence, script_sig): s.append(', ') if tx == '00' * 32 and prevout_n == 0xffffffff: s.append('coinbase %s' % script_sig) - else: script_sig2 = script_sig if len(script_sig2) > 24: From 2adbf041c540f4e8086ed8d4a05a44c98dc9841e Mon Sep 17 00:00:00 2001 From: Liquid369 Date: Tue, 23 Jul 2024 14:00:09 -0500 Subject: [PATCH 03/11] Cleanup API, RPC, BB, CryptoID --- src/apiClient.py | 7 ++- src/blockbookClient.py | 24 +++---- src/cryptoIDClient.py | 39 +++++------- src/hwdevice.py | 32 +++++----- src/ledgerClient.py | 80 ++++++++++-------------- src/rpcClient.py | 138 +++++++++++++---------------------------- src/trezorClient.py | 64 +++++-------------- 7 files changed, 136 insertions(+), 248 deletions(-) diff --git a/src/apiClient.py b/src/apiClient.py index 2205db8..a54e903 100644 --- a/src/apiClient.py +++ b/src/apiClient.py @@ -31,9 +31,10 @@ def process_api_exceptions_int(*args, **kwargs): class ApiClient: - def __init__(self, isTestnet=False): - self.isTestnet = isTestnet - self.api = BlockBookClient(isTestnet) + def __init__(self, main_wnd): + self.main_wnd = main_wnd + self.isTestnet = main_wnd.isTestnetRPC + self.api = BlockBookClient(main_wnd, self.isTestnet) @process_api_exceptions def getAddressUtxos(self, address): diff --git a/src/blockbookClient.py b/src/blockbookClient.py index 4348542..d6d55b7 100644 --- a/src/blockbookClient.py +++ b/src/blockbookClient.py @@ -15,11 +15,8 @@ def process_blockbook_exceptions_int(*args, **kwargs): try: return func(*args, **kwargs) except Exception as e: - if client.isTestnet: - new_url = "https://testnet.fuzzbawls.pw" - else: - new_url = "https://zkbitcoin.com/" - message = "BlockBook Client exception on %s\nTrying backup server %s" % (client.url, new_url) + new_url = "https://testnet.fuzzbawls.pw" if client.isTestnet else "https://zkbitcoin.com/" + message = f"BlockBook Client exception on {client.url}\nTrying backup server {new_url}" printException(getCallerName(True), getFunctionName(True), message, str(e)) try: client.url = new_url @@ -33,21 +30,18 @@ def process_blockbook_exceptions_int(*args, **kwargs): class BlockBookClient: - def __init__(self, isTestnet=False): + def __init__(self, main_wnd, isTestnet=False): + self.main_wnd = main_wnd self.isTestnet = isTestnet - if isTestnet: - self.url = "https://testnet.rockdev.org/" - else: - self.url = "https://explorer.rockdev.org/" + self.url = "https://testnet.rockdev.org/" if isTestnet else "https://explorer.rockdev.org/" def checkResponse(self, method, param=""): - url = self.url + "/api/%s" % method - if param != "": - url += "/%s" % param + url = f"{self.url}/api/{method}" + if param: + url += f"/{param}" resp = requests.get(url, data={}, verify=True) if resp.status_code == 200: - data = resp.json() - return data + return resp.json() raise Exception("Invalid response") @process_blockbook_exceptions diff --git a/src/cryptoIDClient.py b/src/cryptoIDClient.py index 45fb180..fbfef04 100644 --- a/src/cryptoIDClient.py +++ b/src/cryptoIDClient.py @@ -11,7 +11,6 @@ api_keys = ["b62b40b5091e", "f1d66708a077", "ed85c85c0126", "ccc60d06f737"] - def process_cryptoID_exceptions(func): def process_cryptoID_exceptions_int(*args, **kwargs): try: @@ -20,27 +19,23 @@ def process_cryptoID_exceptions_int(*args, **kwargs): message = "CryptoID Client exception" printException(getCallerName(True), getFunctionName(True), message, str(e)) return None - return process_cryptoID_exceptions_int - def UTXOS_cryptoID_to_trezor(utxos): # convert JSON labels new_utxos = [] for u in utxos: - new_u = {} - new_u["txid"] = u["tx_hash"] - new_u["vout"] = u["tx_ouput_n"] - new_u["satoshis"] = u["value"] - new_u["confirmations"] = u["confirmations"] - new_u["script"] = u["script"] + new_u = { + "txid": u["tx_hash"], + "vout": u["tx_ouput_n"], + "satoshis": u["value"], + "confirmations": u["confirmations"], + "script": u["script"] + } new_utxos.append(new_u) - return new_utxos - class CryptoIDClient: - def __init__(self, isTestnet=False): if isTestnet: raise Exception("\nNo CryptoID Testnet server\n") @@ -53,24 +48,24 @@ def checkResponse(self, parameters): parameters['key'] = key resp = requests.get(self.url, params=parameters) if resp.status_code == 200: - data = resp.json() - return data + return resp.json() return None @process_cryptoID_exceptions def getAddressUtxos(self, address): - self.parameters = {} - self.parameters['q'] = 'unspent' - self.parameters['active'] = address + self.parameters = { + 'q': 'unspent', + 'active': address + } res = self.checkResponse(self.parameters) if res is None: return None - else: - return UTXOS_cryptoID_to_trezor(res['unspent_outputs']) + return UTXOS_cryptoID_to_trezor(res['unspent_outputs']) @process_cryptoID_exceptions def getBalance(self, address): - self.parameters = {} - self.parameters['q'] = 'getbalance' - self.parameters['a'] = address + self.parameters = { + 'q': 'getbalance', + 'a': address + } return self.checkResponse(self.parameters) diff --git a/src/hwdevice.py b/src/hwdevice.py index 0b270e3..b93af8e 100644 --- a/src/hwdevice.py +++ b/src/hwdevice.py @@ -14,31 +14,28 @@ from time import sleep from trezorClient import TrezorApi - def check_api_init(func): def func_int(*args, **kwargs): hwDevice = args[0] if hwDevice.api is None: - logging.warning("%s: hwDevice.api is None" % func.__name__) + logging.warning(f"{func.__name__}: hwDevice.api is None") raise Exception("HW device: client not initialized") return func(*args, **kwargs) - return func_int - class HWdevice(QObject): # signal: sig1 (thread) is done - emitted by signMessageFinish sig1done = pyqtSignal(str) def __init__(self, main_wnd, *args, **kwargs): printDbg("HW: Initializing Class...") - QObject.__init__(self, *args, **kwargs) + super().__init__(*args, **kwargs) self.main_wnd = main_wnd self.api = None printOK("HW: Class initialized") def initDevice(self, hw_index): - printDbg("HW: initializing hw device with index %d" % hw_index) + printDbg(f"HW: initializing hw device with index {hw_index}") if hw_index >= len(HW_devices): raise Exception("Invalid HW index") @@ -53,7 +50,7 @@ def initDevice(self, hw_index): self.api.initDevice() self.sig1done = self.api.sig1done self.api.sig_disconnected.connect(self.main_wnd.clearHWstatus) - printOK("HW: hw device with index %d initialized" % hw_index) + printOK(f"HW: hw device with index {hw_index} initialized") @check_api_init def clearDevice(self): @@ -68,14 +65,15 @@ def clearDevice(self): @check_api_init def getStatus(self): printDbg("HW: checking device status...") - printOK("Status: %d" % self.api.status) + printOK(f"Status: {self.api.status}") return self.api.model, self.api.status, self.api.messages[self.api.status] - def prepare_transfer_tx(self, caller, bip32_path, utxos_to_spend, dest_address, tx_fee, isTestnet=False): + def prepare_transfer_tx(self, caller, bip32_path, utxos_to_spend, dest_address, tx_fee, isTestnet=False): rewardsArray = [] - mnode = {} - mnode['path'] = bip32_path - mnode['utxos'] = utxos_to_spend + mnode = { + 'path': bip32_path, + 'utxos': utxos_to_spend + } rewardsArray.append(mnode) self.prepare_transfer_tx_bulk(caller, rewardsArray, dest_address, tx_fee, isTestnet) @@ -86,17 +84,17 @@ def prepare_transfer_tx_bulk(self, caller, rewardsArray, dest_address, tx_fee, i @check_api_init def scanForAddress(self, hwAcc, spath, intExt=0, isTestnet=False): - printOK("HW: Scanning for Address n. %d on account n. %d" % (spath, hwAcc)) + printOK(f"HW: Scanning for Address n. {spath} on account n. {hwAcc}") return self.api.scanForAddress(hwAcc, spath, intExt, isTestnet) @check_api_init def scanForBip32(self, account, address, starting_spath=0, spath_count=10, isTestnet=False): - printOK("HW: Scanning for Bip32 path of address: %s" % address) + printOK(f"HW: Scanning for Bip32 path of address: {address}") found = False spath = -1 for i in range(starting_spath, starting_spath + spath_count): - printDbg("HW: checking path... %d'/0/%d" % (account, i)) + printDbg(f"HW: checking path... {account}'/0/{i}") curr_addr = self.api.scanForAddress(account, i, isTestnet) if curr_addr == address: @@ -106,11 +104,11 @@ def scanForBip32(self, account, address, starting_spath=0, spath_count=10, isTes sleep(0.01) - return (found, spath) + return found, spath @check_api_init def scanForPubKey(self, account, spath, isTestnet=False): - printOK("HW: Scanning for PubKey of address n. %d on account n. %d" % (spath, account)) + printOK(f"HW: Scanning for PubKey of address n. {spath} on account n. {account}") return self.api.scanForPubKey(account, spath, isTestnet) @check_api_init diff --git a/src/ledgerClient.py b/src/ledgerClient.py index e0c46bb..ac71283 100644 --- a/src/ledgerClient.py +++ b/src/ledgerClient.py @@ -29,10 +29,10 @@ def process_ledger_exceptions_int(*args, **kwargs): except BTChipException as e: printDbg('Error while communicating with Ledger hardware wallet.') e.message = 'Error while communicating with Ledger hardware wallet.' - if (e.sw in (0x6f01, 0x6d00, 0x6700, 0x6faa)): + if e.sw in (0x6f01, 0x6d00, 0x6700, 0x6faa): e.message = 'Make sure the PIVX app is open on your Ledger device.' e.message += '
If there is a program (such as Ledger Bitcoin Wallet) interfering with the USB communication, close it first.' - elif (e.sw == 0x6982): + elif e.sw == 0x6982: e.message = 'Enter the PIN on your Ledger device.' printException(getCallerName(True), getFunctionName(True), e.message, e.args) raise DisconnectedException(e.message, hwDevice) @@ -63,7 +63,7 @@ class LedgerApi(QObject): sigTxdone = pyqtSignal(bytearray, str) # signal: sigtx (thread) is done (aborted) - emitted by signTxFinish sigTxabort = pyqtSignal() - # signal: tx_progress percent - emitted by perepare_transfer_tx_bulk + # signal: tx_progress percent - emitted by prepare_transfer_tx_bulk tx_progress = pyqtSignal(int) # signal: sig_progress percent - emitted by signTxSign sig_progress = pyqtSignal(int) @@ -71,8 +71,8 @@ class LedgerApi(QObject): sig_disconnected = pyqtSignal(str) def __init__(self, main_wnd, *args, **kwargs): + super().__init__(*args, **kwargs) self.main_wnd = main_wnd - QObject.__init__(self, *args, **kwargs) self.model = [x[0] for x in HW_devices].index("LEDGER Nano") self.messages = [ 'Device not initialized.', @@ -96,9 +96,9 @@ def initDevice(self): printDbg("Ledger Initialized") self.status = 1 ver = self.chip.getFirmwareVersion() - printOK("Ledger HW device connected [v. %s]" % str(ver.get('version'))) + printOK(f"Ledger HW device connected [v. {ver.get('version')}]") # Check device is unlocked - bip32_path = MPATH + "%d'/0/%d" % (0, 0) + bip32_path = f"{MPATH}{0}'/0/{0}" _ = self.chip.getWalletPublicKey(bip32_path) self.status = 2 self.sig_progress.connect(self.updateSigProgress) @@ -125,8 +125,7 @@ def append_inputs_to_TX(self, utxo, bip32_path): utxo_tx_index = utxo['vout'] if utxo_tx_index < 0 or utxo_tx_index > len(prev_transaction.outputs): - raise Exception('Incorrect value of outputIndex for UTXO %s-%d' % - (utxo['txid'], utxo['vout'])) + raise Exception(f'Incorrect value of outputIndex for UTXO {utxo["txid"]}-{utxo["vout"]}') trusted_input = self.chip.getTrustedInput(prev_transaction, utxo_tx_index) self.trusted_inputs.append(trusted_input) @@ -136,10 +135,10 @@ def append_inputs_to_TX(self, utxo, bip32_path): pubkey_hash = bin_hash160(curr_pubkey) pubkey_hash_from_script = extract_pkh_from_locking_script(prev_transaction.outputs[utxo_tx_index].script) if pubkey_hash != pubkey_hash_from_script: - text = "Error: The hashes for the public key for the BIP32 path (%s), and the UTXO locking script do not match." % bip32_path + text = f"Error: The hashes for the public key for the BIP32 path ({bip32_path}), and the UTXO locking script do not match." text += "Your signed transaction will not be validated by the network.\n" - text += "pubkey_hash: %s\n" % pubkey_hash.hex() - text += "pubkey_hash_from_script: %s\n" % pubkey_hash_from_script.hex() + text += f"pubkey_hash: {pubkey_hash.hex()}\n" + text += f"pubkey_hash_from_script: {pubkey_hash_from_script.hex()}\n" printDbg(text) self.arg_inputs.append({ @@ -156,18 +155,15 @@ def prepare_transfer_tx_bulk(self, caller, rewardsArray, dest_address, tx_fee, i with self.lock: # For each UTXO create a Ledger 'trusted input' self.trusted_inputs = [] - # https://klmoney.wordpress.com/bitcoin-dissecting-transactions-part-2-building-a-transaction-by-hand) + # https://klmoney.wordpress.com/bitcoin-dissecting-transactions-part-2-building-a-transaction-by-hand self.arg_inputs = [] self.amount = 0 - num_of_sigs = sum([len(mnode['utxos']) for mnode in rewardsArray]) + num_of_sigs = sum(len(mnode['utxos']) for mnode in rewardsArray) curr_utxo_checked = 0 for mnode in rewardsArray: # Add proper HW path (for current device) on each utxo - if isTestnet: - mnode['path'] = MPATH_TESTNET + mnode['path'] - else: - mnode['path'] = MPATH + mnode['path'] + mnode['path'] = (MPATH_TESTNET if isTestnet else MPATH) + mnode['path'] # Create a TX input with each utxo for utxo in mnode['utxos']: @@ -198,10 +194,10 @@ def prepare_transfer_tx_bulk(self, caller, rewardsArray, dest_address, tx_fee, i self.mBox2 = QMessageBox(caller) self.messageText = "

Confirm transaction on your device, with the following details:

" - # messageText += "From bip32_path: %s

" % str(bip32_path) - self.messageText += "

Payment to:
%s

" % dest_address - self.messageText += "

Net amount:
%s PIV

" % str(round(self.amount / 1e8, 8)) - self.messageText += "

Fees:
%s PIV

" % str(round(int(tx_fee) / 1e8, 8)) + # messageText += f"From bip32_path: {bip32_path}

" + self.messageText += f"

Payment to:
{dest_address}

" + self.messageText += f"

Net amount:
{round(self.amount / 1e8, 8)} PIV

" + self.messageText += f"

Fees:
{round(int(tx_fee) / 1e8, 8)} PIV

" messageText = self.messageText + "Signature Progress: 0 %" self.mBox2.setText(messageText) self.mBox2.setIconPixmap(caller.ledgerImg.scaledToHeight(200, Qt.SmoothTransformation)) @@ -215,23 +211,15 @@ def prepare_transfer_tx_bulk(self, caller, rewardsArray, dest_address, tx_fee, i @process_ledger_exceptions def scanForAddress(self, hwAcc, spath, intExt=0, isTestnet=False): with self.lock: - if not isTestnet: - curr_path = MPATH + "%d'/%d/%d" % (hwAcc, intExt, spath) - curr_addr = self.chip.getWalletPublicKey(curr_path).get('address')[12:-2] - else: - curr_path = MPATH_TESTNET + "%d'/%d/%d" % (hwAcc, intExt, spath) - pubkey = compress_public_key(self.chip.getWalletPublicKey(curr_path).get('publicKey')).hex() - curr_addr = pubkey_to_address(pubkey, isTestnet) + curr_path = (MPATH_TESTNET if isTestnet else MPATH) + f"{hwAcc}'/{intExt}/{spath}" + curr_addr = self.chip.getWalletPublicKey(curr_path).get('address')[12:-2] if not isTestnet else pubkey_to_address(compress_public_key(self.chip.getWalletPublicKey(curr_path).get('publicKey')).hex(), isTestnet) return curr_addr @process_ledger_exceptions def scanForPubKey(self, account, spath, isTestnet=False): - hwpath = "%d'/0/%d" % (account, spath) - if isTestnet: - curr_path = MPATH_TESTNET + hwpath - else: - curr_path = MPATH + hwpath + hwpath = f"{account}'/0/{spath}" + curr_path = (MPATH_TESTNET if isTestnet else MPATH) + hwpath with self.lock: nodeData = self.chip.getWalletPublicKey(curr_path) @@ -240,10 +228,7 @@ def scanForPubKey(self, account, spath, isTestnet=False): @process_ledger_exceptions def signMess(self, caller, hwpath, message, isTestnet=False): - if isTestnet: - path = MPATH_TESTNET + hwpath - else: - path = MPATH + hwpath + path = (MPATH_TESTNET if isTestnet else MPATH) + hwpath # Ledger doesn't accept characters other that ascii printable: # https://ledgerhq.github.io/btchip-doc/bitcoin-technical.html#_sign_message message = message.encode('ascii', 'ignore') @@ -251,9 +236,11 @@ def signMess(self, caller, hwpath, message, isTestnet=False): # Connection pop-up mBox = QMessageBox(caller) - warningText = "Another application (such as Ledger Wallet app) has probably taken over " - warningText += "the communication with the Ledger device.

To continue, close that application and " - warningText += "click the Retry button.\nTo cancel, click the Abort button" + warningText = ( + "Another application (such as Ledger Wallet app) has probably taken over " + "the communication with the Ledger device.

To continue, close that application and " + "click the Retry button.\nTo cancel, click the Abort button" + ) mBox.setText(warningText) mBox.setWindowTitle("WARNING") mBox.setStandardButtons(QMessageBox.Retry | QMessageBox.Abort) @@ -274,8 +261,7 @@ def signMess(self, caller, hwpath, message, isTestnet=False): printOK('Signing Message') self.mBox = QMessageBox(caller.ui) - messageText = "Check display of your hardware device\n\n- message hash:\n\n%s\n\n-path:\t%s\n" % ( - message_sha, path) + messageText = f"Check display of your hardware device\n\n- message hash:\n\n{message_sha}\n\n-path:\t{path}\n" self.mBox.setText(messageText) self.mBox.setIconPixmap(caller.ledgerImg.scaledToHeight(200, Qt.SmoothTransformation)) self.mBox.setWindowTitle("CHECK YOUR LEDGER") @@ -314,13 +300,13 @@ def signMessageFinish(self): printOK("Message signed") sig1 = work.hex() else: - printDbg('client.signMessageSign() returned invalid response (code 3): ' + self.signature.hex()) + printDbg(f'client.signMessageSign() returned invalid response (code 3): {self.signature.hex()}') sig1 = "None" else: - printDbg('client.signMessageSign() returned invalid response (code 2): ' + self.signature.hex()) + printDbg(f'client.signMessageSign() returned invalid response (code 2): {self.signature.hex()}') sig1 = "None" else: - printDbg('client.signMessageSign() returned invalid response (code 1): ' + self.signature.hex()) + printDbg(f'client.signMessageSign() returned invalid response (code 1): {self.signature.hex()}') sig1 = "None" else: printOK("Signature refused by the user") @@ -377,13 +363,13 @@ def signTxFinish(self): self.mBox2.accept() if self.tx_raw is not None: - # Signal to be catched by FinishSend on TabRewards / dlg_sewwpAll + # Signal to be caught by FinishSend on TabRewards / dlg_sewwpAll self.sigTxdone.emit(self.tx_raw, str(round(self.amount / 1e8, 8))) else: printOK("Transaction refused by the user") self.sigTxabort.emit() def updateSigProgress(self, percent): - messageText = self.messageText + "Signature Progress: " + str(percent) + " %" + messageText = f"{self.messageText}Signature Progress: {percent} %" self.mBox2.setText(messageText) QApplication.processEvents() diff --git a/src/rpcClient.py b/src/rpcClient.py index 64e8db6..cb8477d 100644 --- a/src/rpcClient.py +++ b/src/rpcClient.py @@ -19,7 +19,6 @@ def process_RPC_exceptions_int(*args, **kwargs): try: args[0].httpConnection.connect() return func(*args, **kwargs) - except Exception as e: message = "Exception in RPC client" printException(getCallerName(True), getFunctionName(True), message, str(e)) @@ -35,11 +34,11 @@ def process_RPC_exceptions_int(*args, **kwargs): class RpcClient: - def __init__(self, rpc_protocol, rpc_host, rpc_user, rpc_password): + def __init__(self, rpc_protocol: str, rpc_host: str, rpc_user: str, rpc_password: str): # Lock for threads self.lock = threading.RLock() - self.rpc_url = "%s://%s:%s@%s" % (rpc_protocol, rpc_user, rpc_password, rpc_host) + self.rpc_url = f"{rpc_protocol}://{rpc_user}:{rpc_password}@{rpc_host}" host, port = rpc_host.split(":") if rpc_protocol == "https": @@ -50,65 +49,47 @@ def __init__(self, rpc_protocol, rpc_host, rpc_user, rpc_password): self.conn = AuthServiceProxy(self.rpc_url, timeout=1000, connection=self.httpConnection) @process_RPC_exceptions - def getBlockCount(self): - n = 0 + def getBlockCount(self) -> int: with self.lock: - n = self.conn.getblockcount() - - return n + return self.conn.getblockcount() @process_RPC_exceptions - def getBlockHash(self, blockNum): - h = None + def getBlockHash(self, blockNum: int) -> str: with self.lock: - h = self.conn.getblockhash(blockNum) - - return h + return self.conn.getblockhash(blockNum) @process_RPC_exceptions - def getBudgetVotes(self, proposal): - votes = {} + def getBudgetVotes(self, proposal: str) -> dict: with self.lock: - votes = self.conn.getbudgetvotes(proposal) - - return votes + return self.conn.getbudgetvotes(proposal) @process_RPC_exceptions - def getFeePerKb(self): - res = MINIMUM_FEE + def getFeePerKb(self) -> float: with self.lock: # get transaction data from last 200 blocks feePerKb = float(self.conn.getfeeinfo(200)['feeperkb']) - res = (feePerKb if feePerKb > MINIMUM_FEE else MINIMUM_FEE) - - return res + return feePerKb if feePerKb > MINIMUM_FEE else MINIMUM_FEE @process_RPC_exceptions - def getMNStatus(self, address): - mnStatus = None + def getMNStatus(self, address: str) -> dict: with self.lock: mnStatusList = self.conn.listmasternodes(address) if not mnStatusList: return None mnStatus = mnStatusList[0] mnStatus['mnCount'] = self.conn.getmasternodecount()['enabled'] - - return mnStatus + return mnStatus @process_RPC_exceptions - def getMasternodeCount(self): - ans = None + def getMasternodeCount(self) -> dict: with self.lock: - ans = self.conn.getmasternodecount() - - return ans + return self.conn.getmasternodecount() @process_RPC_exceptions - def getMasternodes(self): + def getMasternodes(self) -> dict: printDbg("RPC: Getting masternode list...") mnList = {} score = [] - masternodes = [] with self.lock: masternodes = self.conn.listmasternodes() @@ -120,7 +101,6 @@ def getMasternodes(self): else: lastpaid_ago = now() - mn.get('lastpaid') mn['score'] = min(lastpaid_ago, mn.get('activetime')) - else: mn['score'] = 0 @@ -138,17 +118,13 @@ def getMasternodes(self): return mnList @process_RPC_exceptions - def getNextSuperBlock(self): - n = 0 + def getNextSuperBlock(self) -> int: with self.lock: - n = self.conn.getnextsuperblock() - - return n + return self.conn.getnextsuperblock() @process_RPC_exceptions - def getProposalsProjection(self): + def getProposalsProjection(self) -> list: printDbg("RPC: Getting proposals projection...") - data = [] proposals = [] with self.lock: # get budget projection JSON data @@ -156,11 +132,12 @@ def getProposalsProjection(self): for p in data: # create proposal-projection dictionary - new_proposal = {} - new_proposal['Name'] = p.get('Name') - new_proposal['Allotted'] = float(p.get("Alloted")) - new_proposal['Votes'] = p.get('Yeas') - p.get('Nays') - new_proposal['Total_Allotted'] = float(p.get('TotalBudgetAlloted')) + new_proposal = { + 'Name': p.get('Name'), + 'Allotted': float(p.get("Alloted")), + 'Votes': p.get('Yeas') - p.get('Nays'), + 'Total_Allotted': float(p.get('TotalBudgetAlloted')) + } # append dictionary to list proposals.append(new_proposal) @@ -168,27 +145,20 @@ def getProposalsProjection(self): return proposals @process_RPC_exceptions - def getProtocolVersion(self): - res = DEFAULT_PROTOCOL_VERSION + def getProtocolVersion(self) -> int: with self.lock: prot_version = self.conn.getinfo().get('protocolversion') - res = int(prot_version) - - return res + return int(prot_version) if prot_version else DEFAULT_PROTOCOL_VERSION @process_RPC_exceptions - def getRawTransaction(self, txid): - res = None + def getRawTransaction(self, txid: str) -> str: with self.lock: - res = self.conn.getrawtransaction(txid) - - return res + return self.conn.getrawtransaction(txid) @process_RPC_exceptions - def getStatus(self): + def getStatus(self) -> tuple[bool, str, int, float, bool]: status = False - statusMess = "Unable to connect to a PIVX RPC server.\n" - statusMess += "Either the local PIVX wallet is not open, or the remote RPC server is not responding." + statusMess = "Unable to connect to a PIVX RPC server.\nEither the local PIVX wallet is not open, or the remote RPC server is not responding." n = 0 response_time = None with self.lock: @@ -204,58 +174,38 @@ def getStatus(self): return status, statusMess, n, response_time, isTestnet @process_RPC_exceptions - def isBlockchainSynced(self): - res = False - response_time = None + def isBlockchainSynced(self) -> tuple[bool, float]: with self.lock: status, response_time = timeThis(self.conn.mnsync, 'status') if status is not None: - res = status.get("IsBlockchainSynced") - - return res, response_time + return status.get("IsBlockchainSynced"), response_time + return False, response_time @process_RPC_exceptions - def mnBudgetRawVote(self, mn_tx_hash, mn_tx_index, proposal_hash, vote, time, vote_sig): - res = None + def mnBudgetRawVote(self, mn_tx_hash: str, mn_tx_index: int, proposal_hash: str, vote: str, time: int, vote_sig: str) -> str: with self.lock: - res = self.conn.mnbudgetrawvote(mn_tx_hash, mn_tx_index, proposal_hash, vote, time, vote_sig) - - return res + return self.conn.mnbudgetrawvote(mn_tx_hash, mn_tx_index, proposal_hash, vote, time, vote_sig) @process_RPC_exceptions - def decodemasternodebroadcast(self, work): + def decodemasternodebroadcast(self, work: str) -> str: printDbg("RPC: Decoding masternode broadcast...") - res = "" with self.lock: - res = self.conn.decodemasternodebroadcast(work.strip()) - - return res + return self.conn.decodemasternodebroadcast(work.strip()) @process_RPC_exceptions - def relaymasternodebroadcast(self, work): + def relaymasternodebroadcast(self, work: str) -> str: printDbg("RPC: Relaying masternode broadcast...") - res = "" with self.lock: - res = self.conn.relaymasternodebroadcast(work.strip()) - - return res + return self.conn.relaymasternodebroadcast(work.strip()) @process_RPC_exceptions - def sendRawTransaction(self, tx_hex): - dbg_mess = "RPC: Sending raw transaction" - dbg_mess += "..." - printDbg(dbg_mess) - tx_id = None + def sendRawTransaction(self, tx_hex: str) -> str: + printDbg("RPC: Sending raw transaction...") with self.lock: - tx_id = self.conn.sendrawtransaction(tx_hex, True) - - return tx_id + return self.conn.sendrawtransaction(tx_hex, True) @process_RPC_exceptions - def verifyMessage(self, pivxaddress, signature, message): + def verifyMessage(self, pivxaddress: str, signature: str, message: str) -> bool: printDbg("RPC: Verifying message...") - res = False with self.lock: - res = self.conn.verifymessage(pivxaddress, signature, message) - - return res + return self.conn.verifymessage(pivxaddress, signature, message) diff --git a/src/trezorClient.py b/src/trezorClient.py index cef6ee9..4eaa4e7 100644 --- a/src/trezorClient.py +++ b/src/trezorClient.py @@ -23,7 +23,7 @@ from threads import ThreadFuns from txCache import TxCache -from qt.dlg_pinMatrix import PinMatrix_dlg +from qt.dlg_pinMatrix import PinMatrixDlg def process_trezor_exceptions(func): @@ -47,21 +47,15 @@ def process_trezor_exceptions_int(*args, **kwargs): class TrezorApi(QObject): - # signal: sig1 (thread) is done - emitted by signMessageFinish sig1done = pyqtSignal(str) - # signal: sigtx (thread) is done - emitted by signTxFinish sigTxdone = pyqtSignal(bytearray, str) - # signal: sigtx (thread) is done (aborted) - emitted by signTxFinish sigTxabort = pyqtSignal() - # signal: tx_progress percent - emitted by perepare_transfer_tx_bulk tx_progress = pyqtSignal(int) - # signal: sig_progress percent - emitted by signTxSign sig_progress = pyqtSignal(int) - # signal: sig_disconnected -emitted with DisconnectedException sig_disconnected = pyqtSignal(str) def __init__(self, model, main_wnd, *args, **kwargs): - QObject.__init__(self, *args, **kwargs) + super().__init__(*args, **kwargs) self.model = model # index of HW_devices self.main_wnd = main_wnd self.messages = [ @@ -71,7 +65,6 @@ def __init__(self, model, main_wnd, *args, **kwargs): "Wrong device model detected.", "Wrong PIN inserted" ] - # Device Lock for threads self.lock = threading.RLock() self.status = 0 self.client = None @@ -83,9 +76,7 @@ def append_inputs_to_TX(self, utxo, bip32_path, inputs): if utxo['staker'] != "": printException(getCallerName(), getFunctionName(), "Unable to sing P2CS on Trezor", "") return - # Update amount self.amount += int(utxo['satoshis']) - # Add input address_n = parse_path(bip32_path) prev_hash = binascii.unhexlify(utxo['txid']) it = trezor_proto.TxInputType( @@ -120,9 +111,7 @@ def initDevice(self): self.status = 0 devices = enumerate_devices() if not len(devices): - # No device connected return - # Use the first device for now d = devices[0] ui = TrezorUi() try: @@ -138,13 +127,10 @@ def initDevice(self): model = self.client.features.model or "1" if not self.checkModel(model): self.status = 3 - self.messages[3] = "Wrong device model (%s) detected.\nLooking for model %s." % ( - HW_devices[self.model][0], model - ) + self.messages[3] = f"Wrong device model ({model}) detected.\nLooking for model {HW_devices[self.model][0]}." return required_version = MINIMUM_FIRMWARE_VERSION[model] - printDbg("Current version is %s (minimum required: %s)" % (str(self.client.version), str(required_version))) - # Check device is unlocked + printDbg(f"Current version is {str(self.client.version)} (minimum required: {str(required_version)})") bip32_path = parse_path(MPATH + "%d'/0/%d" % (0, 0)) _ = btc.get_address(self.client, 'PIVX', bip32_path, False) self.status = 2 @@ -161,7 +147,6 @@ def load_prev_txes(self, rewardsArray): json_tx = ParseTx(raw_tx) txes[prev_hash] = self.json_to_tx(json_tx) - # completion percent emitted curr_utxo_checked += 1 completion = int(95 * curr_utxo_checked / num_of_txes) self.tx_progress.emit(completion) @@ -176,7 +161,6 @@ def json_to_tx(self, jtx): t.bin_outputs = [self.json_to_bin_output(output) for output in jtx["vout"]] return t - def json_to_input(self, input): i = trezor_proto.TxInputType() if "coinbase" in input: @@ -190,7 +174,6 @@ def json_to_input(self, input): i.sequence = input["sequence"] return i - def json_to_bin_output(self, output): o = trezor_proto.TxOutputBinType() o.amount = int(output["value"]) @@ -204,13 +187,11 @@ def prepare_transfer_tx_bulk(self, caller, rewardsArray, dest_address, tx_fee, i self.amount = 0 for mnode in rewardsArray: - # Add proper HW path (for current device) on each utxo if isTestnet: mnode['path'] = MPATH_TESTNET + mnode['path'] else: mnode['path'] = MPATH + mnode['path'] - # Create a TX input with each utxo for utxo in mnode['utxos']: self.append_inputs_to_TX(utxo, mnode['path'], inputs) @@ -230,10 +211,9 @@ def prepare_transfer_tx_bulk(self, caller, rewardsArray, dest_address, tx_fee, i self.mBox2 = QMessageBox(caller) self.messageText = "

Signing transaction...

" - # messageText += "From bip32_path: %s

" % str(bip32_path) - self.messageText += "

Payment to:
%s

" % dest_address - self.messageText += "

Net amount:
%s PIV

" % str(round(self.amount / 1e8, 8)) - self.messageText += "

Fees:
%s PIV

" % str(round(int(tx_fee) / 1e8, 8)) + self.messageText += f"

Payment to:
{dest_address}

" + self.messageText += f"

Net amount:
{str(round(self.amount / 1e8, 8))} PIV

" + self.messageText += f"

Fees:
{str(round(int(tx_fee) / 1e8, 8))} PIV

" messageText = self.messageText + "Signature Progress: 0 %" self.mBox2.setText(messageText) self.setBoxIcon(self.mBox2, caller) @@ -281,17 +261,14 @@ def signMess(self, caller, hwpath, message, isTestnet=False): path = MPATH_TESTNET + hwpath else: path = MPATH + hwpath - # Connection pop-up self.mBox = QMessageBox(caller) - messageText = "Check display of your hardware device\n\n- message:\n\n%s\n\n-path:\t%s\n" % ( - splitString(message, 32), path) + messageText = f"Check display of your hardware device\n\n- message:\n\n{splitString(message, 32)}\n\n-path:\t{path}\n" self.mBox.setText(messageText) self.setBoxIcon(self.mBox, caller) self.mBox.setWindowTitle("CHECK YOUR TREZOR") self.mBox.setStandardButtons(QMessageBox.NoButton) self.mBox.show() - # Sign message ThreadFuns.runInThread(self.signMessageSign, (path, message, isTestnet), self.signMessageFinish) @process_trezor_exceptions @@ -331,29 +308,23 @@ def signTxSign(self, ctrl, inputs, outputs, txes, isTestnet=False): def signTxFinish(self): self.mBox2.accept() if self.tx_raw is not None: - # Signal to be catched by FinishSend on TabRewards / dlg_sewwpAll self.sigTxdone.emit(self.tx_raw, str(round(self.amount / 1e8, 8))) - else: printOK("Transaction refused by the user") self.sigTxabort.emit() def updateSigProgress(self, percent): - # -1 simply adds a waiting message to the actual progress if percent == -1: t = self.mBox2.text() messageText = t + "
Please confirm action on your Trezor device..." else: - messageText = self.messageText + "Signature Progress: " + str(percent) + " %" + messageText = self.messageText + f"Signature Progress: {percent} %" self.mBox2.setText(messageText) QApplication.processEvents() -# From trezorlib.btc def sign_tx(sig_percent, client, coin_name, inputs, outputs, details=None, prev_txes=None): - # set up a transactions dict txes = {None: trezor_proto.TransactionType(inputs=inputs, outputs=outputs)} - # preload all relevant transactions ahead of time for inp in inputs: if inp.script_type not in ( trezor_proto.InputScriptType.SPENDP2SHWITNESS, @@ -379,13 +350,11 @@ def sign_tx(sig_percent, client, coin_name, inputs, outputs, details=None, prev_ res = client.call(signtx) - # Prepare structure for signatures signatures = [None] * len(inputs) serialized_tx = b"" def copy_tx_meta(tx): tx_copy = trezor_proto.TransactionType(**tx) - # clear fields tx_copy.inputs_cnt = len(tx.inputs) tx_copy.inputs = [] tx_copy.outputs_cnt = len(tx.bin_outputs or tx.outputs) @@ -397,10 +366,9 @@ def copy_tx_meta(tx): R = trezor_proto.RequestType - percent = 0 # Used for signaling progress. 1-10 for inputs/outputs, 10-100 for sigs. + percent = 0 sig_percent.emit(percent) while isinstance(res, trezor_proto.TxRequest): - # If there's some part of signed transaction, let's add it if res.serialized: if res.serialized.serialized_tx: serialized_tx += res.serialized.serialized_tx @@ -409,16 +377,14 @@ def copy_tx_meta(tx): idx = res.serialized.signature_index sig = res.serialized.signature if signatures[idx] is not None: - raise ValueError("Signature for index %d already filled" % idx) + raise ValueError(f"Signature for index {idx} already filled") signatures[idx] = sig - # emit completion percent percent = 10 + int(90 * (idx + 1) / len(signatures)) sig_percent.emit(percent) if res.request_type == R.TXFINISHED: break - # Device asked for one more information, let's process it. current_tx = txes[res.details.tx_hash] if res.request_type == R.TXMETA: @@ -434,7 +400,6 @@ def copy_tx_meta(tx): res = client.call(trezor_proto.TxAck(tx=msg)) elif res.request_type == R.TXOUTPUT: - # Update just one percent then display additional waiting message (emitting -1) if percent == 9: percent += 1 sig_percent.emit(percent) @@ -466,10 +431,9 @@ def copy_tx_meta(tx): return signatures, serialized_tx -class TrezorUi(object): +class TrezorUi: def __init__(self): self.prompt_shown = False - pass def get_pin(self, code=None) -> str: if code == PIN_CURRENT: @@ -481,7 +445,7 @@ def get_pin(self, code=None) -> str: else: desc = "PIN" - pin = ask_for_pin_callback("Please enter {}".format(desc)) + pin = ask_for_pin_callback(f"Please enter {desc}") if pin is None: raise exceptions.Cancelled return pin @@ -500,7 +464,7 @@ def button_request(self, msg_code): def ask_for_pin_callback(msg, hide_numbers=True): - dlg = PinMatrix_dlg(title=msg, fHideBtns=hide_numbers) + dlg = PinMatrixDlg(title=msg, fHideBtns=hide_numbers) if dlg.exec_(): return dlg.getPin() else: From 13fe7fbfe8e3c5984a25afc9b2a7ec1fd84dd97c Mon Sep 17 00:00:00 2001 From: Liquid369 Date: Tue, 23 Jul 2024 14:05:37 -0500 Subject: [PATCH 04/11] Better type hinting, fstrings in threads and mainapp --- src/constants.py | 8 ++-- src/mainApp.py | 16 ++++---- src/mainWindow.py | 40 ++++++++++---------- src/qt/guiHeader.py | 6 +-- src/tabRewards.py | 84 ++++++++++++++++++++---------------------- src/threads.py | 24 +++++++++--- src/watchdogThreads.py | 4 +- src/workerThread.py | 5 ++- 8 files changed, 96 insertions(+), 91 deletions(-) diff --git a/src/constants.py b/src/constants.py index 222b648..bc38d39 100644 --- a/src/constants.py +++ b/src/constants.py @@ -7,7 +7,7 @@ import os from queue import Queue -wqueue = Queue() # type: Queue[str] +wqueue: Queue[str] = Queue() APPDATA_DIRNAME = ".PET4L-DATA" @@ -21,7 +21,7 @@ TESTNET_MAGIC_BYTE = 139 TESTNET_STAKE_MAGIC_BYTE = 73 DEFAULT_PROTOCOL_VERSION = 70915 -MINIMUM_FEE = 0.0001 # minimum PIV/kB +MINIMUM_FEE = 0.0001 # minimum PIV/kB SECONDS_IN_2_MONTHS = 60 * 24 * 60 * 60 MAX_INPUTS_NO_WARNING = 75 starting_width = 1033 @@ -50,8 +50,8 @@ trusted_RPC_Servers = [ ["https", "lithuania.fuzzbawls.pw:8080", "spmtUser", "WUss6sr8956S5Paex254"], ["https", "latvia.fuzzbawls.pw:8080", "spmtUser", "8X88u7TuefPm7mQaJY52"], - ["https", "charlotte.fuzzbawls.pw:8080", "spmtUser", "ZyD936tm9dvqmMP8A777"]] - + ["https", "charlotte.fuzzbawls.pw:8080", "spmtUser", "ZyD936tm9dvqmMP8A777"] +] HW_devices = [ # (model name, api index) diff --git a/src/mainApp.py b/src/mainApp.py index e0b9419..de9c74b 100644 --- a/src/mainApp.py +++ b/src/mainApp.py @@ -17,8 +17,8 @@ from misc import printDbg, initLogs, saveCacheSettings, readCacheSettings, getVersion from mainWindow import MainWindow from constants import user_dir, SECONDS_IN_2_MONTHS -from qt.dlg_configureRPCservers import ConfigureRPCservers_dlg -from qt.dlg_signmessage import SignMessage_dlg +from qt.dlg_configureRPCservers import ConfigureRPCserversDlg +from qt.dlg_signmessage import SignMessageDlg class ServiceExit(Exception): @@ -30,7 +30,7 @@ class ServiceExit(Exception): def service_shutdown(signum, frame): - print('Caught signal %d' % signum) + print(f'Caught signal {signum}') raise ServiceExit @@ -54,7 +54,7 @@ def __init__(self, imgDir, app, start_args): # Get version and title self.version = getVersion() - self.title = 'PET4L - PIVX Emergency Tool For Ledger - v.%s-%s' % (self.version['number'], self.version['tag']) + self.title = f'PET4L - PIVX Emergency Tool For Ledger - v.{self.version["number"]}-{self.version["tag"]}' # Open database self.db = Database(self) @@ -107,7 +107,7 @@ def initUI(self, imgDir): self.show() self.activateWindow() - def closeEvent(self, *args, **kwargs): + def closeEvent(self, event): # Terminate the running threads. # Set the shutdown flag on each thread to trigger a clean shutdown of each thread. self.mainWindow.myRpcWd.shutdown_flag.set() @@ -133,16 +133,16 @@ def closeEvent(self, *args, **kwargs): # Adios print("Bye Bye.") - return QMainWindow.closeEvent(self, *args, **kwargs) + return super().closeEvent(event) def onEditRPCServer(self): # Create Dialog - ui = ConfigureRPCservers_dlg(self) + ui = ConfigureRPCserversDlg(self) if ui.exec(): printDbg("Configuring RPC Servers...") def onSignVerifyMessage(self): # Create Dialog - ui = SignMessage_dlg(self.mainWindow) + ui = SignMessageDlg(self.mainWindow) if ui.exec(): printDbg("Sign/Verify message...") diff --git a/src/mainWindow.py b/src/mainWindow.py index 727b793..87ae791 100644 --- a/src/mainWindow.py +++ b/src/mainWindow.py @@ -13,7 +13,7 @@ from PyQt5.QtCore import pyqtSignal, Qt, QThread from PyQt5.QtGui import QPixmap, QColor, QPalette, QTextCursor, QFont, QIcon from PyQt5.QtWidgets import QWidget, QPushButton, QHBoxLayout, QGroupBox, QVBoxLayout, \ - QFileDialog, QTextEdit, QTabWidget, QLabel, QSplitter + QFileDialog, QTextEdit, QTabWidget, QLabel, QSplitter, QAction, QMenuBar from apiClient import ApiClient from constants import starting_height, DefaultCache, wqueue @@ -41,9 +41,8 @@ class MainWindow(QWidget): # signal: UTXO list loading percent (emitted by load_utxos_thread in tabRewards) sig_UTXOsLoading = pyqtSignal(int) - def __init__(self, parent, imgDir): - super(QWidget, self).__init__(parent) + super().__init__(parent) self.parent = parent self.imgDir = imgDir self.runInThread = ThreadFuns.runInThread @@ -79,7 +78,7 @@ def __init__(self, parent, imgDir): self.hwdevice = HWdevice(self) # -- init Api Client - self.apiClient = ApiClient(self.isTestnetRPC) + self.apiClient = ApiClient(self) # Pass 'self' as main_wnd reference # -- Create Queue to redirect stdout self.queue = wqueue @@ -257,7 +256,7 @@ def checkVersion(self, ctrl): (remote_version[0] == local_version[0] and remote_version[1] > local_version[1]) or \ (remote_version[0] == local_version[0] and remote_version[1] == local_version[1] and remote_version[2] > local_version[2]): - self.versionMess = 'New Version Available: %s ' % (self.gitVersion) + self.versionMess = f'New Version Available: {self.gitVersion} ' self.versionMess += '(download)' else: self.versionMess = "You have the latest version of PET4L" @@ -265,7 +264,7 @@ def checkVersion(self, ctrl): def updateVersion(self): if self.versionMess is not None: self.versionLabel.setText(self.versionMess) - printOK("Remote version: %s" % str(self.gitVersion)) + printOK(f"Remote version: {self.gitVersion}") def onChangeSelectedHW(self, i): # Clear status @@ -288,14 +287,13 @@ def onSaveConsole(self): timestamp = strftime('%Y-%m-%d_%H-%M-%S', gmtime(now())) options = QFileDialog.Options() options |= QFileDialog.DontUseNativeDialog - fileName, _ = QFileDialog.getSaveFileName(self, "Save Logs to file", "PET4L_Logs_%s.txt" % timestamp, "All Files (*);; Text Files (*.txt)", options=options) + fileName, _ = QFileDialog.getSaveFileName(self, f"Save Logs to file PET4L_Logs_{timestamp}.txt", "All Files (*);; Text Files (*.txt)", options=options) try: if fileName: - printOK("Saving logs to %s" % fileName) - log_file = open(fileName, 'w+', encoding="utf-8") - log_text = self.consoleArea.toPlainText() - log_file.write(log_text) - log_file.close() + printOK(f"Saving logs to {fileName}") + with open(fileName, 'w+', encoding="utf-8") as log_file: + log_text = self.consoleArea.toPlainText() + log_file.write(log_text) except Exception as e: err_msg = "error writing Log file" @@ -315,14 +313,14 @@ def onToggleConsole(self): def showHWstatus(self): self.updateHWleds() - myPopUp_sb(self, "info", 'PET4L - hw check', "%s" % self.hwStatusMess) + myPopUp_sb(self, "info", 'PET4L - hw check', f"{self.hwStatusMess}") def showRPCstatus(self, server_index, fDebug): # Update displayed status only if selected server is not changed if server_index == self.header.rpcClientsBox.currentIndex(): self.updateRPCled(fDebug) if fDebug: - myPopUp_sb(self, "info", 'PET4L - rpc check', "%s" % self.rpcStatusMess) + myPopUp_sb(self, "info", 'PET4L - rpc check', f"{self.rpcStatusMess}") def updateHWleds(self): if self.hwStatus == 1: @@ -342,7 +340,7 @@ def updateHWstatus(self, ctrl): printDbg(str(e)) pass - printDbg("status:%s - mess: %s" % (self.hwStatus, self.hwStatusMess)) + printDbg(f"status:{self.hwStatus} - mess: {self.hwStatusMess}") def updateLastBlockLabel(self): text = '--' @@ -370,9 +368,9 @@ def updateLastBlockPing(self): color = "green" self.header.lastPingIcon.setPixmap(self.connGreen_icon) if self.rpcResponseTime is not None: - self.header.responseTimeLabel.setText("%.3f" % self.rpcResponseTime) - self.header.responseTimeLabel.setStyleSheet("color: %s" % color) - self.header.lastPingIcon.setStyleSheet("color: %s" % color) + self.header.responseTimeLabel.setText(f"{self.rpcResponseTime:.3f}") + self.header.responseTimeLabel.setStyleSheet(f"color: {color}") + self.header.lastPingIcon.setStyleSheet(f"color: {color}") def updateRPCled(self, fDebug=False): if self.rpcConnected: @@ -403,7 +401,7 @@ def updateRPClist(self): # Add public servers (italics) italicsFont = QFont("Times", italic=True) for s in public_servers: - url = s["protocol"] + "://" + s["host"].split(':')[0] + url = f"{s['protocol']}://{s['host'].split(':')[0]}" self.header.rpcClientsBox.addItem(url, s) self.header.rpcClientsBox.setItemData(self.getServerListIndex(s), italicsFont, Qt.FontRole) # Add Local Wallet (bold) @@ -413,7 +411,7 @@ def updateRPClist(self): self.header.rpcClientsBox.setItemData(self.getServerListIndex(custom_servers[0]), boldFont, Qt.FontRole) # Add custom servers for s in custom_servers[1:]: - url = s["protocol"] + "://" + s["host"].split(':')[0] + url = f"{s['protocol']}://{s['host'].split(':')[0]}" self.header.rpcClientsBox.addItem(url, s) # reset index if self.parent.cache['selectedRPC_index'] >= self.header.rpcClientsBox.count(): @@ -428,7 +426,7 @@ def updateRPClist(self): def updateRPCstatus(self, ctrl, fDebug=False): rpc_index, rpc_protocol, rpc_host, rpc_user, rpc_password = self.getRPCserver() if fDebug: - printDbg("Trying to connect to RPC %s://%s..." % (rpc_protocol, rpc_host)) + printDbg(f"Trying to connect to RPC {rpc_protocol}://{rpc_host}...") try: rpcClient = RpcClient(rpc_protocol, rpc_host, rpc_user, rpc_password) diff --git a/src/qt/guiHeader.py b/src/qt/guiHeader.py index fdaf92e..75f2b80 100644 --- a/src/qt/guiHeader.py +++ b/src/qt/guiHeader.py @@ -12,7 +12,7 @@ class GuiHeader(QWidget): def __init__(self, caller, *args, **kwargs): - QWidget.__init__(self) + super().__init__(*args, **kwargs) layout = QHBoxLayout() layout.setContentsMargins(0, 0, 0, 0) # --- 1) Check Box @@ -28,7 +28,7 @@ def __init__(self, caller, *args, **kwargs): self.button_checkRpc.setToolTip("try to connect to RPC server") self.centralBox.addWidget(self.button_checkRpc, 0, 2) self.rpcLed = QLabel() - self.rpcLed.setToolTip("%s" % caller.rpcStatusMess) + self.rpcLed.setToolTip(f"{caller.rpcStatusMess}") self.rpcLed.setPixmap(caller.ledGrayH_icon) self.centralBox.addWidget(self.rpcLed, 0, 3) self.lastPingBox = QWidget() @@ -65,7 +65,7 @@ def __init__(self, caller, *args, **kwargs): self.button_checkHw.setToolTip("try to connect to Hardware Wallet") self.centralBox.addWidget(self.button_checkHw, 1, 2) self.hwLed = QLabel() - self.hwLed.setToolTip("status: %s" % caller.hwStatusMess) + self.hwLed.setToolTip(f"status: {caller.hwStatusMess}") self.hwLed.setPixmap(caller.ledGrayH_icon) self.centralBox.addWidget(self.hwLed, 1, 3) layout.addLayout(self.centralBox) diff --git a/src/tabRewards.py b/src/tabRewards.py index 4e6c1c7..555751e 100644 --- a/src/tabRewards.py +++ b/src/tabRewards.py @@ -15,7 +15,7 @@ from misc import printDbg, printError, printException, getCallerName, getFunctionName, \ persistCacheSetting, myPopUp, myPopUp_sb, DisconnectedException, checkTxInputs from pivx_parser import ParseTx, IsPayToColdStaking, GetDelegatedStaker -from qt.gui_tabRewards import TabRewards_gui +from qt.gui_tabRewards import TabRewardsGui from threads import ThreadFuns from txCache import TxCache from utils import checkPivxAddr @@ -34,7 +34,7 @@ def __init__(self, caller): self.suggestedFee = MINIMUM_FEE # --- Initialize GUI - self.ui = TabRewards_gui(self.caller.imgDir) + self.ui = TabRewardsGui(self.caller.imgDir) self.caller.tabRewards = self.ui self.ui.btn_Copy.setIcon(self.caller.copy_icon) @@ -48,14 +48,14 @@ def __init__(self, caller): self.updateFee() # Connect GUI buttons - self.ui.addySelect.currentIndexChanged.connect(lambda: self.onChangeSelected()) - self.ui.rewardsList.box.itemClicked.connect(lambda: self.updateSelection()) - self.ui.btn_reload.clicked.connect(lambda: self.loadSelection()) - self.ui.btn_selectAllRewards.clicked.connect(lambda: self.onSelectAllRewards()) - self.ui.btn_deselectAllRewards.clicked.connect(lambda: self.onDeselectAllRewards()) - self.ui.btn_sendRewards.clicked.connect(lambda: self.onSendRewards()) - self.ui.btn_Cancel.clicked.connect(lambda: self.onCancel()) - self.ui.btn_Copy.clicked.connect(lambda: self.onCopy()) + self.ui.addySelect.currentIndexChanged.connect(self.onChangeSelected) + self.ui.rewardsList.box.itemClicked.connect(self.updateSelection) + self.ui.btn_reload.clicked.connect(self.loadSelection) + self.ui.btn_selectAllRewards.clicked.connect(self.onSelectAllRewards) + self.ui.btn_deselectAllRewards.clicked.connect(self.onDeselectAllRewards) + self.ui.btn_sendRewards.clicked.connect(self.onSendRewards) + self.ui.btn_Cancel.clicked.connect(self.onCancel) + self.ui.btn_Copy.clicked.connect(self.onCopy) # Connect Signals self.caller.sig_UTXOsLoading.connect(self.update_loading_utxos) @@ -72,7 +72,7 @@ def display_utxos(self): rewards = self.caller.parent.db.getRewardsList(self.curr_addr) if rewards is not None: - def item(value): + def item(value: str) -> QTableWidgetItem: item = QTableWidgetItem(value) item.setTextAlignment(Qt.AlignCenter) item.setFlags(Qt.ItemIsEnabled | Qt.ItemIsSelectable) @@ -93,17 +93,17 @@ def item(value): self.ui.rewardsList.box.showRow(row) if utxo['staker'] != "": self.ui.rewardsList.box.item(row, 2).setIcon(self.caller.coldStaking_icon) - self.ui.rewardsList.box.item(row, 2).setToolTip("Staked by %s" % utxo['staker']) + self.ui.rewardsList.box.item(row, 2).setToolTip(f"Staked by {utxo['staker']}") # make immature rewards unselectable if utxo['coinstake']: required = 16 if self.caller.isTestnetRPC else 101 if utxo['confirmations'] < required: - for i in range(0, 4): + for i in range(4): self.ui.rewardsList.box.item(row, i).setFlags(Qt.NoItemFlags) ttip = self.ui.rewardsList.box.item(row, i).toolTip() self.ui.rewardsList.box.item(row, i).setToolTip( - ttip + "\n(Immature - %d confirmations required)" % required) + ttip + f"\n(Immature - {required} confirmations required)") self.ui.rewardsList.box.resizeColumnsToContents() @@ -115,15 +115,12 @@ def item(value): if not self.caller.rpcConnected: self.ui.resetStatusLabel('PIVX wallet not connected') else: - self.ui.resetStatusLabel('Found no Rewards for %s' % self.curr_addr) + self.ui.resetStatusLabel(f'Found no Rewards for {self.curr_addr}') - def getSelection(self): + def getSelection(self) -> list: # Get selected rows indexes items = self.ui.rewardsList.box.selectedItems() - rows = set() - for i in range(0, len(items)): - row = items[i].row() - rows.add(row) + rows = {item.row() for item in items} indexes = list(rows) # Get UTXO info from DB for each selection = [] @@ -139,7 +136,7 @@ def loadSelection(self): printDbg("Checking HW device") if self.caller.hwStatus != 2: myPopUp_sb(self.caller, "crit", 'PET4L - hw device check', "Connect to HW device first") - printDbg("Unable to connect - hw status: %d" % self.caller.hwStatus) + printDbg(f"Unable to connect - hw status: {self.caller.hwStatus}") return None self.ui.addySelect.clear() @@ -159,7 +156,7 @@ def loadSelection_thread(self, ctrl): self.caller.parent.cache["intExt"] = persistCacheSetting('cache_intExt', intExt) for i in range(spathFrom, spathTo + 1): - path = "%d'/%d/%d" % (hwAcc, intExt, i) + path = f"{hwAcc}'/{intExt}/{i}" address = self.caller.hwdevice.scanForAddress(hwAcc, i, intExt, isTestnet) try: balance = self.caller.apiClient.getBalance(address) @@ -167,9 +164,9 @@ def loadSelection_thread(self, ctrl): print(e) balance = 0 - itemLine = "%s -- %s" % (path, address) - if (balance): - itemLine += " [%s PIV]" % str(balance) + itemLine = f"{path} -- {address}" + if balance: + itemLine += f" [{balance} PIV]" self.ui.addySelect.addItem(itemLine, [path, address, balance]) @@ -192,7 +189,7 @@ def load_utxos_thread(self, ctrl): # Get raw tx u['rawtx'] = TxCache(self.caller)[u['txid']] if u['rawtx'] is None: - printDbg("Unable to get raw TX with hash=%s from RPC server." % u['txid']) + printDbg(f"Unable to get raw TX with hash={u['txid']} from RPC server.") # Don't save UTXO if raw TX is unavailable utxos.remove(u) u['staker'] = "" @@ -263,7 +260,7 @@ def onSendRewards(self): self.caller.onCheckHw() if self.caller.hwStatus != 2: myPopUp_sb(self.caller, "crit", 'PET4L - hw device check', "Connect to HW device first") - printDbg("Unable to connect to hardware device. The device status is: %d" % self.caller.hwStatus) + printDbg(f"Unable to connect to hardware device. The device status is: {self.caller.hwStatus}") return None # SEND @@ -277,15 +274,15 @@ def SendRewards(self, inputs=None, gui=None): # re-connect signals try: self.caller.hwdevice.api.sigTxdone.disconnect() - except: + except Exception: pass try: self.caller.hwdevice.api.sigTxabort.disconnect() - except: + except Exception: pass try: self.caller.hwdevice.api.tx_progress.disconnect() - except: + except Exception: pass self.caller.hwdevice.api.sigTxdone.connect(gui.FinishSend) self.caller.hwdevice.api.sigTxabort.connect(gui.AbortSend) @@ -301,7 +298,7 @@ def SendRewards(self, inputs=None, gui=None): num_of_inputs = len(self.selectedRewards) else: # bulk send - num_of_inputs = sum([len(x['utxos']) for x in inputs]) + num_of_inputs = sum(len(x['utxos']) for x in inputs) ans = checkTxInputs(self.caller, num_of_inputs) if ans is None or ans == QMessageBox.No: # emit sigTxAbort and return @@ -310,9 +307,9 @@ def SendRewards(self, inputs=None, gui=None): # LET'S GO if inputs is None: - printDbg("Sending from PIVX address %s to PIVX address %s " % (self.curr_addr, self.dest_addr)) + printDbg(f"Sending from PIVX address {self.curr_addr} to PIVX address {self.dest_addr} ") else: - printDbg("Sweeping rewards to PIVX address %s " % self.dest_addr) + printDbg(f"Sweeping rewards to PIVX address {self.dest_addr} ") self.ui.rewardsList.statusLabel.hide() printDbg("Preparing transaction. Please wait...") self.ui.loadingLine.show() @@ -374,10 +371,9 @@ def FinishSend(self, serialized_tx, amount_to_send): decodedTx = ParseTx(tx_hex, self.caller.isTestnetRPC) destination = decodedTx.get("vout")[0].get("scriptPubKey").get("addresses")[0] amount = decodedTx.get("vout")[0].get("value") - message = '

Broadcast signed transaction?

Destination address:
%s

' % destination - message += '

Amount: %s PIV
' % str(round(amount / 1e8, 8)) - message += 'Fees: %s PIV
Size: %d Bytes

' % ( - str(round(self.currFee / 1e8, 8)), len(tx_hex) / 2) + message = f'

Broadcast signed transaction?

Destination address:
{destination}

' + message += f'

Amount: {round(amount / 1e8, 8)} PIV
' + message += f'Fees: {round(self.currFee / 1e8, 8)} PIV
Size: {len(tx_hex) / 2} Bytes

' except Exception as e: printException(getCallerName(), getFunctionName(), "decoding exception", str(e)) message = '

Unable to decode TX- Broadcast anyway?

' @@ -392,7 +388,7 @@ def FinishSend(self, serialized_tx, amount_to_send): txid = self.caller.rpcClient.sendRawTransaction(tx_hex) if txid is None: raise Exception("Unable to send TX - connection to RPC server lost.") - printDbg("Transaction sent. ID: %s" % txid) + printDbg(f"Transaction sent. ID: {txid}") mess2_text = "

Transaction successfully sent.

" mess2 = QMessageBox(QMessageBox.Information, 'transaction Sent', mess2_text) mess2.setDetailedText(txid) @@ -431,15 +427,15 @@ def updateSelection(self, clicked_item=None): self.selectedRewards = self.getSelection() numOfInputs = len(self.selectedRewards) if numOfInputs: - for i in range(0, numOfInputs): - total += int(self.selectedRewards[i].get('satoshis')) + for reward in self.selectedRewards: + total += int(reward.get('satoshis')) # update suggested fee and selected rewards estimatedTxSize = (44 + numOfInputs * 148) * 1.0 / 1000 # kB feePerKb = self.caller.rpcClient.getFeePerKb() self.suggestedFee = round(feePerKb * estimatedTxSize, 8) - printDbg("estimatedTxSize is %s kB" % str(estimatedTxSize)) - printDbg("suggested fee is %s PIV (%s PIV/kB)" % (str(self.suggestedFee), str(feePerKb))) + printDbg(f"estimatedTxSize is {estimatedTxSize} kB") + printDbg(f"suggested fee is {self.suggestedFee} PIV ({feePerKb} PIV/kB)") self.ui.selectedRewardsLine.setText(str(round(total / 1e8, 8))) @@ -450,8 +446,6 @@ def updateSelection(self, clicked_item=None): def update_loading_utxos(self, percent): if percent < 100: - self.ui.resetStatusLabel('Checking explorer... %d%%' % percent) + self.ui.resetStatusLabel(f'Checking explorer... {percent}%') else: self.display_utxos() - - diff --git a/src/threads.py b/src/threads.py index 8ba0225..e248a5a 100644 --- a/src/threads.py +++ b/src/threads.py @@ -8,28 +8,41 @@ Based on project: https://github.com/Bertrand256/dash-masternode-tool """ + import threading import traceback from functools import partial from workerThread import WorkerThread +from typing import Callable, Any, Optional class ThreadFuns: @staticmethod - def runInThread(worker_fun, worker_fun_args, on_thread_finish=None, on_thread_exception=None, - skip_raise_exception=False): + def runInThread( + worker_fun: Callable[..., Any], + worker_fun_args: tuple, + on_thread_finish: Optional[Callable[[], None]] = None, + on_thread_exception: Optional[Callable[[Exception], None]] = None, + skip_raise_exception: bool = False + ) -> threading.Thread: """ Run a function inside a thread. :param worker_fun: reference to function to be executed inside a thread :param worker_fun_args: arguments passed to a thread function :param on_thread_finish: function to be called after thread finishes its execution + :param on_thread_exception: function to be called if an exception occurs in the thread :param skip_raise_exception: Exception raised inside the 'worker_fun' will be passed to the calling thread if: - on_thread_exception is a valid function (it's exception handler) - skip_raise_exception is False :return: reference to a thread object """ - def on_thread_finished_int(thread_arg, on_thread_finish_arg, skip_raise_exception_arg, on_thread_exception_arg): + def on_thread_finished_int( + thread_arg: WorkerThread, + on_thread_finish_arg: Optional[Callable[[], None]], + skip_raise_exception_arg: bool, + on_thread_exception_arg: Optional[Callable[[Exception], None]] + ): if thread_arg.worker_exception: if on_thread_exception_arg: on_thread_exception_arg(thread_arg.worker_exception) @@ -44,14 +57,13 @@ def on_thread_finished_int(thread_arg, on_thread_finish_arg, skip_raise_exceptio # starting thread from another thread causes an issue of not passing arguments' # values to on_thread_finished_int function, so on_thread_finish is not called st = traceback.format_stack() - print('Running thread from inside another thread. Stack: \n' + ''.join(st)) + print(f'Running thread from inside another thread. Stack: \n{"".join(st)}') thread = WorkerThread(worker_fun=worker_fun, worker_fun_args=worker_fun_args) # in Python 3.5 local variables sometimes are removed before calling on_thread_finished_int # so we have to bind that variables with the function ref - bound_on_thread_finished = partial(on_thread_finished_int, thread, on_thread_finish, skip_raise_exception, - on_thread_exception) + bound_on_thread_finished = partial(on_thread_finished_int, thread, on_thread_finish, skip_raise_exception, on_thread_exception) thread.finished.connect(bound_on_thread_finished) thread.daemon = True diff --git a/src/watchdogThreads.py b/src/watchdogThreads.py index 574c866..9bc016e 100644 --- a/src/watchdogThreads.py +++ b/src/watchdogThreads.py @@ -12,13 +12,13 @@ from misc import printOK -class CtrlObject(object): +class CtrlObject: pass class RpcWatchdog(QObject): def __init__(self, control_tab, timer_off=10, timer_on=120, *args, **kwargs): - QObject.__init__(self, *args, **kwargs) + super().__init__(*args, **kwargs) self.firstLoop = True self.shutdown_flag = Event() self.control_tab = control_tab diff --git a/src/workerThread.py b/src/workerThread.py index 6b07dd2..0f8ba50 100644 --- a/src/workerThread.py +++ b/src/workerThread.py @@ -13,7 +13,7 @@ from misc import printError -class CtrlObject(object): +class CtrlObject: pass @@ -23,7 +23,7 @@ class WorkerThread(QThread): """ def __init__(self, worker_fun, worker_fun_args): - QThread.__init__(self) + super().__init__() self.worker_fun = worker_fun self.worker_fun_args = worker_fun_args # prepare control object passed to external thread function @@ -44,4 +44,5 @@ def run(self): self.worker_result = self.worker_fun(self.ctrl_obj, *self.worker_fun_args) except Exception as e: printError("worker thread", "run", str(e)) + self.worker_exception = e self.stop() From 073863c306d9959ac15a48a1ce079e279ceed237 Mon Sep 17 00:00:00 2001 From: Liquid369 Date: Wed, 18 Sep 2024 08:23:33 -0500 Subject: [PATCH 05/11] Fix some linting --- src/mainWindow.py | 2 +- src/qt/gui_tabRewards.py | 4 ++-- src/threads.py | 10 +++++----- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/mainWindow.py b/src/mainWindow.py index 87ae791..87ae2fe 100644 --- a/src/mainWindow.py +++ b/src/mainWindow.py @@ -13,7 +13,7 @@ from PyQt5.QtCore import pyqtSignal, Qt, QThread from PyQt5.QtGui import QPixmap, QColor, QPalette, QTextCursor, QFont, QIcon from PyQt5.QtWidgets import QWidget, QPushButton, QHBoxLayout, QGroupBox, QVBoxLayout, \ - QFileDialog, QTextEdit, QTabWidget, QLabel, QSplitter, QAction, QMenuBar + QFileDialog, QTextEdit, QTabWidget, QLabel, QSplitter from apiClient import ApiClient from constants import starting_height, DefaultCache, wqueue diff --git a/src/qt/gui_tabRewards.py b/src/qt/gui_tabRewards.py index 944385d..91cc2d0 100644 --- a/src/qt/gui_tabRewards.py +++ b/src/qt/gui_tabRewards.py @@ -9,8 +9,8 @@ from PyQt5.QtCore import Qt from PyQt5.QtWidgets import ( - QWidget, QPushButton, QHBoxLayout, QGroupBox, QVBoxLayout, QLineEdit, QComboBox, - QProgressBar, QLabel, QFormLayout, QDoubleSpinBox, QTableWidget, QTableWidgetItem, + QWidget, QPushButton, QHBoxLayout, QGroupBox, QVBoxLayout, QLineEdit, QComboBox, + QProgressBar, QLabel, QFormLayout, QDoubleSpinBox, QTableWidget, QTableWidgetItem, QAbstractItemView, QHeaderView, QSpinBox ) diff --git a/src/threads.py b/src/threads.py index e248a5a..5f422c9 100644 --- a/src/threads.py +++ b/src/threads.py @@ -19,9 +19,9 @@ class ThreadFuns: @staticmethod def runInThread( - worker_fun: Callable[..., Any], - worker_fun_args: tuple, - on_thread_finish: Optional[Callable[[], None]] = None, + worker_fun: Callable[..., Any], + worker_fun_args: tuple, + on_thread_finish: Optional[Callable[[], None]] = None, on_thread_exception: Optional[Callable[[Exception], None]] = None, skip_raise_exception: bool = False ) -> threading.Thread: @@ -38,9 +38,9 @@ def runInThread( """ def on_thread_finished_int( - thread_arg: WorkerThread, + thread_arg: WorkerThread, on_thread_finish_arg: Optional[Callable[[], None]], - skip_raise_exception_arg: bool, + skip_raise_exception_arg: bool, on_thread_exception_arg: Optional[Callable[[Exception], None]] ): if thread_arg.worker_exception: From ddc4ec0a85ca99dabc714c3e0d88e60edfb5ab86 Mon Sep 17 00:00:00 2001 From: Liquid369 Date: Wed, 18 Sep 2024 08:47:55 -0500 Subject: [PATCH 06/11] Update flake,mypy to reflect python 3.10 --- .github/workflows/main.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml index 939734a..f6dbc2e 100644 --- a/.github/workflows/main.yaml +++ b/.github/workflows/main.yaml @@ -26,9 +26,9 @@ jobs: pip install setuptools==53.0.0 pip install -r requirements.txt pip install importlib-metadata==4.13.0 - pip install flake8==3.8.4 - pip install mypy==0.781 - pip install vulture==2.3 + pip install flake8==4.0.0 + pip install mypy==0.920 + pip install vulture==2.4 - name: Lint run: | From 6ca5c842fffe128d59d01c3540210aa2a6988e29 Mon Sep 17 00:00:00 2001 From: Liquid369 Date: Wed, 18 Sep 2024 08:50:50 -0500 Subject: [PATCH 07/11] Fix some errors by linter --- src/misc.py | 4 ++-- src/pivx_b58.py | 2 +- src/rpcClient.py | 2 +- src/threads.py | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/misc.py b/src/misc.py index 8c7792d..4d62ddf 100644 --- a/src/misc.py +++ b/src/misc.py @@ -11,7 +11,7 @@ from contextlib import redirect_stdout from ipaddress import ip_address from urllib.parse import urlparse -from typing import Any, Callable, Dict, Optional, Type +from typing import Any, Callable, Dict, Optional, Type, Tuple import simplejson as json from PyQt5.QtCore import QObject, pyqtSignal, QSettings @@ -286,7 +286,7 @@ def sec_to_time(seconds: int) -> str: return f"{days} days, {hrs} hrs, {mins} mins, {seconds} secs" -def timeThis(function: Callable[..., Any], *args: Any) -> tuple[Optional[Any], Optional[float]]: +def timeThis(function: Callable[..., Any], *args: Any) -> Tuple[Optional[Any], Optional[float]]: try: start = time.perf_counter() val = function(*args) diff --git a/src/pivx_b58.py b/src/pivx_b58.py index fda81be..093aa23 100644 --- a/src/pivx_b58.py +++ b/src/pivx_b58.py @@ -61,6 +61,6 @@ def b58decode(v: str, length: int = None) -> bytes: result = _bchr(0) * nPad + result if length is not None and len(result) != length: - return None + return b'' return result diff --git a/src/rpcClient.py b/src/rpcClient.py index cb8477d..146462a 100644 --- a/src/rpcClient.py +++ b/src/rpcClient.py @@ -75,7 +75,7 @@ def getMNStatus(self, address: str) -> dict: with self.lock: mnStatusList = self.conn.listmasternodes(address) if not mnStatusList: - return None + return {} mnStatus = mnStatusList[0] mnStatus['mnCount'] = self.conn.getmasternodecount()['enabled'] return mnStatus diff --git a/src/threads.py b/src/threads.py index 5f422c9..d0c3113 100644 --- a/src/threads.py +++ b/src/threads.py @@ -39,7 +39,7 @@ def runInThread( def on_thread_finished_int( thread_arg: WorkerThread, - on_thread_finish_arg: Optional[Callable[[], None]], + on_thread_finish_arg: Optional[Callable[[], None]], skip_raise_exception_arg: bool, on_thread_exception_arg: Optional[Callable[[Exception], None]] ): From fbf490d31f3153ad2ec879531b3fc9bcef48fa11 Mon Sep 17 00:00:00 2001 From: Liquid369 Date: Wed, 18 Sep 2024 09:01:36 -0500 Subject: [PATCH 08/11] Fix byteorder --- src/pivx_parser.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pivx_parser.py b/src/pivx_parser.py index 14b7e55..b55ca02 100644 --- a/src/pivx_parser.py +++ b/src/pivx_parser.py @@ -14,7 +14,7 @@ def __init__(self, hex_str: str): self.cursor = 0 self.hex_str = hex_str - def readInt(self, nbytes: int, byteorder: str = "big", signed: bool = False) -> int: + def readInt(self, nbytes: int, byteorder: str = 'big', signed: bool = False) -> int: if self.cursor + nbytes * 2 > len(self.hex_str): raise Exception("HexParser range error") b = bytes.fromhex(self.hex_str[self.cursor:self.cursor + nbytes * 2]) From 3b1539d982c3f3f1f410888cfb6a733d8e5c6341 Mon Sep 17 00:00:00 2001 From: Liquid369 Date: Fri, 25 Oct 2024 09:35:13 -0500 Subject: [PATCH 09/11] Fix type annotations --- src/pivx_parser.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/pivx_parser.py b/src/pivx_parser.py index b55ca02..6bf6af1 100644 --- a/src/pivx_parser.py +++ b/src/pivx_parser.py @@ -4,6 +4,7 @@ # Distributed under the MIT software license, see the accompanying # file LICENSE.txt or http://www.opensource.org/licenses/mit-license.php. +from typing import Dict, Tuple from misc import getCallerName, getFunctionName, printException import utils from pivx_hashlib import pubkeyhash_to_address @@ -43,11 +44,11 @@ def readString(self, nbytes: int, byteorder: str = "big") -> str: return res -def IsCoinBase(vin: dict) -> bool: +def IsCoinBase(vin: Dict[str, any]) -> bool: return vin["txid"] == "0" * 64 and vin["vout"] == 4294967295 and vin["scriptSig"]["hex"][:2] != "c2" -def ParseTxInput(p: HexParser) -> dict: +def ParseTxInput(p: HexParser) -> Dict[str, any]: vin = { "txid": p.readString(32, "little"), "vout": p.readInt(4, "little"), @@ -65,7 +66,7 @@ def ParseTxInput(p: HexParser) -> dict: return vin -def ParseTxOutput(p: HexParser, isTestnet: bool = False) -> dict: +def ParseTxOutput(p: HexParser, isTestnet: bool = False) -> Dict[str, any]: vout = { "value": p.readInt(8, "little"), "scriptPubKey": { @@ -85,7 +86,7 @@ def ParseTxOutput(p: HexParser, isTestnet: bool = False) -> dict: return vout -def ParseTx(hex_string: str, isTestnet: bool = False) -> dict: +def ParseTx(hex_string: str, isTestnet: bool = False) -> Dict[str, any]: p = HexParser(hex_string) tx = { "version": p.readInt(4, "little"), @@ -96,13 +97,13 @@ def ParseTx(hex_string: str, isTestnet: bool = False) -> dict: return tx -def IsPayToColdStaking(rawtx: str, out_n: int) -> tuple[bool, bool]: +def IsPayToColdStaking(rawtx: str, out_n: int) -> Tuple[bool, bool]: tx = ParseTx(rawtx) script = tx['vout'][out_n]["scriptPubKey"]["hex"] return utils.IsPayToColdStaking(bytes.fromhex(script)), IsCoinStake(tx) -def IsCoinStake(json_tx: dict) -> bool: +def IsCoinStake(json_tx: Dict[str, any]) -> bool: return json_tx['vout'][0]["scriptPubKey"]["hex"] == "" From e17d54bdd6f2d2d8a78aea27ca99328b0ef125b5 Mon Sep 17 00:00:00 2001 From: Liquid369 Date: Fri, 25 Oct 2024 10:07:48 -0500 Subject: [PATCH 10/11] Small fixes resolve type-checking errors Update requirements Update more deps adjust types More type hints Update deps and type hints Pin python 3.10.6 for bugfix Fix flags Fix port type port conversion Set union --- .github/workflows/main.yaml | 4 ++-- requirements.txt | 2 ++ src/pivx_parser.py | 41 +++++++++++++++++++++++++++---------- src/rpcClient.py | 14 ++++++++++++- src/tabRewards.py | 2 +- 5 files changed, 48 insertions(+), 15 deletions(-) diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml index f6dbc2e..c697947 100644 --- a/.github/workflows/main.yaml +++ b/.github/workflows/main.yaml @@ -17,7 +17,7 @@ jobs: - name: Initialize Python uses: actions/setup-python@v5 with: - python-version: '3.10' + python-version: '3.10.6' - name: Install Dependencies run: | @@ -66,7 +66,7 @@ jobs: - name: Setup Python 3.10 uses: actions/setup-python@v5 with: - python-version: '3.10' + python-version: '3.10.6' - name: Setup pip cache uses: actions/cache@v4 with: diff --git a/requirements.txt b/requirements.txt index 340430b..6c1baca 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,3 +8,5 @@ PyQt5>=5.15.10 requests==2.32.2 simplejson==3.19.2 ecdsa==0.19.0 +types-requests>=2.28.11,<3.0 +types-simplejson>=3.18.0,<4.0 diff --git a/src/pivx_parser.py b/src/pivx_parser.py index 6bf6af1..d970c28 100644 --- a/src/pivx_parser.py +++ b/src/pivx_parser.py @@ -4,7 +4,7 @@ # Distributed under the MIT software license, see the accompanying # file LICENSE.txt or http://www.opensource.org/licenses/mit-license.php. -from typing import Dict, Tuple +from typing import Dict, Tuple, List, Literal, Any, TypedDict from misc import getCallerName, getFunctionName, printException import utils from pivx_hashlib import pubkeyhash_to_address @@ -15,7 +15,7 @@ def __init__(self, hex_str: str): self.cursor = 0 self.hex_str = hex_str - def readInt(self, nbytes: int, byteorder: str = 'big', signed: bool = False) -> int: + def readInt(self, nbytes: int, byteorder: Literal['little', 'big'] = 'big', signed: bool = False) -> int: if self.cursor + nbytes * 2 > len(self.hex_str): raise Exception("HexParser range error") b = bytes.fromhex(self.hex_str[self.cursor:self.cursor + nbytes * 2]) @@ -44,12 +44,32 @@ def readString(self, nbytes: int, byteorder: str = "big") -> str: return res -def IsCoinBase(vin: Dict[str, any]) -> bool: +class VinType(TypedDict, total=False): + txid: str + vout: int + scriptSig: Dict[str, str] + sequence: int + coinbase: str + + +class VoutType(TypedDict): + value: int + scriptPubKey: Dict[str, Any] + + +class TxType(TypedDict): + version: int + vin: List[VinType] + vout: List[VoutType] + locktime: int + + +def IsCoinBase(vin: VinType) -> bool: return vin["txid"] == "0" * 64 and vin["vout"] == 4294967295 and vin["scriptSig"]["hex"][:2] != "c2" -def ParseTxInput(p: HexParser) -> Dict[str, any]: - vin = { +def ParseTxInput(p: HexParser) -> VinType: + vin: VinType = { "txid": p.readString(32, "little"), "vout": p.readInt(4, "little"), "scriptSig": { @@ -66,8 +86,8 @@ def ParseTxInput(p: HexParser) -> Dict[str, any]: return vin -def ParseTxOutput(p: HexParser, isTestnet: bool = False) -> Dict[str, any]: - vout = { +def ParseTxOutput(p: HexParser, isTestnet: bool = False) -> VoutType: + vout: VoutType = { "value": p.readInt(8, "little"), "scriptPubKey": { "hex": p.readString(p.readVarInt(), "big"), @@ -76,7 +96,6 @@ def ParseTxOutput(p: HexParser, isTestnet: bool = False) -> Dict[str, any]: } try: locking_script = bytes.fromhex(vout["scriptPubKey"]["hex"]) - # add addresses only if P2PKH, P2PK or P2CS if len(locking_script) in [25, 35, 51]: add_bytes = utils.extract_pkh_from_locking_script(locking_script) address = pubkeyhash_to_address(add_bytes, isTestnet) @@ -86,9 +105,9 @@ def ParseTxOutput(p: HexParser, isTestnet: bool = False) -> Dict[str, any]: return vout -def ParseTx(hex_string: str, isTestnet: bool = False) -> Dict[str, any]: +def ParseTx(hex_string: str, isTestnet: bool = False) -> TxType: p = HexParser(hex_string) - tx = { + tx: TxType = { "version": p.readInt(4, "little"), "vin": [ParseTxInput(p) for _ in range(p.readVarInt())], "vout": [ParseTxOutput(p, isTestnet) for _ in range(p.readVarInt())], @@ -103,7 +122,7 @@ def IsPayToColdStaking(rawtx: str, out_n: int) -> Tuple[bool, bool]: return utils.IsPayToColdStaking(bytes.fromhex(script)), IsCoinStake(tx) -def IsCoinStake(json_tx: Dict[str, any]) -> bool: +def IsCoinStake(json_tx: TxType) -> bool: return json_tx['vout'][0]["scriptPubKey"]["hex"] == "" diff --git a/src/rpcClient.py b/src/rpcClient.py index 146462a..b1b91dd 100644 --- a/src/rpcClient.py +++ b/src/rpcClient.py @@ -9,6 +9,7 @@ import http.client as httplib import ssl import threading +from typing import Union, Optional from constants import DEFAULT_PROTOCOL_VERSION, MINIMUM_FEE from misc import getCallerName, getFunctionName, printException, printDbg, now, timeThis @@ -40,7 +41,18 @@ def __init__(self, rpc_protocol: str, rpc_host: str, rpc_user: str, rpc_password self.rpc_url = f"{rpc_protocol}://{rpc_user}:{rpc_password}@{rpc_host}" - host, port = rpc_host.split(":") + if ":" in rpc_host: + host, port_str = rpc_host.split(":") + try: + port = int(port_str) + except ValueError: + raise ValueError(f"Invalid port specified: {port_str}") + else: + host = rpc_host + port = None + + self.httpConnection: Union[httplib.HTTPConnection, httplib.HTTPSConnection] + if rpc_protocol == "https": self.httpConnection = httplib.HTTPSConnection(host, port, timeout=20, context=ssl._create_unverified_context()) else: diff --git a/src/tabRewards.py b/src/tabRewards.py index 555751e..d25c20f 100644 --- a/src/tabRewards.py +++ b/src/tabRewards.py @@ -75,7 +75,7 @@ def display_utxos(self): def item(value: str) -> QTableWidgetItem: item = QTableWidgetItem(value) item.setTextAlignment(Qt.AlignCenter) - item.setFlags(Qt.ItemIsEnabled | Qt.ItemIsSelectable) + item.setFlags(item.flags() | Qt.ItemIsEnabled | Qt.ItemIsSelectable) return item # Clear up old list From eda9053f6d83cc51b6ae43e88a37ebac85cb3ad5 Mon Sep 17 00:00:00 2001 From: Liquid369 Date: Sat, 26 Oct 2024 09:22:36 -0500 Subject: [PATCH 11/11] Fix linter --- src/misc.py | 26 ++++++++++++++++++++------ src/rpcClient.py | 2 +- src/tabRewards.py | 3 +-- 3 files changed, 22 insertions(+), 9 deletions(-) diff --git a/src/misc.py b/src/misc.py index 4d62ddf..9fc962a 100644 --- a/src/misc.py +++ b/src/misc.py @@ -8,16 +8,17 @@ import os import sys import time +import io from contextlib import redirect_stdout from ipaddress import ip_address from urllib.parse import urlparse -from typing import Any, Callable, Dict, Optional, Type, Tuple +from typing import Any, Callable, Dict, Optional, Tuple import simplejson as json from PyQt5.QtCore import QObject, pyqtSignal, QSettings from PyQt5.QtWidgets import QMessageBox -from constants import log_File, DefaultCache, wqueue, MAX_INPUTS_NO_WARNING +from constants import log_File, DefaultCache, MAX_INPUTS_NO_WARNING def add_defaultKeys_to_dict(dictObj: Dict[str, Any], defaultObj: Dict[str, Any]) -> None: @@ -26,7 +27,7 @@ def add_defaultKeys_to_dict(dictObj: Dict[str, Any], defaultObj: Dict[str, Any]) dictObj[key] = defaultObj[key] -QT_MESSAGE_TYPE: Dict[str, Type[QMessageBox.Icon]] = { +QT_MESSAGE_TYPE: Dict[str, QMessageBox.Icon] = { "info": QMessageBox.Information, "warn": QMessageBox.Warning, "crit": QMessageBox.Critical, @@ -158,7 +159,7 @@ def myPopUp(parentWindow: Any, messType: str, messTitle: str, messText: str, def def myPopUp_sb(parentWindow: Any, messType: str, messTitle: str, messText: str, singleButton: QMessageBox.StandardButton = QMessageBox.Ok) -> int: message_type = QT_MESSAGE_TYPE.get(messType, QMessageBox.Information) mess = QMessageBox(message_type, messTitle, messText, singleButton, parent=parentWindow) - mess.setStandardButtons(singleButton | singleButton) + mess.setStandardButtons(singleButton) return mess.exec_() @@ -258,8 +259,9 @@ def readCacheSettings() -> Dict[str, Any]: def redirect_print(what: str) -> None: - with redirect_stdout(WriteStream(wqueue)): - print(what) + with io.StringIO() as buffer: + with redirect_stdout(buffer): + print(what) def saveCacheSettings(cache: Dict[str, Any]) -> None: @@ -316,6 +318,18 @@ def flush(self) -> None: pass +class WrapperWriteStream: + def __init__(self, write_stream: WriteStream): + self.write_stream = write_stream + + def write(self, message: str) -> int: + self.write_stream.write(message) + return len(message) + + def flush(self) -> None: + self.write_stream.flush() + + # QObject (to be run in QThread) that blocks until data is available # and then emits a QtSignal to the main thread. class WriteStreamReceiver(QObject): diff --git a/src/rpcClient.py b/src/rpcClient.py index b1b91dd..11b8456 100644 --- a/src/rpcClient.py +++ b/src/rpcClient.py @@ -9,7 +9,7 @@ import http.client as httplib import ssl import threading -from typing import Union, Optional +from typing import Union from constants import DEFAULT_PROTOCOL_VERSION, MINIMUM_FEE from misc import getCallerName, getFunctionName, printException, printDbg, now, timeThis diff --git a/src/tabRewards.py b/src/tabRewards.py index d25c20f..48023f2 100644 --- a/src/tabRewards.py +++ b/src/tabRewards.py @@ -7,8 +7,7 @@ import threading import simplejson as json -from PyQt5.Qt import QApplication -from PyQt5.QtCore import Qt +from PyQt5.Qt import QApplication, Qt from PyQt5.QtWidgets import QMessageBox, QTableWidgetItem, QHeaderView from constants import MINIMUM_FEE