Monday, May 10, 2010

Building a stock quote server in Erlang using Ejabberd, XMPP, Bosh, Exmpp, Strophe and Yaws

Recently, I have been building a stock quote server at work that publishes financial data using using Ejabberd, XMPP, PubSub, Exmpp and Bosh on the server side and Strophe library on the web application front. I will describe a simplified implementation of the quote server using Yahoo Quotes.



Installation


Download Ejabberd and go through the installation wizad. You will be asked your host name, admin account/password and whether ejabberd would be running in a clustered environment. For this tutorial, we will be running ejabberd on a single. Once installed, you can start the ejabbered server using


 /Applications/ejabberd-2.1.3/bin/ejabberdctl start

As, I am using Mac, the actual path on your machine may be different. The ejabbered comes with a web baesd admin tool, that you can access using


 http://<your-host-name>:5280/admin


and you would be able to see available nodes, users, etc.








Registering Users


We will be creating two users: producer and consumer, where the former would be used for publishing stock quotes and latter would be used for subscribing quotes on the web side, i.e.,


 sudo /Applications/ejabberd-2.1.3/bin/ejabberdctl register producer  producer
sudo /Applications/ejabberd-2.1.3/bin/ejabberdctl register consumer consumer


Debuging with Psi


You can debug XMPP communications using a jabber client such as Psi, which you can download. After you download, you can install and specify your local hostname as a server, e.g.









You can then login using consumer@<your-host-name> with password consumer. As, we will be using PubSub protocol, you can discover available nodes or topics using General->Service Discovery from the menu, e.g.









Downloading Sample Code


I have stored all code needed for this example on http://github.com/bhatti/FQPubSub, that you can checkout using:


 git clone git@github.com:bhatti/FQPubSub.git

The sample code depends on exmpp, lhttpc, jsonerl, and yaws modules so after downloading the code, checkout dependent modules using



 git submodule init
git submodule update

Above commands will checkout dependent modules in deps directory.


Building Sample Code


Before building, ensure you have make and autoconf tools installed, then replace <paraclete.local> with your <your-host-name> in docroot/index.html and src/quote_utils.hrl. Then type following command



 make

to build all sample code and dependent libraries


Starting Web Server


Though, the web code including Srophe library and Javascript can be run directly in the browser, but you can start Yaws to serve the application as follows:


 erl -pa ebin deps/exmpp/ebin/ deps/lhttpc/ebin/ deps/yaws/ebin -boot start_sasl -run web_server start

Note, that the web server will be continuously running, so you can open a separate shell before typing above command.



Publishing Quotes


Create two separate shells and type following command in first shell:


   erl -pa ebin deps/exmpp/ebin/ deps/lhttpc/ebin/ deps/yaws/ebin -boot start_sasl -run quote_publisher start AAPL

and following command in second shell


   erl -pa ebin deps/exmpp/ebin/ deps/lhttpc/ebin/ deps/yaws/ebin -boot start_sasl -run quote_publisher start IBM

Above commands will start Erlang processes, that will poll Yahoo Quotes every second and publish the quotes on the node AAPL and IBM respectively.



Next point your browser to http://<your-host-name>:8000/, and add “IBM” and “AAPL” symbols, you would then see quotes for both symbols, e.g.


Code under the hood


Now that you are able to run the example, let’s take a look at the code how it works:


Client library for Yahoo Finance


Though, at work we use our own real time stock quote feed, but for this sample I implemented stock quote feed using Yahoo Finance. The src/yquote_client.hrl and src/yquote_client.erl define client API for accessing Yahoo finance service. Here is the Erlang code for requesting the quote using HTTP request and parsing it:



  1 %%%-------------------------------------------------------------------

2 %%% File : yquote_client.erl
3 %%% Author : Shahzad Bhatti

4 %%% Purpose : Wrapper Library for Yahoo Stock Quotes

5 %%% Created : May 8, 2010
6 %%%-------------------------------------------------------------------

7
8 -module(yquote_client).

9
10 -author('bhatti@plexobject.com').
11

12 -export([
13 quote/1
14 ]).

