Friday, August 13, 2010

Popular blogs entries from my blog (sitemap):


Monday, May 10, 2010

Building a stock quote server in Erlang using Ejabberd, XMPP, Bosh, Exmpp, Strophe and Yaws

Recently, I have been building a stock quote server at work that publishes financial data using using Ejabberd, XMPP, PubSub, Exmpp and Bosh on the server side and Strophe library on the web application front. I will describe a simplified implementation of the quote server using Yahoo Quotes.



Installation


Download Ejabberd and go through the installation wizad. You will be asked your host name, admin account/password and whether ejabberd would be running in a clustered environment. For this tutorial, we will be running ejabberd on a single. Once installed, you can start the ejabbered server using


 /Applications/ejabberd-2.1.3/bin/ejabberdctl start

As, I am using Mac, the actual path on your machine may be different. The ejabbered comes with a web baesd admin tool, that you can access using


 http://<your-host-name>:5280/admin


and you would be able to see available nodes, users, etc.








Registering Users


We will be creating two users: producer and consumer, where the former would be used for publishing stock quotes and latter would be used for subscribing quotes on the web side, i.e.,


 sudo /Applications/ejabberd-2.1.3/bin/ejabberdctl register producer  producer
sudo /Applications/ejabberd-2.1.3/bin/ejabberdctl register consumer consumer


Debuging with Psi


You can debug XMPP communications using a jabber client such as Psi, which you can download. After you download, you can install and specify your local hostname as a server, e.g.









You can then login using consumer@<your-host-name> with password consumer. As, we will be using PubSub protocol, you can discover available nodes or topics using General->Service Discovery from the menu, e.g.









Downloading Sample Code


I have stored all code needed for this example on http://github.com/bhatti/FQPubSub, that you can checkout using:


 git clone git@github.com:bhatti/FQPubSub.git

The sample code depends on exmpp, lhttpc, jsonerl, and yaws modules so after downloading the code, checkout dependent modules using



 git submodule init
git submodule update

Above commands will checkout dependent modules in deps directory.


Building Sample Code


Before building, ensure you have make and autoconf tools installed, then replace <paraclete.local> with your <your-host-name> in docroot/index.html and src/quote_utils.hrl. Then type following command



 make

to build all sample code and dependent libraries


Starting Web Server


Though, the web code including Srophe library and Javascript can be run directly in the browser, but you can start Yaws to serve the application as follows:


 erl -pa ebin deps/exmpp/ebin/ deps/lhttpc/ebin/ deps/yaws/ebin -boot start_sasl -run web_server start

Note, that the web server will be continuously running, so you can open a separate shell before typing above command.



Publishing Quotes


Create two separate shells and type following command in first shell:


   erl -pa ebin deps/exmpp/ebin/ deps/lhttpc/ebin/ deps/yaws/ebin -boot start_sasl -run quote_publisher start AAPL

and following command in second shell


   erl -pa ebin deps/exmpp/ebin/ deps/lhttpc/ebin/ deps/yaws/ebin -boot start_sasl -run quote_publisher start IBM

Above commands will start Erlang processes, that will poll Yahoo Quotes every second and publish the quotes on the node AAPL and IBM respectively.



Next point your browser to http://<your-host-name>:8000/, and add “IBM” and “AAPL” symbols, you would then see quotes for both symbols, e.g.


Code under the hood


Now that you are able to run the example, let’s take a look at the code how it works:


Client library for Yahoo Finance


Though, at work we use our own real time stock quote feed, but for this sample I implemented stock quote feed using Yahoo Finance. The src/yquote_client.hrl and src/yquote_client.erl define client API for accessing Yahoo finance service. Here is the Erlang code for requesting the quote using HTTP request and parsing it:



  1 %%%-------------------------------------------------------------------

2 %%% File : yquote_client.erl
3 %%% Author : Shahzad Bhatti

4 %%% Purpose : Wrapper Library for Yahoo Stock Quotes

5 %%% Created : May 8, 2010
6 %%%-------------------------------------------------------------------

7
8 -module(yquote_client).

9
10 -author('bhatti@plexobject.com').
11

12 -export([
13 quote/1
14 ]).

15
16 -record(quote, {
17 symbol,
18 price,
19 change,
20 volume,

21 avg_daily_volume,
22 stock_exchange,
23 market_cap,
24 book_value,
25 ebitda,
26 dividend_per_share,

27 dividend_yield,
28 earnings_per_share,
29 week_52_high,
30 week_52_low,
31 day_50_moving_avg,
32 day_200_moving_avg,

33 price_earnings_ratio,
34 price_earnings_growth_ratio,
35 price_sales_ratio,
36 price_book_ratio,
37 short_ratio}).
38

39

40
41 quote(Symbol) ->
42 inets:start(),
43 {ok,{_Status, _Headers, Response}} = http:request(get, {url(Symbol), []},

44 [{timeout, 5000}], [{sync, true}]),
45

46 Values = re:split(Response, "[,\r\n]"),
47 #quote{

48 symbol = list_to_binary(Symbol),
49 price = to_float(lists:nth(1, Values)),
50 change = to_float(lists:nth(2, Values)),
51 volume = to_integer(lists:nth(3, Values)),

52 avg_daily_volume = to_integer(lists:nth(4, Values)),
53 stock_exchange = lists:nth(5, Values), % to_string

54 market_cap = to_float(lists:nth(6, Values)), % B

55 book_value = to_float(lists:nth(7, Values)),
56 ebitda = to_float(lists:nth(8, Values)), % B

57 dividend_per_share = to_float(lists:nth(9, Values)),

58 dividend_yield = to_float(lists:nth(10, Values)),
59 earnings_per_share = to_float(lists:nth(11, Values)),
60 week_52_high = to_float(lists:nth(12, Values)),
61 week_52_low = to_float(lists:nth(13, Values)),

62 day_50_moving_avg = to_float(lists:nth(14, Values)),
63 day_200_moving_avg = to_float(lists:nth(15, Values)),
64 price_earnings_ratio = to_float(lists:nth(16, Values)),
65 price_earnings_growth_ratio = to_float(lists:nth(17, Values)),

66 price_sales_ratio = to_float(lists:nth(18, Values)),
67 price_book_ratio = to_float(lists:nth(19, Values)),
68 short_ratio = to_float(lists:nth(20, Values))}.
69

