mining.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. """ Functionality for mining new blocks. """
  2. import json
  3. import os
  4. import sys
  5. import signal
  6. import select
  7. from threading import Thread, Condition
  8. from typing import Callable, Tuple, List
  9. from .proof_of_work import ProofOfWork
  10. from .chainbuilder import ChainBuilder
  11. from .block import Block
  12. from . import mining_strategy
  13. __all__ = ['Miner']
  14. signal.signal(signal.SIGCHLD, signal.SIG_IGN)
  15. def exit_on_pipe_close(pipe):
  16. """ Waits until the pipe `pipe` can no longer be used, then kills this process. """
  17. poller = select.poll()
  18. poller.register(pipe, select.POLLERR)
  19. poller.poll()
  20. os._exit(1)
  21. def start_process(func: Callable) -> Tuple[int, int]:
  22. """
  23. Starts a function in a forked process, and writes its result to a pipe.
  24. :param func: The function the background process should run.
  25. :rval: A tuple of the pipe where the result will be written to and the process id of the
  26. forked process.
  27. """
  28. rx, wx = os.pipe()
  29. pid = os.fork()
  30. if pid == 0: # child
  31. try:
  32. os.close(0)
  33. os.closerange(3, wx)
  34. os.closerange(wx + 1, 2 ** 16)
  35. Thread(target=exit_on_pipe_close, args=(wx,), daemon=True).start()
  36. res = func().to_json_compatible()
  37. with os.fdopen(wx, "w") as fp:
  38. json.dump(res, sys.stdout)
  39. json.dump(res, fp)
  40. except Exception:
  41. import traceback
  42. traceback.print_exc()
  43. os._exit(1)
  44. os._exit(0)
  45. else: # parent
  46. os.close(wx)
  47. return rx, pid
  48. def wait_for_result(pipes: List[int], cls: type):
  49. """
  50. Waits for one of the pipes in `pipes` to become ready, reads a JSON object from that pipe
  51. and calls `cls.from_json_compatible()` to build the return value.
  52. All pipes are closed once this function returns.
  53. :param pipes: The list of pipes to wait for.
  54. :param cls: The class (with a `from_json_compatible` method) to return instances of.
  55. :rtype: An instance of `cls`.
  56. """
  57. ready, _, _ = select.select(pipes, [], [])
  58. for p in pipes:
  59. if p != ready[0]:
  60. os.close(p)
  61. with os.fdopen(ready[0], "r") as fp:
  62. return cls.from_json_compatible(json.load(fp))
  63. class Miner:
  64. """
  65. Management of a background process that mines for new blocks.
  66. The miner process is forked for each new proof of work that needs to be performed. The
  67. completed block is sent back JSON-serialized through a pipe that is opened for that purpose.
  68. When that pipe is closed by the parent process prematurely, the proof of work process knows it
  69. is no longer needed and exits.
  70. To start the mining process, `start_mining` needs to be called once. After that, the mining
  71. will happen automatically, with the mined block switching every time the chainbuilder finds a
  72. new primary block chain.
  73. To stop the mining process, there is the `shutdown` method. Once stopped, mining cannot be
  74. resumed (except by creating a new `Miner`).
  75. :ivar proto: The protocol where newly mined blocks will be sent to.
  76. :vartype proto: Protocol
  77. :ivar chainbuilder: The chain builder used by :any:`start_mining` to find the primary chain.
  78. :vartype chainbuilder: ChainBuilder
  79. :ivar _cur_miner_pipes: Pipes where worker processes will write their results to.
  80. :vartype _cur_miner_pipes: Optional[List[int]]
  81. :ivar _cur_miner_pids: Process ids of our worker processes.
  82. :vartype _cur_miner_pids: List[int]
  83. :ivar reward_pubkey: The public key to which mining fees and block rewards should be sent to.
  84. :vartype reward_pubkey: Key
  85. """
  86. def __init__(self, proto, reward_pubkey):
  87. self.proto = proto
  88. self.chainbuilder = ChainBuilder(proto)
  89. self.chainbuilder.chain_change_handlers.append(self._chain_changed)
  90. self._cur_miner_pids = []
  91. self._cur_miner_pipes = None
  92. self.reward_pubkey = reward_pubkey
  93. self._stopped = False
  94. self._started = False
  95. self._miner_cond = Condition()
  96. def _miner_thread(self):
  97. def wait_for_miner():
  98. with self._miner_cond:
  99. while self._cur_miner_pipes is None:
  100. if self._stopped:
  101. return None, None
  102. self._miner_cond.wait()
  103. pipes = self._cur_miner_pipes
  104. pids = self._cur_miner_pids
  105. self._cur_miner_pipes = None
  106. return pipes, pids
  107. while True:
  108. rxs, pids = wait_for_miner()
  109. if rxs is None:
  110. return
  111. try:
  112. block = wait_for_result(rxs, Block)
  113. self.proto.broadcast_primary_block(block)
  114. except json.JSONDecodeError:
  115. pass
  116. with self._miner_cond:
  117. if self._cur_miner_pids == pids:
  118. for pid in pids:
  119. try:
  120. os.kill(pid, signal.SIGKILL)
  121. except ProcessLookupError:
  122. pass
  123. self._cur_miner_pids = []
  124. def start_mining(self):
  125. """ Start mining on a new block. """
  126. with self._miner_cond:
  127. if not self._started:
  128. Thread(target=self._miner_thread, daemon=True).start()
  129. self._started = True
  130. # TODO: accessing the chainbuilder is problematic if start_mining was not called from the protocol's main thread
  131. chain = self.chainbuilder.primary_block_chain
  132. transactions = self.chainbuilder.unconfirmed_transactions.values()
  133. block = mining_strategy.create_block(chain, transactions, self.reward_pubkey)
  134. self._stop_mining_for_now()
  135. self._cur_miner_pipes = []
  136. miner = ProofOfWork(block)
  137. rx, pid = start_process(miner.run)
  138. self._cur_miner_pids.append(pid)
  139. self._cur_miner_pipes.append(rx)
  140. self._miner_cond.notify()
  141. def _chain_changed(self):
  142. if not self._stopped and self._started:
  143. self.start_mining()
  144. def _stop_mining_for_now(self):
  145. for pid in self._cur_miner_pids:
  146. try:
  147. os.kill(pid, signal.SIGKILL)
  148. except ProcessLookupError:
  149. pass
  150. self._cur_miner_pids = []
  151. def shutdown(self):
  152. """ Stop all mining. """
  153. self._stopped = True
  154. with self._miner_cond:
  155. self._stop_mining_for_now()
  156. self._miner_cond.notify()
  157. self.chainbuilder.chain_change_handlers.remove(self._chain_changed)