As an experiment to test queue brokers, I setup to write some applications which would benefit of asynchronous processing between frontend and backend systems. Web specific frameworks as Rails, cakePHP, codeignite, Django and Webpy can be relieved of a huge amount of processing by the use of Queues (and by that extend, a nicer architecture design).

So I decided to make a simple page rating system, using one of the many 5 star rating javascript plugins avaliable over the web. None of the components of this system is mandatory, so I managed to run it with a Rails frontend instead of the PHP one I show here, and the result was almost the same.

The basic idea is that the rating javascript send an AJAX request to a program which in turn will gather all the info needed to assemble a simple Message. This message is posted to the Queue, which is polled by a Python script that reads and from time to time saves the summarized information to a database.

The message itself is the MD5 hash of the URL followed by a semicolon and the score set by the user. The calculations used to weight this score was taken from a Leah Culver’s post, but one could really use other data to make it better.

There is also the need of a way to get the current score, to show it in the javascript rating plugin. For now I’m reading it from the database, but I think that Memcached can be used to avoid constant readings. The poller could invalidate the cache or even update it after updating the database.

Check this drawing, it’s pretty simple itself:

rating system diagram

rating system diagram

So there goes what is needed:

– PHP5 and Apache (or any other webserver). MySQL enabled.
– Stomp clients for everybody. I’ve been using Stompcli for PHP and stomp.py for Python, both very simple and easy to use (just unpacks and it’s ready to go).
ActiveMQ (needs JAVA) or any other brokers (I’ve tested it with RabbitMQ which is made in Erlang w00t)

By the way, I’m using STOMP as the queue protocol. There are a load of alternatives but I choose this one (lots of clients, works wonderfully with activemessaging and rails, etc, etc).

I had to patch stomp.py, because I found out that a “pause” method would help me a lot. You may use the one I shipped in the bundle, or patch the original with a small diff file. The pause method was only used to be able to buffer data in the queue while the poller was processing a transitory buffer to commit the data gathered to MySQL.

Basically, from a frontend writing straight to the database, the workflow is broken in three main pieces: from frontend to the queue, to the queue to a local hash, and from the hash to the database.

Download ActiveMQ binary release and run in a terminal window:

$ ./bin/activemq

Download this zip file , for a complete distribution. WordPress don’t allow zipfile uploads, so I had to use an external hosting. Let me know in the comments if it went offline. You also may get all the stuff at their original place (but even them you will need the patch, php files and python poller…):

StompCLI (PHP): http://stompcli.googlecode.com/files/stompcli-php5-1.0-20080916.tar.gz
Stomp.py (Python): http://www.briggs.net.nz/log/projects/stomppy/
The javascript stuff: http://php.scripts.psu.edu/rja171/widgets/rating.zip

Create a database named ratings, with proper user and password and load the provided dump to create the ratings tables (file ratings.sql).

Put the ratings directory under your apache/php root, edit i.php and r.php to setup the right username/password for your MySQL account.

Copy the python files to a place where you can run them. Set the right user/password for MySQL in sc.py and run in another terminal with:

$ python sc.py

Use your browser to navigate to the following url:

http://localhost/ratings/i.php:/p=https://zenmachine.wordpress.com

The upper frame is a rating toolbar. The original page is shown in the bigger frame. It would benefit of a HTML tag rewrite, to prepend all href links with the rating URL.

As you click in the stars, you may monitor how the website is rated, checking in the url:

http://localhost/ratings/getscore.php?p=https://zenmachine.wordpress.com

or by reloading the upper frame.

Besides the HTML parsing and automatic frame reload, it would be nice experiment with caching in some parts, as when r.php queries the database for the current score and click rate. Ah, and other ratings algorithms would be nice too.

rate.php (the action to be called in AJAX mode)


<?
        require_once("Stomp.php");
        $con = new Stomp("tcp://localhost:61613");
        $con->connect();
        $con->send("/queue/ratings", md5($_REQUEST["p"]).":".$_REQUEST["rating"]);
        $con->disconnect();
        // echoes back
        echo $_REQUEST["rating"];

?>

The python poller


import time
import sys
import stomp
import MySQLdb
from threading import Thread   
import copy
import signal

class MyListener(object):

    def __init__(self, t, c):
    self.tmpHash=t
    self.clicks=c

    def on_error(self, headers, message):
        print 'received an error %s' % message

    def on_message(self, headers, message):
        print 'received a message %s' % message
    (hash, score) = message.split(":")

    if self.tmpHash.has_key(hash):
        self.tmpHash[hash]=int(self.tmpHash[hash]) + int(score)
    else:
        self.tmpHash[hash]=int(score)
        self.clicks[hash]=0

    if self.clicks.has_key(hash):
        self.clicks[hash]=int(self.clicks[hash]) + 1

    print 'total: %s clicks %s' % (self.tmpHash[hash], self.clicks[hash])

