CS11 Erlang - Lab 5

By this point you should have two components for working with RSS 2.0 feeds, an rss_parse module that allows you to parse and analyze RSS 2.0 feed XML documents, and an rss_queue module that can properly manage and update a sequence of RSS feed items, to detect when an incoming item is new, an updated version of an existing item, or just an old version that you have already seen before.

This week you will complete most of the rss_queue functionality, including allowing a queue to receive RSS feed items from the Internet, and also allowing a queue to receive RSS feed items from other queues. This will form the basis of our feed aggregator. (The only feature that your queues will be missing after this week will be the ability to filter items based on various criteria.)

Retrieving and Aggregating RSS Feeds

Given how we have constructed our RSS queues so far, it turns out that we can add Internet feed-reading to our system very easily. RSS queues will have two modes of operation:

It would be pretty annoying to try to implement these two modes directly in the rss_queue module, so instead we will take an easier approach. We have already designed our queues to receive incoming items as messages from another process, so we will build our URL-reading capabilities to use this interface: For queues that read from a URL, we will simply add another Erlang process to the mix, called an rss_reader process, whose sole existence is to periodically read from the specified URL, and then send the retrieved items to a specific rss_queue process. This pair of processes will form a unit, providing the capability of populating a queue from an RSS feed URL.

You can see that a particular rss_queue and rss_reader pair are dependent on each other. If the rss_reader dies then the queue can no longer receive feed items. Likewise, if the rss_queue dies then the reader no longer has a place to send its data. Therefore, we will link these processes together, so that if one dies, it automatically terminates the other one as well. This way, if an error causes one process in the pair to terminate unexpectedly, the other process will automatically be cleaned up.

Reading URL Data

In order to read from an RSS feed URL, you will need to use the httpc:request(Url) module function. This function is very simple to use; it handles all of the nuances of talking to the remote webserver, and the response comes back as a composite data structure containing the HTTP response code, the response headers, the actual data, and so forth.

The main caveat is that httpc:request(Url) will fail until the Internet services are started in the Erlang VM by calling inets:start/0. This function can be called multiple times, but it will report an error after the first invocation.

