@IndeedEng: A Technical Speaker Series

We’re excited to host @IndeedEng, a technical speaker series. Building successful large-scale consumer applications takes smart, passionate people with a variety of backgrounds and expertise. Our goal is to bring together the tech community to discuss the challenges of developing great products.

In this ongoing series, Indeed will share our real-world experience building a site that supports 250 million job seekers in 60+ countries a month and host compelling tech talks by outside speakers.

Boxcar: A self-balancing distributed services protocol

At Indeed, we have an unrelenting obsession with speed. We believe site performance is critical to providing the best experience for job seekers. Since Indeed launched in 2004, traffic growth has been swift and steady. Today, we have over 85 million unique visitors and over 2 billion searches per month. Traffic to Indeed continues to rapidly increase, so ensuring our systems scale with this traffic really matters. In order for our site to be fast and reliable, we developed Boxcar, an RPC framework and load distribution strategy for interacting with remote services using a generic layer to transport raw bytes.

Train car used with permission of O Scale Trains Magazine (oscalemag.com), photo credit Don McFall of Old Line Graphics

Considerations

Before we developed Boxcar in 2010, Indeed’s main search site was powered by a single web application. In 2009, we split this application into two separate but highly-coupled server processes to address some bottlenecks, but this was only a stop-gap solution. The two processes were always deployed on the same machine to simplify configuration and minimize potential protocol compatibility issues. This solution unfortunately prevented us from maximizing hardware utilization, increased vulnerability to certain kinds of failures, and required a carefully managed deploy process during our weekly releases.

We decided that we needed to implement a more general service-oriented architecture to improve system scalability. We considered various architectures and network topologies that were in common use. Some required expensive, custom hardware or larger numbers of commodity machines. Others introduced latency or additional points of failure. Dissatisfied with readily available options, we decided to develop something new.

The resulting solution includes an elegant algorithm that reliably routes requests from front-end applications to back-end services without a single point of failure or intermediate hops. This layer is itself protocol-agnostic, but we standardized on Google Protocol Buffers to provide structured remote service invocation with excellent forward and backward compatibility. When a service instance becomes unavailable, there is no disruption to our site as a result of the core algorithm adapting nearly instantaneously and routing requests only to the remaining available instances. It also transparently diverts traffic towards servers with faster response time.

Implementation

The basic Boxcar algorithm is simple. Each server maintains an ordered list of connection slots. Each client sends a given request to a server based on the lowest slot number available across the whole pool. This provides a lightweight mechanism for clients to balance requests without the need to directly measure server load.

client 1 has slot 0 on server 1

Figure 1: each client’s pool contains the lowest server slot number available

Servers give out their lowest available slot when a connection is established. The same slot number is never simultaneously associated with multiple active connections. The slot associated with a connection does not affect how the server treats requests on that connection; it is only on the client that this affects request distribution.

Each Boxcar client maintains a persistent, fixed-size connection pool that it constantly adjusts to maintain balance. When making a request, a client uses the connection from its pool with the lowest slot number. In the background, the client optimizes this connection pool. It establishes new connections to available servers even while servicing requests using already established connections. If a new connection has a lower slot number than any connection already in the pool, the client kicks out the existing connection and replaces it with the new one. Otherwise, it discards the new connection and sleeps briefly before trying again. The pool is managed by a specialized data structure that handles the non-I/O operations in a matter of microseconds.

When a client goes away, the server reclaims the slot numbers associated with its connections. If these are low slot numbers, they are quickly allocated to the remaining clients. These clients then discard the connections they had associated with higher slot numbers in order to make room in their pools. (see Figures 2-3).

client 1 has slot 0 on servers 1 and 2; client 2 has slot 1 on servers 1 and 2; client 1 vanishes

Figure 2: client with a low slot disappears

client 2 takes slot 0 on servers 1 and 2

Figure 3: low slots are reclaimed and allocated

The net effect is that each client has pre-computed which connection will be used for the Nth concurrent request. Our clients are typically web applications that service simultaneous requests themselves. If a client only services one request at a time, every request will use the same connection — the one in the pool that has the lowest slot number. All other connections will sit idle. If the client services two concurrent requests, each request will use one of the two connections in the pool with the lowest slot numbers. The other connections will remain idle. This generalizes to as many concurrent requests as there are connections in the pool.

