Why Event Logging Matters
Data-driven decision making is a core part of our culture at Indeed. When we test new changes to our applications and services, whether we’re changing our user interface or our backend algorithms, we want to measure how those changes affect job seekers. For example, when we test a ranking algorithm change, we look at search result click-through rate as a measure of relevance. To compute click-through rate, we need to know which jobs were shown as search results and which of those were clicked.
To that end, we have built a system called logrepo (short for “log repository”) that allows us to track all potentially interesting events that occur on our site. Logrepo is a distributed event logging system that aggregates events from our data centers around the world into one place so we can analyze them.
Once logrepo aggregates events into its central repository, purpose-built analysis tools can process events and deliver important insights that enable us to make decisions based on real usage data. For example, we built a powerful analysis tool called Ramses that provides a way to quickly visualize the results of querying multi-field, multi-dimensional indexes built on time series data from logrepo. This allows us to easily answer questions about how job seekers are interacting with our site and compare metrics across a variety of data segments.
The Ramses graph in Figure 1 shows how an A/B test performed outside the US in terms of search result clicks by new versus returning visitors. This is just a taste of the power of Ramses, a tool that could not exist without a reliable, uniform approach to event log data collection.
As powerful as it is, Ramses is far from the only consumer of logrepo data. We have built numerous business metric dashboards that process logrepo event data, allowing us to choose arbitrary dimensions for the organization of the data presented. We also process logrepo data in map-reduce jobs for analysis and creation of machine learning models that power important features like search result ranking and job recommendations.
What We Log
We log over 500 event types, each with its own particular set of fields. A log event might be anything from a user action to a system performance metric. Figure 2 contains examples of some of the event data that we capture in logrepo. Each application is free to log whatever data it chooses, but there is a set of common fields we collect with most events.
|Example Log Data
|data common to almost all log events
|uid, type, user tracking cookie, active A/B test groups, user agent, referer, and many more
|logged for every search result page
|search terms (what and where), country, # results, page number, # jobs on page, time spent in different backend services, + 60 more
|logged when a user clicks on a job in search results
|uid of source jobsearch event, job id
|logged when an employer contacts a jobseeker via their Indeed Resume
|uid of source resume search event, employer id, email message id, amount charged, country of resume, whether message was sent or blocked (and why), + 30 more
We had several goals in mind when designing logrepo. We focused on how the system could provide the best possible support for analysis tools that empower our decision-making process.
We want to prevent loss or duplication of log events. While our log data would still be useful even with minor loss or duplication, any degradation in consistency limits the questions that can be answered with the accuracy required to make informed decisions. We require exactly-once delivery semantics for our log events to avoid these limitations.
To complicate matters, we also require logrepo to be able to tolerate failure of a single logrepo server at any time without any loss of consistency or availability. We are willing to give up real-time processing to achieve this consistency.
We also want logrepo to be able to deliver streams of data for a particular event type, in a parseable format, for consumption by log analysis tools like Ramses. We want for it to be very simple to implement such tools in any programming language.
Why not use Flume?
Building our own log collection system might seem unnecessary given the number of open source options available today, of which Flume is probably the most popular. Logrepo has been in production at Indeed since 2007, so it predates Flume and several other options. However, even today Flume does not provide the delivery guarantees and consistency characteristics of logrepo. We discuss those differences when explaining our transport mechanism below.
Flume has a few notable advantages. Its strength lies in its modular architecture, which makes it easy to add new sources, collectors, and sinks — such as a sink for HDFS. Replicating logrepo data into HDFS was not an original goal of the system, and maintaining consistency during HDFS replication is a problem we have not yet solved completely.
Another advantage of Flume is that it stores its logs in arrival order, so once you’ve processed the log stream up to a point, you can be sure that all events in the stream before that point have been processed. With logrepo, everything is accessed in timestamp order. Due to the asynchronous nature of the system, you must wait a certain amount of time to ensure that all log events from a time range have arrived. As mentioned above, we are willing to trade off real-time processing for consistency. Our primary use case is analysis rather than monitoring, so several minutes of latency is acceptable.
To understand logrepo, we need to consider five parts of the the system:
- Entry Format: how event log entries are structured
- Archival Structure: how log entries are organized into files on disk
- Builder: process that collects entries and converts them into the archival structure
- Transport: process that moves log entries between servers and data centers
- Reader Daemon: process that streams event log entry data for processing
We wanted a standardized entry format to make it easy to build general purpose analysis tools. We chose to encode each entry as key-value pairs using the familiar URL query string format. This format is relatively human readable, handles escaping properly, and offers good tool support.
All log entries have two common fields: uid and type. The uid field is a unique identifier for the log entry that also contains a timestamp, and the type field indicates the specific action and source application that provide the data contained in the entry.
The structure used for storing the data on disk is critical for being able to efficiently deduplicate, store, and later read the log entries. Log entries are stored in such a way that all disk IO is sequential for both writes and reads. This gives logrepo a big scalability advantage for sequential access (our primary use case) over a general purpose datastore that is implemented with B+ trees, with the disadvantage of not having good random access performance for single entries (not our primary use case).
In the central repository, log entries are stored one per line in text files, sorted by uid (and therefore timestamp). The path at which each file is stored is determined by the data center, event type, and timestamp of its log entries. The timestamp used to determine the path is the top 25 bits of a 45 bit millisecond timestamp. This leaves 20 bits of the timestamp remaining within each file, so each file corresponds to a timespan of about 17 minutes. There can be multiple files corresponding to each prefix which can be easily merged when read since the entries are sorted.
dc1/orgClk/15mt/0.log4181.seg.gz: uid=15mt000000k1j548&type=orgClk&... uid=15mt000010k2j72n&type=orgClk&... uid=15mt000050k3j7bd&type=orgClk&... uid=15mt000060k2j6lu&type=orgClk&... uid=15mt0000b0k430co&type=orgClk&... uid=15mt0000d0k2i1ed&type=orgClk&...
The builder converts the raw log entry data into the archival structure. The builder takes potentially new log entries, determines if they are new, and if so adds them. As the logs are accessed in time sorted order, the builder must also keep its output sorted. To maintain performance it is critical that all disk IO done by the builder is sequential.
The logrepo builder buffers incoming log entries and writes them to files. It first sorts the log entries into the appropriate buckets based on the first 25 bits of their timestamps. The builder sorts its entries and flushes them to a file when one of the buckets reaches the limit of the buffer size or a maximum wait time between flushes is exceeded. If the builder finds existing files for the bucket, it compares and merges entries in the buffer and the files, skipping any duplicates. If there are too many files in the bucket, they may be merged into a single larger file to reduce future merge overhead.
This buffering introduces a delay in log entry availability at the reader. To avoid too much copying and merging, which limits throughput, we need to make sure that we buffer a substantial amount of data before each flush. Currently we flush at 17 minute intervals (to match the timespan determined by the archival structure above), which means that to be reasonably sure that all the logs have arrived for a 17 minute period we wait 34 minutes.
We can easily tolerate an hour or two delay in data availability for our analysis purposes. For real-time monitoring, we consume the same log entries from a different stream that bypasses the repository archive — sacrificing consistency for much lower latency.
The goal of the transport layer is to reliably move log messages from the application servers through the logrepo servers in each datacenter and finally to the central log repository.
Each data center contains two logrepo servers, each of which receives all log entries for that data center in order to improve fault tolerance. We use syslog-ng to send log entries from our application servers to both logrepo servers. A third copy of every log entry is stored on the application server’s local drive for additional fault tolerance. One of the logrepo servers is designated as the primary and runs the logrepo builder, which converts the raw logs from syslog-ng into the archival format. The primary then uses rsync to push the files in the archive to the central log repository. After a delay, the primary also checks for missing entries by copying the raw logs from the backup logrepo server and adding any log entries not found in the archive. The uniqueness of the uid field guarantees that this process introduces no duplicates.
The redundant delivery of log messages to two servers gives us the ability to fail over from the primary to the secondary at any time. By always duplicating and then deduplicating we can suffer a total machine failure with no loss in availability or consistency.
In contrast, Flume handles consistency through delivery acknowledgements. Once a message has been committed (acknowledged) downstream, its delivery is acknowledged to the upstream server. In cases where a downstream server fails, messages will buffer indefinitely until it recovers. If the failure is irrecoverable, consistency issues arise around whether the messages that were unacknowledged at the time of the failure have been committed further downstream. The same issues occur if Flume fails over to a spare collector that has no knowledge of what has and has not been committed. In either case, the upstream Flume server must decide whether to send the messages again, in which case they may be duplicated, or not to re-send, in which case they may be lost.
We access the events stored in the logrepo through a reader daemon running on the central logrepo server or one of its replicas. The server supports querying for log entries by type and time range. This is where we see the benefits of our archival structure.
When serving results for a query, the server finds all files for the type and 25 bit prefix of the start timestamp and streams the log entries back to the client. It skips all events with timestamps less than the start and merges events from the files as it streams. It repeats this process for each subsequent time-based bucket until it reaches the bucket containing the end timestamp. Once it has seen a log entry beyond the end timestamp, it knows it is done and terminates the stream.
The reader is optimized for low latency for retrieving events of a single type in a time range. We did not optimize for interleaving events of different types. And since a typical analysis request includes millions or billions of events, we did not need to optimize for looking up a single event by uid. Optimizing for this simple retrieval case has served us well in the creation of our analysis tools, which process the log entry data streams into purpose-built data structures for ad-hoc analysis.
The interface to the reader daemon is extremely simple. To ask for event log data, we connect to the daemon on a TCP socket and send a one-line string, consisting of a start-time/end-time pair (in epoch milliseconds) and an event type. The matching data is streamed back instantly. For interactive peeking at raw log data we often simply use the Unix command nc (netcat):
~$ date --date "2012-04-01 05:24:00" +%s 1333275840 ~$ date --date "2012-04-01 05:25:00" +%s 1333275900 ~$ echo "1333275840000 1333275900000 orgClk" | nc logrepo 9999 | head -2 uid=16pmmtjgh14632ij&type=orgClk&v=0&tk=16pmmsulc146325g&jobId=11eaf231341a048f&onclick=1&url=http%3A%2F%2Fwww.indeed.co.uk%2Frc%2Fclk&href=... uid=16pmmtjh2183j353&type=orgClk&v=0&tk=16pmmntjr183j1ti&jobId=029b8ec3ddeea5ae&onclick=1&url=http%3A%2F%2Fwww.indeed.com%2Frc%2Fclk&href=...
This simple interface to the reader daemon makes it easy to process logrepo data in any programming language. The core of a Python implementation is 13 lines of code.
Deciding With Data
We now capture 1 to 2 billion logrepo events every day across all Indeed properties. We feed those events into a diverse set of powerful analytics tools, like Ramses (about which we plan to share more in a future blog post). We use those tools to determine how to improve our services to better serve job seekers and employers.
Over the past 5 years, logrepo has provided significant value to Indeed. It serves as the foundation of our data-driven approach to decision making. Indeed’s products are vastly better thanks to its reliable, flexible, and scalable architecture. To keep this up for another 5 years, we will need to make it even more reliable, flexible, and scalable. If this sounds like the kind of problem you’d be interested in solving, take a look at some of our career opportunities.