You can actually try it out on the Erlang command line:

    1> inets:start().
    ok
    2> httpc:request("http://rss.cnn.com/rss/cnn_topstories.rss").
    {ok,{{"HTTP/1.1",200,"OK"},
    ...

    3> inets:start().
    {error,{already_started,inets}}

One of your tasks for this week will be to extract the actual response body from the composite data that comes back from this function. This is not too complicated, but you will probably find the anonymous variable "_" useful for this task.

Of course, you also need to check the HTTP response code to make sure that everything went OK. If you look at a table of HTTP response codes, you would notice that any response code from 400 above indicates an error, whereas under 400 usually means that some extra work may need to be done to retrieve the requested data. However, ignore all of that detail; we will only care about getting back a 200 "OK" response. Anything else we will treat as an error.

Thus, you will need to look at the response body, and you will need to look at the numeric status code for a value of 200.

Queue Subscribers

Here is an example of how we might configure our RSS readers and queues into a system to aggregate items from CNN's and BBC's news feeds, as well as the digg.com story feed:

The messages that flow between rss_queue processes are simply new or updated RSS feed items. That is, if a queue receives an item that it already has, it simply ignores it. But, if a queue receives an item that is either completely new, or one that is an updated version of an existing item, the queue propagates it forward to the next queue(s) in the chain.

Again, given how we have implemented our rss_queue processes, queues really don't care where their messages come from. However, they don't yet have the ability to propagate messages on to the next queue in the chain. Thus, we need to introduce the notion of a subscription to a queue. A queue Q1 can subscribe to another queue Q2, thus telling Q2 that it should forward all new/updated feed items to Q1.

This means that we need to introduce a new state value into our rss_queue implementation, and that is a set of the PIDs of subscribers to that queue. We will also need a way for a queue to subscribe to another queue, and we will need to update our "add-item" message handler to forward along new/updated items to all subscribers.

But there is one other challenge with this approach, and this is the fact that a queue process may subscribe to a number of other queues, and then it could go and die. In this case, all of the queue's subscriptions should automatically be cleaned up, so that subsequent send-attemps will only be to queues that are still operating.

Of course, you can use process-linking to implement such a feature, or you could also use process-monitoring to do this. When a queue Q1 subscribes to another queue Q2, Q2 can either monitor or link to Q1, so that if Q1 dies, Q2 can automatically unsubscribe Q1.

Note that if you use linking to handle unsubscriptions, you need to make sure that a (queue, reader) pair will still terminate properly if the queue's reader dies. To receive termination signals, the queue must use the process_flag(trap_exit, true) call, but then it won't automatically die if its reader dies. A simple solution would be for the queue to remember its reader's PID, if the queue has a reader. But, a much simpler solution may be to use process monitoring instead.

Logging

Once you have all of these processes interacting, and the RSS readers periodically fetching feeds from various URLs, you will need to have some kind of logging to make it possible to understand what is going on. Erlang has an error_logger module that provides an extensible logging capability for you to use. However, to make it even nicer to use, you should grab a copy of this logging.hrl file and use it in your project. It provides several macro functions that you can use in your code: You can look at the actual definitions of these macro functions to see how they work, but you should be able to use them like this:
    -include("logging.hrl").

    ...
    ?INFO("Reading URL ~s~n", [Url]).
Such a line would produce the following result:
    =INFO REPORT==== 11-Feb-2009::20:08:15 ===
    rss_reader <0.81.0>:  Reading URL http://rss.cnn.com/rss/cnn_topstories.rss

Your Tasks

Now that we have covered the general idea of this assignment, let's get down to what you need to implement for this week's lab. You should create a new directory for this week's work, ~/cs11/erlang/lab5, and copy your files from lab 3 into this directory to work with. You will definitely be changing a few of these files for this week's lab.

The RSS Reader

Implement a new rss_reader module, which will contain the process that reads RSS feeds and sends the items to an rss_queue process. This process' operation is very simple:

  1. Create a start(Url, QPid) function that starts a new reader process. The first argument is the URL of the RSS feed to periodically retrieve, and the second argument is the PID of the rss_queue process to send the results to.

    This function should just spawn a new process, calling a server(Url, QPid) function that implements the main loop of the rss_reader process. Use the MFA version of spawn, and use ?MODULE for the module argument.

  2. The server function will implement a simple loop, performing these operations:

    1. First, retrieve the feed from the specified URL, using the httpc:request/1 function.
    2. If the response code is 200, get out the response body and parse it into XML using xmerl_scan:string/1.
    3. Once the body is parsed, verify that it is an RSS 2.0 feed, using your rss_parse:is_rss2_feed/1 function from lab 3.
    4. If all of the above steps complete successfully, use your rss_queue:add_feed/2 helper from last week to send all items from the feed to the queue paired with this reader.
    5. Finally, wait for a time interval, then go back to the start and do it all again.

      Normally, RSS feed readers update their feeds on a long time-interval, anywhere from 15 minutes to an hour. However, this won't help you test things very well, so while developing you can set this interval to 60 seconds or something similar. Define a macro constant, e.g. RETRIEVE_INTERVAL, so that you can easily change this timeout value without having to hunt through your code.

      You can use the after clause of the receive statement to wait for this time interval. Note that a receive statement doesn't actually need to receive any messages; you can use it to implement a simple timeout as well:

          1> receive after 1000 -> ok end.
          (one second passes...)
          ok
          2>
  3. Make sure that your reader terminates if any errors occur in the above processing. Here we don't need to be clever about catching any exceptions, because if an exception occurs it will kill the process, and that will automatically propagate to the linked queue. However, you do need to make sure that the process terminates in all of these situations:

    In these cases, call the exit/1 BIF with an appropriate reason indicating what happened.

    (Of course, other errors, like the httpc:request/1 call reporting an error or the XML parse failing, will automatically terminate the process, since those errors won't be handled and they will kill the process. If you want to be really cool, you can catch these errors and report a more friendly error message, but this is not required.)

  4. Make sure that you use both logging and edoc commenting in your rss_reader process. Make it easy to understand your code, both by clearly documenting its operation, and also by reporting what it is doing as it operates.

Once your reader is implemented, it's time to update the RSS queue to provide this week's capabilities.

RSS Queue Updates

This week there are two main things you need to do with your RSS queue component. First, you need to provide a way to start up a queue process so that it is coupled together with a reader process. (Your previous work should already allow a queue to start up in the stand-alone mode that is also needed.) Second, you need to provide a way for a queue to have subscribers, so that new RSS feed items can be forwarded along to other queues.

  1. Your rss_queue from last time should already have a start/0 function that starts a queue in "standalone" mode. Add a new start/1 function that takes a single argument, a URL, and starts up an rss_queue process that has a linked reader process feeding it RSS items, as discussed above.

    There are a few details that make this task slightly complicated. First, for the queue and reader to be linked, the queue process really should spawn the reader itself, rather than the start/1 function spawning the reader. What you can do is to factor your rss_queue server's operation into an "initialization" phase and a "server-loop" phase. Specifically, you can create an init function, and have both of your start functions spawn a separate process that calls init. Then, init can call your server function with the appropriate arguments.

    Also, to make it simpler to implement your init function, you can make it take a list of arguments, instead of having two versions with different numbers of arguments. One version can be init([]), which starts the queue in standalone mode, and the other can be init([Url]), which also takes care of spawning a reader process and then linking to the reader.

    Once you have this function set up, you should be able to start an rss_queue process with an associated reader, and it should be able to populate the queue with feed items, and periodically update the contents, straight from an Internet RSS feed. Cool!

  2. To support subscribers, you will need to add a new state value Subscribers to your queue, which simply maintains a set of queue-PIDs. You should use the sets module to make your life very easy; this module properly maintains a set of values, ensuring that there are no duplicates, and allowing you to test whether an element is already in the set. You can call sets:new/0 to create a new set, and then use sets:add_element/2, sets:del_element/2, etc., to manipulate the set. (You might notice that the documentation for the sets module says that "the representation of a set is not defined," but if you try out the module from the Erlang shell, you will discover that there is a #sets record-type that this module manipulates. Pretty neat.)

    Note that everywhere your server makes a tail-call to itself, you will need to add this new state value. It might be a good idea to move to a tuple or even a record for representing the state as a single composite value, especially given what we will be doing next week. (Hint hint...)

  3. Handle a new message {subscribe, QPid}, which allows a queue to subscribe to another queue's feed-items. You need to be clever in this handler: Make sure that the specified PID is not already a subscriber. If they are already a subscriber, just ignore the message; you are already done. However, if the specified PID is not already a subscriber, you need to perform the following tasks:

  4. Also handle a new message {unsubscribe, QPid}, which removes the specified PID from the set of subscribers. This handler should also properly handle the situation where the queue is not actually a subscriber, and just ignore the request.

    If the specified queue-PID is in fact a subscriber, don't forget to also unlink or unmonitor the subscriber's PID.

  5. Update your queue to get 'EXIT' or 'DOWN' messages from the subscriber queues, depending on whether you chose to link to, or monitor, the subscriber queues. The handler for these messages simply needs to remove the queue's PID from the subscriber list, since you know that the incoming PID is already going to be a dead process.

  6. Your rss_queue process should already support an {add_item, RSSItem} message. Update the code for this message so that if the incoming RSS item is either a new item or an updated version of an existing item, that the RSS item is also forwarded to every subscriber in the queue's set of subscribers. Obviously, if the incoming RSSItem is neither new or updated, don't notify any subscribers.

    Similar to your subscribe-handler above, you can perform this task relatively easily by creating a fun that calls your rss_queue:add_item/2 helper, but this time taking the queue-PID as an argument instead of the item as an argument. Then you can get a list of all subscriber PIDs by using sets:as_list/1, and then you can pass these to lists:for_each, as before. Sweet.

  7. As with the reader, make sure that you use logging statements in your queue code to make it clear what is going on. Also make sure to write edoc comments for the new functions you added.

Once all of these tasks are completed, you should be done updating your RSS queue component for this week. And, in fact, you are ready to build simple aggregated RSS feeds that pull their items directly from the Internet.

Testing

Once you have finished all of the above coding, you should try to set up a set of queues like the ones in the picture above. Here are some RSS feeds to try:

One important note: If you want to test your program by leaving it running for several hours to watch how it responds to new items being posted, make sure to make the retrieve-interval at least 30 minutes in length. Sometimes these servers are set up to ban clients that request the feed more often than once every 30 minutes or hour. You have been warned... :-)

Write up a testing script, a series of Erlang statements that sets up a set of reader-queues and aggregator queues. Include this script as a testing.xml file in your final submission so that we can test your work.


Copyright (C) 2009, California Institute of Technology. All rights reserved.
Last updated February 12, 2009.