If there are more concurrent requests than there are connections, additional requests are rejected. We reject requests in this extreme condition in order to fail fast and prevent excessive load on the services. We size our connection pools based on expected peak traffic levels, so this rejection should only happen during extraordinary conditions.

If a remote service fails to serve a request, the client disposes of the associated connection and retries the request on the next best connection. The number of retries that can happen is bounded in configuration. Disposing of the connection frees up an additional space in the pool for the greedy connection acquisition threads to fill, which quickly restores the pool to capacity.

Given the random distribution of connections, the connection selected for retry could be to the same failing server. Failures due to unavailable servers tend to be fast, usually sub-millisecond in our networks, so the client will incrementally purge its pool of these connections. This continues until the client finds a connection to a live server, reaches its retry limit, or purges all connections in its pool. The latter two conditions both result in the request failing outright.

client 1 has slot 0 on servers 1 and 2; client 2 has slot 2 on server 1 and slot 1 on server 2; client 3 has slot 1 on server 1 and slot 2 on server 1

Figure 4: three clients seeking the best slots on two servers

server 2 goes away

Figure 5: server loss

after server 2 goes away: client 1 has slots 0 and 5 on server 1; client 2 has slots 2 and 3 on server 1; client 3 has slots 1 and 4 on server 1

Figure 6: following server loss, clients continue to find the best slots

a new server (server 2) is added

Figure 7: a new server is added

client 1 has slot 0 on servers 1 and 2; client 2 has slot 2 on server 1 and slot 1 on server 2; client 3 has slot 1 on server 1 and slot 2 on server 1

Figure 8: clients connect to the best slots available

If any single server is slow, connections to that server are more likely to be busy. Additional requests from that client will need to be serviced using different connections. The client preference for lower slot numbers means that those are likely to be connected to different servers, thus avoiding the bottleneck. The overall effect is that slower servers will service fewer requests, meaning more load is distributed to the remaining servers. Servers that are running at half speed will receive half as many requests, on average.

Insights

Load balancing implies a strictly equal distribution of load across all instances. That degree of consistency is a difficult problem to solve, and we do not claim to have solved it. The key realization we had when designing Boxcar was that could achieve our goals by solving a much simpler problem.

Boxcar balances requests using an elegant strategy for allocating persistent connections to incoming requests. Our approach does not guarantee a uniform distribution of requests or load. Instead, we try to avoid sending too many requests to a single server. This is a considerably simpler goal to achieve and was based on two insights.

The first insight was that we don’t need to directly balance individual requests. Instead, we balance connections probabilistically and impose a stable connection selection heuristic. This indirectly results in good balance for individual requests. The balance is not perfect in every instance at every moment, but in aggregate, the results are well-balanced, and the outliers are brief and rare. In our use cases, the number of requests is a reasonable proxy for server load, so a relatively uniform distribution of requests results in a relatively uniform distribution of load.

The second insight was that we do not need to have uniform distribution of load across instances. Suppose there are 10 servers and 3 servers worth of load. The implied goal with load balancing is to have each server at 30% utilization. However, for our goals, it’s acceptable if at some moment we have 3 servers at 100% utilization and 7 servers at 0% utilization, or 5 servers at 60% utilization and 5 servers at 0%. Our goal is to stably and efficiently serve our site traffic. As long as no server exceeds its capacity to serve requests stably and in a timely fashion, we really don’t care how uniform the load is.

Results

We’ve now been using Boxcar for nearly three years. We use it in production, testing, and development environments. Today, well over a dozen different services across nearly all of our products use this framework, carrying many hundreds of millions of requests and many gigabytes of data each day.

With Boxcar, Indeed is able to scale well with low cost commodity hardware while maintaining high availability. Our networks are set up with a simple topology without any added latency or network hops. Failures of individual systems have almost no impact on our ability to serve our users. Our operational costs are lower because our systems are less fragile.

Over the years, Boxcar has delivered all of the benefits we hoped for and has proven durable in numerous different applications. It has proven versatile, bringing valuable scaling capabilities to a broad range of products that collectively generate many hundreds of millions of requests per day. The core of Boxcar combines a few simple concepts into a sophisticated, resilient system for load distribution. We will cover other interesting aspects of Boxcar in future posts.

