Author: Nadav S. Samet

Repos: How we use MySQL as a key-value store

When we started TrueAccord in 2013, we used MySQL to store our data in pretty traditional way. As business requirements came in, we found ourselves continuously migrating our table schemas to add more columns and more tables. Before MySQL 5.6, these schema changes would lock down the database for the entire duration of a change causing a brief downtime. When the company was smaller and just starting out, this was tolerable, but as we grew the increase in schema complexity was getting harder to manage via SQL migration scripts.

We were looking for an alternative, something like Big Table, the key-value store that I used back at Google. Using a key-value store enables storing an entire document as a value, and thus eliminating the need for migrations. We investigated several publicly available key-value stores, but none of them met our major requirements at the time. As a small engineering team, we wanted a hosted fully managed database solution, so that backups and server migrations are taken care of for us. Additionally we wanted security features like encryption at rest. DynamoDB came the closest to matching our requirements, but was missing encryption at rest.

We came across this old post from FriendFeed that describes at a high-level design that meets our requirements which inspired our implementation. First, we chose to use MySQL (now Aurora) managed by Amazon RDS as our backing datastore. This solves the requirement for a hosted, managed, encrypted database, and this is a battle-tested database. Then for the key-value interface (to avoid schema migrations), we built a thin library called Repos that provides a key-value interface implemented on top of MySQL. Now we have something that allows us to move quickly on top of a reliable datastore.

Enter Repos

Each repo represents a map from a UUID (key) to an arbitrary array of bytes representing the value. Each repo is stored in MySQL using two tables. The first table is the log table. Every time we wanted to insert or update an entity, we will insert it to this table.

Column name Type Description
pk bigint(20) Auto incremented primary key
uuid binary(16) Unique id for each entry
time_msec bigint(20) Time inserted
format char(1) Describes the format of the entry_bin column.
entry_bin longblog The value.

We always append to this table, never updating an existing row. By doing so, we get the full history of every object. This has proven to be really handy for debugging why a change has occurred, and when.

The format column can take two possible values: ‘1’ means the value in entry_pb is a serialized protocol buffer, and ‘2’ means it is compressed using Snappy (a compression scheme that aims for high speed and reasonable compression)

To optimize look-ups, we have another table, the “latest” table, with the following format:

Column name Type Description
parent_pk bigint(20) PK of this entry in the log table.
uuid binary(16) The unique id of the entry(here it is a primary key)
format Char(1) Describes the format of the entry_bin column.
entry_bin longblog The value.

 

Whenever we insert an element to the log table, we also upsert it to this table so it always has the latest inserted element. We do this as a transaction to ensure the tables are always in sync.

Secondary Index Implementation

The first hurdle when going in this route is secondary indexes. For example, if your Repo maps a user id to his account information (email, hashed password, full name), how would you look up an account by email? To do so, we implemented index tables. An index table maps the values in the key value store to a primitive value that MySQL can index. A single repo may have multiple indexes, and each one goes to its own table. Index tables have the following layout:

Column name Type Description
parent_pk bigint(20) PK of this entry in the log table.
uuid binary(16) Random id for each entity (here it is a primary key)
value * The indexed value (for example, the email address of the user)

 

We always insert to the secondary index. Therefore, over time, the index will contain stale values. To solve that, when querying, we join the uuid and parent_pk with the latest value and return the result only if there is a match.

For example, if we have a person with id “idA” and he changed his  email, the log table would look like this:

pk uuid time_msec value (format, entry_bin)
501 idA t1 {“user”: “john”, “email”: “john@example.com”}
517 idA t2 {“user”: “john”, “email”: “john@domain.com”}

 

The latest table, would have only the updated row:

parent_pk uuid value (format, entry_bin)
517 idA {“user”: “john”, “email”: “john@domain.com”}

 

The email index table would have the email value, for each version of the object:

parent_pk uuid value
501 idA john@example.com
517 idA john@domain.com

 

