فهرست منبع

retry block downloads when no response for some time

Malte Kraus 8 سال پیش
والد
کامیت
80c18aee99
1فایلهای تغییر یافته به همراه95 افزوده شده و 28 حذف شده
  1. 95 28
      src/chainbuilder.py

+ 95 - 28
src/chainbuilder.py

@@ -6,18 +6,63 @@ candidate for an even longer chain that it attempts to download and verify.
 import threading
 import threading
 import logging
 import logging
 from typing import List, Dict, Callable, Optional
 from typing import List, Dict, Callable, Optional
-from datetime import datetime
+from datetime import datetime, timedelta
 
 
 from .block import GENESIS_BLOCK, GENESIS_BLOCK_HASH, Block
 from .block import GENESIS_BLOCK, GENESIS_BLOCK_HASH, Block
 from .blockchain import Blockchain
 from .blockchain import Blockchain
 
 
 __all__ = ['ChainBuilder']
 __all__ = ['ChainBuilder']
 
 
-class PartialChain:
+class BlockRequest:
+    """
+    Stores information about a pending block request and the partial chains that depend on it.
+
+    :ivar partial_chains: The partial chains that wait for this request. These partial chains are
+                          ordered by age, youngest first.
+    :vartype partial_chains: List[List[Block]]
+    :ivar _last_update: The time and date of the last block request to our peers.
+    :vartype _last_update: datetime
+    :ivar _request_count: The number of requests to our peers we have sent.
+    :vartype _request_count: int
+    """
+
+    BLOCK_REQUEST_RETRY_INTERVAL = timedelta(minutes=1)
+    """ The approximate interval after which a block request will be retried. """
+    BLOCK_REQUEST_RETRY_COUNT = 3
+    """ The number of failed requests of a block until we give up and delete the depending partial chains. """
+
+
     def __init__(self):
     def __init__(self):
-        self.blocks = []
-        self.last_update = datetime.utcnow()
-        # TODO: delete partial chains after some time
+        self.partial_chains = [[]]
+        self.clear()
+
+    def clear(self):
+        """ Clears the download count and last update time of this request. """
+        self._last_update = datetime(1970, 1, 1)
+        self._request_count = 0
+
+    def send_request(self, protocol: 'Protocol'):
+        """ Sends a request for the next required block to the given `protocol`. """
+        self._request_count += 1
+        self._last_update = datetime.utcnow()
+        protocol.send_block_request(self.partial_chains[0][-1].prev_block_hash)
+        logging.debug("asking for another block %d (attempt %d)", max(len(r) for r in self.partial_chains), self._request_count)
+
+    def timeout_reached(self) -> bool:
+        """ Returns a bool indicating whether all attempts to download this block have failed. """
+        return self._request_count > self.BLOCK_REQUEST_RETRY_COUNT
+
+    def checked_retry(self, protocol: 'Protocol'):
+        """
+        Retries sending this request, if no response was received for a certain time or if no
+        request was sent yet.
+        """
+
+        if self._last_update + self.BLOCK_REQUEST_RETRY_INTERVAL < datetime.utcnow():
+            if self._request_count >= self.BLOCK_REQUEST_RETRY_COUNT:
+                self._request_count += 1
+            else:
+                self.send_request(protocol)
 
 
 class ChainBuilder:
 class ChainBuilder:
     """
     """
@@ -27,7 +72,7 @@ class ChainBuilder:
     :ivar primary_block_chain: The longest fully validated block chain we know of.
     :ivar primary_block_chain: The longest fully validated block chain we know of.
     :vartype primary_block_chain: Blockchain
     :vartype primary_block_chain: Blockchain
     :ivar _block_requests: A dict from block hashes to lists of partial chains waiting for that block.
     :ivar _block_requests: A dict from block hashes to lists of partial chains waiting for that block.
-    :vartype _block_requests: Dict[bytes, List[PartialChain]]
+    :vartype _block_requests: Dict[bytes, BlockRequest]
     :ivar block_cache: A cache of received blocks, not bound to any one specific block chain.
     :ivar block_cache: A cache of received blocks, not bound to any one specific block chain.
     :vartype block_cache: Dict[bytes, Block]
     :vartype block_cache: Dict[bytes, Block]
     :ivar unconfirmed_transactions: Known transactions that are not part of the primary block chain.
     :ivar unconfirmed_transactions: Known transactions that are not part of the primary block chain.
@@ -39,7 +84,7 @@ class ChainBuilder:
     :vartype protocol: Protocol
     :vartype protocol: Protocol
     """
     """
 
 
-    def __init__(self, protocol):
+    def __init__(self, protocol: 'Protocol'):
         self.primary_block_chain = Blockchain()
         self.primary_block_chain = Blockchain()
         self._block_requests = {}
         self._block_requests = {}
         # TODO: delete some old checkpoints
         # TODO: delete some old checkpoints
@@ -94,20 +139,37 @@ class ChainBuilder:
             handler()
             handler()
 
 
         self._blockchain_checkpoints[chain.head.hash] = chain
         self._blockchain_checkpoints[chain.head.hash] = chain
