@@ -202,6 +202,7 @@ def __init__(self, *, wallet: 'Abstract_Wallet', lnworker: 'LNWallet'):
202
202
self .prepayments [swap .prepay_hash ] = bytes .fromhex (k )
203
203
self .is_server = False # overriden by swapserver plugin if enabled
204
204
self .is_initialized = asyncio .Event ()
205
+ self .pairs_updated = asyncio .Event ()
205
206
206
207
def start_network (self , network : 'Network' ):
207
208
assert network
@@ -922,7 +923,15 @@ def update_pairs(self, pairs):
922
923
self .percentage = pairs .percentage
923
924
self ._min_amount = pairs .min_amount
924
925
self ._max_amount = pairs .max_amount
925
- self .is_initialized .set ()
926
+ self .trigger_pairs_updated ()
927
+
928
+ def trigger_pairs_updated (self ):
929
+ def trigger ():
930
+ self .is_initialized .set ()
931
+ self .pairs_updated .set ()
932
+ self .pairs_updated .clear ()
933
+ loop = get_asyncio_loop ()
934
+ loop .call_soon_threadsafe (trigger )
926
935
927
936
def get_max_amount (self ) -> int :
928
937
"""in satoshis"""
@@ -1065,7 +1074,7 @@ def create_claim_txin(
1065
1074
* ,
1066
1075
txin : PartialTxInput ,
1067
1076
swap : SwapData ,
1068
- ) -> PartialTransaction :
1077
+ ) -> Tuple [ PartialTxInput , Optional [ int ]] :
1069
1078
if swap .is_reverse : # successful reverse swap
1070
1079
locktime = None
1071
1080
# preimage will be set in sign_tx
@@ -1280,11 +1289,10 @@ def __init__(self, config, sm, keypair):
1280
1289
self .private_key = keypair .privkey
1281
1290
self .nostr_private_key = to_nip19 ('nsec' , keypair .privkey .hex ())
1282
1291
self .nostr_pubkey = keypair .pubkey .hex ()[2 :]
1283
- self .dm_replies = defaultdict (asyncio .Future ) # type: Dict[bytes , asyncio.Future]
1292
+ self .dm_replies = defaultdict (asyncio .Future ) # type: Dict[str , asyncio.Future]
1284
1293
self .ssl_context = ssl .create_default_context (purpose = ssl .Purpose .SERVER_AUTH , cafile = ca_path )
1285
1294
self .relay_manager = None
1286
1295
self .taskgroup = OldTaskGroup ()
1287
- self .server_relays = None
1288
1296
1289
1297
def __enter__ (self ):
1290
1298
asyncio .run_coroutine_threadsafe (self .main_loop (), self .network .asyncio_loop )
@@ -1311,8 +1319,8 @@ async def main_loop(self):
1311
1319
else :
1312
1320
tasks = [
1313
1321
self .check_direct_messages (),
1314
- self .receive_offers (),
1315
1322
self .get_pairs (),
1323
+ self .update_relays ()
1316
1324
]
1317
1325
try :
1318
1326
async with self .taskgroup as group :
@@ -1333,12 +1341,16 @@ async def stop(self):
1333
1341
1334
1342
@property
1335
1343
def relays (self ):
1336
- return self .network .config .NOSTR_RELAYS .split (',' )
1344
+ our_relays = self .config .NOSTR_RELAYS .split (',' ) if self .config .NOSTR_RELAYS else []
1345
+ if self .sm .is_server :
1346
+ return our_relays
1347
+ last_swapserver_relays = self .config .LAST_SWAPSERVER_RELAYS .split (',' ) if self .config .LAST_SWAPSERVER_RELAYS else []
1348
+ return list (set (our_relays + last_swapserver_relays ))
1337
1349
1338
1350
def get_relay_manager (self ):
1339
1351
assert get_running_loop () == get_asyncio_loop (), f"this must be run on the asyncio thread!"
1340
1352
if not self .relay_manager :
1341
- if self .network .proxy :
1353
+ if self .network .proxy and self . network . proxy . enabled :
1342
1354
proxy = make_aiohttp_proxy_connector (self .network .proxy , self .ssl_context )
1343
1355
else :
1344
1356
proxy : Optional ['ProxyConnector' ] = None
@@ -1396,7 +1408,7 @@ async def publish_offer(self, sm):
1396
1408
private_key = self .nostr_private_key )
1397
1409
self .logger .info (f"published offer { event_id } " )
1398
1410
1399
- async def send_direct_message (self , pubkey : str , relays , content : str ) -> str :
1411
+ async def send_direct_message (self , pubkey : str , content : str ) -> str :
1400
1412
event_id = await aionostr ._add_event (
1401
1413
self .relay_manager ,
1402
1414
kind = self .NOSTR_DM ,
@@ -1407,27 +1419,29 @@ async def send_direct_message(self, pubkey: str, relays, content: str) -> str:
1407
1419
1408
1420
@log_exceptions
1409
1421
async def send_request_to_server (self , method : str , request_data : dict ) -> dict :
1422
+ self .logger .debug (f"swapserver req: method: { method } relays: { self .relays } " )
1410
1423
request_data ['method' ] = method
1411
- request_data ['relays' ] = self .config .NOSTR_RELAYS
1412
1424
server_pubkey = self .config .SWAPSERVER_NPUB
1413
- event_id = await self .send_direct_message (server_pubkey , self . server_relays , json .dumps (request_data ))
1425
+ event_id = await self .send_direct_message (server_pubkey , json .dumps (request_data ))
1414
1426
response = await self .dm_replies [event_id ]
1415
1427
return response
1416
1428
1417
- async def receive_offers (self ):
1429
+ async def get_pairs (self ):
1418
1430
await self .is_connected .wait ()
1419
1431
query = {
1420
1432
"kinds" : [self .USER_STATUS_NIP38 ],
1421
1433
"limit" :10 ,
1422
1434
"#d" : [f"electrum-swapserver-{ self .NOSTR_EVENT_VERSION } " ],
1423
1435
"#r" : [f"net:{ constants .net .NET_NAME } " ],
1424
- "since" : int (time .time ()) - self .OFFER_UPDATE_INTERVAL_SEC
1436
+ "since" : int (time .time ()) - 60 * 60 ,
1437
+ "until" : int (time .time ()) + 60 * 60 ,
1425
1438
}
1426
1439
async for event in self .relay_manager .get_events (query , single_event = False , only_stored = False ):
1427
1440
try :
1428
1441
content = json .loads (event .content )
1429
1442
tags = {k : v for k , v in event .tags }
1430
1443
except Exception as e :
1444
+ self .logger .debug (f"failed to parse event: { e } " )
1431
1445
continue
1432
1446
if tags .get ('d' ) != f"electrum-swapserver-{ self .NOSTR_EVENT_VERSION } " :
1433
1447
continue
@@ -1436,8 +1450,9 @@ async def receive_offers(self):
1436
1450
# check if this is the most recent event for this pubkey
1437
1451
pubkey = event .pubkey
1438
1452
ts = self ._offers .get (pubkey , {}).get ('timestamp' , 0 )
1439
- if event .created_at <= ts :
1440
- #print('skipping old event', pubkey[0:10], event.id)
1453
+ if (event .created_at <= ts
1454
+ or event .created_at > time .time () + 60 * 60
1455
+ or event .created_at < time .time () - 60 * 60 ):
1441
1456
continue
1442
1457
try :
1443
1458
pow_bits = get_nostr_ann_pow_amount (
@@ -1452,40 +1467,28 @@ async def receive_offers(self):
1452
1467
content ['pow_bits' ] = pow_bits
1453
1468
content ['pubkey' ] = pubkey
1454
1469
content ['timestamp' ] = event .created_at
1470
+ server_relays = content ['relays' ].split (',' ) if 'relays' in content else []
1471
+ content ['relays' ] = ',' .join (server_relays [:10 ]) # limit to 10 relays
1455
1472
self ._offers [pubkey ] = content
1473
+ if self .config .SWAPSERVER_NPUB == pubkey :
1474
+ pairs = self ._parse_offer (content )
1475
+ self .sm .update_pairs (pairs )
1456
1476
# mirror event to other relays
1457
- server_relays = content ['relays' ].split (',' ) if 'relays' in content else []
1458
1477
await self .taskgroup .spawn (self .rebroadcast_event (event , server_relays ))
1459
1478
1460
- async def get_pairs (self ):
1461
- if not self .config .SWAPSERVER_NPUB :
1462
- return
1463
- query = {
1464
- "kinds" : [self .USER_STATUS_NIP38 ],
1465
- "authors" : [self .config .SWAPSERVER_NPUB ],
1466
- "#d" : [f"electrum-swapserver-{ self .NOSTR_EVENT_VERSION } " ],
1467
- "#r" : [f"net:{ constants .net .NET_NAME } " ],
1468
- "since" : int (time .time ()) - self .OFFER_UPDATE_INTERVAL_SEC ,
1469
- "limit" : 1
1470
- }
1471
- async for event in self .relay_manager .get_events (query , single_event = True , only_stored = False ):
1472
- try :
1473
- content = json .loads (event .content )
1474
- tags = {k : v for k , v in event .tags }
1475
- except Exception :
1476
- continue
1477
- if tags .get ('d' ) != f"electrum-swapserver-{ self .NOSTR_EVENT_VERSION } " :
1478
- continue
1479
- if tags .get ('r' ) != f"net:{ constants .net .NET_NAME } " :
1480
- continue
1481
- # check if this is the most recent event for this pubkey
1482
- pubkey = event .pubkey
1483
- content ['pubkey' ] = pubkey
1484
- content ['timestamp' ] = event .created_at
1485
- self .logger .info (f'received offer from { age (event .created_at )} ' )
1486
- pairs = self ._parse_offer (content )
1487
- self .sm .update_pairs (pairs )
1488
- self .server_relays = content ['relays' ].split (',' )
1479
+ async def update_relays (self ):
1480
+ """
1481
+ Update the relays when update_pairs is called.
1482
+ This ensures we try to connect to the same relays as the ones announced by the swap server.
1483
+ """
1484
+ while True :
1485
+ previous_relays = self .config .LAST_SWAPSERVER_RELAYS
1486
+ await self .sm .pairs_updated .wait ()
1487
+ # assign the latest known swapserver relays to the config
1488
+ self .config .LAST_SWAPSERVER_RELAYS = self ._offers [self .config .SWAPSERVER_NPUB ]['relays' ]
1489
+ if self .config .LAST_SWAPSERVER_RELAYS != previous_relays :
1490
+ self .logger .debug (f"updating relays" )
1491
+ await self .relay_manager .update_relays (self .relays )
1489
1492
1490
1493
async def rebroadcast_event (self , event : Event , server_relays : Sequence [str ]):
1491
1494
"""If the relays of the origin server are different from our relays we rebroadcast the
@@ -1528,8 +1531,7 @@ async def handle_request(self, request):
1528
1531
method = request .pop ('method' )
1529
1532
event_id = request .pop ('event_id' )
1530
1533
event_pubkey = request .pop ('event_pubkey' )
1531
- self .logger .info (f'handle_request: id={ event_id } { method } { request } ' )
1532
- relays = request .pop ('relays' ).split (',' )
1534
+ print (f'handle_request: id={ event_id } { method } { request } ' )
1533
1535
if method == 'addswapinvoice' :
1534
1536
r = self .sm .server_add_swap_invoice (request )
1535
1537
elif method == 'createswap' :
@@ -1540,4 +1542,4 @@ async def handle_request(self, request):
1540
1542
raise Exception (method )
1541
1543
r ['reply_to' ] = event_id
1542
1544
self .logger .info (f'sending response id={ event_id } ' )
1543
- await self .send_direct_message (event_pubkey , relays , json .dumps (r ))
1545
+ await self .send_direct_message (event_pubkey , json .dumps (r ))
0 commit comments