Now, to find an account whose latest email value is “john@domain.com”, the Repos library would build a query similar to this:

SELECT l.uuid, l.format, l.entry_bin FROM latest AS l, email_index AS e
  WHERE e.value = john@example.com" AND
        e.uuid = l.uuid AND e.parent_pk = l.parent_pk

Our Repo library provides a nice Scala api for querying by index. For example,

accountsRepo.byEmail.all("john@domain.com")

Would return all the accounts that have this email address.

Using Table Janitor to Manage Our Tables and Indexes

The table janitor is a process implemented as an Akka actor that runs on our JVMs. This actor is responsible for two main tasks:

  1. Ensuring that the underlying MySQL tables are created.It does this by reflecting all of the Repos and indices defined in the code and then creating the corresponding MySQL tables. This makes adding a new repo or adding an index as simple as just defining it in the code.
  2. Ensuring that the indices are up to date. This is necessary since when a new index gets added, there may still be servers that run old version of the code and do not write into the new index. The table janitor regularly monitors the log tables and (re-)indexes every new record. Adding an index to an existing repo is easy – we just declare it in the code.

How we do Analytics

We use AWS data pipeline to incrementally dump our log tables into S3. We then use Spark (with ScalaPB) for Bigdata processing. We also upload a snapshot of it to Google’s Bigquery. As all our repos use Protocol buffers as their value type, we can automatically generate Bigquery schemas for each repo.

Pros and Cons of Our Approach

By writing repos and have all our database access go through it, we get a lot of benefits:

  • Uniformity: having all our key-value maps being repos has the advantage that every optimization and every improvement applies to all our tables. For example, when we build a view that shows an object history, it works for all of our repos.
  • Schema evolution is free when using protocol buffers as values. We can just add optional fields, rename existing fields, or convert an optional to a repeated and it just works.
  • Security: storing data securely on RDS is a breeze. Encryption at rest? Click a checkbox. Require data encryption in transit? SSL is supported by default.
  • Reliability: We never had the RDS MySQL (later Aurora) instances go down (besides rare scheduled maintenance windows which require the instances to be rebooted). We have never lost data. Additionally we can recover the database to any given snapshot in time with RDS by replaying binary logs on top of a snapshot.
  • Ease of use: adding a Repo or an index is trivial. All of our ~60 or so Repos work in exactly the same way, and accessed through the same programmatic interface, our engineers can easily work with any of them using the same programming interface.
  • Optimization/Monitoring/debugging: Since MySQL is a mature and well-understood technology, there is a plethora of documentation on how to tune it, how to debug problems. In addition, AWS provides a lot of metrics for monitoring how an RDS instance is doing.

However, there are also downsides:

  • Storing binary data in MySQL limits what can be done using the command line MySQL client. We had to write a command line tool (and a UI) to look up elements by key so we can debug. For more complex queries, we use Spark and BigQuery for visibility into our data.
  • Being a homegrown solution, we occasionally had to spend time tuning our SQL queries when our repos grew in size. On the positive side, scaling up due to business growth is a good problem to have and fixing it for one repo, made an improvement for all others.
  • JDBC has Multiple Layers: JDBC/HikariCP/Mysql connector: we had quite a few issues where it was tricky to pinpoint the source of the problem.

Alternatives: What the Future Looks Like

As much as we’d like our homegrown solution, we are continuously thinking what our next storage solution will be like.

  • Current versions of both MySQL and Postgres come with built-in support for indexing JSON documents.
  • Google now offers a publicly hosted version of Bigtable.
  • We are moving towards having our data represented as a stream of events which may benefit from a different data store.

Success

The Repos implementation has enabled our engineering team to quickly develop a lot of new functionality, as well as iterating over the data schema. By implementing on top of RDS, we have the peace of mind that our data is safe and our servers are up to date with all the security patches. At the same time, having full control over the implementation details of repos allowed us to quickly implement additional security measure so we can satisfy the stringent requirements of card issuers and other financial institutions, without sacrificing development speed.