15
16 -record(quote, {
17 symbol,
18 price,
19 change,
20 volume,

21 avg_daily_volume,
22 stock_exchange,
23 market_cap,
24 book_value,
25 ebitda,
26 dividend_per_share,

27 dividend_yield,
28 earnings_per_share,
29 week_52_high,
30 week_52_low,
31 day_50_moving_avg,
32 day_200_moving_avg,

33 price_earnings_ratio,
34 price_earnings_growth_ratio,
35 price_sales_ratio,
36 price_book_ratio,
37 short_ratio}).
38

39

40
41 quote(Symbol) ->
42 inets:start(),
43 {ok,{_Status, _Headers, Response}} = http:request(get, {url(Symbol), []},

44 [{timeout, 5000}], [{sync, true}]),
45

46 Values = re:split(Response, "[,\r\n]"),
47 #quote{

48 symbol = list_to_binary(Symbol),
49 price = to_float(lists:nth(1, Values)),
50 change = to_float(lists:nth(2, Values)),
51 volume = to_integer(lists:nth(3, Values)),

52 avg_daily_volume = to_integer(lists:nth(4, Values)),
53 stock_exchange = lists:nth(5, Values), % to_string

54 market_cap = to_float(lists:nth(6, Values)), % B

55 book_value = to_float(lists:nth(7, Values)),
56 ebitda = to_float(lists:nth(8, Values)), % B

57 dividend_per_share = to_float(lists:nth(9, Values)),

58 dividend_yield = to_float(lists:nth(10, Values)),
59 earnings_per_share = to_float(lists:nth(11, Values)),
60 week_52_high = to_float(lists:nth(12, Values)),
61 week_52_low = to_float(lists:nth(13, Values)),

62 day_50_moving_avg = to_float(lists:nth(14, Values)),
63 day_200_moving_avg = to_float(lists:nth(15, Values)),
64 price_earnings_ratio = to_float(lists:nth(16, Values)),
65 price_earnings_growth_ratio = to_float(lists:nth(17, Values)),

66 price_sales_ratio = to_float(lists:nth(18, Values)),
67 price_book_ratio = to_float(lists:nth(19, Values)),
68 short_ratio = to_float(lists:nth(20, Values))}.
69

70 url(Symbol) ->

71 "http://finance.yahoo.com/d/quotes.csv?s=" ++ Symbol ++ "&f=l1c1va2xj1b4j4dyekjm3m4rr5p5p6s7".
72

73 to_float(<<"N/A">>) ->

74 -1;
75 to_float(Bin) ->
76 {Multiplier, Bin1} = case bin_ends_with(Bin, <<$B>>) of

77 true ->
78 {1000000000, bin_replace(Bin, <<$B>>, <<>>)};
79 false ->

80 case bin_ends_with(Bin, <<$M>>) of

81 true ->

82 {1000000, bin_replace(Bin, <<$M>>, <<>>)};
83 false ->
84 {1,Bin}

85 end

86 end,
87 L = binary_to_list(Bin1),
88 list_to_float(L) * Multiplier.

89

90
91

Note that I am omitting some code in above listing, as I just wanted to highlight HTTP request and parsing code.


Publishing the Stock Quote


I used exmpp library to communicate with the XMPP server in Erlang. Here is the code for publishing the quotes using Bosh/XMPP protocol:



  1 %%%-------------------------------------------------------------------

2 %%% File : quote_publisher.erl
3 %%% Author : Shahzad Bhatti

4 %%% Purpose : OTP server for publishing quotes

5 %%% Created : May 8, 2010
6 %%%-------------------------------------------------------------------

7 -module(quote_publisher).

8
9 -export([
10 start/1,
11 start/5,
12 stop/1]).
13

14 -export([init/5]).
15

16 -include_lib("quote_utils.hrl").
17
18 -record(state, {session, jid, service=?TEST_XMPP_PUBSUB, symbol}).

19
20 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

21 %% APIs
22 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

23 start(Symbol) ->

24 start(?TEST_XMPP_SERVER, ?TEST_XMPP_PORT, ?PRODUCER_USERNAME,
25 ?PRODUCER_PASSWORD, Symbol).

26

27 start(Host, Port, User, Password, Symbol) ->
28 spawn(?MODULE, init, [Host, Port, User, Password, Symbol]).

29

30 stop(Pid) ->
31 Pid ! stop.
32
33 init(Host, Port, User, Password, Symbol) ->

34 {ok, {MySession, MyJID}} = quote_utils:connect(Host, Port, User, Password),
35 State = #state{session=MySession, jid=MyJID, symbol = Symbol},

