Implementation of the HTTP server driver
The HTTP server driver is implemented as a function. It takes an observable of responses as input and returns an observable of requests (referential drivers are upside down, so inputs are outputs and outputs are input). The base implementation is the following one:
def http_driver(sink, loop):
app = None
runner = None
def on_subscribe(observer):
app = web.Application()
sink.subscribe(
on_next=on_sink_item,
on_error=on_sink_error,
on_completed=on_sink_completed)
return AnonymousObservable(on_subscribe)
When being called, the http_driver function returns an observable, whose subscription function is on_subscribe. The http_driver function declares two variables, app and runner, which will be used in several nested functions. The subscription function creates the web application and subscribes to the sink stream. So the behavior of the driver is the following: it returns a source stream. When someone subscribes to this source observable then the on_subscribe function is called and the driver subscribes to the sink observable. This allows you to chain subscriptions one after each other.
Before looking at the implementation of the sink.subscribe callbacks, let's look at three other functions that are needed for the implementation: starting the server, stopping the server, and adding a route and its handler. These functions are nested functions of the on_subscribe one. The http_driver function contains two levels of nesting, each one allowing you to capture different levels of variables via closures.
The function that starts the server is the following one:
def start_server(host, port, app):
runner = web.AppRunner(app)
async def _start_server(runner):
await runner.setup()
site = web.TCPSite(runner, host, port)
await site.start()
loop.create_task(_start_server(runner))
return runner
It creates runner, defines a coroutine that starts the server, and schedules the execution of this coroutine on the event loop. Finally, it returns runner. The implementation of the _start_server coroutine is the same one that was used in Chapter 2, Asynchronous Programming in Python. Refer to it if its content is not clear.
The function that stops the server has a similar structure:
def stop_server(runner):
async def _stop_server():
await runner.cleanup()
loop.create_task(_stop_server())
It declares a coroutine to stop the runner and schedules the execution of this coroutine. The last part of this driver is the creation of new routes and the definition of their handler. Here is its implementation:
def add_route(app, methods, path):
async def on_request_data(request, path):
data = await request.read()
response_future = asyncio.Future()
request_item = {
'method': request.method,
'path': path,
'match_info': request.match_info,
'data': data,
'context': response_future,
}
observer.on_next(request_item)
await response_future
data, status = response_future.result()
response = web.StreamResponse(status=status, reason=None)
await response.prepare(request)
if data is not None:
await response.write(data)
return response
for method in methods:
app.router.add_route(method, path, lambda r:
on_request_data(r, path))
It takes three parameters as input:
- A reference to the application
- The HTTP methods associated with this route
- The path of the route
At the end of the function, a route is added for each method provided. The handler is common to all routes and is implemented in the on_request_data coroutine. This coroutine is called each time an HTTP client makes a request on one of the configured routes. It works in the following way: for each request, it creates a future and sends an item on the source observable of the drivers, with this future in it. This future will be completed with the content of the answer when an answer is received. Then the value of this future is used to send the HTTP response.
The first line of the coroutine reads the data from the request object. Then, a future is created and a request_item is built with all request information in it, as well as the future in the context field. The item is sent on to the source observable and the coroutine waits for the completion of the future. Once the future has completed (that is, await response_future line), the result of the future is retrieved. The result of the future is a tuple containing two fields: the payload of the answer and the HTTP status of the request.
Then a response object is created, with the status retrieved from the future. This aiohttp response object must be prepared before some data can be written to it. If the future contains some data, then this data is written in the aiohttp response object and the response is returned so that it can be sent as an HTTP response to the client.
Finally, these three functions can be used in the on_next callback of the sink observer:
def on_sink_item(i):
nonlocal runner
if i['what'] == 'response':
response_future = i['context']
response_future.set_result((i['data'], i['status']))
elif i['what'] == 'add_route':
add_route(app, i['methods'], i['path'])
elif i['what'] == 'start_server':
runner = start_server(i['host'], i['port'], app)
elif i['what'] == 'stop_server':
stop_server(runner)
def on_sink_error(e):
observer.on_error(e)
def on_sink_completed():
observer.on_completed()
on_sink_item calls one of the previous functions to start the server, stop the server, or add a route. It also sets the result of the future when a response item is received. This allows you to resume the on_request_data coroutine when a response item is received on the sink observable.
Note that the runner variable is declared as nonlocal so that the reference of the outermost runner variable is updated when the runner variable is updated here. Without this statement, the runner = start_server expression would create a local variable, on_sink_item, and it could not be used to stop the server later.
The two other callbacks forward any errors or completion of the sink stream to the source stream. Some additional logic could be added in the error callback; for example, to stop the server. In any case, forwarding these events is important so that they can be caught downstream by other components or drivers.