Put your alerts in version control with DogPush

At TrueAccord, we take our service availability very seriously. To ensure our service is always up and running, we are tracking hundreds of system metrics (for example, how much heap is used by each web server), as well as many business metrics (how many payment plans have been charged in the past hour).

We set up monitors for each of these metrics on Datadog, that when triggered, will page an on call engineer. The trigger is usually based on some threshold for that metric.

As our team grew and more alerts were added we noticed three problems with Datadog:

  1. Any member of our team can edit or delete alerts in Datadog’s UI. The changes may be intentional or accidental, though our team prefers to review changes before they hit production. In Datadog, the review stage is missing.
  2. Due to the previous problem, sometimes an engineer would add a new alert with uncalibrated thresholds to datadog to get some initial monitoring for a newly written component. As Murphy’s law would have it, the new alert would fire at 3am waking up the on call engineer, and it may not even indicate a real production issue, but a miscalibrated threshold. A review system could better enforce best practices for new alerts.
  3. Datadog also does not expose a way to indicate that an alert should only be sent during business hours. For example, for some of our batch jobs, it is okay if they fail during the night, but we want an engineer to address it first thing in the morning.

To solve these problems, we made DogPush. It lets you manage your alerts as YAML files that you can check in your source control. So you can use your existing code review system to review them, and once they’re approved they get automatically pushed to DataDog — Voila! In addition, it’s straightforward to setup a cron job (or a Jenkins job) to automatically mute the relevant alerts outside business hours.
DogPush is completely free and open source – check it out here.

ScalaPB: TrueAccord’s Protocol Buffer code generator for Scala

When we started working on TrueAccord, we had a limited understanding of various technical aspects of the problem. Naturally, one of those unclear aspects was the data model: what data entities we will need to track, what will be their relationships (one-to-one, one-to-many, and so on), and how easy it is going to be to change the data model as business requirements become known and our domain expertise grows.

Using Protocol Buffers to model the data your service uses for storage or messaging is great for a fast-changing project:

  1. adding and removing fields is trivial, turning an optional field into a repeated field and so on. If we modeled our data using SQL, we will be constantly migrating our database schema.
  2. the data schema (the proto file) serves as an always up-to-date reference documentation for the service’s data structures and messages. People from different teams can easily generate parsers for almost every programming language, and access the same data.

Continue reading

A Reactive HTTP Reverse Proxy in Play!

At TrueAccord, we use Play to develop our backend. Given our development environment, we need to have part of our URL space routed to a different server written in Python. We initially thought of setting up a lightweight HTTP server like nginx that would act as a reverse proxy for both of our development servers, which is a reasonable solution. However, we also wanted to avoid having yet another moving part in our development environment and were curious if we could write something quick in Scala that could achieve this.

As it turns out, writing this little reverse proxy in Scala/Play is relatively straightforward. It’s also pretty impressive that with so few lines of code we get a reactive proxy server that streams the content continuously to the end client while chunks of it are still arriving from the upstream server. A more traditional (and time-intensive) implementation would have buffered the entire upstream response until it was complete and only then sent it to the client..

So, without further ado, here is the code:

In line 14, proxyRequest.stream returns a Future[(WSResponseHeaders, Enumerator[Array[Byte]])]. This means that at some point in the future, our closure at line 15 will get called and will be supplied two things: the headers returned from the upsteam server (WSResponseHeaders) and an Enumerator[Array[Byte]], which is a producer of arrays of bytes. Each array of byte that it will produce is a part of the response body from the upstream server. Conveniently, Play provides a Result constructor that takes producers like this and turns them into responses that can be served to the end client.

flattenMultiMap is a little helper function that converts the query string parameters from the collection type they are given by Play requests to the format expected by WS.url.

Pretty cool, eh?

Why did we use Scala as our main backend language?

scala-logo