70 url(Symbol) ->

71 "http://finance.yahoo.com/d/quotes.csv?s=" ++ Symbol ++ "&f=l1c1va2xj1b4j4dyekjm3m4rr5p5p6s7".
72

73 to_float(<<"N/A">>) ->

74 -1;
75 to_float(Bin) ->
76 {Multiplier, Bin1} = case bin_ends_with(Bin, <<$B>>) of

77 true ->
78 {1000000000, bin_replace(Bin, <<$B>>, <<>>)};
79 false ->

80 case bin_ends_with(Bin, <<$M>>) of

81 true ->

82 {1000000, bin_replace(Bin, <<$M>>, <<>>)};
83 false ->
84 {1,Bin}

85 end

86 end,
87 L = binary_to_list(Bin1),
88 list_to_float(L) * Multiplier.

89

90
91

Note that I am omitting some code in above listing, as I just wanted to highlight HTTP request and parsing code.


Publishing the Stock Quote


I used exmpp library to communicate with the XMPP server in Erlang. Here is the code for publishing the quotes using Bosh/XMPP protocol:



  1 %%%-------------------------------------------------------------------

2 %%% File : quote_publisher.erl
3 %%% Author : Shahzad Bhatti

4 %%% Purpose : OTP server for publishing quotes

5 %%% Created : May 8, 2010
6 %%%-------------------------------------------------------------------

7 -module(quote_publisher).

8
9 -export([
10 start/1,
11 start/5,
12 stop/1]).
13

14 -export([init/5]).
15

16 -include_lib("quote_utils.hrl").
17
18 -record(state, {session, jid, service=?TEST_XMPP_PUBSUB, symbol}).

19
20 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

21 %% APIs
22 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

23 start(Symbol) ->

24 start(?TEST_XMPP_SERVER, ?TEST_XMPP_PORT, ?PRODUCER_USERNAME,
25 ?PRODUCER_PASSWORD, Symbol).

26

27 start(Host, Port, User, Password, Symbol) ->
28 spawn(?MODULE, init, [Host, Port, User, Password, Symbol]).

29

30 stop(Pid) ->
31 Pid ! stop.
32
33 init(Host, Port, User, Password, Symbol) ->

34 {ok, {MySession, MyJID}} = quote_utils:connect(Host, Port, User, Password),
35 State = #state{session=MySession, jid=MyJID, symbol = Symbol},

36 create_symbol_node(State),
37 loop(State).
38

39 loop(#state{session=MySession, jid=_MyJID, service = _Service,

40 symbol = _Symbol}=State) ->

41 receive
42 stop ->
43 quote_utils:disconnect(MySession);

44 Record = #received_packet{packet_type=message, raw_packet=_Packet} ->

45 loop(State);
46 Record ->

47 loop(State)
48 after 2000 ->

49 publish_quote(State),
50 loop(State)

51 end.
52

53 create_symbol_node(#state{session=MySession, jid=MyJID, service = Service,

54 symbol = Symbol}) ->

55 IQ = exmpp_client_pubsub:create_node(Service, Symbol),
56 PacketId = exmpp_session:send_packet(MySession, exmpp_stanza:set_sender(IQ, MyJID)),

57 PacketId2 = erlang:binary_to_list(PacketId),
58 receive #received_packet{id=PacketId2, raw_packet=Raw} ->

59 case exmpp_iq:is_error(Raw) of
60 true -> {error, Raw};
61 _ -> ok

62 end