+        self._retry_expired_requests()
+        self._clean_block_requests()
+
+        self.protocol.broadcast_primary_block(chain.head)
 
 
-        # stop trying to build shorter block chains
+    def _retry_expired_requests(self):
+        """ Sends new block requests to our peers for unanswered pending requests. """
+        for request in self._block_requests.values():
+            request.checked_retry(self.protocol)
+
+    def _clean_block_requests(self):
+        """
+        Deletes partial chains and block requests when they are shorter than the primary block
+        chain or all download attempts failed.
+        """
+        # TODO: call this regularly when not mining
         block_requests = {}
         block_requests = {}
-        for block_hash, requests in self._block_requests.items():
+        for block_hash, request in self._block_requests.items():
+            if request.timeout_reached():
+                logging.info("giving up on a block")
+                continue
+
             new_requests = []
             new_requests = []
-            for partial_chain in requests:
-                if partial_chain.blocks[0].height > chain.head.height:
+            for partial_chain in request.partial_chains:
+                if partial_chain[0].height > self.primary_block_chain.head.height:
                     new_requests.append(partial_chain)
                     new_requests.append(partial_chain)
             if new_requests:
             if new_requests:
-                block_requests[block_hash] = new_requests
+                request.partial_chains = new_requests
+                block_requests[block_hash] = request
         self._block_requests = block_requests
         self._block_requests = block_requests
 
 
-        self.protocol.broadcast_primary_block(chain.head)
-
     def new_block_received(self, block: 'Block'):
     def new_block_received(self, block: 'Block'):
         """ Event handler that is called by the network layer when a block is received. """
         """ Event handler that is called by the network layer when a block is received. """
         self._assert_thread_safety()
         self._assert_thread_safety()
@@ -116,42 +178,47 @@ class ChainBuilder:
             return
             return
         self.block_cache[block.hash] = block
         self.block_cache[block.hash] = block
 
 
+        self._retry_expired_requests()
+
         if block.hash not in self._block_requests:
         if block.hash not in self._block_requests:
             if block.height > self.primary_block_chain.head.height:
             if block.height > self.primary_block_chain.head.height:
-                self._block_requests.setdefault(block.hash, []).append(PartialChain())
+                if block.hash not in self._block_requests:
+                    self._block_requests[block.hash] = BlockRequest()
             else:
             else:
                 return
                 return
 
 
-        requests = self._block_requests[block.hash]
+        request = self._block_requests[block.hash]
         del self._block_requests[block.hash]
         del self._block_requests[block.hash]
         while True:
         while True:
-            for partial_chain in requests:
-                partial_chain.blocks.append(block)
-                partial_chain.last_update = datetime.utcnow()
+            for partial_chain in request.partial_chains:
+                partial_chain.append(block)
             if block.prev_block_hash not in self.block_cache or block.prev_block_hash in self._blockchain_checkpoints:
             if block.prev_block_hash not in self.block_cache or block.prev_block_hash in self._blockchain_checkpoints:
                 break
                 break
             block = self.block_cache[block.prev_block_hash]
             block = self.block_cache[block.prev_block_hash]
-        self._block_requests.setdefault(block.prev_block_hash, []).extend(requests)
+        if block.prev_block_hash in self._block_requests:
+            chains = request.partial_chains
+            request = self._block_requests[block.prev_block_hash]
+            request.partial_chains.extend(chains)
+        else:
+            request.clear()
+            self._block_requests[block.prev_block_hash] = request
 
 
         if block.prev_block_hash in self._blockchain_checkpoints:
         if block.prev_block_hash in self._blockchain_checkpoints:
             winner = self.primary_block_chain
             winner = self.primary_block_chain
-            for partial_chain in requests:
+            for partial_chain in request.partial_chains:
                 chain = self._blockchain_checkpoints[block.prev_block_hash]
                 chain = self._blockchain_checkpoints[block.prev_block_hash]
-                for b in partial_chain.blocks[::-1]:
+                for b in partial_chain[::-1]:
                     next_chain = chain.try_append(b)
                     next_chain = chain.try_append(b)
                     if next_chain is None:
                     if next_chain is None:
+                        logging.warning("invalid block")
                         break
                         break
                     chain = next_chain
                     chain = next_chain
                 if chain.head.height > winner.head.height:
                 if chain.head.height > winner.head.height:
                     winner = chain
                     winner = chain
+            del self._block_requests[block.prev_block_hash]
             if winner is not self.primary_block_chain:
             if winner is not self.primary_block_chain:
                 self._new_primary_block_chain(winner)
                 self._new_primary_block_chain(winner)
-        else:
-            # TODO: only do this if we have no pending requests for this block
-            self.protocol.send_block_request(block.prev_block_hash)
-            logging.debug("asking for another block %d", max(len(r.blocks) for r in requests))
-            self._block_requests[block.prev_block_hash] = requests
-
+        request.checked_retry(self.protocol)
 
 
 from .protocol import Protocol
 from .protocol import Protocol
 from .block import Block
 from .block import Block