Why did we use Scala as our main backend language?

Given what we needed to accomplish when we founded TrueAccord – and how we intend to grow – Scala quickly emerged as our ideal programming language. From its interoperability with Java  to its emphasis on functional data structures and economy of code, Scala has delivered an incredibly versatile engine to power TrueAccord’s Proactive Loss Management system for recovering debt portfolios worth tens of billions of dollars.

Of course there are solid arguments for a range of programming languages. Java may seem the obvious choice, given its popularity and the breadth of its libraries. But many developers rate Java pretty low on the fun scale. There’s the ever-popular Python and Ruby, which many feel are optimum for the front end, but we wanted to use a single language for the entire stack.

We actually started in Python, because our core team was very familiar with it. Python allows you to get off the ground really fast, but as the code grew and more people joined the team, Python’s lack of type safety started to slow us down. Despite our disciplined unit testing, things got messy very quickly. It took quite a bit of reasoning to be completely sure what each function expected as arguments and what it returned. Ultimately we realized we needed something that would make it easy for us to write robust, maintainable and type-safe code without hindering the team’s productivity. Scala was the answer.

Interoperability with Java

Scala allows us to access Java’s huge ecosystem. Java has been around enterprises for a long time, and there is a wide variety of high-quality libraries for any imaginable functionality. From essential libraries like Joda-Time to the handy utilities in Apache Commons, Java has it covered.

It’s also very common to see third-party service providers like payments processors provide Java libraries that access their APIs, which saves us hours of integration work and makes it easy for us to quickly experiment with many platforms. This is how TrueAccord is able to personalize correspondence with individual debtors and make it easy for them to pay creditors via a number of different methods. On top of that, the JVM provides a robust environment to run our code. There’s a variety of parameters we can tune and many tools available for monitoring, artifact deployment and so on. As Scala runs on top of the JVM it enjoys this rich and mature ecosystem.

Functional data structures

When you are writing non-trivial computations, especially in a multithreaded environment, it’s easy for things to go awry. In languages like C and C++ it’s common to have a state that is shared between different threads. When you go that route, you have to be careful about using locks or mutexes around each access to the state to ensure that no two threads can modify this state at the same time. It’s very easy to make mistakes in this regard. Scala encourages writing pure functions that manipulate immutable data structures. When your data structures are immutable, you can freely share them between different execution contexts, and deadlocks and race conditions become things of the past.

Economy of code

Scala is a very expressive language. And yet it lets you express your ideas with very few lines of code. When you define a case class, you are actually communicating a higher level abstraction that relies on other types.

For example, a Person could be a case class that consists of a name, age and an address. The address could be another case class with fields like street, city and zip code. By defining types in this way, usually in one or two lines of code, Scala provides a functionality that is equivalent to 20 to 30 lines of Java code. The savings here comes from not having to implement constructors, getters, equals() and hash() methods. There are many other examples like this that allow the Scala programmer to write fewer lines of code that say quite a lot. This leads to fewer bugs and less code to read and review.

This is not to say there are no drawbacks. Like any technology, Scala is not perfect. For example, Scala code that uses functional programming idioms can be a little confusing to newcomers, but you get better at reading and writing functional code with practice. It’s the same as lifting weights to build muscle. Another drawback is the compilation time. It can take a little while for the compiler to go through a full build, but on the other hand, having immediate feedback from the compiler on mismatching types and other common human errors helps recover that lost time.At TrueAccord, this level of code correctness saves us invaluable time and money in the long run.

Come Work With Us!

We combine a social mission, huge market, a proven team and early traction + revenue. We're early enough for you to have a huge influence on the company's future (and upside) but late enough so we already reached first funding and product market fit - the perfect timing to join. If you want to learn a lot while your actions have a positive impact on the world - come work with us.

Stay up to date! Subscribe for updates on TrueAccord

SF Scala: Nadav Samet, ScalaPB: Working with Protocol Buffers in Scala