63 end.
64
65 publish_quote(#state{session=MySession, jid=MyJID, service = Service, symbol = Symbol}) ->

66 Quote = yquote_client:quote(Symbol),
67 JsonQuote = ?record_to_json(quote, Quote),
68 M = exmpp_xml:element(?QUOTE_DATA),

69 IQ = exmpp_client_pubsub:publish(Service, Symbol, exmpp_xml:append_cdata(M,
70 JsonQuote)),
71 Xml = exmpp_stanza:set_id(exmpp_stanza:set_sender(IQ, MyJID), Symbol),

72 PacketId = exmpp_session:send_packet(MySession, exmpp_stanza:set_sender(IQ, MyJID)),
73 PacketId2 = erlang:binary_to_list(PacketId),

74 receive #received_packet{id=PacketId2, raw_packet=Raw} ->

75 case exmpp_iq:is_error(Raw) of

76 true -> error;
77 _ -> ok
78 end

79 end.

80
81
82

In above code, a process is created for each symbol, which periodically polls stock quote and publishes it to the XMPP node using pubsub/bosh protocol. Note that a unique node is created for each symbol and node must be created before anyone can publish or subscribe. Also, note that publish/subscribe APIs use request/ack protocol, so after sending the request, the process retrieves the acknowledgement of the request.



Here are some utility functions used by the publisher:


  1 -module(quote_utils).

2
3 -include_lib("quote_utils.hrl").
4

5 -export([
6 init_session/2,

7 connect/4,
8 disconnect/1]).
9

10 bosh_url(Host, Port) ->

11 "http://" ++ Host ++ ":" ++ integer_to_list(Port) ++ "/http-bind".
12

13

14 connect(Host, _Port, User, Password) ->
15 safe_start_apps(),

16 MySession = exmpp_session:start({1,0}),
17 exmpp_xml:start_parser(), %% Create XMPP ID (Session Key):

18 MyJID = exmpp_jid:make(User, Host, random),

19 %% Create a new session with basic (digest) authentication:
20 exmpp_session:auth_basic_digest(MySession, MyJID, Password),
21

22

23 {ok, _StreamId, _Features} = exmpp_session:connect_BOSH(MySession, bosh_url(Host, 5280), Host, []),

24 try quote_utils:init_session(MySession, Password)
25 catch

26 _:Error -> io:format("got error: ~p~n", [Error]), {error, Error}

27 end,
28 {ok, {MySession, MyJID}}.
29

30 init_session(MySession, Password) ->

31 %% Login with defined JID / Authentication:
32 try exmpp_session:login(MySession, "PLAIN")
33 catch

34 throw:{auth_error, 'not-authorized'} ->
35 %% Try creating a new user:
36 io:format("Register~n",[]),
37 %% In a real life client, we should trap error case here

38 %% and print the correct message.
39 exmpp_session:register_account(MySession, Password),
40 %% After registration, retry to login:

41 exmpp_session:login(MySession)
42 end,
43 %% We explicitely send presence:

44 exmpp_session:send_packet(MySession, exmpp_presence:set_status(exmpp_presence:available(), "Ready to publish!!!")),

45 ok.
46
47 disconnect(MySession) ->

48 exmpp_session:stop(MySession).
49

50 safe_start_apps() ->
51 try start_apps()
52 catch

53 _:Error -> io:format("apps already started : ~p~n", [Error]), {error, Error}

54 end.
55

56 start_apps() ->
57 ok = application:start(exmpp),
58 ok = application:start(crypto),
59 ok = application:start(ssl),

60 ok = application:start(lhttpc).
61


Note that above code auto-registers users, which is not recommended for production use.


Javascript code using Strophe library


The web application depends on jQuery, Strophe and Strophe Pubsub. These libraries are included in docroot directory that are imported by index.html. The Strophe library and ejabbered 2.1.3 version supports cross domain scripting so that bosh service here doesn’t need to be on the same domain/port, but it must have a /crossdomain.xml policy file that allows access from wherever index.html lives. The Javascript initializes the connection parameter as follows (you would have to change Host):



   1 <script type="text/javascript">

2 // The BOSH_SERVICE here doesn't need to be on the same domain/port, but

3 // it must have a /crossdomain.xml policy file that allows access from

4 // wherever crossdomain.html lives.
5 // TODO: REPLACE <paraclete.local> with your <host-name>

6 var HOST = 'paraclete.local';
7 var JID = 'consumer@' + HOST;

8 var PASSWORD = 'consumer';
9 var BOSH_SERVICE = 'http://' + HOST + ':5280/http-bind'; //'/xmpp-httpbind'

10 var PUBSUB = 'pubsub.' + HOST;
11 var connection = null;

12 var autoReconnect = true;
13 var hasQuotes = [];
14 var subscriptions = [];

15

16 function log(msg) {
17 $('#log').append('<div></div>').append(document.createTextNode(msg));

18 }
19

20 function rawInput(data) {
21 //log('RECV: ' + data);

22 }
23
24 function rawOutput(data) {
25 //log('SENT: ' + data);

26 }
27 function onQuote(stanza) {
28 //log('onQuote###### ' + stanza);

29 try {
30 $(stanza).find('event items item data').each(function(idx, elem) {
31 quote = jQuery.parseJSON($(elem).text());

32 //{"price":235.86,"change":-10.39,"volume":59857756,"avg_daily_volume":20775600,"stock_exchange":[78,97,115,100,97,113,78,77],"market_cap":2.146e+11,

33 //"book_value":43.257,"ebitda":1.5805e+10,"dividend_per_share":0.0,"dividend_yield":-1,"earnings_per_share":11.796,"week_52_high":272.46,"week_52_low":119.38,

34 //"day_50_moving_avg":245.206,"day_200_moving_avg":214.119,"price_earnings_ratio":20.88,"price_earnings_growth_ratio":1.05,"price_sales_ratio":4.38,

35 //"price_book_ratio":5.69,"short_ratio":0.7}
36 if (hasQuotes[quote.symbol] != undefined) {

37 $('price_' + quote.symbol).innerHTML = quote.price;
38 $('change_' + quote.symbol).innerHTML = quote.change;
39 $('volume_' + quote.symbol).innerHTML = quote.volume;

40 } else {
41 hasQuotes[quote.symbol] = true;
42 $('#quotesTable > tbody:last').append('<tr id="quote_' +

43 quote.symbol + '"><td>' + quote.symbol +
44 '</td><td id="price_' + quote.symbol + '">' + quote.price +

45 '</td><td id="change_' + quote.symbol + '" class="class_change_' + quote.symbol + '">' +
46 quote.change + '</td><td id="volume_' +

47 quote.symbol + '">' +
48 quote.volume + '</td></tr>');
49 }

50

51 if(quote.change < 0) {
52 $('.class_change_' + quote.symbol).css('color', 'red');

53 } else {
54 $('.class_change_' + quote.symbol).css('color', 'green');

55 }
56 });
57 } catch (e) {
58 log(e)

59 }
60 return true;
61 }
62

63 function handleSubscriptionChange (stanza) {
64 //log("***handleSubscriptionChange Received: " + stanza);

65 }
66
67 function onConnect(status) {
68 if (status == Strophe.Status.CONNECTING) {

69 log('Strophe is connecting.');
70 } else if (status == Strophe.Status.CONNFAIL) {
71 log('Strophe failed to connect.');

72 $('#connect').get(0).value = 'connect';
73 } else if (status == Strophe.Status.DISCONNECTING) {

74 log('Strophe is disconnecting.');
75 } else if (status == Strophe.Status.DISCONNECTED) {
76 if (autoReconnect) {

77 log( "Streaming disconnected. Trying to reconnect...", METHODNAME );
78 connection.connect($('#jid').get(0).value, $('#pass').get(0).value, onConnect);
79 log( "Streaming reconnected.", METHODNAME );

80 } else {
81 log('Strophe is disconnected.');
82 $('#connect').get(0).value = 'connect';

83 //publishEvent( "streamingDisconnected" );

84 }
85 } else if (status == Strophe.Status.CONNECTED) {

86 log('Strophe is connected.');
87 //log('QUOTE_BOT: Send a message to ' + connection.jid + ' to talk to me.');

88 connection.addHandler(onMessage, null, 'message', null, null, null);
89 connection.send($pres().tree());

90 publishEvent( "streamingConnected" );
91 }
92 }
93

94 function subscribe(symbol) {
95 if (subscriptions[symbol]) return;
96 try {

97 connection.pubsub.subscribe(JID, PUBSUB, symbol, [], onQuote, handleSubscriptionChange);
98 subscriptions[symbol] = true;
99 log("Subscribed to " + symbol);

100 } catch (e) {
101 alert(e)
102 }
103 }
104 function unsubscribe(symbol) {

105 if (!subscriptions[symbol]) return;
106 try {
107 connection.pubsub.unsubscribe(JID, PUBSUB, symbol, handleSubscriptionChange);
108 subscriptions[symbol] = false;

109 log("Unsubscribed from " + symbol);
110 } catch (e) {
111 alert(e)
112 }

113 }
114

115 function onMessage(msg) {
116 var to = msg.getAttribute('to');

117 var from = msg.getAttribute('from');
118 var type = msg.getAttribute('type');
119 var elems = msg.getElementsByTagName('body');

120

121 if (type == "chat" && elems.length > 0) {
122 var body = elems[0];

123 log('QUOTE_BOT: I got a message from ' + from + ': ' + Strophe.getText(body));
124 var reply = $msg({to: from, from: to, type: 'chat'}).cnode(Strophe.copyElement(body));
125 connection.send(reply.tree());

126 log('QUOTE_BOT: I sent ' + from + ': ' + Strophe.getText(body));
127 }
128 // we must return true to keep the handler alive.

129 // returning false would remove it after it finishes.

130 return true;
131 }
132
133 $(document).ready(function () {

134 connection = new Strophe.Connection(BOSH_SERVICE);
135 connection.rawInput = rawInput;
136 connection.rawOutput = rawOutput;
137 connection.connect(JID, PASSWORD, onConnect);
138 //connection.disconnect();

139 $('#add_symbol').bind('click', function () {
140 var symbol = $('#symbol').get(0).value;

141 subscribe(symbol);
142 });
143 });
144

145 </script>
146

When the document is loaded, the connection to the ejabberd server is established. Here is the form and table that is used to add subscription and display current quote information for the symbols:


  1 <form name='symbols'>

2 <label for='symbol'>Symbol:</label>
3 <input type='text' id='symbol'/>

4 <input type='button' id='add_symbol' value='add' />

5 </form>
6 <hr />
7 <div id='log'></div>

8 <table id="quotesTable" width="600" border="2" bordercolor="#333333">

9 <thead>
10 <tr>
11 <th>Symbol</th>

12 <th>Price</th>
13 <th>Change</th>

14 <th>Volume</th>

15 </tr>
16 </thead>

17 <tbody>
18 </tbody>

19 </table>

20

When the form is submitted, it calls subscribe method, which in turn sends request to the ejabbered server for subscription. When a new quote is received, it calls onQuote function, which inserts a row in the table when a new symbol is added or updates the quote information if it already exists.


Conclusion


The ejabberd, XMPP, exmpp, Bosh and Strophe provides a robust and mature solution for messaging and are especially suitable for web applications that want to build highly scalable and interactive applications. Though, above code is fairly simple, but same design principles can be used to support large number of stock quotes updates. As, we need to send stock quotes from tens of thousands symbols for every tick within a fraction of a second, the Erlang provides very scalable solution, where each symbol is simply served by an Erlang process. Finally, I am still learning more about Ejabberd’s clustering, security, and other features so that it can truly survive the production load, so I would love to hear any feedback you might have with similar solution.

Saturday, April 24, 2010

Favorite fifteen tips from “Rework” book by Jason Fried and DHH

I have been a long admirer of Jason Fried of 37Signals and read his first book Getting Real. Jason along with DHH have put forth many of the ideas from that book along with other ideas from their blog Signal vs. Noise into a new book Rework. I just finished reading it and though it reiterates many ideas from the earlier book “Getting Real” and their blogs, it’s worth re-reading those ideas as many of business companies today still runs on old fallacies. The book consists of thirteen sections and over eighty ideas, here are my favorite ideas from the book:


Failure is not a rite of passage




I have heared the advice from startup folks about “Fail early and fail often.” On the contrary, this book shows people who learn from mistakes will make new mistakes, instead success shows what actually works. Another related avice in the book is “Reason to quit”, which shows when you can quit and choose something else. When I read Founders at Work: Stories of Startups’ Early Days, it also showed that most startups don’t stick to their original ideas and move to other ideas based on early feedback.



Planning is Guessing




This is related to another advice from the book “Your estimates suck” as Planning and Estimation is hard especially in software business. I have written about Software Estimation in my earlier blogs, however most places still equate estimates with commitments. Jason and DHH reminds us again that estimates are just guesses that were made based on the best information available at the time.


Workaholism




This is another unorthodox advice that is contradictory to how most software projects are run. Most companies measure workers’ dedication on how many hours he/she put even when they are not actually producing. This is also common when managers treat estimates as commitments and refuse to admit reality when things change. We are all familiar with iron triangle of schedule/cost/functionality or sometime referred to as cost/quality/schedule or cost/resourcs/schedule. Often business folks are unwilling to change schedule and functionality, which often requires working late hours. This is also related to Heroism, which I have blogged before and go to sleep, as workholism can result in sleep deprivation, which reduces creativity and productivity.



Scratch your own itch


Most successful businesses started with hobbies or personal interests or problems and there are tons of examples of this. This advice is also related to eat your own dog food, though not mentioned in this book.


Start making something




Jason and DHH reminds us another great point that ideas are cheap and the real question is how well you execute them.


Draw a line in the sand




One of the key characteristics of Ruby on Rails software that DHH produced is having strong opinions that limits variations. Similarly, 37Signals is known for their simple design and limited features. You can differentiate yourself from others by standing for something.



Outside money is Plan Z




Both DHH and Jason often talked about downside of getting money from venture capitalists and I agree that these days you can start most software startups with minimal money and raising money can be very distracting. Another related tip that “building a flip is building to flop”, which is often what startup founders hope to get out.


Start at the epicenter




This book advices you to focus on your core product. Though, this book briefly mentiosn this topic but there is a great presentation of Video of Geoffrey Moore at Business of Software 2009 that talks about similar topic. This advice is also reated to other tips from the book such as “don’t copy”, “decommoditize your product”, “focus on you instead of they”, i.e., focus on your core strengths and not your competitors.



Focus on what won’t change




This is great advice for building business that will last. I remember when I started working at Amazon, we were told the core values of Amazon that included having a large selection, cheap prices, customer service and everything we built started from outside-in focus, i.e., it started with customers.


Get it out here




This is similar to common advice from the startup and agile community, i.e. release early and release often.


Interruption is the enemy of productivity





More and more research is showing that our brain can’t focus on onething at a time, and constant interruption and multi-tasking hampers your productivity. This is also somewhat related to office space is setup as many agile practices encourage more open space with pair programming and I have found that it prevents concentration. I found that private office offered from Organizational Patterns of Agile Software Development provides less interruption.


Meetings are toxic




This is another hallmark idea of 37Signals and the book contains a number of tips on making your productive such as fixed time, fewer people, clear agenda, beginning with a specific problem and ending with action items and making someone responsible for them.


Good enough is fine





37Signals is known for their simple design and fewer features. This is related other advice in the book such as “embrace the constraints”, “throw less at the problem”, “underdo your competitor”, “say no” and “be a curator”. When you have limited resources, you can become more creative. Also, you are better off building half a product, not a half assed product.


Make tiny decisions




The authors encourage to make tiny decisions as big decisions are hard to make and hard to change. This advice is related to other tips such as “decisions are progress”, which encourages you to always make progress and “quick wins”, which encourages you to build momentum by accomplishing small tasks.


Build an audience




The authors encourage to build audience that come back to you by writing blogs, tweets and speaking. This is also reated to “sell your by-products”, “emulate chefs”, “emulate drug dealers” and “out-teach your competitors”.



Conclusion


Though, I skipped many gems of advice on hiring, culture and marketing but I suggest you read the book to build long lasting and successful business.

Thursday, March 18, 2010

Smarter Email appender for Log4j with support of duplicate-removal, summary-report and JMX

I have been using SMTPAppender for a while to notify developers when something breaks on the production site and for most part it works well. However, due to some misconfiguration or service crash it can result in large number of emails. I was struck by similar problem at work when my email box suddently got tons of emails from the production site. So I decided to write a bit intelligent email appender. My goals for the appender were:

* Throttle emails based on some configured time
* Remove duplicate emails
* Support JMX for dynamic configuration
* Provide summary report with count of errors and their timings

I created FilteredSMTPAppender class that extends SMTPAppender. The FilteredSMTPAppender defines a nested class Stats for keeping track of errors. For each unique exception, it creates an instance of Stats, that stores the first and last occurrence of this exception as well as count. The Stats class uses hash of stack trace to identify unique exceptions, however it ignores first line, which often stores some dynamic information. FilteredSMTPAppender registers iteslf as MBean so that it can be configured at runtime. It overrides append method to capture the event and overrides checkEntryConditions to add filtering. It also changes the layout so that the summary count of error messages are added to the footer of email message.

The FilteredSMTPAppender uses a number of helper classes such as ServiceJMXBeanImpl for MBean definition, LRUSortedList to keep fixed cache of exceptions. Here is listing of LRUSortedList and ServiceJMXBeanImpl.

Listing of FilteredSMTPAppender.java




1 package com.plexobject.log;

2
3 import java.beans.PropertyChangeEvent;
4 import java.beans.PropertyChangeListener;
5 import java.util.Comparator;

6 import java.util.Date;
7
8 import javax.mail.MessagingException;
9

10 import org.apache.commons.lang.builder.EqualsBuilder;
11 import org.apache.commons.lang.time.FastDateFormat;
12
13 import org.apache.log4j.Layout;

14 import org.apache.log4j.net.SMTPAppender;
15 import org.apache.log4j.spi.LoggingEvent;
16
17 import com.plexobject.jmx.JMXRegistrar;

18 import com.plexobject.jmx.impl.ServiceJMXBeanImpl;
19 import com.plexobject.metrics.Metric;
20 import com.plexobject.metrics.Timer;

21 import com.plexobject.util.Configuration;
22 import com.plexobject.util.LRUSortedList;
23
24 public class FilteredSMTPAppender extends SMTPAppender {

25
26 private static final String SMTP_FILTER_MIN_DUPLICATE_INTERVAL_SECS = "smtp.filter.min.duplicate.interval.secs";
27 private static final int MAX_STATS = Configuration.getInstance().getInteger("smtp.filter.max", 100);

28 private static int MIN_DUPLICATE_EMAILS_INTERVAL = Configuration.getInstance().getInteger(SMTP_FILTER_MIN_DUPLICATE_INTERVAL_SECS,
29 60); // 1 minute
30 private static final Date STARTED = new Date();

31 private static final FastDateFormat DATE_FMT = FastDateFormat.getInstance("MM/dd/yy HH:mm");
32
33 final static class Stats implements Comparable {

34
35 final int checksum;
36 final long firstSeen;

37 long lastSeen;
38 long lastSent;
39 int numSeen;

40 int numEmails;
41
42 Stats(LoggingEvent event) {
43 StringBuilder sb = new StringBuilder();

44 String[] trace = event.getThrowableStrRep();
45 for (int i = 1; i < checksum =" sb.toString().hashCode();" firstseen =" lastSeen" numseen =" 1;" current =" System.currentTimeMillis();" elapsed =" current" lastseen =" current;"> MIN_DUPLICATE_EMAILS_INTERVAL * 1000) {
62 lastSent = current;

63 numEmails++;
64 return true;
65 } else {

66 return false;
67 }
68 }
69

70 @Override
71 public boolean equals(Object object) {
72 if (!(object instanceof Stats)) {

73 return false;
74 }
75 Stats rhs = (Stats) object;
76 return new EqualsBuilder().append(this.checksum, rhs.checksum).isEquals();

77
78 }
79
80 @Override
81 public int hashCode() {

82 return checksum;
83 }
84
85 @Override

86 public String toString() {
87 return " (" + checksum + ") occurred " + numSeen + " times, " + numEmails + " # of emails, first @" + DATE_FMT.format(new Date(firstSeen)) + ", last @" + DATE_FMT.format(new Date(lastSeen)) + " since server started @" + DATE_FMT.format(STARTED);

88 }
89
90 @Override
91 public int compareTo(Stats other) {

92 return checksum - other.checksum;
93 }
94 }
95

96 final static class StatsCmp implements Comparator {
97

98 @Override
99 public int compare(Stats first, Stats second) {
100 return first.checksum - second.checksum;

101 }
102 }
103 private static final LRUSortedList STATS_LIST = new LRUSortedList(

104 MAX_STATS, new StatsCmp());
105 private LoggingEvent event;
106 private ServiceJMXBeanImpl mbean;
107 private Layout layout;

108
109 public FilteredSMTPAppender() {
110 mbean = JMXRegistrar.getInstance().register(getClass());
111 mbean.addPropertyChangeListener(new PropertyChangeListener() {
112

113 @Override
114 public void propertyChange(PropertyChangeEvent event) {
115 try {
116 if (event != null && SMTP_FILTER_MIN_DUPLICATE_INTERVAL_SECS.equalsIgnoreCase(event.getPropertyName())) {

117 MIN_DUPLICATE_EMAILS_INTERVAL = Integer.parseInt((String) event.getNewValue());
118 }
119 } catch (Exception e) {
120 e.printStackTrace();
121 }

122 }
123 });
124
125 }
126
127 public void append(LoggingEvent event) {

128 this.event = event;
129 if (layout == null) {
130 layout = getLayout();
131 }

132 super.append(event);
133 }
134
135 protected boolean checkEntryConditions() {
136 final Timer timer = Metric.newTimer(getClass().getSimpleName() + ".checkEntryConditions");

137 try {
138 boolean check = true;
139 if (event != null) {

140 Stats newStats = new Stats(event);
141 Stats stats = STATS_LIST.get(newStats);
142 if (stats == null) {
143 stats = newStats;

144 STATS_LIST.add(stats);
145 } else {
146 check = stats.check();
147 }
148 if (check) {

149 setMessageFooter(stats);
150 }
151 }
152 return check && super.checkEntryConditions();

153 } finally {
154 timer.stop();
155 }
156 }
157

158 private void setMessageFooter(Stats stats) {
159 String message = event.getMessage().toString();
160
161 final String footer = "\n\n-------------------------\n" + message + " - " + stats;

162
163 if (layout != null) {
164 setLayout(new Layout() {
165

166 @Override
167 public void activateOptions() {
168 layout.activateOptions();
169
170 }

171
172 @Override
173 public String format(LoggingEvent evt) {
174 return layout.format(evt);
175 }

176
177 @Override
178 public String getFooter() {
179 return footer;
180 }

181
182 @Override
183 public boolean ignoresThrowable() {
184 return layout.ignoresThrowable();

185 }
186 });
187 }
188 }
189 }
190

191


Listing of ServiceJMXBeanImpl.java











1 package com.plexobject.util;

2
3 import java.util.ArrayList;
4 import java.util.Collection;
5 import java.util.Collections;

6 import java.util.Comparator;
7 import java.util.Iterator;
8 import java.util.List;

9 import java.util.ListIterator;
10
11 import org.apache.log4j.Logger;
12

13
14 public class LRUSortedList implements List {

15 private static final Logger LOGGER = Logger.getLogger(LRUSortedList.class);
16 private final int max;

17 private final Comparator comparator;
18
19 private final List> list = new ArrayList>();

20 private final List> timestamps = new ArrayList>();

21
22 // comparator to sort by timestamp
23 private static final Comparator> CMP = new Comparator>() {

24 @Override
25 public int compare(Pair first, Pair second) {

26 if (first.getFirst() <> second.getFirst()) {

29 return 1;
30 } else {
31 return 0;

32 }
33 }
34 };
35
36 public LRUSortedList(int max, Comparator comparator) {

37 this.max = max;
38 this.comparator = comparator;
39 }
40

41 @Override
42 public boolean add(T e) {
43 if (list.size() > max) {

44 removeOldest();
45 }
46 // add object
47 long timestamp = System.nanoTime();

48 int insertionIdx = Collections.binarySearch(this, e, comparator);
49 if (insertionIdx < insertionidx =" (-insertionIdx)" t="">(timestamp, e));
52 } else {

53 // found
54 list.set(insertionIdx, new Pair(timestamp, e));
55 }

56
57 // as timestamps are sorted, we just remove the oldest (first)
58 if (timestamps.size() > max) {

59 timestamps.remove(0);
60 }
61 // update timestamp
62 Pair t = new Pair(timestamp, insertionIdx);

63 timestamps.add(t);
64 return true;
65 }
66

67 @Override
68 public void add(int index, T element) {
69 throw new UnsupportedOperationException(

70 "can't add element at arbitrary index, must use add to keep sorted order");
71 }
72
73 @Override

74 public boolean addAll(Collection c) {
75 for (T e : c) {

76 add(e);
77 }
78 return c.size() > 0;
79 }

80
81 @Override
82 public boolean addAll(int index, Collection c) {

83 throw new UnsupportedOperationException(
84 "can't add element at arbitrary index, must use addAll to keep sorted order");
85 }

86
87 @Override
88 public void clear() {
89 list.clear();

90 }
91
92 @SuppressWarnings("unchecked")
93 @Override

94 public boolean contains(Object e) {
95 if (e == null) {
96 return false;

97 }
98 try {
99 return Collections.binarySearch(this, (T) e, comparator) >= 0;

100 } catch (ClassCastException ex) {
101 LOGGER.error("Unexpected type for contains "
102 + e.getClass().getName() + ": " + e);

103 return false;
104 }
105 }
106
107 @Override

108 public boolean containsAll(Collection c) {
109 for (Object e : c) {
110 if (!contains(e)) {

111 return false;
112 }
113 }
114 return true;

115 }
116
117 @Override
118 public T get(int index) {
119 Pair e = list.get(index);

120 return e != null ? e.getSecond() : null;
121 }
122
123 public T get(Object e) {

124 int ndx = indexOf(e);
125 if (ndx >= 0) {
126 return get(ndx);
127 }

128 return null;
129 }
130
131 @SuppressWarnings("unchecked")
132 @Override

133 public int indexOf(Object e) {
134 try {
135 return Collections.binarySearch(this, (T) e, comparator);

136 } catch (ClassCastException ex) {
137 LOGGER.error("Unexpected type for get " + e.getClass().getName()
138 + ": " + e);

139 return -1;
140 }
141 }
142
143 @Override
144 public boolean isEmpty() {

145 return list.isEmpty();
146 }
147
148 @Override
149 public Iterator iterator() {

150 final Iterator> it = list.iterator();
151 return new Iterator() {

152
153 @Override
154 public boolean hasNext() {
155 return it.hasNext();

156 }
157
158 @Override
159 public T next() {
160 Pair e = it.next();

161 return e.getSecond();
162 }
163
164 @Override
165 public void remove() {

166 it.remove();
167 }
168 };
169 }
170
171 @Override

172 public int lastIndexOf(Object o) {
173 for (int i = list.size() - 1; i >= 0; i--) {
174 T e = get(i);

175 if (e.equals(o)) {
176 return i;
177 }
178 }
179 return -1;

180 }
181
182 @Override
183 public ListIterator listIterator() {
184 final ListIterator> it = list.listIterator();

185 return buildListIterator(it);
186 }
187
188 @Override
189 public ListIterator listIterator(int index) {

190 final ListIterator> it = list.listIterator(index);
191 return buildListIterator(it);
192 }

193
194 @SuppressWarnings("unchecked")
195 @Override
196 public boolean remove(Object e) {

197 try {
198 int ndx = Collections.binarySearch(this, (T) e, comparator);
199 if (ndx >= 0) {

200 remove(ndx);
201 return true;
202 } else {
203 return false;

204 }
205
206 } catch (ClassCastException ex) {
207 LOGGER.error("Unexpected type for remove " + e.getClass().getName()

208 + ": " + e);
209 return false;
210 }
211 }

212
213 @Override
214 public T remove(int index) {
215 Pair e = list.remove(index);

216 Pair t = new Pair(e.getFirst(), 0);
217
218 int insertionIdx = Collections.binarySearch(timestamps, t, CMP);

219 if (insertionIdx >= 0) {
220 timestamps.remove(insertionIdx);
221 }
222 return e != null ? e.getSecond() : null;

223 }
224
225 @Override
226 public boolean removeAll(Collection c) {

227 boolean all = true;
228 for (Object e : c) {
229 all = all && remove(e);

230 }
231 return all;
232 }
233
234 @Override
235 public boolean retainAll(Collection c) {

236 boolean changed = false;
237 Iterator it = c.iterator();
238 while (it.hasNext()) {

239 Object e = it.next();
240 if (!contains(e)) {
241 it.remove();
242 changed = true;
243 }

244 }
245 return changed;
246 }
247
248 @Override

249 public T set(int index, T element) {
250 throw new UnsupportedOperationException();
251 }

252
253 @Override
254 public int size() {
255 return list.size();

256 }
257
258 @Override
259 public List subList(int fromIndex, int toIndex) {

260 List tlist = new ArrayList();
261 List> plist = list.subList(fromIndex, toIndex);

262 for (Pair e : plist) {
263 tlist.add(e.getSecond());
264 }
265 return tlist;

266 }
267
268 @Override
269 public Object[] toArray() {
270 return subList(0, list.size()).toArray();

271 }
272
273 @SuppressWarnings("hiding")
274 @Override
275 public T[] toArray(T[] a) {

276 return subList(0, list.size()).toArray(a);
277 }
278
279 @Override
280 public String toString() {

281 StringBuilder sb = new StringBuilder();
282 Iterator it = iterator();
283 while (it.hasNext()) {

284 sb.append(it.next() + ", ");
285 }
286 return sb.toString();
287 }
288

289 private void removeOldest() {
290 timestamps.remove(timestamps.size() - 1);
291 }
292
293 private ListIterator buildListIterator(

294 final ListIterator> it) {
295 return new ListIterator() {

296
297 @Override
298 public void add(T e) {
299 it.add(new Pair(System.nanoTime(), e));

300 }
301
302 @Override
303 public boolean hasNext() {
304 return it.hasNext();

305
306 }
307
308 @Override
309 public boolean hasPrevious() {

310 return it.hasPrevious();
311
312 }
313
314 @Override
315 public T next() {

316 Pair e = it.next();
317 return e.getSecond();
318 }
319
320 @Override

321 public int nextIndex() {
322 return it.nextIndex();
323
324 }

325
326 @Override
327 public T previous() {
328 Pair e = it.previous();

329 return e.getSecond();
330 }
331
332 @Override
333 public int previousIndex() {

334 return it.previousIndex();
335
336 }
337
338 @Override
339 public void remove() {

340 it.remove();
341
342 }
343
344 @Override
345 public void set(T e) {

346 it.set(new Pair(System.nanoTime(), e));
347
348 }
349 };
350 }

351
352 }
353
354

