Mastering Node.js(Second Edition)
上QQ阅读APP看书,第一时间看更新

Building a Twitter feed using file events

Let's apply what we've learned. The goal is to create a server that a client can connect to and receive updates from Twitter. We will first create a process to query Twitter for any messages with the hashtag #nodejs, and write any found messages to a tweets.txt file in 140-byte chunks. We will then create a network server that broadcasts these messages to a single client. Those broadcasts will be triggered by write events on the tweets.txt file. Whenever a write occurs, 140-byte chunks are asynchronously read from the last-known client read pointer. This will happen until we reach the end of the file, broadcasting as we go. Finally, we will create a simple client.html page, which asks for, receives, and displays these messages.

While this example is certainly contrived, it demonstrates:

  •  Listening to the filesystem for changes, and responding to those events
  • Using data stream events for reading and writing files
  • Responding to network events
  • Using timeouts for polling state
  • Using a Node server itself as a network event broadcaster

To handle server broadcasting, we are going to use the Server Sent Events (SSE) protocol, a new protocol being standardized as part of HTML5.

We're first going to create a Node server that listens for changes on a file and broadcasts any new content to the client. Open your editor and create a file server.js:

let fs = require("fs");
let http = require('http');

let theUser = null;
let userPos = 0;
let tweetFile = "tweets.txt";

We will be accepting a single user connection, whose pointer will be theUser. The userPos will store the last position this client read from in tweetFile:

http.createServer((request, response) => {
response.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Access-Control-Allow-Origin': '*'
});

theUser = response;

response.write(':' + Array(2049).join(' ') + '\n');
response.write('retry: 2000\n');

response.socket.on('close', () => {
theUser = null;
});
}).listen(8080);

Create an HTTP server listening on port 8080, which will listen for and handle a single connection, storing the response argument, representing the pipe connecting the server to the client. The response argument implements the writable stream interface, allowing us to write messages to the client:

let sendNext = function(fd) {
let buffer = Buffer.alloc(140);
fs.read(fd, buffer, 0, 140, userPos * 140, (err, num) => {
if (!err && num > 0 && theUser) {
++userPos;
theUser.write(`data: ${buffer.toString('utf-8', 0, num)}\n\n`);
return process.nextTick(() => {
sendNext(fd);
});
}
});
};

We create a function to send the client messages. We will be pulling buffers of 140 bytes out of the readable stream bound to our tweets.txt file, incrementing our file position counter by one on each read. We write this buffer to the writable stream binding our server to the client. When done, we queue up a repeat call of the same function using nextTick, repeating until we get an error, receive no data, or the client disconnects:

function start() {
fs.open(tweetFile, 'r', (err, fd) => {
if (err) {
return setTimeout(start, 1000);
}
fs.watch(tweetFile, (event, filename) => {
if (event === "change") {
sendNext(fd);
}
});
  });
};

start();

Finally, we start the process by opening the tweets.txt file and watching for any changes, calling sendNext whenever new tweets are written. When we start the server, there may not yet exist a file to read from, so we poll using setTimeout until one exists.

Now that we have a server looking for file changes to broadcast, we need to generate data. We first install the TWiT Twitter package for Node, via npm.

We then create a process whose sole job is to write new data to a file:

const fs = require("fs");
const Twit = require('twit');

let twit = new Twit({
consumer_key: 'your key',
consumer_secret: 'your secret',
access_token: 'your token',
access_token_secret: 'your secret token'
});

To use this example, you will need a Twitter Developer account. Alternatively, there is also the option of changing the relevant code to simply write random 140-byte strings to tweets.txt: require("crypto").randomBytes(70).toString('hex'):

let tweetFile = "tweets.txt";
let writeStream = fs.createWriteStream(tweetFile, {
flags: "a" // indicate that we want to (a)ppend to the file
});

This establishes a stream pointer to the same file that our server will be watching.
We will be writing to this file:

let cleanBuffer = function(len) {
let buf = Buffer.alloc(len);
buf.fill('\0');
return buf;
};

Because Twitter messages are never longer than 140 bytes, we can simplify the read/write operation by always writing 140-byte chunks, even if some of that space is empty. Once we receive updates, we will create a buffer that is number of messages x 140 bytes wide, and write those 140-byte chunks to this buffer:

let check = function() {
twit.get('search/tweets', {
q: '#nodejs since:2013-01-01'
}, (err, reply) => {
let buffer = cleanBuffer(reply.statuses.length * 140);
reply.statuses.forEach((obj, idx) => {
buffer.write(obj.text, idx*140, 140);
});
writeStream.write(buffer);
})
setTimeout(check, 10000);
};

check();

We now create a function that will be asked every 10 seconds to check for messages containing the hashtag #nodejs. Twitter returns an array of message objects. The one object property we are interested in is the #text of the message. Calculate the number of bytes necessary to represent these new messages (140 x message count), fetch a clean buffer, and fill it with 140-byte chunks until all messages are written. Finally, this data is written to our tweets.txt file, causing a change event to occur that our server is notified of.

The final piece is the client page itself. This is a rather simple page, and how it operates should be familiar to the reader. The only thing to note is the use of SSE that listens to port 8080 on localhost. It should be clear how, on receipt of a new tweet from the server, a list element is added to the unordered list container #list:

<!DOCTYPE html>
<html>
<head>
<title></title>
</head>

<script>

window.onload = () => {
let list = document.getElementById("list");
let evtSource = new EventSource("http://localhost:8080/events");

evtSource.onmessage = (e) => {
let newElement = document.createElement("li");
newElement.innerHTML = e.data;
list.appendChild(newElement);
}
}

</script>
<body>

<ul id="list"></ul>

</body>
</html>

To read more about SSE, refer to Chapter 6, Creating Real-time Applications,
or you can visit: https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events.