Logrepo: Enabling Data-Driven Decisions

Why Event Logging Matters

Data-driven decision making is a core part of our culture at Indeed. When we test new changes to our applications and services, whether we’re changing our user interface or our backend algorithms, we want to measure how those changes affect job seekers. For example, when we test a ranking algorithm change, we look at search result click-through rate as a measure of relevance. To compute click-through rate, we need to know which jobs were shown as search results and which of those were clicked.

To that end, we have built a system called logrepo (short for “log repository”) that allows us to track all potentially interesting events that occur on our site. Logrepo is a distributed event logging system that aggregates events from our data centers around the world into one place so we can analyze them.

Once logrepo aggregates events into its central repository, purpose-built analysis tools can process events and deliver important insights that enable us to make decisions based on real usage data. For example, we built a powerful analysis tool called Ramses that provides a way to quickly visualize the results of querying multi-field, multi-dimensional indexes built on time series data from logrepo. This allows us to easily answer questions about how job seekers are interacting with our site and compare metrics across a variety of data segments.

The Ramses graph in Figure 1 shows how an A/B test performed outside the US in terms of search result clicks by new versus returning visitors. This is just a taste of the power of Ramses, a tool that could not exist without a reliable, uniform approach to event log data collection.

example Ramses graph

Figure 1: A sample graph from Ramses, our dynamic event data query tool

As powerful as it is, Ramses is far from the only consumer of logrepo data. We have built numerous business metric dashboards that process logrepo event data, allowing us to choose arbitrary dimensions for the organization of the data presented. We also process logrepo data in map-reduce jobs for analysis and creation of machine learning models that power important features like search result ranking and job recommendations.

What We Log

We log over 500 event types, each with its own particular set of fields. A log event might be anything from a user action to a system performance metric. Figure 2 contains examples of some of the event data that we capture in logrepo. Each application is free to log whatever data it chooses, but there is a set of common fields we collect with most events.

Event Type Description Example Log Data
all events data common to almost all log events uid, type, user tracking cookie, active A/B test groups, user agent, referer, and many more
jobsearch logged for every search result page search terms (what and where), country, # results, page number, # jobs on page, time spent in different backend services, + 60 more
orgClk logged when a user clicks on a job in search results uid of source jobsearch event, job id
resContactEmail logged when an employer contacts a jobseeker via their Indeed Resume uid of source resume search event, employer id, email message id, amount charged, country of resume, whether message was sent or blocked (and why), + 30 more
Figure 2: example log event data

Goals

We had several goals in mind when designing logrepo. We focused on how the system could provide the best possible support for analysis tools that empower our decision-making process.

We want to prevent loss or duplication of log events. While our log data would still be useful even with minor loss or duplication, any degradation in consistency limits the questions that can be answered with the accuracy required to make informed decisions. We require exactly-once delivery semantics for our log events to avoid these limitations.

To complicate matters, we also require logrepo to be able to tolerate failure of a single logrepo server at any time without any loss of consistency or availability. We are willing to give up real-time processing to achieve this consistency.

We also want logrepo to be able to deliver streams of data for a particular event type, in a parseable format, for consumption by log analysis tools like Ramses. We want for it to be very simple to implement such tools in any programming language.

Why not use Flume?

Building our own log collection system might seem unnecessary given the number of open source options available today, of which Flume is probably the most popular. Logrepo has been in production at Indeed since 2007, so it predates Flume and several other options. However, even today Flume does not provide the delivery guarantees and consistency characteristics of logrepo. We discuss those differences when explaining our transport mechanism below.

Flume has a few notable advantages. Its strength lies in its modular architecture, which makes it easy to add new sources, collectors, and sinks — such as a sink for HDFS. Replicating logrepo data into HDFS was not an original goal of the system, and maintaining consistency during HDFS replication is a problem we have not yet solved completely.