Listing of LRUSortedList.java





1 package com.plexobject.jmx.impl;

2
3 import java.beans.PropertyChangeListener;
4 import java.beans.PropertyChangeSupport;
5 import java.util.Map;

6 import java.util.concurrent.ConcurrentHashMap;
7 import java.util.concurrent.atomic.AtomicLong;
8
9 import javax.management.AttributeChangeNotification;

10 import javax.management.MBeanNotificationInfo;
11 import javax.management.Notification;
12 import javax.management.NotificationBroadcasterSupport;

13 import javax.management.NotificationListener;
14
15 import org.apache.commons.lang.builder.EqualsBuilder;
16 import org.apache.commons.lang.builder.HashCodeBuilder;

17 import org.apache.commons.lang.builder.ToStringBuilder;
18 import org.apache.log4j.Logger;
19
20 import com.plexobject.jmx.ServiceJMXBean;

21 import com.plexobject.metrics.Metric;
22 import com.plexobject.util.TimeUtils;
23
24 public class ServiceJMXBeanImpl extends NotificationBroadcasterSupport

25 implements ServiceJMXBean, NotificationListener {
26 private static final Logger LOGGER = Logger
27 .getLogger(ServiceJMXBeanImpl.class);

28 private Map properties = new ConcurrentHashMap();
29 private final PropertyChangeSupport pcs = new PropertyChangeSupport(this);

30
31 private final String serviceName;
32 private AtomicLong totalErrors;

33 private AtomicLong totalRequests;
34
35 private AtomicLong sequenceNumber;
36 private String state;

37
38 public ServiceJMXBeanImpl(final String serviceName) {
39 this.serviceName = serviceName;

40 this.totalErrors = new AtomicLong();
41 this.totalRequests = new AtomicLong();
42 this.sequenceNumber = new AtomicLong();

43 }
44
45 @Override
46 public double getAverageElapsedTimeInNanoSecs() {

47 return Metric.getMetric(getServiceName())
48 .getAverageDurationInNanoSecs();
49 }
50

51 public String getProperty(final String name) {
52 return properties.get(name);
53 }

54
55 public void setProperty(final String name, final String value) {

56 final String oldValue = properties.put(name, value);
57 final Notification notification = new AttributeChangeNotification(this,

58 sequenceNumber.incrementAndGet(), TimeUtils
59 .getCurrentTimeMillis(), name + " changed", name,
60 "String", oldValue, value);
61 sendNotification(notification);

62 handleNotification(notification, null);
63 }
64
65 @Override

66 public String getServiceName() {
67 return serviceName;
68 }
69

70 @Override
71 public long getTotalDurationInNanoSecs() {
72 return Metric.getMetric(getServiceName()).getTotalDurationInNanoSecs();

73 }
74
75 @Override
76 public long getTotalErrors() {

77 return totalErrors.get();
78 }
79
80 public void incrementError() {

81 final long oldErrors = totalErrors.getAndIncrement();
82 final Notification notification = new AttributeChangeNotification(this,

83 sequenceNumber.incrementAndGet(), TimeUtils
84 .getCurrentTimeMillis(), "Errors changed", "Errors",
85 "long", oldErrors, oldErrors + 1);

86 sendNotification(notification);
87 }
88
89 @Override
90 public long getTotalRequests() {

91 return totalRequests.get();
92 }
93
94 public void incrementRequests() {

95 final long oldRequests = totalRequests.getAndIncrement();
96 final Notification notification = new AttributeChangeNotification(this,

97 sequenceNumber.incrementAndGet(), TimeUtils
98 .getCurrentTimeMillis(), "Requests changed",
99 "Requests", "long", oldRequests, oldRequests + 1);

100 sendNotification(notification);
101 }
102
103 @Override
104 public MBeanNotificationInfo[] getNotificationInfo() {
105 String[] types = new String[] { AttributeChangeNotification.ATTRIBUTE_CHANGE };

106 String name = AttributeChangeNotification.class.getName();
107 String description = "An attribute of this MBean has changed";
108 MBeanNotificationInfo info = new MBeanNotificationInfo(types, name,

109 description);
110
111 return new MBeanNotificationInfo[] { info };
112 }
113

114 @Override
115 public String getState() {
116 return state;
117 }
118

119 /**
120 * @param state
121 * the state to set

122 */
123 public void setState(String state) {
124 this.state = state;
125 }

126
127 /**
128 * @see java.lang.Object#equals(Object)

129 */
130 @Override
131 public boolean equals(Object object) {
132 if (!(object instanceof ServiceJMXBeanImpl)) {

133 return false;
134 }
135 ServiceJMXBeanImpl rhs = (ServiceJMXBeanImpl) object;
136 return new EqualsBuilder().append(this.serviceName, rhs.serviceName)

137 .isEquals();
138 }
139
140 /**
141 * @see java.lang.Object#hashCode()

142 */
143 @Override
144 public int hashCode() {
145 return new HashCodeBuilder(786529047, 1924536713).append(

146 this.serviceName).toHashCode();
147 }
148
149 /**
150 * @see java.lang.Object#toString()

151 */
152 @Override
153 public String toString() {
154 return new ToStringBuilder(this)

155 .append("serviceName", this.serviceName).append("totalErrors",
156 this.totalErrors).append("totalRequests",
157 this.totalRequests).append("totalRequests",

158 this.totalRequests).append("state", this.state).append(
159 "properties", this.properties).toString();
160 }

161
162 public void addPropertyChangeListener(PropertyChangeListener pcl) {
163 pcs.addPropertyChangeListener(pcl);
164 }
165

166 public void removePropertyChangeListener(PropertyChangeListener pcl) {
167 pcs.removePropertyChangeListener(pcl);
168
169 }
170

171 @Override
172 public void handleNotification(Notification notification, Object handback) {
173 LOGGER.info("Received notification: ClassName: "
174 + notification.getClass().getName() + ", Source: "

175 + notification.getSource() + ", Type: "
176 + notification.getType() + ", tMessage: "
177 + notification.getMessage());
178 if (notification instanceof AttributeChangeNotification) {

179 AttributeChangeNotification acn = (AttributeChangeNotification) notification;
180 pcs.firePropertyChange(acn.getAttributeName(), acn.getOldValue(),
181 acn.getNewValue());
182
183 }
184 }

185 }
186
187


