Mondo Visione Worldwide Financial Markets Intelligence

FTSE Mondo Visione Exchanges Index:

The eight rules of real-time stream processing

Date 14/07/2006

Mike Stonebraker, Ugur Cetintemel and Stanley Zdonik
CTO, Senior Architect and Chief Architect, Streambase Systems

On Wall Street and other global exchanges, electronic trading volumes are growing exponentially. Market data feeds can generate tens of thousands of messages per second, and the Options Price Reporting Authority (OPRA) - which aggregates all the quotes and trades from the options exchanges - estimates peak rates of 62,000 messages per second in 2005, with rates doubling every year.

This dramatic escalation in feed volumes is stressing or breaking traditional feed processing systems. Also, in electronic trading, a latency of even one second is unacceptable, and the trading operation whose engine has the most current data will maximise arbitrage profits. This fact is causing financial services companies to require millisecond-level processing of feed data - or faster - with very low latency.

Until now, off-the-shelf system software to manage streaming data has been largely unavailable for applications to process tens of thousands of messages/ second. Previously available approaches for developing stream processing applications include the following:

  • Custom code: Traditionally, custom coding has been the only solution to support low-latency high-volume streaming applications. However, resorting to a custom-coded solution is far from ideal in most environments because the result is inflexible, costly to develop and maintain, and often difficult to modify in response to new feature requests.
  • Traditional database systems: These offer general data management functionality and are designed to handle applications on static data ranging from Online Transaction Processing (OLTP) to data warehousing. However, in order to reliably store large, finite data-sets and process human-initiated queries, the data is stored on disk and indexed before any query processing can take place and outputs produced (see Figure 1).

Figure 1: Traditional relational database system

The indexing, storage steps, and ad hoc query execution in this approach all introduce latencies that are unacceptable for fast data streams.

  • Main-memory/specialised databases: These are high-throughput versions of traditional database systems, which can avoid going to disk for most operations, given sufficient memory. Despite being much faster than traditional DBMS systems, these systems cannot scale to high stream rates as they still fundamentally 'shoehorn' stream processing into the prevailing processing model inherent in all relational databases.
  • Rules engines: Rules engines were first proposed by the artificial intelligence (AI) community in the 1970s, with various types marketed in recent years to the financial services community. A rule is a condition/action pair, where the action is enabled whenever its condition is met. When rules are simple, the paradigm works well; however, as the size of a rule set grows (such as tick-by-tick monitoring of multiple securities for specific movement patterns across price, volume, and time), it quickly becomes unmanageable. Since multiple rules can be enabled whenever the data changes, it becomes very difficult to manage the flow of control and execution ordering of a large intertwined set of rules. Furthermore, rules engines are not architecturally optimised for low latency and comprehensive state management.
  • Point solutions: These typically provide a specific application suitable for solving one specific problem (e.g. algorithmic trading), provided that the applications underlying data model, programming and user interfaces, and analytic capabilities exactly meet the needs of the user. Unfortunately, this often is not the case. Such point solutions are also typically unable to meet the demanding real-time processing needs of multiple organisations across the firm beyond one problem area.

Recently, several technologies have emerged - including off-the-shelf stream processing engines - specifically to overcome these limitations and address the challenges of processing high-volume, real-time data without requiring the use of custom code. Within several years, it is expected that these engines will be as ubiquitous for processing real-time data as relational databases are for processing stored data. This paper outlines eight rules that such systems should follow to excel at a variety of streaming applications.

Rule 1: Keep the data moving

To achieve low-latency, a system must be able to perform message processing without having to first store and retrieve the data. As described above, a costly storage operation adds a great deal of unnecessary latency to the process (e.g. committing a database record requires a disk write of a log record). It is inefficient to require such a time-intensive operation before message processing can occur. Hence, the first requirement is to process messages in-stream without requiring data storage. See Figure 2 for an architectural example of this processing paradigm.

Figure 2: Stream processing engine processes data continuously on-the-fly, with optional storage