36 create_symbol_node(State),
37 loop(State).
38

39 loop(#state{session=MySession, jid=_MyJID, service = _Service,

40 symbol = _Symbol}=State) ->

41 receive
42 stop ->
43 quote_utils:disconnect(MySession);

44 Record = #received_packet{packet_type=message, raw_packet=_Packet} ->

45 loop(State);
46 Record ->

47 loop(State)
48 after 2000 ->

49 publish_quote(State),
50 loop(State)

51 end.
52

53 create_symbol_node(#state{session=MySession, jid=MyJID, service = Service,

54 symbol = Symbol}) ->

55 IQ = exmpp_client_pubsub:create_node(Service, Symbol),
56 PacketId = exmpp_session:send_packet(MySession, exmpp_stanza:set_sender(IQ, MyJID)),

57 PacketId2 = erlang:binary_to_list(PacketId),
58 receive #received_packet{id=PacketId2, raw_packet=Raw} ->

59 case exmpp_iq:is_error(Raw) of
60 true -> {error, Raw};
61 _ -> ok

62 end

63 end.
64
65 publish_quote(#state{session=MySession, jid=MyJID, service = Service, symbol = Symbol}) ->

66 Quote = yquote_client:quote(Symbol),
67 JsonQuote = ?record_to_json(quote, Quote),
68 M = exmpp_xml:element(?QUOTE_DATA),

69 IQ = exmpp_client_pubsub:publish(Service, Symbol, exmpp_xml:append_cdata(M,
70 JsonQuote)),
71 Xml = exmpp_stanza:set_id(exmpp_stanza:set_sender(IQ, MyJID), Symbol),

72 PacketId = exmpp_session:send_packet(MySession, exmpp_stanza:set_sender(IQ, MyJID)),
73 PacketId2 = erlang:binary_to_list(PacketId),

74 receive #received_packet{id=PacketId2, raw_packet=Raw} ->

75 case exmpp_iq:is_error(Raw) of

76 true -> error;
77 _ -> ok
78 end

79 end.

80
81
82

In above code, a process is created for each symbol, which periodically polls stock quote and publishes it to the XMPP node using pubsub/bosh protocol. Note that a unique node is created for each symbol and node must be created before anyone can publish or subscribe. Also, note that publish/subscribe APIs use request/ack protocol, so after sending the request, the process retrieves the acknowledgement of the request.



Here are some utility functions used by the publisher:


  1 -module(quote_utils).

2
3 -include_lib("quote_utils.hrl").
4

5 -export([
6 init_session/2,

7 connect/4,
8 disconnect/1]).
9

10 bosh_url(Host, Port) ->

11 "http://" ++ Host ++ ":" ++ integer_to_list(Port) ++ "/http-bind".
12

13

14 connect(Host, _Port, User, Password) ->
15 safe_start_apps(),

16 MySession = exmpp_session:start({1,0}),
17 exmpp_xml:start_parser(), %% Create XMPP ID (Session Key):

18 MyJID = exmpp_jid:make(User, Host, random),

19 %% Create a new session with basic (digest) authentication:
20 exmpp_session:auth_basic_digest(MySession, MyJID, Password),
21

22

23 {ok, _StreamId, _Features} = exmpp_session:connect_BOSH(MySession, bosh_url(Host, 5280), Host, []),

24 try quote_utils:init_session(MySession, Password)
25 catch

26 _:Error -> io:format("got error: ~p~n", [Error]), {error, Error}

27 end,
28 {ok, {MySession, MyJID}}.
29

30 init_session(MySession, Password) ->

31 %% Login with defined JID / Authentication:
32 try exmpp_session:login(MySession, "PLAIN")
33 catch

34 throw:{auth_error, 'not-authorized'} ->
35 %% Try creating a new user:
36 io:format("Register~n",[]),
37 %% In a real life client, we should trap error case here

38 %% and print the correct message.
39 exmpp_session:register_account(MySession, Password),
40 %% After registration, retry to login:

41 exmpp_session:login(MySession)
42 end,
43 %% We explicitely send presence:

44 exmpp_session:send_packet(MySession, exmpp_presence:set_status(exmpp_presence:available(), "Ready to publish!!!")),

45 ok.
46
47 disconnect(MySession) ->

48 exmpp_session:stop(MySession).
49

50 safe_start_apps() ->
51 try start_apps()
52 catch

53 _:Error -> io:format("apps already started : ~p~n", [Error]), {error, Error}

54 end.
55

56 start_apps() ->
57 ok = application:start(exmpp),
58 ok = application:start(crypto),
59 ok = application:start(ssl),

60 ok = application:start(lhttpc).
61


Note that above code auto-registers users, which is not recommended for production use.


Javascript code using Strophe library


The web application depends on jQuery, Strophe and Strophe Pubsub. These libraries are included in docroot directory that are imported by index.html. The Strophe library and ejabbered 2.1.3 version supports cross domain scripting so that bosh service here doesn’t need to be on the same domain/port, but it must have a /crossdomain.xml policy file that allows access from wherever index.html lives. The Javascript initializes the connection parameter as follows (you would have to change Host):



   1 <script type="text/javascript">

2 // The BOSH_SERVICE here doesn't need to be on the same domain/port, but

3 // it must have a /crossdomain.xml policy file that allows access from

4 // wherever crossdomain.html lives.
5 // TODO: REPLACE <paraclete.local> with your <host-name>

6 var HOST = 'paraclete.local';
7 var JID = 'consumer@' + HOST;

8 var PASSWORD = 'consumer';
9 var BOSH_SERVICE = 'http://' + HOST + ':5280/http-bind'; //'/xmpp-httpbind'

10 var PUBSUB = 'pubsub.' + HOST;
11 var connection = null;

12 var autoReconnect = true;
13 var hasQuotes = [];
14 var subscriptions = [];

15

16 function log(msg) {
17 $('#log').append('<div></div>').append(document.createTextNode(msg));

18 }
19

20 function rawInput(data) {
21 //log('RECV: ' + data);

22 }
23
24 function rawOutput(data) {
25 //log('SENT: ' + data);

26 }
27 function onQuote(stanza) {
28 //log('onQuote###### ' + stanza);

29 try {
30 $(stanza).find('event items item data').each(function(idx, elem) {
31 quote = jQuery.parseJSON($(elem).text());

32 //{"price":235.86,"change":-10.39,"volume":59857756,"avg_daily_volume":20775600,"stock_exchange":[78,97,115,100,97,113,78,77],"market_cap":2.146e+11,

33 //"book_value":43.257,"ebitda":1.5805e+10,"dividend_per_share":0.0,"dividend_yield":-1,"earnings_per_share":11.796,"week_52_high":272.46,"week_52_low":119.38,

34 //"day_50_moving_avg":245.206,"day_200_moving_avg":214.119,"price_earnings_ratio":20.88,"price_earnings_growth_ratio":1.05,"price_sales_ratio":4.38,

35 //"price_book_ratio":5.69,"short_ratio":0.7}
36 if (hasQuotes[quote.symbol] != undefined) {

37 $('price_' + quote.symbol).innerHTML = quote.price;
38 $('change_' + quote.symbol).innerHTML = quote.change;
39 $('volume_' + quote.symbol).innerHTML = quote.volume;

40 } else {
41 hasQuotes[quote.symbol] = true;
42 $('#quotesTable > tbody:last').append('<tr id="quote_' +

43 quote.symbol + '"><td>' + quote.symbol +
44 '</td><td id="price_' + quote.symbol + '">' + quote.price +

45 '</td><td id="change_' + quote.symbol + '" class="class_change_' + quote.symbol + '">' +
46 quote.change + '</td><td id="volume_' +

47 quote.symbol + '">' +
48 quote.volume + '</td></tr>');
49 }

50

51 if(quote.change < 0) {
52 $('.class_change_' + quote.symbol).css('color', 'red');

53 } else {
54 $('.class_change_' + quote.symbol).css('color', 'green');

55 }
56 });
57 } catch (e) {
58 log(e)

59 }
60 return true;
61 }
62

63 function handleSubscriptionChange (stanza) {
64 //log("***handleSubscriptionChange Received: " + stanza);

65 }
66
67 function onConnect(status) {
68 if (status == Strophe.Status.CONNECTING) {

69 log('Strophe is connecting.');
70 } else if (status == Strophe.Status.CONNFAIL) {
71 log('Strophe failed to connect.');

72 $('#connect').get(0).value = 'connect';
73 } else if (status == Strophe.Status.DISCONNECTING) {

74 log('Strophe is disconnecting.');
75 } else if (status == Strophe.Status.DISCONNECTED) {
76 if (autoReconnect) {

77 log( "Streaming disconnected. Trying to reconnect...", METHODNAME );
78 connection.connect($('#jid').get(0).value, $('#pass').get(0).value, onConnect);
79 log( "Streaming reconnected.", METHODNAME );

80 } else {
81 log('Strophe is disconnected.');
82 $('#connect').get(0).value = 'connect';

83 //publishEvent( "streamingDisconnected" );

84 }
85 } else if (status == Strophe.Status.CONNECTED) {

86 log('Strophe is connected.');
87 //log('QUOTE_BOT: Send a message to ' + connection.jid + ' to talk to me.');

88 connection.addHandler(onMessage, null, 'message', null, null, null);
89 connection.send($pres().tree());

90 publishEvent( "streamingConnected" );
91 }
92 }
93

94 function subscribe(symbol) {
95 if (subscriptions[symbol]) return;
96 try {

97 connection.pubsub.subscribe(JID, PUBSUB, symbol, [], onQuote, handleSubscriptionChange);
98 subscriptions[symbol] = true;
99 log("Subscribed to " + symbol);

100 } catch (e) {
101 alert(e)
102 }
103 }
104 function unsubscribe(symbol) {

105 if (!subscriptions[symbol]) return;
106 try {
107 connection.pubsub.unsubscribe(JID, PUBSUB, symbol, handleSubscriptionChange);
108 subscriptions[symbol] = false;

109 log("Unsubscribed from " + symbol);
110 } catch (e) {
111 alert(e)
112 }

113 }
114

115 function onMessage(msg) {
116 var to = msg.getAttribute('to');

117 var from = msg.getAttribute('from');
118 var type = msg.getAttribute('type');
119 var elems = msg.getElementsByTagName('body');

120

121 if (type == "chat" && elems.length > 0) {
122 var body = elems[0];

123 log('QUOTE_BOT: I got a message from ' + from + ': ' + Strophe.getText(body));
124 var reply = $msg({to: from, from: to, type: 'chat'}).cnode(Strophe.copyElement(body));
125 connection.send(reply.tree());

126 log('QUOTE_BOT: I sent ' + from + ': ' + Strophe.getText(body));
127 }
128 // we must return true to keep the handler alive.

129 // returning false would remove it after it finishes.

130 return true;
131 }
132
133 $(document).ready(function () {

134 connection = new Strophe.Connection(BOSH_SERVICE);
135 connection.rawInput = rawInput;
136 connection.rawOutput = rawOutput;
137 connection.connect(JID, PASSWORD, onConnect);
138 //connection.disconnect();

139 $('#add_symbol').bind('click', function () {
140 var symbol = $('#symbol').get(0).value;

141 subscribe(symbol);
142 });
143 });
144

145 </script>
146

When the document is loaded, the connection to the ejabberd server is established. Here is the form and table that is used to add subscription and display current quote information for the symbols:


  1 <form name='symbols'>

2 <label for='symbol'>Symbol:</label>
3 <input type='text' id='symbol'/>

4 <input type='button' id='add_symbol' value='add' />

5 </form>
6 <hr />
7 <div id='log'></div>

8 <table id="quotesTable" width="600" border="2" bordercolor="#333333">

9 <thead>
10 <tr>
11 <th>Symbol</th>

12 <th>Price</th>
13 <th>Change</th>

14 <th>Volume</th>

15 </tr>
16 </thead>

17 <tbody>
18 </tbody>

19 </table>

20

When the form is submitted, it calls subscribe method, which in turn sends request to the ejabbered server for subscription. When a new quote is received, it calls onQuote function, which inserts a row in the table when a new symbol is added or updates the quote information if it already exists.


Conclusion


The ejabberd, XMPP, exmpp, Bosh and Strophe provides a robust and mature solution for messaging and are especially suitable for web applications that want to build highly scalable and interactive applications. Though, above code is fairly simple, but same design principles can be used to support large number of stock quotes updates. As, we need to send stock quotes from tens of thousands symbols for every tick within a fraction of a second, the Erlang provides very scalable solution, where each symbol is simply served by an Erlang process. Finally, I am still learning more about Ejabberd’s clustering, security, and other features so that it can truly survive the production load, so I would love to hear any feedback you might have with similar solution.

No comments:

Post a Comment