persistence.py 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. """ Functionality for storing and retrieving the miner state on disk. """
  2. import json
  3. import os
  4. import os.path
  5. import tempfile
  6. import gzip
  7. from io import TextIOWrapper
  8. from threading import Condition, Thread
  9. class Persistence:
  10. """
  11. Functionality for storing and retrieving the miner state on disk.
  12. :param path: The path to the storage location. """#TODO
  13. """:param chainbuilder: The chainbuilder to persist.
  14. """
  15. def __init__(self, path: str, chainbuilder: 'ChainBuilder'):
  16. self.chainbuilder = chainbuilder
  17. self.proto = chainbuilder.protocol
  18. self.path = path
  19. self._store_cond = Condition()
  20. self._store_data = None
  21. chainbuilder.chain_change_handlers.append(self.store)
  22. self._loading = False
  23. Thread(target=self._store_thread, daemon=True).start()
  24. def load(self):
  25. """ Loads data from disk. """
  26. self._loading = True
  27. try:
  28. with gzip.open(self.path, "r") as f:
  29. obj = json.load(TextIOWrapper(f))
  30. for block in obj['blocks']:
  31. self.proto.received("block", block, None, 2)
  32. for trans in obj['transactions']:
  33. self.proto.received("transaction", trans, None, 2)
  34. for peer in obj["peers"]:
  35. self.proto.received("peer", peer, None, 2)
  36. finally:
  37. self._loading = False
  38. def store(self):
  39. """
  40. Asynchronously stores current data to disk.
  41. Used as an event handler in the chainbuilder.
  42. """
  43. if self._loading:
  44. return
  45. obj = {
  46. "blocks": [b.to_json_compatible() for b in self.chainbuilder.primary_block_chain.blocks[::-1]],
  47. "transactions": [t.to_json_compatible() for t in self.chainbuilder.unconfirmed_transactions.values()],
  48. "peers": [list(peer.peer_addr) for peer in self.proto.peers if peer.is_connected and peer.peer_addr is not None],
  49. }
  50. with self._store_cond:
  51. self._store_data = obj
  52. self._store_cond.notify()
  53. def _store_thread(self):
  54. while True:
  55. with self._store_cond:
  56. while self._store_data is None:
  57. self._store_cond.wait()
  58. obj = self._store_data
  59. self._store_data = None
  60. with tempfile.NamedTemporaryFile(dir=os.path.dirname(self.path), mode="wb", delete=False) as tmpf:
  61. try:
  62. with TextIOWrapper(gzip.open(tmpf, mode="w")) as f:
  63. json.dump(obj, f, indent=4)
  64. tmpf.close()
  65. os.rename(tmpf.name, self.path)
  66. except Exception as e:
  67. os.unlink(tmpf.name)
  68. raise e
  69. from .chainbuilder import ChainBuilder