Prechádzať zdrojové kódy

use a message queue for simple thread safety

Malte Kraus 8 rokov pred
rodič
commit
817dc83e90
2 zmenil súbory, kde vykonal 37 pridanie a 14 odobranie
  1. 1 1
      src/mining.py
  2. 36 13
      src/protocol.py

+ 1 - 1
src/mining.py

@@ -22,7 +22,7 @@ class Miner:
         while True:
             miner = self.cur_miner
             if miner is None:
-                sleep(1)
+                sleep(0)
             else:
                 block = miner.run()
                 self.cur_miner = None

+ 36 - 13
src/protocol.py

@@ -2,9 +2,9 @@ import json
 from enum import Enum
 import socket
 import socketserver
-from threading import Thread
+from threading import Thread, Lock
 import logging
-from queue import Queue
+from queue import Queue, PriorityQueue
 from binascii import unhexlify, hexlify
 
 from .block import Block
@@ -77,15 +77,19 @@ class PeerConnection:
             except Exception:
                 logging.exception("exception in reader/writer thread")
 
-            while not self.outgoing_msgs.empty():
-                self.outgoing_msgs.get_nowait()
-            self.outgoing_msgs.put(None)
-            self.is_connected = False
-            if self in self.proto.peers:
-                self.proto.peers.remove(self)
-            self.socket.close()
+            self.close()
+
         return wrapper
 
+    def close(self):
+        while not self.outgoing_msgs.empty():
+            self.outgoing_msgs.get_nowait()
+        self.outgoing_msgs.put(None)
+        self.is_connected = False
+        if self in self.proto.peers:
+            self.proto.peers.remove(self)
+        self.socket.close()
+
     def send_msg(self, msg_type, msg_param):
         """
         Sends a message to this peer.
@@ -106,7 +110,6 @@ class PeerConnection:
             if item is None:
                 break
             logging.debug("sending %s", item['msg_type'])
-            #print(repr(item))
             data = json.dumps(item, 4).encode()
             self.socket.sendall(str(len(data)).encode() + b"\n")
             self.socket.sendall(data)
@@ -172,6 +175,9 @@ class Protocol:
         self.block_request_handlers = []
         self._primary_block = primary_block.to_json_compatible()
         self.peers = []
+        self._callback_queue = PriorityQueue()
+        self._callback_counter = 0
+        self._callback_counter_lock = Lock()
 
         class IncomingHandler(socketserver.BaseRequestHandler):
             """ Handler for incoming P2P connections. """
@@ -189,16 +195,33 @@ class Protocol:
         # we want to do this only after we opened our listening socket
         self.peers.append(PeerConnection(bootstrap_peer, self))
 
+        Thread(target=self._main_thread, daemon=True).start()
+
     def broadcast_primary_block(self, block):
         """ Notifies all peers and local listeners of a new primary block. """
         self._primary_block = block.to_json_compatible()
         for peer in self.peers:
             peer.send_msg("block", self._primary_block)
-        self.received_block(self._primary_block, None)
+        self.received('block', self._primary_block, None, 0)
 
-    def received(self, msg_type, msg_param, peer):
+    def received(self, msg_type, msg_param, peer, prio=1):
         """ Called by a PeerConnection when a new message was received. """
-        getattr(self, 'received_' + msg_type)(msg_param, peer)
+        with self._callback_counter_lock:
+            counter = self._callback_counter + 1
+            self._callback_counter = counter
+        self._callback_queue.put((prio, counter, msg_type, msg_param, peer))
+
+    def _main_thread(self):
+        while True:
+            _, _, msg_type, msg_param, peer = self._callback_queue.get()
+            try:
+                getattr(self, 'received_' + msg_type)(msg_param, peer)
+            except Exception:
+                logging.exception("unhandled exception in event handler")
+                try:
+                    peer.close()
+                except OSError:
+                    pass
 
     def received_peer(self, peer_addr, _):
         """ Information about a peer has been received. """