protocol.py 13 KB


  1. """ Implementation of the P2P protocol. """
  2. import json
  3. import socket
  4. import socketserver
  5. import logging
  6. from threading import Thread, Lock
  7. from queue import Queue, PriorityQueue
  8. from binascii import unhexlify, hexlify
  9. from uuid import UUID, uuid4
  10. from typing import Callable, List
  11. __all__ = ['Protocol', 'PeerConnection', 'MAX_PEERS', 'HELLO_MSG']
  12. MAX_PEERS = 10
  13. """ The maximum number of peers that we connect to."""
  14. HELLO_MSG = b"bl0ckch41n"
  15. """ The hello message two peers use to make sure they are speaking the same protocol. """
  16. # TODO: set this centrally
  17. socket.setdefaulttimeout(30)
  18. class PeerConnection:
  19. """
  20. Handles the low-level socket connection to one other peer.
  21. :ivar peer_addr: The self-reported address one can use to connect to this peer.
  22. :ivar param: The self-reported address one can use to connect to this peer.
  23. :ivar _sock_addr: The address our socket is or will be connected to.
  24. :ivar socket: The socket object we use to communicate with our peer.
  25. :param sock: A socket object we should use to communicate with our peer.
  26. :ivar proto: The Protocol instance this peer connection belongs to.
  27. :ivar is_connected: A boolean indicating the current connection status.
  28. :ivar outgoing_msgs: A queue of messages we want to send to this peer.
  29. """
  30. def __init__(self, peer_addr: tuple, proto: 'Protocol', sock: socket.socket=None):
  31. self.peer_addr = None
  32. self._sock_addr = peer_addr
  33. self.socket = sock
  34. self.proto = proto
  35. self.is_connected = False
  36. self._sent_uuid = str(uuid4())
  37. self.outgoing_msgs = Queue()
  38. Thread(target=self.run, daemon=True).start()
  39. def send_peers(self):
  40. """ Sends all known peers to this peer. """
  41. for peer in self.proto.peers:
  42. if peer.peer_addr is not None:
  43. self.send_msg("peer", list(peer.peer_addr))
  44. def run(self):
  45. """
  46. Creates a connection, handles the handshake, then hands off to the reader and writer threads.
  47. Does not return until the writer thread does.
  48. """
  49. if self.socket is None:
  50. logging.info("connecting to peer %s", repr(self._sock_addr))
  51. self.socket = socket.create_connection(self._sock_addr)
  52. self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
  53. self.socket.sendall(HELLO_MSG)
  54. if self.socket.recv(len(HELLO_MSG)) != HELLO_MSG:
  55. return
  56. self.is_connected = True
  57. self.send_msg("myport", self.proto.server.server_address[1])
  58. self.send_msg("block", self.proto._primary_block)
  59. self.send_msg("id", self._sent_uuid)
  60. self.send_peers()
  61. Thread(target=self.reader_thread, daemon=True).start()
  62. self.writer_thread()
  63. def close_on_error(fn: Callable):
  64. """ A decorator that closes both threads if one dies. """
  65. def wrapper(self, *args, **kwargs):
  66. try:
  67. fn(self, *args, **kwargs)
  68. except Exception:
  69. logging.exception("exception in reader/writer thread")
  70. self.close()
  71. return wrapper
  72. def close(self):
  73. """ Closes the connection to this peer. """
  74. # TODO: use locks to avoid the race conditions here
  75. if not self.is_connected:
  76. return
  77. logging.info("closing connection to peer %s", self._sock_addr)
  78. while not self.outgoing_msgs.empty():
  79. self.outgoing_msgs.get_nowait()
  80. self.outgoing_msgs.put(None)
  81. self.is_connected = False
  82. if self in self.proto.peers:
  83. self.proto.peers.remove(self)
  84. self.socket.close()
  85. def send_msg(self, msg_type: str, msg_param):
  86. """
  87. Sends a message to this peer.
  88. :msg_type: The type of message.
  89. :msg_param: the JSON-compatible parameter of this message
  90. """
  91. if not self.is_connected:
  92. return
  93. self.outgoing_msgs.put({'msg_type': msg_type, 'msg_param': msg_param})
  94. @close_on_error
  95. def writer_thread(self):
  96. """ The writer thread takes messages from our message queue and sends them to the peer. """
  97. while True:
  98. item = self.outgoing_msgs.get()
  99. if item is None:
  100. break
  101. data = json.dumps(item, indent=4).encode()
  102. self.socket.sendall(str(len(data)).encode() + b"\n" + data)
  103. self.outgoing_msgs.task_done()
  104. @close_on_error
  105. def reader_thread(self):
  106. """
  107. The reader thread reads messages from the socket and passes them to the protocol to handle.
  108. """
  109. while True:
  110. buf = b""
  111. while not buf or buf[-1] != ord('\n'):
  112. try:
  113. tmp = self.socket.recv(1)
  114. except socket.timeout as e:
  115. if buf:
  116. raise e
  117. continue
  118. if not tmp:
  119. return
  120. buf += tmp
  121. length = int(buf)
  122. logging.debug("expecting json obj of length %d", length)
  123. buf = bytearray(length)
  124. read = 0
  125. while length > read:
  126. tmp = self.socket.recv_into(memoryview(buf)[read:])
  127. if not tmp:
  128. return
  129. read += tmp
  130. obj = json.loads(buf.decode())
  131. msg_type = obj['msg_type']
  132. msg_param = obj['msg_param']
  133. logging.debug("received %s", obj['msg_type'])
  134. if msg_type == 'myport':
  135. addr = self.socket.getpeername()
  136. self.peer_addr = (addr[0],) + (int(msg_param),) + addr[2:]
  137. self.proto.received(msg_type, msg_param, self)
  138. class SocketServer(socketserver.TCPServer):
  139. """
  140. A TCP socketserver that calls does not close the connections on its own.
  141. """
  142. allow_reuse_address = True
  143. """ Make sure the server can be restarted without delays. """
  144. def serve_forever_bg(self):
  145. """ Runs the server forever in a background thread. """
  146. logging.info("listening on %s", self.server_address)
  147. Thread(target=self.serve_forever, daemon=True).start()
  148. def close_request(self, request):
  149. pass
  150. def shutdown_request(self, request):
  151. pass
  152. class Protocol:
  153. """
  154. Manages connections to our peers. Allows sending messages to them and has event handlers
  155. for handling messages from other peers.
  156. :ivar block_receive_handlers: Event handlers that get called when a new block is received.
  157. :vartype block_receive_handlers: List[Callable]
  158. :ivar trans_receive_handlers: Event handlers that get called when a new transaction is received.
  159. :vartype trans_receive_handlers: List[Callable]
  160. :ivar block_request_handlers: Event handlers that get called when a block request is received.
  161. :vartype block_request_handlers: List[Callable]
  162. :ivar peers: The peers we are connected to.
  163. :vartype peers: List[PeerConnection]
  164. """
  165. def __init__(self, bootstrap_peers: 'List[tuple]',
  166. primary_block: 'Block', listen_port: int=0, listen_addr: str=""):
  167. """
  168. :param bootstrap_peers: network addresses of peers where we bootstrap the P2P network from
  169. :param primary_block: the head of the primary block chain
  170. :param listen_port: the port where other peers should be able to reach us
  171. :param listen_addr: the address where other peers should be able to reach us
  172. """
  173. self.block_receive_handlers = []
  174. self.trans_receive_handlers = []
  175. self.block_request_handlers = []
  176. self._primary_block = primary_block.to_json_compatible()
  177. self.peers = []
  178. self._callback_queue = PriorityQueue()
  179. self._callback_counter = 0
  180. self._callback_counter_lock = Lock()
  181. class IncomingHandler(socketserver.BaseRequestHandler):
  182. """ Handler for incoming P2P connections. """
  183. proto = self
  184. def handle(self):
  185. logging.info("connection from peer %s", repr(self.client_address))
  186. if len(self.proto.peers) > MAX_PEERS:
  187. logging.warning("too many connections: rejecting peer %s",
  188. repr(self.client_address))
  189. self.request.close()
  190. # TODO: separate limits for incoming and outgoing connections
  191. return
  192. conn = PeerConnection(self.client_address, self.proto, self.request)
  193. self.proto.peers.append(conn)
  194. self.server = SocketServer((listen_addr, listen_port), IncomingHandler)
  195. self.server.serve_forever_bg()
  196. # we want to do this only after we opened our listening socket
  197. self.peers.extend([PeerConnection(peer, self) for peer in bootstrap_peers])
  198. Thread(target=self._main_thread, daemon=True).start()
  199. def broadcast_primary_block(self, block: 'Block'):
  200. """ Notifies all peers and local listeners of a new primary block. """
  201. self._primary_block = block.to_json_compatible()
  202. for peer in self.peers:
  203. peer.send_msg("block", self._primary_block)
  204. self.received('block', self._primary_block, None, 0)
  205. def broadcast_transaction(self, trans: 'Transaction'):
  206. """ Notifies all peers and local listeners of a new transaction. """
  207. for peer in self.peers:
  208. peer.send_msg("transaction", trans.to_json_compatible())
  209. def received(self, msg_type: str, msg_param, peer: PeerConnection, prio: int=1):
  210. """
  211. Called by a PeerConnection when a new message was received.
  212. :param msg_type: The message type identifier.
  213. :param msg_param: The JSON-compatible object that was received.
  214. :param peer: The peer who sent us the message.
  215. :param prio: The priority of the message. (Should be lower for locally generated events
  216. than for remote events, to make sure self-mined blocks get handled first.)
  217. """
  218. with self._callback_counter_lock:
  219. counter = self._callback_counter + 1
  220. self._callback_counter = counter
  221. self._callback_queue.put((prio, counter, msg_type, msg_param, peer))
  222. def _main_thread(self):
  223. """ The main loop of the one thread where all incoming events are handled. """
  224. while True:
  225. _, _, msg_type, msg_param, peer = self._callback_queue.get()
  226. try:
  227. getattr(self, 'received_' + msg_type)(msg_param, peer)
  228. except:
  229. logging.exception("unhandled exception in event handler")
  230. try:
  231. if peer is not None:
  232. peer.close()
  233. except OSError:
  234. pass
  235. def received_id(self, uuid: str, sender: PeerConnection):
  236. """
  237. A unique connection id was received. We use this to detect and close connections to
  238. ourselves.
  239. TODO: detect duplicate connections to other peers (needs TLS or something similar)
  240. """
  241. for peer in self.peers:
  242. if peer._sent_uuid == uuid:
  243. peer.close()
  244. sender.close()
  245. break
  246. def received_peer(self, peer_addr: list, _):
  247. """ Information about a peer has been received. """
  248. peer_addr = tuple(peer_addr)
  249. if len(self.peers) >= MAX_PEERS:
  250. # TODO: maintain list of known, not connected peers
  251. return
  252. for peer in self.peers:
  253. if peer.peer_addr == peer_addr:
  254. return
  255. # TODO: if the other peer also just learned of us, we can end up with two connections (one from each direction)
  256. self.peers.append(PeerConnection(peer_addr, self))
  257. def received_myport(self, _, sender: PeerConnection):
  258. for peer in self.peers:
  259. if peer.is_connected and peer is not sender:
  260. peer.send_msg("peer", list(sender.peer_addr))
  261. def received_getblock(self, block_hash: str, peer: PeerConnection):
  262. """ We received a request for a new block from a certain peer. """
  263. for handler in self.block_request_handlers:
  264. block = handler(unhexlify(block_hash))
  265. if block is not None:
  266. peer.send_msg("block", block.to_json_compatible())
  267. break
  268. def received_block(self, block: dict, _):
  269. """ Someone sent us a block. """
  270. for handler in self.block_receive_handlers:
  271. handler(Block.from_json_compatible(block))
  272. def received_transaction(self, transaction: dict, _):
  273. """ Someone sent us a transaction. """
  274. for handler in self.trans_receive_handlers:
  275. handler(Transaction.from_json_compatible(transaction))
  276. def send_block_request(self, block_hash: bytes):
  277. """ Sends a request for a block to all our peers. """
  278. for peer in self.peers:
  279. peer.send_msg("getblock", hexlify(block_hash).decode())
  280. from .block import Block
  281. from .transaction import Transaction