At work, I found myself in apparent need of synchronizing access to a shared resources. The actors requesting access were two distributed services. Being fond and familiar with the “assembly language of databases”, I knew I could implement a distributed mutex on top of Foundation DB.
Calling Foundation DB the “assembly language of databases” (something I once read on Hacker News) is an exaggeration. Nonetheless, Foundation DB does provide a set primitives which makes accessing distributed memory safe and graceful.
Further investigation into the requirements revealed that synchronization was not necessary. Regardless, I implemented the distributed mutex in Go for fun, and in this post I’ll use that implementation to outline the features of Foundation DB.
Foundation DB is a distributed key-value database with automatic shard and replication management. Keys are byte strings (under 10kB) pointing at values which are slightly larger byte strings (under 100kB). Foundation DB is designed for highly concurrent workloads with many clients and many simultaneous, small transactions. Transactions must take less than 60s to complete and read-write less than 10MB of data.
The transactions follow an optimistic concurrency model. No key-values are ever locked from access. Instead, if any of the key-values read by the transaction are modified before the transaction commits, then the transaction is rolled back and retried. When this happens, we call it a transaction conflict.
These conflicts prevent inconsistent reads and allow Foundation DB to guarantee ACID properties. There’s a catch though: consistency (the ‘C’ in ACID) is enforced by the clients. Foundation DB does not ensure data obeys any schema or constraints. Rather, if the client is written properly, consistency emerges from transaction conflicts.
Foundation DB sorts its key-values by the numeric value of the key’s byte string. Given a byte string prefix, adjacent key-values can be efficiently streamed to the client. A group of key-values containing a common prefix is called a subspace. To add a new key-value to a subspace, you simply write a key-value with the appropriate prefix, and due to sorting, the key-value will be placed beside its peers.
Foundation DB provides a special kind of subspace called a directory.
Traditional directory paths are mapped to short byte string prefixes.
For instance, the path /my/dir/path could be
mapped to the prefix 0xfa3209 Key-values
written into this directory would include this prefix.
Foundation DB provides an encoding scheme for tuples.
This allows clients to encode common data types (integer, float,
boolean, string) with deterministic
ordering. For instance, when used as a key, the tuple (23,"abc") will always appear before the tuple (52,"xyz").
When a transaction is committed, it receives a unique 12-byte ID called a versionstamp. The first 10 bytes are a monotonically increasing integer and the last 2 bytes are chosen by the client of the transaction. The integer is incremented around once per nanosecond. For a given database, you’ll never observe the same versionstamp twice, so they can be used as unique IDs or as a noncontinuous sequence.
The schema for our mutex is simple. We will have a key-value for identifying the owner of the mutex and a queue on which waiting clients may place themselves. When the owner of the mutex releases, it will grab the next client from the head of the queue and make them the owner. Clients can watch the key-value specifying the owner and unblock when they are given control of the mutex.
This is a distributed system, so there is always the chance that the owner of the mutex could die and deadlock the entire application. To avoid this, the owner will be required to heartbeat the mutex. A background service will watch for stale heartbeats and pass the mutex ownership on to the next client.
Using my FQL (my query language), we can describe the schema as follows:
% There is always one owner key. When the mutex is
% unlocked, :name is an empty string. The owner of
% the mutex updates :heartbeat once a second. When
% the mutex is first claimed, the heartbeat starts
% as `nil`.
/path/to/mutex("owner",<name:str>)=<heartbeat:vstamp|nil>
% Whenever we write an owner KV, we clear the range of
% owner keys beforehand to ensure there's only one key
% in the owner subspace.
/path/to/mutex("owner",...)=clear
% There are zero or more queue keys.
/path/to/mutex("queue",<index:vstamp>)=<name:str>
% When the owner releases the mutex, they pop a new owner
% off the front of the queue. If the queue is empty then
% they write an empty string for :name.
/path/to/mutex("queue",<index:vstamp>)=<name:str>
/path/to/mutex("queue",:index)=clear
/path/to/mutex("owner",:name)=nil
This article uses features of FQL which I have yet to document:
vstampis a new data type representing a versionstamp.clearworks on ranges of keys. Variables may be named<name:int|bool|str>and their values may be referenced in subsequent queries:name.
The :name strings are unique identifiers
provided by the clients. Heartbeats and indexes (for ordering the queue)
are written using versionstamps. As long as the heartbeat value keeps
changing with new versionstamps, we know the mutex owner is still alive.
As the queue index, because versionstamps are monotonically increasing,
new clients will always be placed at the back of the queue.
We can easily dump the total state of the mutex by reading its entire subspace:
% Read the entire mutex state.
/path/to/mutex(...)
/path/to/mutex("owner","roger")=0x52750b30ffbc7de3b3620000
/path/to/mutex("queue",0x52750b30ffbbdbb3348f0000)="daniel"
/path/to/mutex("queue",0x52750b30ffbb3982b5bc0000)="leo"
/path/to/mutex("queue",0x52750b30ffba975236e90000)="wilma"
As stated earlier, Foundation DB does nothing to enforce the schema; No authorizations, constraints, or structures are verified server side; the client must manage all of this.
In the client implementation, I isolate the marshaling of key-values to define the schema explicitly. I generally have a marshal and unmarshal function for every key and value, but for this application I didn’t need to unmarshal the owner value or the queue key. Some marshaling methods return a key range which is used for reading an entire subspace. Below, I’ll list the marshal function prototypes:
func packOwnerRange() (fdb.KeyRange, error)
func packOwnerKey(name string) fdb.Key
func unpackOwnerKey(key fdb.Key) (string, error)
func packOwnerValue() []byte
func packQueueRange() (fdb.KeyRange, error)
func packQueueKey() (fdb.Key, error)
func packQueueValue(name string) []byte
func unpackQueueValue(val []byte) string
Using these marshaling functions, I implement a function
for each query. The queries accept an fdb.Transactor as
an argument. This can either be the handle to the database or a running
transaction, allowing the queries to be run in isolation or composed
together into a larger transactions.
func setOwner(db fdb.Transactor, name string) error
func getOwner(db fdb.Transactor) (owner, error)
func watchOwner(ctx context.Context, db fdb.Transactor) <-chan error
func heartbeat(db fdb.Transactor, name string) error
func enqueue(db fdb.Transactor, name string) error
func dequeue(db fdb.Transactor) (string, error)
The <-chan error
returned by watchOwner acts like a
future. When the owner changes, the channel returns nil. If an error
occurs which causes the watch to fail, it returns an error.
Finally, the public functions can string together several queries to
implement the desired behavior. Below is the implementation for
acquiring the mutex. This is a blocking call and depends on the
non-blocking TryAcquire which claims
the mutex if it’s unlocked or places the client onto the queue.
// Blocks until the current client is the owner of the mutex. The `ctx`
// argument includes a cancelation signal for aborting the operation.
func (x *Mutex) Acquire(ctx context.Context, db fdb.Database) error {
// Attempts a non-blocking aquire. If it succeeds, we can return.
acquired, err := x.TryAcquire(db)
if err != nil {
return fmt.Errorf("failed to try aquire: %w", err)
}
if acquired {
return nil
}
// Check if we're the mutex owner. If not, then watch for writes on
// the owner key-value. When the watch fires, check if we're the
// owner again. Loop forever until either we are the owner or the
// cancelation token fires.
for {
watch, err := db.Transact(func(tr fdb.Transaction) (any, error) {
owner, err := x.getOwner(tr)
if err != nil {
return nil, fmt.Errorf("failed to get owner: %w", err)
}
// Return nil (instead of a channel) to
// signal that we are now the owner of
// the mutex.
if owner.name == x.name {
return nil, nil
}
return x.watchOwner(ctx, tr), nil
})
if err != nil {
return err
}
// If watch is nil then we are the owner.
// Otherwise, wait for the watch to fire
// and check again.
if watch == nil {
// Start a background goroutine which
// heartbeats the mutex once per second.
x.startHeartbeat()
return nil
}
// If the cancelation signal fires or a server-side error
// occurs then the channel below will return an error.
// Otherwise it simply returns nil when the watched
// key-value is updated.
if err := <-watch.(<-chan error); err != nil {
return fmt.Errorf("failed to watch owner: %w", err)
}
}
}
If we take a look at the TryAcquire method we can get a better
understanding of Foundation DB’s concurrency model. TryAcquire executes a transaction that does
one of two things: acquires the mutex or places the client on the back
of the queue. There is a third case where we are already the owner, but
that one is trivial.
func (x *Mutex) TryAcquire(db fdb.Database) (bool, error) {
// Read the owner key-value and either lock the mutex or place
// ourselves onto the back of the queue.
acquired, err := db.Transact(func(tr fdb.Transaction) (any, error) {
owner, err := x.getOwner(tr)
if err != nil {
return false, fmt.Errorf("failed to get owner: %w", err)
}
switch owner.name {
case x.name:
// We're already the owner.
return true, nil
case "":
// There is no owner (mutex is unlocked). Clear the
// owner subspace and write a new owner key with our
// name.
err := x.setOwner(tr, x.name)
if err != nil {
return nil, fmt.Errorf("failed to set owner: %w", err)
}
return true, nil
default:
// Someone else is the owner (mutex is locked). Place
// our name on the queue.
return false, x.enqueue(db, x.name)
}
})
if err != nil {
return false, err
}
// Start a background goroutine which
// heartbeats the mutex once per second.
x.startHeartbeat()
return acquired.(bool), err
}
For correct behavior, Foundation DB will need to manage multiple clients attempting to do these writes simultaneously.
Let’s explore the case where two clients try to claim the same
unlocked mutex. Both clients will read the empty owner key /path/to/mutex("owner",""). They will both try to
clear this key and replace it with one containing their respective name:
/path/to/mutex("owner","client name")=nil.
Transactions automatically manage a conflict range which tracks the key-values written and read during the transaction. When it comes time to commit, Foundation DB checks if any of the key-values read were modified by another client during the transaction. If so, the transaction is rolled back and retried, preventing the client from acting on a stale read.
In the sequence diagram above, take a look at the events happening
between t1 and t2. Because
client-A reads the empty owner key, that key becomes a part
of it’s conflict range. client-B overwrites this key and
commits before client-A. This causes
client-A’s transaction to be rejected. Upon retrying the
transaction, client-A sees that client-B is
the owner of the mutex. Instead of claiming ownership,
client-A places itself onto the queue.
When testing client implementations, I always use integration testing. Tests perform reads and writes against an containerized Foundation DB cluster. I reimplemented this setup frequently enough that I decided to pull out the logic into a framework which can run locally or in GitHub actions, caches build & test results, etc. A local installation of Foundation DB works fine as well.
For each test, I create a random subspace to isolate the reads and writes. This subspace is deleted after each test runs. For this isolation to work, you must ensure your client does all it’s reads and writes within the context of a configurable subspace.
type testFn func(t *testing.T, db fdb.Database, root subspace.Subspace)
func runTest(t *testing.T, test testFn) {
// Connect to the database.
fdb.MustAPIVersion(710)
db := fdb.MustOpenDefault()
// Generate a random directory name.
randBytes := make([]byte, 8)
if _, err := rand.Read(randBytes); err != nil {
t.Fatalf("failed to generate random bytes: %v", err)
}
dirName := hex.EncodeToString(randBytes)
// Open the directory. This process maps the directory
// to a unique, short key prefix.
root, err := directory.CreateOrOpen(db, []string{dirName}, nil)
if err != nil {
t.Fatalf("failed to create root directory: %v", err)
}
// Schedule a destructor which deletes the directory
// (clearing all it's key-values) after the test is
// complete.
defer func() {
err := directory.Root().Remove(db, []string{dirName})
if err != nil {
t.Errorf("failed to delete root directory: %v", err)
}
}()
// Run the test.
test(t, db, root)
}
To ensure the marshaling methods remain symmetric, I perform round trip tests for every key and value. Combined with tests for each query, it becomes easy to avoid schema drift in different parts of the implementation. Once again, Foundation DB doesn’t enforce the schema, so the extra checks are warranted.
func TestMarshaling(t *testing.T) {
t.Run("owner key", func(t *testing.T) {
in := "name"
out := UnpackOwnerKey(PackOwnerKey(in))
require.Equal(out, in)
})
t.Run("queue value", func(t *testing.T) {
in := "name"
out := UnpackQueueValue(PackQueueValue(in))
require.Equal(out, in)
}
}
As the name implies, Foundation DB operates at a lower level than other databases. It takes care of the foundational problems of distributed data storage but requires the developer to implement their model from a handful of primitives.
Foundation DB can partially replace common patterns like leader election or service discovery. It can be used to coordinate clusters of workers or index large amounts of blob data. It’s become the backbone of the distributed systems I’ve worked on.
There are many features which this article doesn’t cover, like server-side operations, manual conflict management, or transaction throttling. Take a look at the Foundation DB documentation for more information. If you have any questions, drop by the Foundation DB forums. If you see me around, please say “hi”. 😀