Another advantage of Flume is that it stores its logs in arrival order, so once you’ve processed the log stream up to a point, you can be sure that all events in the stream before that point have been processed. With logrepo, everything is accessed in timestamp order. Due to the asynchronous nature of the system, you must wait a certain amount of time to ensure that all log events from a time range have arrived. As mentioned above, we are willing to trade off real-time processing for consistency. Our primary use case is analysis rather than monitoring, so several minutes of latency is acceptable.

System Architecture

To understand logrepo, we need to consider five parts of the the system:

  1. Entry Format: how event log entries are structured
  2. Archival Structure: how log entries are organized into files on disk
  3. Builder: process that collects entries and converts them into the archival structure
  4. Transport: process that moves log entries between servers and data centers
  5. Reader Daemon: process that streams event log entry data for processing

transport diagram

Figure 3: how event log data is transported and stored

Entry Format

We wanted a standardized entry format to make it easy to build general purpose analysis tools. We chose to encode each entry as key-value pairs using the familiar URL query string format. This format is relatively human readable, handles escaping properly, and offers good tool support.

All log entries have two common fields: uid and type. The uid field is a unique identifier for the log entry that also contains a timestamp, and the type field indicates the specific action and source application that provide the data contained in the entry.

uid=14us0t7sm064g2uc&type=orgClk&v=0&tk=14us0soeo064g2m3&jobId=8005d47e09b124f4&onclick=1&url=http%3A%2F%2Faq.indeed.com%2Frc%2Fclk&href=http%3A%2F%2Faq.indeed.com%2FPlant-Manager-jobs&agent=...

Figure 4: a partial log file entry

Archival Structure

The structure used for storing the data on disk is critical for being able to efficiently deduplicate, store, and later read the log entries. Log entries are stored in such a way that all disk IO is sequential for both writes and reads. This gives logrepo a big scalability advantage for sequential access (our primary use case) over a general purpose datastore that is implemented with B+ trees, with the disadvantage of not having good random access performance for single entries (not our primary use case).

In the central repository, log entries are stored one per line in text files, sorted by uid (and therefore timestamp). The path at which each file is stored is determined by the data center, event type, and timestamp of its log entries. The timestamp used to determine the path is the top 25 bits of a 45 bit millisecond timestamp. This leaves 20 bits of the timestamp remaining within each file, so each file corresponds to a timespan of about 17 minutes. There can be multiple files corresponding to each prefix which can be easily merged when read since the entries are sorted.

logrepo archival structure diagram

Figure 5: example log file path with path components explained

dc1/orgClk/15mt/0.log4181.seg.gz:
     uid=15mt000000k1j548&type=orgClk&...
     uid=15mt000010k2j72n&type=orgClk&...
     uid=15mt000050k3j7bd&type=orgClk&...
     uid=15mt000060k2j6lu&type=orgClk&...
     uid=15mt0000b0k430co&type=orgClk&...
     uid=15mt0000d0k2i1ed&type=orgClk&...

Figure 6: example of partial log entries showing uid and type

Builder

The builder converts the raw log entry data into the archival structure. The builder takes potentially new log entries, determines if they are new, and if so adds them. As the logs are accessed in time sorted order, the builder must also keep its output sorted. To maintain performance it is critical that all disk IO done by the builder is sequential.

The logrepo builder buffers incoming log entries and writes them to files. It first sorts the log entries into the appropriate buckets based on the first 25 bits of their timestamps. The builder sorts its entries and flushes them to a file when one of the buckets reaches the limit of the buffer size or a maximum wait time between flushes is exceeded. If the builder finds existing files for the bucket, it compares and merges entries in the buffer and the files, skipping any duplicates. If there are too many files in the bucket, they may be merged into a single larger file to reduce future merge overhead.

This buffering introduces a delay in log entry availability at the reader. To avoid too much copying and merging, which limits throughput, we need to make sure that we buffer a substantial amount of data before each flush. Currently we flush at 17 minute intervals (to match the timespan determined by the archival structure above), which means that to be reasonably sure that all the logs have arrived for a 17 minute period we wait 34 minutes.

We can easily tolerate an hour or two delay in data availability for our analysis purposes. For real-time monitoring, we consume the same log entries from a different stream that bypasses the repository archive — sacrificing consistency for much lower latency.

Transport

The goal of the transport layer is to reliably move log messages from the application servers through the logrepo servers in each datacenter and finally to the central log repository.

