protocol.py 10 KB


  1. import json
  2. from enum import Enum
  3. import socket
  4. import socketserver
  5. from threading import Thread, Lock
  6. import logging
  7. from queue import Queue, PriorityQueue
  8. from binascii import unhexlify, hexlify
  9. from .block import Block
  10. from .transaction import Transaction
  11. __all__ = ['Protocol', 'PeerConnection']
  12. MAX_PEERS = 10
  13. HELLO_MSG = b"bl0ckch41n"
  14. logging.basicConfig(level=logging.INFO)
  15. socket.setdefaulttimeout(30)
  16. class PeerConnection:
  17. """
  18. Handles the low-level socket connection to one other peer.
  19. :ivar peer_addr: The self-reported address one can use to connect to this peer.
  20. :ivar _sock_addr: The address our socket is or will be connected to.
  21. :ivar socket: The socket object we use to communicate with our peer.
  22. :ivar proto: The Protocol instance this peer connection belongs to.
  23. :ivar is_connected: A boolean indicating the current connection status.
  24. :ivar outgoing_msgs: A queue of messages we want to send to this peer.
  25. """
  26. def __init__(self, peer_addr, proto, socket=None):
  27. self.peer_addr = None
  28. self._sock_addr = peer_addr
  29. self.socket = socket
  30. self.proto = proto
  31. self.is_connected = False
  32. self.outgoing_msgs = Queue()
  33. Thread(target=self.run, daemon=True).start()
  34. def send_peers(self):
  35. """ Sends all known peers to this peer. """
  36. for peer in self.proto.peers:
  37. if peer.peer_addr is not None:
  38. self.send_msg("peer", list(peer.peer_addr))
  39. def run(self):
  40. """
  41. Creates a connection, handles the handshake, then hands off to the reader and writer threads.
  42. Does not return until the writer thread does.
  43. """
  44. if self.socket is None:
  45. logging.info("connecting to peer %s", repr(self._sock_addr))
  46. self.socket = socket.create_connection(self._sock_addr)
  47. self.socket.sendall(HELLO_MSG)
  48. if self.socket.recv(len(HELLO_MSG)) != HELLO_MSG:
  49. return
  50. self.is_connected = True
  51. self.send_msg("myport", self.proto.server.server_address[1])
  52. self.send_msg("block", self.proto._primary_block)
  53. self.send_peers()
  54. # TODO: broadcast this new peer to our current peers, under certain circumstances
  55. Thread(target=self.reader_thread, daemon=True).start()
  56. self.writer_thread()
  57. def close_on_error(fn):
  58. """ A decorator that closes both threads if one dies. """
  59. def wrapper(self, *args, **kwargs):
  60. try:
  61. fn(self, *args, **kwargs)
  62. except Exception:
  63. logging.exception("exception in reader/writer thread")
  64. self.close()
  65. return wrapper
  66. def close(self):
  67. if not self.is_connected:
  68. return
  69. logging.info("closing connection to peer %s", self._sock_addr)
  70. while not self.outgoing_msgs.empty():
  71. self.outgoing_msgs.get_nowait()
  72. self.outgoing_msgs.put(None)
  73. self.is_connected = False
  74. if self in self.proto.peers:
  75. self.proto.peers.remove(self)
  76. self.socket.close()
  77. def send_msg(self, msg_type, msg_param):
  78. """
  79. Sends a message to this peer.
  80. :msg_type: The type of message.
  81. :msg_param: the JSON-compatible parameter of this message
  82. """
  83. if not self.is_connected:
  84. return
  85. self.outgoing_msgs.put({'msg_type': msg_type, 'msg_param': msg_param})
  86. @close_on_error
  87. def writer_thread(self):
  88. """ The writer thread takes messages from our message queue and sends them to the peer. """
  89. while True:
  90. item = self.outgoing_msgs.get()
  91. if item is None:
  92. break
  93. logging.debug("sending %s", item['msg_type'])
  94. data = json.dumps(item, 4).encode()
  95. self.socket.sendall(str(len(data)).encode() + b"\n")
  96. self.socket.sendall(data)
  97. self.outgoing_msgs.task_done()
  98. @close_on_error
  99. def reader_thread(self):
  100. """ The reader thread reads messages from the socket and passes them to the protocol to handle. """
  101. while True:
  102. buf = b""
  103. while not buf or buf[-1] != ord('\n'):
  104. try:
  105. tmp = self.socket.recv(1)
  106. except socket.timeout as e:
  107. if buf:
  108. raise e
  109. continue
  110. if not tmp:
  111. return
  112. buf += tmp
  113. length = int(buf)
  114. logging.debug("expecting json obj of length %d", length)
  115. buf = bytearray(length)
  116. read = 0
  117. while length > read:
  118. tmp = self.socket.recv_into(memoryview(buf)[read:])
  119. if not tmp:
  120. return
  121. read += tmp
  122. obj = json.loads(buf.decode())
  123. msg_type = obj['msg_type']
  124. msg_param = obj['msg_param']
  125. logging.debug("received %s", obj['msg_type'])
  126. if msg_type == 'myport':
  127. self.peer_addr = (self._sock_addr[0],) + (int(msg_param),) + self._sock_addr[2:]
  128. else:
  129. self.proto.received(msg_type, msg_param, self)
  130. class SocketServer(socketserver.TCPServer):
  131. allow_reuse_address = True
  132. def serve_forever_bg(self):
  133. Thread(target=self.serve_forever, daemon=True).start()
  134. def close_request(self, request):
  135. pass
  136. def shutdown_request(self, request):
  137. pass
  138. class Protocol:
  139. """
  140. Manages connections to our peers. Allows sending messages to them and has event handlers
  141. for handling messages from other peers.
  142. """
  143. def __init__(self, bootstrap_peers, primary_block, listen_port=0, listen_addr=""):
  144. """
  145. :param bootstrap_peers: network addresses of peers where we bootstrap the P2P network from
  146. :param primary_block: the head of the primary block chain
  147. :param listen_port: the port where other peers should be able to reach us
  148. :param listen_addr: the address where other peers should be able to reach us
  149. """
  150. self.block_receive_handlers = []
  151. self.trans_receive_handlers = []
  152. self.block_request_handlers = []
  153. self._primary_block = primary_block.to_json_compatible()
  154. self.peers = []
  155. self._callback_queue = PriorityQueue()
  156. self._callback_counter = 0
  157. self._callback_counter_lock = Lock()
  158. class IncomingHandler(socketserver.BaseRequestHandler):
  159. """ Handler for incoming P2P connections. """
  160. proto = self
  161. def handle(self):
  162. logging.info("connection from peer %s", repr(self.client_address))
  163. if len(self.proto.peers) > MAX_PEERS:
  164. logging.warn("too many connections: rejecting peer %s", repr(self.client_address))
  165. self.request.close()
  166. # TODO: separate limits for incoming and outgoing connections
  167. return
  168. conn = PeerConnection(self.client_address, self.proto, self.request)
  169. self.proto.peers.append(conn)
  170. self.server = SocketServer((listen_addr, listen_port), IncomingHandler)
  171. self.server.serve_forever_bg()
  172. # we want to do this only after we opened our listening socket
  173. self.peers.extend([PeerConnection(peer, self) for peer in bootstrap_peers])
  174. Thread(target=self._main_thread, daemon=True).start()
  175. def broadcast_primary_block(self, block: Block):
  176. """ Notifies all peers and local listeners of a new primary block. """
  177. self._primary_block = block.to_json_compatible()
  178. for peer in self.peers:
  179. peer.send_msg("block", self._primary_block)
  180. self.received('block', self._primary_block, None, 0)
  181. def broadcast_transaction(self, trans: Transaction):
  182. """ Notifies all peers and local listeners of a new transaction. """
  183. for peer in self.peers:
  184. peer.send_msg("transaction", trans.to_json_compatible())
  185. def received(self, msg_type, msg_param, peer, prio=1):
  186. """ Called by a PeerConnection when a new message was received. """
  187. with self._callback_counter_lock:
  188. counter = self._callback_counter + 1
  189. self._callback_counter = counter
  190. self._callback_queue.put((prio, counter, msg_type, msg_param, peer))
  191. def _main_thread(self):
  192. while True:
  193. _, _, msg_type, msg_param, peer = self._callback_queue.get()
  194. try:
  195. getattr(self, 'received_' + msg_type)(msg_param, peer)
  196. except Exception:
  197. logging.exception("unhandled exception in event handler")
  198. try:
  199. peer.close()
  200. except OSError:
  201. pass
  202. def received_peer(self, peer_addr, _):
  203. """ Information about a peer has been received. """
  204. peer_addr = tuple(peer_addr)
  205. if len(self.peers) >= MAX_PEERS:
  206. return
  207. for peer in self.peers:
  208. if peer.peer_addr == peer_addr:
  209. return
  210. # TODO: if the other peer also just learned of us, we can end up with two connections (one from each direction)
  211. self.peers.append(PeerConnection(peer_addr, self))
  212. def received_getblock(self, block_hash, peer):
  213. """ We received a request for a new block from a certain peer. """
  214. for handler in self.block_request_handlers:
  215. block = handler(unhexlify(block_hash))
  216. if block is not None:
  217. peer.send_msg("block", block.to_json_compatible())
  218. break
  219. def received_block(self, block, _):
  220. """ Someone sent us a block. """
  221. for handler in self.block_receive_handlers:
  222. handler(Block.from_json_compatible(block))
  223. def received_transaction(self, transaction, _):
  224. """ Someone sent us a transaction. """
  225. for handler in self.trans_receive_handlers:
  226. handler(Transaction.from_json_compatible(transaction))
  227. def send_block_request(self, block_hash: bytes):
  228. """ Sends a request for a block to all our peers. """
  229. for peer in self.peers:
  230. peer.send_msg("getblock", hexlify(block_hash).decode())