class CommitScores(Thread):
    def __init__ (self, tmpHash, clickHash, stompconn):
        Thread.__init__(self)
        self.tmpHash = tmpHash
        self.clicks = clickHash
        #change username and password here:
        self.dbconn=MySQLdb.connect('localhost', 'username', 'passwd')
        self.dbconn.select_db('ratings')
        self.conn = stompconn
        print "CommitScores started"

    def run(self):
        while 1:
            time.sleep(10)

            # let the queue hold it for a while
            self.conn.toggle_pause()

            c = copy.deepcopy(self.tmpHash)
            r = copy.deepcopy(self.clicks)
            self.tmpHash.clear()
            self.clicks.clear()

            # unpause
            self.conn.toggle_pause()

            hashes = c.keys()
            cursor = self.dbconn.cursor()
            for k in hashes:
                cursor.execute("SELECT score, clicks from ratings where hash=\"%s\"" % (k))
                rs = cursor.fetchone()
                if rs == None:
                    q = "INSERT into ratings (hash, score, clicks) VALUES (\"%s\", %d, %d)" % (k, c[k], r[k])
                    cursor.execute(q)
                else:
                    s = int(rs[0]) + int(c[k])
                    t = int(rs[1]) + int(r[k])
                    q = "UPDATE ratings set score = %d, clicks = %d where hash=\"%s\"" % (s, t, k)
                    cursor.execute(q)

# main

if __name__ == "__main__":

    tmpHash={}
    tmpClicks={}

    signal.signal(signal.SIGINT, signal.SIG_DFL)

    conn = stomp.Connection()
    conn.add_listener(MyListener(tmpHash, tmpClicks))
    conn.start()
    conn.connect()

    conn.subscribe(destination='/queue/ratings', ack='auto')

    # start commit thread
    sc = CommitScores(tmpHash, tmpClicks, conn)
    sc.start()

    time.sleep(100)
    sc.join()

    conn.disconnect()

i.php (the frameset assembler)

<html>
<head>
<title>rate my page</title>
</head>
<FRAMESET ROWS="10%,*" FRAMEBORDER="0" FRAMESPACING="2">
<FRAME SRC="r.php?p=<?echo $_REQUEST&#91;"p"&#93;;?>" NAME="superior" NORESIZE SCROLLING="NO">
<FRAME SRC="<?echo $_REQUEST&#91;"p"&#93;;?>" NAME="central" MARGINWIDTH="2" MARGINHEIGHT="3" NORESIZE SCROLLING="YES">
</FRAMESET>
<noframes>
<body>
</body>
</noframes>
</frameset>
</html>

r.php (the upper frame)

<?php
$link = mysql_connect('localhost', 'username', 'password') or  die('Could not connect: ' . mysql_error());
mysql_select_db('ratings', $link) or die('Could not select database.'); 
$hash = md5($_REQUEST&#91;"p"&#93;);
if ($hash == null) die ("No URL");
$res = mysql_query ("SELECT score, clicks from ratings WHERE hash=\"".$hash."\"", $link) or die ("query error: ".mysql_error());
$row = mysql_fetch_row($res);

$score_sum=$row&#91;0&#93;;
$clicks=$row&#91;1&#93;;

$score = round(($score_sum * 2 / $clicks)/2);

mysql_close($link);
?>

<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
<meta http-equiv="Content-Type" content="text/html; charset=iso-8859-1" />
<title>Rating</title>
<script type="text/javascript" src="jquery.js"></script>
<script type="text/javascript" src="rating.js"></script>
	<link rel="stylesheet" type="text/css" href="rating.css" />

<script type="text/javascript">
$(document).ready(function() {
	$('#rate').rating('rate.php?p=<? echo $_REQUEST&#91;"p"&#93; ?>', {maxvalue:5, curvalue:<?echo $score ?>});
});
</script>
</head>
<body>
<div id="rate" class="rating">&nbsp;</div>
</body>
</html>

getscore.php

<?php
$link = mysql_connect('localhost', 'username', 'password') or  die('Could not connect: ' . mysql_error());

mysql_select_db('ratings', $link) or die('Could not select database.'); 

$hash = md5($_REQUEST&#91;"p"&#93;);
if ($hash == null) die ("No URL");

print "query: SELECT score from ratings WHERE hash=".$hash."
";

$res = mysql_query ("SELECT score, clicks from ratings WHERE hash=\"".$hash."\"", $link) or die ("query error: ".mysql_error());

$row = mysql_fetch_row($res);

$score_sum=$row&#91;0&#93;;
$clicks=$row&#91;1&#93;;

$score = round(($score_sum * 2 / $clicks)/2);

echo "score: ".$score;

mysql_close($link);

?>

stomp.py diff:

--- stomp.py	2008-11-28 11:53:21.000000000 -0200
+++ stomp.py-pause	2008-11-28 11:47:28.000000000 -0200
@@ -340,6 +340,7 @@
         no frames will be received by the connection.
         """
         self.__running = True
+	self.__paused = False
         thread.start_new_thread(self.__receiver_loop, ())
         self.__attempt_connection()

@@ -369,7 +370,13 @@
             return self.__socket is not None and self.__socket.getsockname()[1] != 0
         except socket.error:
             return False
-        
+    #
+    # Pause queue reading
+    #
+    
+    def toggle_pause(self):
+	self.__paused=not self.__paused
+    
     #
     # Manage objects listening to incoming frames
     #
@@ -452,6 +459,9 @@
     #
     __content_length_re = re.compile('^content-length[:]\\s*(?P<value>[0-9]+)', re.MULTILINE)

+
+
+
     def __merge_headers(self, header_map_list):
         """
         Helper function for combining multiple header maps into one.
@@ -576,6 +586,8 @@

                     if self.__socket is None:
                         break
+		    # continue if reading is paused
+		    if self.__paused == True: continue

                     try:
                         try:

Bye

2 Responses to “Vote for Pedro – Rating pages with ActiveMQ (or any other broker)”

  1. lustyscripps Says:

    This post gives a clear idea of how to use messages queues in web systems, thanks a lot!

  2. alef Says:

    nice, pedro!


Leave a comment