Each data center contains two logrepo servers, each of which receives all log entries for that data center in order to improve fault tolerance. We use syslog-ng to send log entries from our application servers to both logrepo servers. A third copy of every log entry is stored on the application server’s local drive for additional fault tolerance. One of the logrepo servers is designated as the primary and runs the logrepo builder, which converts the raw logs from syslog-ng into the archival format. The primary then uses rsync to push the files in the archive to the central log repository. After a delay, the primary also checks for missing entries by copying the raw logs from the backup logrepo server and adding any log entries not found in the archive. The uniqueness of the uid field guarantees that this process introduces no duplicates.

The redundant delivery of log messages to two servers gives us the ability to fail over from the primary to the secondary at any time. By always duplicating and then deduplicating we can suffer a total machine failure with no loss in availability or consistency.

In contrast, Flume handles consistency through delivery acknowledgements. Once a message has been committed (acknowledged) downstream, its delivery is acknowledged to the upstream server. In cases where a downstream server fails, messages will buffer indefinitely until it recovers. If the failure is irrecoverable, consistency issues arise around whether the messages that were unacknowledged at the time of the failure have been committed further downstream. The same issues occur if Flume fails over to a spare collector that has no knowledge of what has and has not been committed. In either case, the upstream Flume server must decide whether to send the messages again, in which case they may be duplicated, or not to re-send, in which case they may be lost.

Reader Daemon

We access the events stored in the logrepo through a reader daemon running on the central logrepo server or one of its replicas. The server supports querying for log entries by type and time range. This is where we see the benefits of our archival structure.

When serving results for a query, the server finds all files for the type and 25 bit prefix of the start timestamp and streams the log entries back to the client. It skips all events with timestamps less than the start and merges events from the files as it streams. It repeats this process for each subsequent time-based bucket until it reaches the bucket containing the end timestamp. Once it has seen a log entry beyond the end timestamp, it knows it is done and terminates the stream.

The reader is optimized for low latency for retrieving events of a single type in a time range. We did not optimize for interleaving events of different types. And since a typical analysis request includes millions or billions of events, we did not need to optimize for looking up a single event by uid. Optimizing for this simple retrieval case has served us well in the creation of our analysis tools, which process the log entry data streams into purpose-built data structures for ad-hoc analysis.

The interface to the reader daemon is extremely simple. To ask for event log data, we connect to the daemon on a TCP socket and send a one-line string, consisting of a start-time/end-time pair (in epoch milliseconds) and an event type. The matching data is streamed back instantly. For interactive peeking at raw log data we often simply use the Unix command nc (netcat):

~$ date --date "2012-04-01 05:24:00" +%s
1333275840
~$ date --date "2012-04-01 05:25:00" +%s
1333275900
~$ echo "1333275840000 1333275900000 orgClk" | nc logrepo 9999 | head -2
uid=16pmmtjgh14632ij&type=orgClk&v=0&tk=16pmmsulc146325g&jobId=11eaf231341a048f&onclick=1&url=http%3A%2F%2Fwww.indeed.co.uk%2Frc%2Fclk&href=...
uid=16pmmtjh2183j353&type=orgClk&v=0&tk=16pmmntjr183j1ti&jobId=029b8ec3ddeea5ae&onclick=1&url=http%3A%2F%2Fwww.indeed.com%2Frc%2Fclk&href=...

Figure 7: using netcat to query the logrepo reader daemon

This simple interface to the reader daemon makes it easy to process logrepo data in any programming language. The core of a Python implementation is 13 lines of code.

Deciding With Data

We now capture 1 to 2 billion logrepo events every day across all Indeed properties. We feed those events into a diverse set of powerful analytics tools, like Ramses (about which we plan to share more in a future blog post). We use those tools to determine how to improve our services to better serve job seekers and employers.

Over the past 5 years, logrepo has provided significant value to Indeed. It serves as the foundation of our data-driven approach to decision making. Indeed’s products are vastly better thanks to its reliable, flexible, and scalable architecture. To keep this up for another 5 years, we will need to make it even more reliable, flexible, and scalable. If this sounds like the kind of problem you’d be interested in solving, take a look at some of our career opportunities.