|
@@ -4,11 +4,12 @@ import json
|
|
|
import socket
|
|
import socket
|
|
|
import socketserver
|
|
import socketserver
|
|
|
import logging
|
|
import logging
|
|
|
|
|
+from collections import namedtuple
|
|
|
from threading import Thread, Lock
|
|
from threading import Thread, Lock
|
|
|
from queue import Queue, PriorityQueue
|
|
from queue import Queue, PriorityQueue
|
|
|
from binascii import unhexlify, hexlify
|
|
from binascii import unhexlify, hexlify
|
|
|
from uuid import UUID, uuid4
|
|
from uuid import UUID, uuid4
|
|
|
-from typing import Callable, List
|
|
|
|
|
|
|
+from typing import Callable, List, Optional
|
|
|
|
|
|
|
|
from .block import GENESIS_BLOCK_HASH
|
|
from .block import GENESIS_BLOCK_HASH
|
|
|
|
|
|
|
@@ -51,6 +52,7 @@ class PeerConnection:
|
|
|
|
|
|
|
|
def send_peers(self):
|
|
def send_peers(self):
|
|
|
""" Sends all known peers to this peer. """
|
|
""" Sends all known peers to this peer. """
|
|
|
|
|
+ logging.debug("%s > peer *", self.peer_addr)
|
|
|
for peer in self.proto.peers:
|
|
for peer in self.proto.peers:
|
|
|
if peer.peer_addr is not None:
|
|
if peer.peer_addr is not None:
|
|
|
self.send_msg("peer", list(peer.peer_addr))
|
|
self.send_msg("peer", list(peer.peer_addr))
|
|
@@ -158,7 +160,6 @@ class PeerConnection:
|
|
|
return
|
|
return
|
|
|
buf += tmp
|
|
buf += tmp
|
|
|
length = int(buf)
|
|
length = int(buf)
|
|
|
- logging.debug("expecting json obj of length %d", length)
|
|
|
|
|
buf = bytearray(length)
|
|
buf = bytearray(length)
|
|
|
read = 0
|
|
read = 0
|
|
|
while length > read:
|
|
while length > read:
|
|
@@ -170,7 +171,6 @@ class PeerConnection:
|
|
|
obj = json.loads(buf.decode())
|
|
obj = json.loads(buf.decode())
|
|
|
msg_type = obj['msg_type']
|
|
msg_type = obj['msg_type']
|
|
|
msg_param = obj['msg_param']
|
|
msg_param = obj['msg_param']
|
|
|
- logging.debug("received %s", obj['msg_type'])
|
|
|
|
|
|
|
|
|
|
if msg_type == 'myport':
|
|
if msg_type == 'myport':
|
|
|
addr = self.socket.getpeername()
|
|
addr = self.socket.getpeername()
|
|
@@ -213,6 +213,8 @@ class Protocol:
|
|
|
:vartype peers: List[PeerConnection]
|
|
:vartype peers: List[PeerConnection]
|
|
|
"""
|
|
"""
|
|
|
|
|
|
|
|
|
|
+ _dummy_peer = namedtuple("DummyPeerConnection", ["peer_addr"])("self")
|
|
|
|
|
+
|
|
|
def __init__(self, bootstrap_peers: 'List[tuple]',
|
|
def __init__(self, bootstrap_peers: 'List[tuple]',
|
|
|
primary_block: 'Block', listen_port: int=0, listen_addr: str=""):
|
|
primary_block: 'Block', listen_port: int=0, listen_addr: str=""):
|
|
|
"""
|
|
"""
|
|
@@ -231,6 +233,7 @@ class Protocol:
|
|
|
self._callback_counter = 0
|
|
self._callback_counter = 0
|
|
|
self._callback_counter_lock = Lock()
|
|
self._callback_counter_lock = Lock()
|
|
|
|
|
|
|
|
|
|
+
|
|
|
class IncomingHandler(socketserver.BaseRequestHandler):
|
|
class IncomingHandler(socketserver.BaseRequestHandler):
|
|
|
""" Handler for incoming P2P connections. """
|
|
""" Handler for incoming P2P connections. """
|
|
|
proto = self
|
|
proto = self
|
|
@@ -255,6 +258,7 @@ class Protocol:
|
|
|
|
|
|
|
|
def broadcast_primary_block(self, block: 'Block'):
|
|
def broadcast_primary_block(self, block: 'Block'):
|
|
|
""" Notifies all peers and local listeners of a new primary block. """
|
|
""" Notifies all peers and local listeners of a new primary block. """
|
|
|
|
|
+ logging.debug("* > block %s", hexlify(block.hash))
|
|
|
self._primary_block = block.to_json_compatible()
|
|
self._primary_block = block.to_json_compatible()
|
|
|
for peer in self.peers:
|
|
for peer in self.peers:
|
|
|
peer.send_msg("block", self._primary_block)
|
|
peer.send_msg("block", self._primary_block)
|
|
@@ -262,10 +266,11 @@ class Protocol:
|
|
|
|
|
|
|
|
def broadcast_transaction(self, trans: 'Transaction'):
|
|
def broadcast_transaction(self, trans: 'Transaction'):
|
|
|
""" Notifies all peers and local listeners of a new transaction. """
|
|
""" Notifies all peers and local listeners of a new transaction. """
|
|
|
|
|
+ logging.debug("* > transaction %s", hexlify(trans.get_hash()))
|
|
|
for peer in self.peers:
|
|
for peer in self.peers:
|
|
|
peer.send_msg("transaction", trans.to_json_compatible())
|
|
peer.send_msg("transaction", trans.to_json_compatible())
|
|
|
|
|
|
|
|
- def received(self, msg_type: str, msg_param, peer: PeerConnection, prio: int=1):
|
|
|
|
|
|
|
+ def received(self, msg_type: str, msg_param, peer: Optional[PeerConnection], prio: int=1):
|
|
|
"""
|
|
"""
|
|
|
Called by a PeerConnection when a new message was received.
|
|
Called by a PeerConnection when a new message was received.
|
|
|
|
|
|
|
@@ -275,6 +280,10 @@ class Protocol:
|
|
|
:param prio: The priority of the message. (Should be lower for locally generated events
|
|
:param prio: The priority of the message. (Should be lower for locally generated events
|
|
|
than for remote events, to make sure self-mined blocks get handled first.)
|
|
than for remote events, to make sure self-mined blocks get handled first.)
|
|
|
"""
|
|
"""
|
|
|
|
|
+
|
|
|
|
|
+ if peer is None:
|
|
|
|
|
+ peer = self._dummy_peer
|
|
|
|
|
+
|
|
|
with self._callback_counter_lock:
|
|
with self._callback_counter_lock:
|
|
|
counter = self._callback_counter + 1
|
|
counter = self._callback_counter + 1
|
|
|
self._callback_counter = counter
|
|
self._callback_counter = counter
|
|
@@ -289,7 +298,7 @@ class Protocol:
|
|
|
except:
|
|
except:
|
|
|
logging.exception("unhandled exception in event handler")
|
|
logging.exception("unhandled exception in event handler")
|
|
|
try:
|
|
try:
|
|
|
- if peer is not None:
|
|
|
|
|
|
|
+ if peer is not self._dummy_peer:
|
|
|
peer.close()
|
|
peer.close()
|
|
|
except OSError:
|
|
except OSError:
|
|
|
pass
|
|
pass
|
|
@@ -301,16 +310,18 @@ class Protocol:
|
|
|
|
|
|
|
|
TODO: detect duplicate connections to other peers (needs TLS or something similar)
|
|
TODO: detect duplicate connections to other peers (needs TLS or something similar)
|
|
|
"""
|
|
"""
|
|
|
|
|
+ logging.debug("%s < id %s", sender.peer_addr, uuid)
|
|
|
for peer in self.peers:
|
|
for peer in self.peers:
|
|
|
if peer._sent_uuid == uuid:
|
|
if peer._sent_uuid == uuid:
|
|
|
peer.close()
|
|
peer.close()
|
|
|
sender.close()
|
|
sender.close()
|
|
|
break
|
|
break
|
|
|
|
|
|
|
|
- def received_peer(self, peer_addr: list, _):
|
|
|
|
|
|
|
+ def received_peer(self, peer_addr: list, sender):
|
|
|
""" Information about a peer has been received. """
|
|
""" Information about a peer has been received. """
|
|
|
|
|
|
|
|
peer_addr = tuple(peer_addr)
|
|
peer_addr = tuple(peer_addr)
|
|
|
|
|
+ logging.debug("%s < peer %s", sender.peer_addr, peer_addr)
|
|
|
if len(self.peers) >= MAX_PEERS:
|
|
if len(self.peers) >= MAX_PEERS:
|
|
|
# TODO: maintain list of known, not connected peers
|
|
# TODO: maintain list of known, not connected peers
|
|
|
return
|
|
return
|
|
@@ -322,33 +333,40 @@ class Protocol:
|
|
|
# TODO: if the other peer also just learned of us, we can end up with two connections (one from each direction)
|
|
# TODO: if the other peer also just learned of us, we can end up with two connections (one from each direction)
|
|
|
self.peers.append(PeerConnection(peer_addr, self))
|
|
self.peers.append(PeerConnection(peer_addr, self))
|
|
|
|
|
|
|
|
- def received_myport(self, _, sender: PeerConnection):
|
|
|
|
|
|
|
+ def received_myport(self, port: int, sender: PeerConnection):
|
|
|
|
|
+ logging.debug("%s < myport %s", sender.peer_addr, port)
|
|
|
for peer in self.peers:
|
|
for peer in self.peers:
|
|
|
if peer.is_connected and peer is not sender:
|
|
if peer.is_connected and peer is not sender:
|
|
|
if peer.peer_addr == sender.peer_addr:
|
|
if peer.peer_addr == sender.peer_addr:
|
|
|
sender.close()
|
|
sender.close()
|
|
|
else:
|
|
else:
|
|
|
|
|
+ logging.debug("%s > peer %s", peer.peer_addr, sender.peer_addr)
|
|
|
peer.send_msg("peer", list(sender.peer_addr))
|
|
peer.send_msg("peer", list(sender.peer_addr))
|
|
|
|
|
|
|
|
def received_getblock(self, block_hash: str, peer: PeerConnection):
|
|
def received_getblock(self, block_hash: str, peer: PeerConnection):
|
|
|
""" We received a request for a new block from a certain peer. """
|
|
""" We received a request for a new block from a certain peer. """
|
|
|
|
|
+ logging.debug("%s < getblock %s", peer.peer_addr, block_hash)
|
|
|
for handler in self.block_request_handlers:
|
|
for handler in self.block_request_handlers:
|
|
|
block = handler(unhexlify(block_hash))
|
|
block = handler(unhexlify(block_hash))
|
|
|
if block is not None:
|
|
if block is not None:
|
|
|
peer.send_msg("block", block.to_json_compatible())
|
|
peer.send_msg("block", block.to_json_compatible())
|
|
|
break
|
|
break
|
|
|
|
|
|
|
|
- def received_block(self, block: dict, _):
|
|
|
|
|
|
|
+ def received_block(self, block: dict, sender: PeerConnection):
|
|
|
""" Someone sent us a block. """
|
|
""" Someone sent us a block. """
|
|
|
|
|
+ block = Block.from_json_compatible(block)
|
|
|
|
|
+ logging.debug("%s < block %s", sender.peer_addr, hexlify(block.hash))
|
|
|
for handler in self.block_receive_handlers:
|
|
for handler in self.block_receive_handlers:
|
|
|
- handler(Block.from_json_compatible(block))
|
|
|
|
|
|
|
+ handler(block)
|
|
|
|
|
|
|
|
- def received_transaction(self, transaction: dict, _):
|
|
|
|
|
|
|
+ def received_transaction(self, transaction: dict, sender: PeerConnection):
|
|
|
""" Someone sent us a transaction. """
|
|
""" Someone sent us a transaction. """
|
|
|
|
|
+ transaction = Transaction.from_json_compatible(transaction)
|
|
|
|
|
+ logging.debug("%s < transaction %s", sender.peer_addr, hexlify(transaction.get_hash()))
|
|
|
for handler in self.trans_receive_handlers:
|
|
for handler in self.trans_receive_handlers:
|
|
|
- handler(Transaction.from_json_compatible(transaction))
|
|
|
|
|
|
|
+ handler(transaction)
|
|
|
|
|
|
|
|
- def received_disconnected(self, _, peer):
|
|
|
|
|
|
|
+ def received_disconnected(self, _, peer: PeerConnection):
|
|
|
"""
|
|
"""
|
|
|
Removes a disconnected peer from our list of connected peers.
|
|
Removes a disconnected peer from our list of connected peers.
|
|
|
|
|
|
|
@@ -360,6 +378,7 @@ class Protocol:
|
|
|
|
|
|
|
|
def send_block_request(self, block_hash: bytes):
|
|
def send_block_request(self, block_hash: bytes):
|
|
|
""" Sends a request for a block to all our peers. """
|
|
""" Sends a request for a block to all our peers. """
|
|
|
|
|
+ logging.debug("* > getblock %s", hexlify(block_hash))
|
|
|
for peer in self.peers:
|
|
for peer in self.peers:
|
|
|
peer.send_msg("getblock", hexlify(block_hash).decode())
|
|
peer.send_msg("getblock", hexlify(block_hash).decode())
|
|
|
|
|
|