Просмотр исходного кода

run miner in forked process, to avoid the Python GIL

Malte Kraus 8 лет назад
Родитель
Сommit
490af57fd0
3 измененных файлов с 116 добавлено и 21 удалено
  1. 1 0
      src/__init__.py
  2. 112 21
      src/mining.py
  3. 3 0
      tests/test_proto.py

+ 1 - 0
src/__init__.py

@@ -0,0 +1 @@
+_run_from_test = False

+ 112 - 21
src/mining.py

@@ -1,17 +1,84 @@
 """ Functionality for mining new blocks. """
 
+import json
+import os
+import sys
+import signal
+import time
+import select
 from threading import Thread, Condition
-from time import sleep
-from typing import Optional
+from typing import Optional, Callable, Tuple, List
 
 from .proof_of_work import ProofOfWork
 from .chainbuilder import ChainBuilder
+from .block import Block
 from . import mining_strategy
 
 __all__ = ['Miner']
 
-def _yield():
-    sleep(0)
+
+
+signal.signal(signal.SIGCHLD, signal.SIG_IGN)
+
+def exit_on_pipe_close(pipe):
+    """ Waits until the pipe `pipe` can no longer be used, then kills this process. """
+    poller = select.poll()
+    poller.register(pipe, select.POLLERR)
+    poller.poll()
+    os._exit(1)
+
+
+def start_process(func: Callable) -> Tuple[int, int]:
+    """
+    Starts a function in a forked process, and writes its result to a pipe.
+
+    :param func: The function the background process should run.
+    :rval: A tuple of the pipe where the result will be written to and the process id of the
+           forked process.
+    """
+    rx, wx = os.pipe()
+    pid = os.fork()
+    if pid == 0: # child
+        try:
+            # pytest opens some weird file descriptors...
+            import src
+            if not src._run_from_test:
+                os.close(0)
+                os.closerange(3, wx)
+                os.closerange(wx + 1, 2**16)
+
+            Thread(target=exit_on_pipe_close, args=(wx,), daemon=True).start()
+
+            res = func().to_json_compatible()
+            with os.fdopen(wx, "w") as fp:
+                json.dump(res, sys.stdout)
+                json.dump(res, fp)
+        except Exception:
+            import traceback
+            traceback.print_exc()
+            os._exit(1)
+        os._exit(0)
+    else: # parent
+        os.close(wx)
+        return rx, pid
+
+def wait_for_result(pipes: List[int], cls: type):
+    """
+    Waits for one of the pipes in `pipes` to become ready, reads a JSON object from that pipe
+    and calls `cls.from_json_compatible()` to build the return value.
+    All pipes are closed once this function returns.
+
+    :param pipes: The list of pipes to wait for.
+    :param cls: The class (with a `from_json_compatible` method) to return instances of.
+    :rtype: An instance of `cls`.
+    """
+    ready, _, _ = select.select(pipes, [], [])
+    for p in pipes:
+        if p != ready[0]:
+            os.close(p)
+
+    with os.fdopen(ready[0], "r") as fp:
+        return cls.from_json_compatible(json.load(fp))
 
 class Miner:
     """
@@ -21,8 +88,10 @@ class Miner:
     :vartype proto: Protocol
     :ivar chainbuilder: The chain builder used by :any:`start_mining` to find the primary chain.
     :vartype chainbuilder: ChainBuilder
-    :ivar _cur_miner: The proof of work we're currently working on.
-    :vartype _cur_miner: Optional[ProofOfWork]
+    :ivar _cur_miner_pipes: Pipes where worker processes will write their results to.
+    :vartype _cur_miner_pipes: Optional[List[int]]
+    :ivar _cur_miner_pids: Process ids of our worker processes.
+    :vartype _cur_miner_pids: List[int]
     :ivar reward_pubkey: The public key to which mining fees and block rewards should be sent to.
     :vartype reward_pubkey: Signing
     """
@@ -31,7 +100,8 @@ class Miner:
         self.proto = proto
         self.chainbuilder = ChainBuilder(proto)
         self.chainbuilder.chain_change_handlers.append(self._chain_changed)
-        self._cur_miner = None
+        self._cur_miner_pids = []
+        self._cur_miner_pipes = None
         self.reward_pubkey = reward_pubkey
         self._stopped = False
         self._miner_cond = Condition()
@@ -40,22 +110,34 @@ class Miner:
     def _miner_thread(self):
         def wait_for_miner():
             with self._miner_cond:
-                while self._cur_miner is None:
+                while self._cur_miner_pipes is None:
                     if self._stopped:
-                        return None
+                        return None, None
                     self._miner_cond.wait()
-                return self._cur_miner
+                pipes = self._cur_miner_pipes
+                pids = self._cur_miner_pids
+                self._cur_miner_pipes = None
+                return pipes, pids
 
         while True:
-            miner = wait_for_miner()
-            if miner is None:
+            rxs, pids = wait_for_miner()
+            if rxs is None:
                 return
-            block = miner.run()
-            with self._miner_cond:
-                if self._cur_miner == miner:
-                    self._cur_miner = None
-            if block is not None:
+
+            try:
+                block = wait_for_result(rxs, Block)
                 self.proto.broadcast_primary_block(block)
+            except json.JSONDecodeError:
+                pass
+
+            with self._miner_cond:
+                if self._cur_miner_pids == pids:
+                    for pid in pids:
+                        try:
+                            os.kill(pid, signal.SIGKILL)
+                        except ProcessLookupError:
+                            pass
+                    self._cur_miner_pids = []
 
     def start_mining(self):
         """ Start mining on a new block. """
@@ -64,7 +146,13 @@ class Miner:
         block = mining_strategy.create_block(chain, transactions, self.reward_pubkey)
         with self._miner_cond:
             self._stop_mining_for_now()
-            self._cur_miner = ProofOfWork(block)
+            self._cur_miner_pipes = []
+
+            miner = ProofOfWork(block)
+            rx, pid = start_process(miner.run)
+            self._cur_miner_pids.append(pid)
+            self._cur_miner_pipes.append(rx)
+
             self._miner_cond.notify()
 
     def _chain_changed(self):
@@ -72,15 +160,18 @@ class Miner:
             self.start_mining()
 
     def _stop_mining_for_now(self):
-        if self._cur_miner:
-            self._cur_miner.abort()
+        for pid in self._cur_miner_pids:
+            try:
+                os.kill(pid, signal.SIGKILL)
+            except ProcessLookupError:
+                pass
+        self._cur_miner_pids = []
 
     def stop_mining(self):
         """ Stop all mining. """
         self._stopped = True
         with self._miner_cond:
             self._stop_mining_for_now()
-            self._cur_miner = None
             self._miner_cond.notify()
 
 from .protocol import Protocol

+ 3 - 0
tests/test_proto.py

@@ -2,6 +2,9 @@ from time import sleep
 import logging
 #logging.basicConfig(level=logging.DEBUG)
 
+import src
+src._run_from_test = True
+
 from src.protocol import Protocol
 from src.mining import Miner
 from src.block import GENESIS_BLOCK