Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swaps: Update submarine swap nostr relays dynamically and remove redundant query #9640

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
dynamically update relays and remove redundant nostr query, store last
swapserver relays in file
f321x committed Mar 21, 2025
commit 21e3fd91ddf2866b100b7bd0fbea01b3d287b0d7
120 changes: 74 additions & 46 deletions electrum/submarine_swaps.py
Original file line number Diff line number Diff line change
@@ -202,6 +202,7 @@ def __init__(self, *, wallet: 'Abstract_Wallet', lnworker: 'LNWallet'):
self.prepayments[swap.prepay_hash] = bytes.fromhex(k)
self.is_server = False # overriden by swapserver plugin if enabled
self.is_initialized = asyncio.Event()
self.pairs_updated = asyncio.Event()

def start_network(self, network: 'Network'):
assert network
@@ -922,7 +923,15 @@ def update_pairs(self, pairs):
self.percentage = pairs.percentage
self._min_amount = pairs.min_amount
self._max_amount = pairs.max_amount
self.is_initialized.set()
self.trigger_pairs_updated_threadsafe()

def trigger_pairs_updated_threadsafe(self):
def trigger():
self.is_initialized.set()
self.pairs_updated.set()
self.pairs_updated.clear()
loop = get_asyncio_loop()
loop.call_soon_threadsafe(trigger)

def get_max_amount(self) -> int:
"""in satoshis"""
@@ -1065,7 +1074,7 @@ def create_claim_txin(
*,
txin: PartialTxInput,
swap: SwapData,
) -> PartialTransaction:
) -> Tuple[PartialTxInput, Optional[int]]:
if swap.is_reverse: # successful reverse swap
locktime = None
# preimage will be set in sign_tx
@@ -1280,11 +1289,11 @@ def __init__(self, config, sm, keypair):
self.private_key = keypair.privkey
self.nostr_private_key = to_nip19('nsec', keypair.privkey.hex())
self.nostr_pubkey = keypair.pubkey.hex()[2:]
self.dm_replies = defaultdict(asyncio.Future) # type: Dict[bytes, asyncio.Future]
self.dm_replies = defaultdict(asyncio.Future) # type: Dict[str, asyncio.Future]
self.ssl_context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_path)
self.relay_manager = None
self.taskgroup = OldTaskGroup()
self.server_relays = None
self._last_swapserver_relays = self._load_last_swapserver_relays() # type: Optional[Sequence[str]]

def __enter__(self):
asyncio.run_coroutine_threadsafe(self.main_loop(), self.network.asyncio_loop)
@@ -1311,8 +1320,8 @@ async def main_loop(self):
else:
tasks = [
self.check_direct_messages(),
self.receive_offers(),
self.get_pairs(),
self.update_relays()
]
try:
async with self.taskgroup as group:
@@ -1333,7 +1342,11 @@ async def stop(self):

@property
def relays(self):
return self.network.config.NOSTR_RELAYS.split(',')
our_relays = self.config.NOSTR_RELAYS.split(',') if self.config.NOSTR_RELAYS else []
if self.sm.is_server:
return our_relays
last_swapserver_relays = self._last_swapserver_relays or []
return list(set(our_relays + last_swapserver_relays))

def get_relay_manager(self):
assert get_running_loop() == get_asyncio_loop(), f"this must be run on the asyncio thread!"
@@ -1396,7 +1409,7 @@ async def publish_offer(self, sm):
private_key=self.nostr_private_key)
self.logger.info(f"published offer {event_id}")

async def send_direct_message(self, pubkey: str, relays, content: str) -> str:
async def send_direct_message(self, pubkey: str, content: str) -> str:
our_private_key = aionostr.key.PrivateKey(self.private_key)
recv_pubkey_hex = aionostr.util.from_nip19(pubkey)['object'].hex() if pubkey.startswith('npub') else pubkey
encrypted_msg = our_private_key.encrypt_message(content, recv_pubkey_hex)
@@ -1411,27 +1424,29 @@ async def send_direct_message(self, pubkey: str, relays, content: str) -> str:

@log_exceptions
async def send_request_to_server(self, method: str, request_data: dict) -> dict:
self.logger.debug(f"swapserver req: method: {method} relays: {self.relays}")
request_data['method'] = method
request_data['relays'] = self.config.NOSTR_RELAYS
server_pubkey = self.config.SWAPSERVER_NPUB
event_id = await self.send_direct_message(server_pubkey, self.server_relays, json.dumps(request_data))
event_id = await self.send_direct_message(server_pubkey, json.dumps(request_data))
response = await self.dm_replies[event_id]
return response

