The server interface
The server interface is created using a Python web server framework called Sanic, which helps to code the web server logic in an asynchronous way:
class Server(object): def __init__(self): self.app = Sanic() self.blockchain = Blockchain() self.sockets = [] self.app.add_route(self.blocks, '/blocks', methods=['GET']) self.app.add_route(self.mine_block, '/mineBlock', methods=['POST']) self.app.add_route(self.peers, '/peers', methods=['GET']) self.app.add_route(self.add_peer, '/addPeer', methods=['POST']) self.app.add_websocket_route(self.p2p_handler, '/')
The preceding code snippet of the Server class creates four REST API interfaces for displaying the blocks, mining (creating) a new block, displaying the node's current peers, and adding a new peer. We will be using endpoints created using the add_route methods to interact with our node.
We will be using the WebSocket protocol to perform P2P communication between the nodes because it allows bidirectional communication between the connected nodes and maintains an active connection without creating too much load on the web server.
All the API interfaces defined in the preceding snippet invoke the following methods:
async def mine_block(self, request): try: newBlock = self.blockchain.generate_next_block(request.json["data"]) except KeyError as e: return json({"status": False, "message": "pass value in data key"}) self.blockchain.add_block(newBlock) await self.broadcast(self.response_latest_msg()) return json(newBlock)
The preceding method implements the functionality required to generate a new block to append. Since it is an HTTP POST method, it accepts a parameter called data, which was sent in the HTTP request body. This data is used to create the new block. This method will broadcast the newly created block, as mentioned earlier in the application design section.
Blocks present in the local blockchain are formatted and returned by this method:
async def blocks(self, request): return json(self.blockchain.blocks)
Whenever the user wants to add a peer explicitly, they will use the following interface:
async def add_peer(self, request):
asyncio.ensure_future(self.connect_to_peers
([request.json["peer"]]),
loop=asyncio.get_event_loop()) return json({"status": True})
This method adds an asynchronous connect_to_peers task to the event loop of the web server. All the socket information of the connected peers are maintained for future broadcasting.
The following implementation fetches address and port information from socket objects from all the peers that are maintained by the node after the initial connection:
async def peers(self, request): peers = map(lambda x: "{}:{}".format(x.remote_address[0], x.remote_address[1]), self.sockets) return json(peers)]
async def connect_to_peers(self, newPeers): for peer in newPeers: logger.info(peer) try: ws = await websockets.connect(peer) await self.init_connection(ws) except Exception as e: logger.info(str(e))
The connect_to_peers method initializes WebSocket connections with all known peers. This method is also invoked when a new peer is added using the HTTP interface. Initial block synchronization is performed using the init_connection function. p2p-handler is a WebSocket handler that listens to the connection. It creates a socket and performs block synchronization using init_connection:
async def p2p_handler(self, request, ws): logger.info('listening websocket p2p port on: %d' % port) try: await self.init_connection(ws) except (ConnectionClosed): await self.connection_closed(ws)
Socket information is removed when a peer gets disconnected:
async def connection_closed(self, ws): logger.critical("connection failed to peer") self.sockets.remove(ws)
Block synchronization is performed by the init_connection method by sending a query message requesting the last block of the peer's blockchain. It also adds a message handler that continuously listens to the messages from its peers:
async def init_connection(self, ws): self.sockets.append(ws) await ws.send(JSON.dumps(self.query_chain_length_msg())) while True: await self.init_message_handler(ws)
The message handler on each node listens for three types of message. Two of them are query messages to which the node responds by querying the local blockchain. The RESPONSE_BLOCKCHAIN message is a response received from the query made by the local node. This message will be further processed by handle_blockchain_response:
async def init_message_handler(self, ws): data = await ws.recv() message = JSON.loads(data) logger.info('Received message: {}'.format(data)) await { QUERY_LATEST: self.send_latest_msg, QUERY_ALL: self.send_chain_msg, RESPONSE_BLOCKCHAIN: self.handle_blockchain_response }[message["type"]](ws, message)
The following methods serialize the response data to be sent to the peer for its query message requests:
async def send_latest_msg(self, ws, *args): await ws.send(JSON.dumps(self.response_latest_msg())) async def send_chain_msg(self, ws, *args): await ws.send(JSON.dumps(self.response_chain_msg()))
The entire blockchain is sent to the connected peer when it queries the blockchain:
def response_chain_msg(self): return { 'type': RESPONSE_BLOCKCHAIN, 'data': JSON.dumps([block.dict() for block in self.blockchain.blocks])}
The following code snippet fetches the last block from the blockchain and formats it into JSON so it can be sent via the socket channel:
def response_latest_msg(self): return { 'type': RESPONSE_BLOCKCHAIN, 'data': JSON.dumps([self.blockchain.get_latest_block().dict()]) }
The following code snippets are the query request messages used to query connected peers for the blockchain:
def query_chain_length_msg(self): return {'type': QUERY_LATEST} def query_all_msg(self): return {'type': QUERY_ALL}
The following method is invoked in the message handler that is continuously listening to the peer's requests and responses. This method is invoked to process the blockchain information sent by the peers:
async def handle_blockchain_response(self, ws, message): received_blocks = sorted(JSON.loads(message["data"]), key=lambda k: k['index']) latest_block_received = received_blocks[-1] latest_block_held = self.blockchain.get_latest_block() if latest_block_received["index"] > latest_block_held.index: logger.info('blockchain possibly behind. We got: ' + str(latest_block_held.index) + ' Peer got: ' + str(latest_block_received["index"]))
If the last index of the block is not greater than the local blockchain's last index, it will be rejected because the node is only interested in the longest chain:
if latest_block_held.hash == latest_block_received["previous_hash"]: logger.info("We can append the received block to our chain")
self.blockchain.blocks.append
(Block(**latest_block_received)) await self.broadcast(self.response_latest_msg())
The last block received is appended to the local blockchain if it satisfies the hash conditions. If it is satisfied, then it's broadcasted to all the peers:
elif len(received_blocks) == 1: logger.info("We have to query the chain from our peer") await self.broadcast(self.query_all_msg()) else: logger.info("Received blockchain is longer than current blockchain") await self.replace_chain(received_blocks)
If the last block doesn't satisfy the hash condition, then the local blockchain may be behind by more than one block. The local copy will then be replaced with the received blockchain if it is a full chain. Otherwise, a message is broadcasted to query the entire blockchain.
When a node receives a blockchain that is longer than the current local copy, it will validate the entire blockchain and then replace the entire local blockchain:
async def replace_chain(self, newBlocks): try: if self.blockchain.is_valid_chain(newBlocks) and len(newBlocks) > len(self.blockchain.blocks): logger.info('Received blockchain is valid. Replacing current blockchain with ' 'received blockchain') self.blockchain.blocks = [Block(**block) for block in newBlocks] await self.broadcast(self.response_latest_msg()) else: logger.info('Received blockchain invalid') except Exception as e: logger.info("Error in replace chain" + str(e))
The broadcast method then sends all the requests and responses to every peer connected to the node through the established socket connection:
async def broadcast(self, message): for socket in self.sockets: await socket.send(JSON.dumps(message))