Testing

Finally, here is how you can test this filter:





1 package com.plexobject;

2
3 import java.net.InetAddress;
4 import java.util.Date;
5

6 import org.apache.log4j.Logger;
7 import org.apache.log4j.PatternLayout;
8 import org.apache.log4j.net.SMTPAppender;

9
10 import com.plexobject.log.FilteredSMTPAppender;
11
12 public class Main {

13 private static final Logger LOGGER = Logger.getLogger(Main.class);
14 public static void main(String[] args) {

15 SMTPAppender appender = new FilteredSMTPAppender();
16 try {
17 appender.setTo("bhatti@xxx.com");
18 appender.setFrom("bhatti@xxx.com");

19 appender.setSMTPHost("smtp.xxx.net");
20 appender.setLocationInfo(true);
21 appender.setSubject("Error from " + InetAddress.getLocalHost());
22

23 appender.setLayout(new PatternLayout());
24 appender.activateOptions();
25 LOGGER.addAppender(appender);
26 } catch (Exception e) {

27 LOGGER.error("Failed to register smtp appender", e);
28 }
29 while (true) {
30 try {

31 throw new Exception("throwing exception at " + new Date());
32 } catch (Exception e) {

33 LOGGER.error("Logging error at " + new Date(), e);
34 }
35 try {

36 Thread.sleep(1000);
37 } catch (InterruptedException e) {
38 Thread.interrupted();
39 }
40 }

41 }
42 }
43
44

Above code simulates error generation every second, but it sends email based on the throttling level defined in the configuration. Obviously you can use log4j properties file to define all this configuration, e.g.



<!– Send email when error happens –>

<appender name=”APP-EMAIL” class=”com.plexobject.log.FilteredSMTPAppender”>

<param name=”BufferSize” value=”256″ />


<param name=”SMTPHost” value=”smtp.xxx.net” />

<param name=”From” value=”bhatti@xxx.com” />

<param name=”To” value=”bhatti@xxx.com” />

<param name=”Subject” value=”Production Error” />

<layout class=”org.apache.log4j.PatternLayout”>

<param name=”ConversionPattern”


value=”[%d{ISO8601}]%n%n%-5p%n%n%c%n%n%m%n%n” />

</layout>


<filter class=”org.apache.log4j.varia.StringMatchFilter”>

<param name=”StringToMatch” value=”My Error”/>

<param name=”AcceptOnMatch” value=”false” />

</filter>


</appender>




Summary

I am skipping other classes, but you can download entire code from FilteredSMTPAppender.zip. This solution seems to be working from me but feel free to share your experience with similar problems.