async def receive_offers(self):
async def get_pairs(self):
await self.is_connected.wait()
query = {
"kinds": [self.USER_STATUS_NIP38],
"limit":10,
"#d": [f"electrum-swapserver-{self.NOSTR_EVENT_VERSION}"],
"#r": [f"net:{constants.net.NET_NAME}"],
"since": int(time.time()) - self.OFFER_UPDATE_INTERVAL_SEC
"since": int(time.time()) - 60 * 60,
"until": int(time.time()) + 60 * 60,
}
async for event in self.relay_manager.get_events(query, single_event=False, only_stored=False):
try:
content = json.loads(event.content)
tags = {k: v for k, v in event.tags}
except Exception as e:
self.logger.debug(f"failed to parse event: {e}")
continue
if tags.get('d') != f"electrum-swapserver-{self.NOSTR_EVENT_VERSION}":
continue
@@ -1440,8 +1455,9 @@ async def receive_offers(self):
# check if this is the most recent event for this pubkey
pubkey = event.pubkey
ts = self._offers.get(pubkey, {}).get('timestamp', 0)
if event.created_at <= ts:
#print('skipping old event', pubkey[0:10], event.id)
if (event.created_at <= ts
or event.created_at > time.time() + 60 * 60
or event.created_at < time.time() - 60 * 60):
continue
try:
pow_bits = get_nostr_ann_pow_amount(
@@ -1456,40 +1472,30 @@ async def receive_offers(self):
content['pow_bits'] = pow_bits
content['pubkey'] = pubkey
content['timestamp'] = event.created_at
server_relays = content['relays'].split(',') if 'relays' in content else []
content['relays'] = server_relays[:10] # limit to 10 relays
self._offers[pubkey] = content
if self.config.SWAPSERVER_NPUB == pubkey:
pairs = self._parse_offer(content)
self.sm.update_pairs(pairs)
# mirror event to other relays
server_relays = content['relays'].split(',') if 'relays' in content else []
await self.taskgroup.spawn(self.rebroadcast_event(event, server_relays))

async def get_pairs(self):
if not self.config.SWAPSERVER_NPUB:
return
query = {
"kinds": [self.USER_STATUS_NIP38],
"authors": [self.config.SWAPSERVER_NPUB],
"#d": [f"electrum-swapserver-{self.NOSTR_EVENT_VERSION}"],
"#r": [f"net:{constants.net.NET_NAME}"],
"since": int(time.time()) - self.OFFER_UPDATE_INTERVAL_SEC,
"limit": 1
}
async for event in self.relay_manager.get_events(query, single_event=True, only_stored=False):
try:
content = json.loads(event.content)
tags = {k: v for k, v in event.tags}
except Exception:
continue
if tags.get('d') != f"electrum-swapserver-{self.NOSTR_EVENT_VERSION}":
continue
if tags.get('r') != f"net:{constants.net.NET_NAME}":
continue
# check if this is the most recent event for this pubkey
pubkey = event.pubkey
content['pubkey'] = pubkey
content['timestamp'] = event.created_at
self.logger.info(f'received offer from {age(event.created_at)}')
pairs = self._parse_offer(content)
self.sm.update_pairs(pairs)
self.server_relays = content['relays'].split(',')
async def update_relays(self):
"""
Update the relays when update_pairs is called.
This ensures we try to connect to the same relays as the ones announced by the swap server.
"""
while True:
previous_relays = self._last_swapserver_relays
await self.sm.pairs_updated.wait()
latest_known_relays = self._offers[self.config.SWAPSERVER_NPUB]['relays']
if latest_known_relays != previous_relays:
self.logger.debug(f"swapserver relays changed, updating relay list.")
# store the latest known relays to a file
self._store_last_swapserver_relays(latest_known_relays)
# update the relay manager
await self.relay_manager.update_relays(self.relays)

async def rebroadcast_event(self, event: Event, server_relays: Sequence[str]):
"""If the relays of the origin server are different from our relays we rebroadcast the
@@ -1533,7 +1539,6 @@ async def handle_request(self, request):
event_id = request.pop('event_id')
event_pubkey = request.pop('event_pubkey')
self.logger.info(f'handle_request: id={event_id} {method} {request}')
relays = request.pop('relays').split(',')
if method == 'addswapinvoice':
r = self.sm.server_add_swap_invoice(request)
elif method == 'createswap':
@@ -1543,5 +1548,28 @@ async def handle_request(self, request):
else:
raise Exception(method)
r['reply_to'] = event_id
self.logger.info(f'sending response id={event_id}')
await self.send_direct_message(event_pubkey, relays, json.dumps(r))
self.logger.debug(f'sending response id={event_id}')
await self.send_direct_message(event_pubkey, json.dumps(r))

def _store_last_swapserver_relays(self, relays: Sequence[str]):
self._last_swapserver_relays = relays
if not self.config.path or not relays:
return
storage_path = os.path.join(self.config.path, 'recent_swapserver_relays')
try:
with open(storage_path, 'w', encoding="utf-8") as f:
json.dump(relays, f, indent=4, sort_keys=True) # type: ignore
except Exception:
self.logger.exception(f"failed to write last swapserver relays to {storage_path}")

def _load_last_swapserver_relays(self) -> Optional[Sequence[str]]:
storage_path = os.path.join(self.config.path, 'recent_swapserver_relays')
if not os.path.exists(storage_path):
return None
try:
with open(storage_path, 'r', encoding="utf-8") as f:
relays = json.load(f)
except Exception:
self.logger.exception(f"failed to read last swapserver relays from {storage_path}")
return None
return relays