Beyond the issue of a DBMS architecture having greater latency than a straight through architecture that does not store the data, an additional latency problem exists. DBMSs are passive - they wait to be told what to do by an application. Hence, the natural way to use a DBMS is to write an application that polls for conditions of interest. Unfortunately, polling results in additional overhead, as well as additional latency, because half the polling interval is added to the processing delay.

Rule 2: Query using SQL on streams (StreamSQL)

In streaming applications, some querying mechanism must be used to issue arbitrary queries on moving data and compute real-time analytics.

Just as the value of SQL has been and remains its ability to issue arbitrary queries against a data store, the same type of querying capability must exist against streams. Historically in financial services, general purpose languages such as C++ or Java have been the workhorse development and programming tools; however, relying on low-level tools such as these results in long development cycles and high maintenance costs. Two higher-level approaches exist: a rules language (limitations described above) or SQL for data streams (StreamSQL).

SQL's success at expressing complex data transformations derives from the fact that it is based on a set of very powerful data processing primitives that do filtering, merging, correlation, and aggregation. SQL is explicit about how these primitives interact so that its meaning can be easily understood independently from runtime conditions. Furthermore, SQL is a widely promulgated standard that is understood by hundreds of thousands of database programmers and is implemented by every serious database management system in commercial use today.

Thus, SQL is the logical starting point for a stream processing engine. Since standard SQL runs queries on records in a finite stored dataset, to deal with continuous event streams and time-based records (tuples), SQL must be extended to become StreamSQL. StreamSQL retains capabilities of SQL while adding new capabilities such as a rich windowing system, the ability to mix stored data with streaming data, and the ability to extend the primitives to include custom logic such as analytic functions. As with SQL, a well-tuned execution strategy must exist that can provide very low response times.

Rule 3: Stream imperfections handle delayed, missing, and out-of-order data

In a conventional database, data is always present before queried against, but in a real-time system, since the data is never stored, the infrastructure must make provision for handling data which is late or delayed, missing, or out-ofsequence.

One characteristic is the ability to time-out individual calculations or computations. For example, consider a simple real-time business analytic which computes the average price of the last tick for a collection of 25 securities. One need only wait for a tick from each security and then output the average price. However, suppose one of the 25 stocks is thinly traded, and no tick for that symbol will be received for the next 10 minutes. This is an example of a computation that must block, waiting for input to complete its calculation. Such input may or may not arrive in a timely fashion. In fact, if the SEC orders a stop to trading in one of the 25 securities, then the calculation will block indefinitely.

In a real-time system, it is never a good idea to allow a programme to wait indefinitely. Hence, every calculation that can block must be allowed to timeout, so that the application can continue with partial data.

Rule 4: Generate predicable outcomes

In a real-time processing infrastructure, the system must process time-series records (tuples) in a consistent manner to ensure the calculations performed on one time-series record do not interfere with the calculations performed on another.

For example, imagine two feeds, one containing TICKS, the other a SPLITS feed which indicates when a stock splits and having the following format:

SPLITS (symbol, time, split_factor)

A typical stream processing application would be to produce the real-time splitadjusted price for a collection of stocks. Hence, the price must be adjusted for the cumulative split_factor that has been seen. The correct answer to this computation can only be produced if messages are processed by the system in ascending time order regardless of when the messages actually arrive. If a split message is processed out-of-order then the split-adjusted price for the stock in question will be wrong for one or more ticks. Hence, it must be possible to control the order of execution to this level of granularity.

Rule 5: Integrate stored and streaming data

Another requirement for streaming applications is that they are capable of storing and accessing current or historical state information, preferably using a familiar standard such as SQL commands.

Storage of state is desired almost universally, whether it is yesterday's business analytics or control strategies to apply in a specific trading situation. In addition, for many situations, events of interest depend partly on real-time data and partly on history, for example:

Issue an alert when the volume-weighted average price of IBM shares over the last 10 ticks exceeds the same statistic over the last 50 ticks, as long as this has not happened more than five times in the last seven hours of trading.

For low latency streaming data applications, interfacing with a client-server DBMS connection will add excessive latency and overhead to the application. Therefore, state must be stored in the same operating system address space as the application. As such, a DBMS solution will only satisfy this requirement if the stream processing infrastructure is extended with an embedded DBMS. Hence, the scope of a StreamSQL command must be either a real-time stream, for example TICKS, or a stored table in the embedded DBMS. See Figure 3 for an example of such an architecture.

Figure 3: Stream processing engine with in-process, embedded database

A very popular extension of this requirement comes from firms with electronic trading applications, who want to test a trading algorithm on historical data to see how it would have performed, and also test alternative scenarios. When the algorithm works well on historical data, it is switched over to a live feed seamlessly without application modification.

The need to compute some sort of business analytic on past data, such as starting four hours ago, is another driver for this seamless switching capability. Then, when the calculation 'catches up' to real-time, seamlessly continue with the calculation on real-time data. This requires switching automatically from historical to live data, without the manual intervention of a human.

Rule 6: Guarantee data safety and availability

Since virtually all financial services firms expect their applications to stay up all the time, if a failure occurs (hardware, operating system, software, application), the application needs to failover to backup hardware and keep going.

Any thought of restarting the operating system and recovering the application from a log is not acceptable for real-time processing. Hence, 'tandem-style' hot backup and real-time failover - whereby a secondary system synchs frequently with a primary processing system - is the best reasonable alternative for these types of applications. An example architecture is depicted in Figure 4.

Figure 4: 'Tandem-style' hot back-up and failover ensures high availability for real-time data processing

Rule 7: Partition and scale applications automatically

It must be possible to split an application over multiple processors or machines for scalability, without the developer having to write low-level code.

Distributed operation is becoming increasingly important given the favorable price-performance characteristics of low-cost commodity clusters. As such, it should be possible to split an application over multiple machines for scalability, without the developer having to write low-level code. Stream processing engines must also support multi-threaded operation to take advantage of modern multi-processor (or multi-core) computer architectures. Even on a single-processor machine, multi-threading is crucial to avoid blocking for external events, thereby facilitating low latency.

Rule 8: Process and repond instantaneously

None of the preceding rules will make any difference alone unless an application using these combined capabilities on one infrastructure can 'keep up'.

This means capacity to run at tens to hundreds of thousands of messages per second with latency as low as the millisecond range when needed. It is imperative that any user with a high-volume streaming application carefully test any product to be considered for performance via this workload. Such testing should include simulating high loads as well as connecting the engine to a live feed and monitoring run-time system performance and behaviour.

Summary

This paper has discussed eight rules which characterise the requirements for real-time stream processing. As more and more organisations realise significant benefits of using a stream processing engine vs custom-coding - such as low latency, business agility, greater developer productivity, and flexibility to quickly capture new opportunities - this technology will become even more widespread as a fundamental systems software infrastructure.

Over time, stream processing engines will become as broadly deployed for querying and processing real-time data as relational databases are today for processing stored data. Since a firm's ability to gain competitive advantage is greatly enhanced via adoption of such technologies, IT and application development organisations should consider and evaluate stream processing engines today.

These eight rules serve to illustrate the necessary features required for any product in this category that will be used for high-volume low-latency applications.

Dr Mike Stonebraker has been a pioneer of database research for more than 30 years and is the 2005 Recipient of the IEEE's von Neumann Award. He was a professor of Computer Science at the University of California at Berkeley for 25 years.

Dr Ugur Cetintemel is an assistant professor at the department of Computer Science at Brown University. His research focuses on the architecture and performance of networked databases and information systems.

Dr Stanley Zdonik is also a professor of Computer Science at Brown University, where he has led the advanced data management research group since 1983.

StreamBase Systems is based in Lexington, Massachusetts; see www.streambase.com.