mining.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. """ Functionality for mining new blocks. """
  2. import json
  3. import os
  4. import sys
  5. import signal
  6. import time
  7. import select
  8. from threading import Thread, Condition
  9. from typing import Optional, Callable, Tuple, List
  10. from .proof_of_work import ProofOfWork
  11. from .chainbuilder import ChainBuilder
  12. from .block import Block
  13. from . import mining_strategy
  14. __all__ = ['Miner']
  15. signal.signal(signal.SIGCHLD, signal.SIG_IGN)
  16. def exit_on_pipe_close(pipe):
  17. """ Waits until the pipe `pipe` can no longer be used, then kills this process. """
  18. poller = select.poll()
  19. poller.register(pipe, select.POLLERR)
  20. poller.poll()
  21. os._exit(1)
  22. def start_process(func: Callable) -> Tuple[int, int]:
  23. """
  24. Starts a function in a forked process, and writes its result to a pipe.
  25. :param func: The function the background process should run.
  26. :rval: A tuple of the pipe where the result will be written to and the process id of the
  27. forked process.
  28. """
  29. rx, wx = os.pipe()
  30. pid = os.fork()
  31. if pid == 0: # child
  32. try:
  33. # pytest opens some weird file descriptors...
  34. import src
  35. if not src._run_from_test:
  36. os.close(0)
  37. os.closerange(3, wx)
  38. os.closerange(wx + 1, 2**16)
  39. Thread(target=exit_on_pipe_close, args=(wx,), daemon=True).start()
  40. res = func().to_json_compatible()
  41. with os.fdopen(wx, "w") as fp:
  42. json.dump(res, sys.stdout)
  43. json.dump(res, fp)
  44. except Exception:
  45. import traceback
  46. traceback.print_exc()
  47. os._exit(1)
  48. os._exit(0)
  49. else: # parent
  50. os.close(wx)
  51. return rx, pid
  52. def wait_for_result(pipes: List[int], cls: type):
  53. """
  54. Waits for one of the pipes in `pipes` to become ready, reads a JSON object from that pipe
  55. and calls `cls.from_json_compatible()` to build the return value.
  56. All pipes are closed once this function returns.
  57. :param pipes: The list of pipes to wait for.
  58. :param cls: The class (with a `from_json_compatible` method) to return instances of.
  59. :rtype: An instance of `cls`.
  60. """
  61. ready, _, _ = select.select(pipes, [], [])
  62. for p in pipes:
  63. if p != ready[0]:
  64. os.close(p)
  65. with os.fdopen(ready[0], "r") as fp:
  66. return cls.from_json_compatible(json.load(fp))
  67. class Miner:
  68. """
  69. Management of a background thread that mines for new blocks.
  70. :ivar proto: The protocol where newly mined blocks will be sent to.
  71. :vartype proto: Protocol
  72. :ivar chainbuilder: The chain builder used by :any:`start_mining` to find the primary chain.
  73. :vartype chainbuilder: ChainBuilder
  74. :ivar _cur_miner_pipes: Pipes where worker processes will write their results to.
  75. :vartype _cur_miner_pipes: Optional[List[int]]
  76. :ivar _cur_miner_pids: Process ids of our worker processes.
  77. :vartype _cur_miner_pids: List[int]
  78. :ivar reward_pubkey: The public key to which mining fees and block rewards should be sent to.
  79. :vartype reward_pubkey: Signing
  80. """
  81. def __init__(self, proto, reward_pubkey):
  82. self.proto = proto
  83. self.chainbuilder = ChainBuilder(proto)
  84. self.chainbuilder.chain_change_handlers.append(self._chain_changed)
  85. self._cur_miner_pids = []
  86. self._cur_miner_pipes = None
  87. self.reward_pubkey = reward_pubkey
  88. self._stopped = False
  89. self._miner_cond = Condition()
  90. Thread(target=self._miner_thread, daemon=True).start()
  91. def _miner_thread(self):
  92. def wait_for_miner():
  93. with self._miner_cond:
  94. while self._cur_miner_pipes is None:
  95. if self._stopped:
  96. return None, None
  97. self._miner_cond.wait()
  98. pipes = self._cur_miner_pipes
  99. pids = self._cur_miner_pids
  100. self._cur_miner_pipes = None
  101. return pipes, pids
  102. while True:
  103. rxs, pids = wait_for_miner()
  104. if rxs is None:
  105. return
  106. try:
  107. block = wait_for_result(rxs, Block)
  108. self.proto.broadcast_primary_block(block)
  109. except json.JSONDecodeError:
  110. pass
  111. with self._miner_cond:
  112. if self._cur_miner_pids == pids:
  113. for pid in pids:
  114. try:
  115. os.kill(pid, signal.SIGKILL)
  116. except ProcessLookupError:
  117. pass
  118. self._cur_miner_pids = []
  119. def start_mining(self):
  120. """ Start mining on a new block. """
  121. chain = self.chainbuilder.primary_block_chain
  122. transactions = self.chainbuilder.unconfirmed_transactions.values()
  123. block = mining_strategy.create_block(chain, transactions, self.reward_pubkey)
  124. with self._miner_cond:
  125. self._stop_mining_for_now()
  126. self._cur_miner_pipes = []
  127. miner = ProofOfWork(block)
  128. rx, pid = start_process(miner.run)
  129. self._cur_miner_pids.append(pid)
  130. self._cur_miner_pipes.append(rx)
  131. self._miner_cond.notify()
  132. def _chain_changed(self):
  133. if not self._stopped:
  134. self.start_mining()
  135. def _stop_mining_for_now(self):
  136. for pid in self._cur_miner_pids:
  137. try:
  138. os.kill(pid, signal.SIGKILL)
  139. except ProcessLookupError:
  140. pass
  141. self._cur_miner_pids = []
  142. def stop_mining(self):
  143. """ Stop all mining. """
  144. self._stopped = True
  145. with self._miner_cond:
  146. self._stop_mining_for_now()
  147. self._miner_cond.notify()
  148. from .protocol import Protocol
  149. from .chainbuilder import ChainBuilder
  150. from .crypto import Signing