Last chapter covered all things that can go wrong in distributed systems, this chapter will go over algorithms that can fix those problems
Best way of building fault-tolerant systems is to find some general-purpose abstractions with useful guarantees, implement them once, and then let applications rely on those guarantees.
- This is also what happens with transactions
- Application pretends there’s no crashes (atomicity)
- That nobody is concurrently accessing the database (isolation)
- That storage devices are perfectly reliable (durability)
- Even though they occur transaction abstraction hides them so application doesn’t worry about them
Consensus, getting all nodes to agree on something, is one of the most important abstractions
First we’ll explore the range of guarantees and abstractions that can be provided in a distributed system.
Need to understand what can be done and what can’t be done
Researchers in this field have been studying these topics for decades, so this is really just the surface. There are literature references if you’d like to read more
Consistency Guarantees
Two nodes will usually always have different states, because writes requests take time to travel to them.
Most replicated databases provide at least eventual consistency, which means that if you stop writing to the database and wait for some (unspecified) length of time, eventually all read requests will return the same value
The inconsistency is temporary
Kleppman says a better name for this should be convergence, as we expect all replicas to eventually converge to the same value
Very weak guarantee, doesn’t say anything as to when they’ll converge. This is very hard for application developers, it’s not consistent on reads of values
- hard to test because app will work well most of the time
- Usually only seen during system fault (like network interruption) or high concurrency
This chapter will go into stronger consistency models that data systems use
- Start with strongest consistency model, Linearizability
- Go into issues with ordering events in distributed systems
- Explore how to atomically commit a distributed transaction
Linearizability
Also called, strong consistency or atomic consistency
Gives the illusion that there’s only one state of data
Need to make just the most recent write doesn’t come from a stale cache or replica
What makes a System Linearizable
After a write occurs, all reads will show the same value after some time (And before)
- If one node picks up the new value, all other nodes must have that same value
Lines should always move forward in time.
![[Screenshot 2024-05-07 at 9.45.44 AM.png]]
Linearizability vs. Serializability
Both seem like they say “Can be arrange in sequential order” but there’s differences
Serializability is isolation property of transactions.
Linearizability is a guarantee on reads and writes of a register / individual objects
Database may provide both serializability and linearizability. This is known as strict serializability or strong one-copy serializability (strong 1SR)
Serializable snapshot isolation is not linearizable
Relying on Linearizability
Where is this useful?
Locking and Leader Selection
Single Leader replication needs to ensure there’s only one leader
- One way is to use a lock
- This lock must be linearizable
Coordination services like ZooKeeper and etcd are often used to implement distributed locks and leader election. They use consensus algorithms to implemement linearizable operations in fault tolerant way.
Many subtle details to implementing locks and leader election, and libraries that help providing higher level recipes on top of ZooKeeper
Distributed Locking is also used at much more granular level in some distributed databases like Oracle Real Application Clusters (RAC)
Constraints and uniqueness guarantees
Username and email must be unique
We need to enforce this constraint amount distributed systems Situation is similar to a lock
- Someone who registers is given a lock
- Compare and Set can occur too (Another similarity)
Cross-Channel Timing Dependencies
In the inconsistent football score example, it was only a problem because there was another communication channel (Two people sitting in the same room talking about the game)
Similar things can happen for computer systems, like uploading a photo and having a process that resizes the photo
Without linearizability, there can be a race condition where the image resizer is ready to start the image but it doesn’t exist.
Problem arises because there’s two different communication channels, the file storage upload and the message queue sending the request to the resizer.
Linearizability is not the only way to avoid it, but it’s the simplest to understand
Additional communication channels make it more difficult (unless you control them), or use alternative approaches
Implementing Linearizable Systems
Single-Leader Replication (Potentially Linearizable)
- Have potential - Not every single-leader database is actually linearizable
- Depends on its design or concurrency bugs
- Need to be sure you know who the leader is, easy for a node to think it’s the leader
Consensus algorithms (linearizable)
- some of these algorithms feel like single-leader, but their unique because split-brain can never happen
- ZooKeeper and etcd use this
Multi-Leader Replication (not linearizable)
- Generally not because they concurrently write on different nodes
Leaderless Replication (probably not)
- Some people claim it’s possible using quorum reads and writes
- Last write wins conflict resolution based on time of day clocks are not linearizable (Like Cassandra)
- This is because clocks are consistent with actual even ordering
Linearizability and quorums
They seem to have linearizability in a Dynamo-style model, but when there are variable network delays, it’s possible to have race conditions
It’s possible to just wait even longer for all reads / writes to complete. This works, but costs performance
Cost of Linearizability
In multi-leader database, if a network interruption happens between datacenters, things still work perfectly fine
In single-leader, then the leader must be in the active datacenter. Any writes and any reads must be sent to leader.
- If followers cannot contact the leader, they cannot update their data
- Application cannot work properly if the followers aren’t up to date
Cap Theorem
This is explained in CAP theorem
You sacrifice availability for stronger consistency
The unhelpful CAP theorem
- Sometimes represented as Consistency, Availability, Partition Tolerance, pick two out of three
- Network Partitions are a fault, and we don’t have any choice about it
- So they will always happen
- When network is working correctly, system can provide both strong consistency and total availability
- Better phrase of CAP would be either Consistent or Available when Partitioned
- More reliable networks need to make this choice less often
CAP theorem doesn’t mention anything about network delays, dead nodes, or other trade-offs
- It’s been influential but it has little practical value for designing systems according to Kleppmann
- Most of CAP has been superseded, so it’s mostly just of historical interest
Linearizability and Network Delays
Very few systems are actually linearizable in practice. RAM on a modern multi-core CPU isn’t.
- If thread one one CPU core writes to a memory address, and a thread on another CPU core reads the same address shortly after, it’s not guaranteed to read the value of the first thread (Unless memory barrier or fence is used)
CPU cores have their own memory and store buffer. It’s faster to just read from that
- We drop consistency to gain performance (not fault tolerance / availability)
Most non-linearizable distributed databases do this for performance, not availability
Ordering Guarantees
Ordering has been a recurring theme in this book, which suggests it might be an important fundamental idea.
- Main purpose of the leader is to determine the order of writes in replication log
- Serializability is about ensuring that transactions behave as if they were executed in some sequential oder
- Timestamps and clocks (chapter 8) is another attempt to introduce order into a disorderly world
There is a deep connection between ordering, linearizability, and consensus. Although this idea is more theoretical, Kleppmann says it’s worth understanding what systems can and can’t do
Ordering and Causality
Ordering is important to preserve causality.
We don’t want to be given an answer, when no question was asked
- Message must be sent before it’s received
Like in real life, one thing leads to another
If a system obeys ordering imposed by causality, It is called causally consistent
The Causal Order is not Total Order
Total order allows any two elements to be compared. Unlike a mathmatical set where there are just numbers.
Linearizability -> Has total order, system only has one copy of data, and every operation is atomic. This means that any two operations will know what happened first
Causality -> Partial Order
- Two events can happen, one before the other, but they are incomparable if they’re concurrent
Linearizability is stronger than casual consistency
Linearizability implies casualty any system that is linearizable will preserve causality correctly.
This is great and simple, but of course, comes at the cost of performance and availability. Some distributed systems abandon it for this reason.
Good news is that a middle ground is possible
Casual Consistency is the strongest possible consistency model that does not slow down due to network delays and remains available in the face of network failures. It also doesn’t reduce perfomance as much as strong consistency
In many cases, systems that seem to require strong consistency, really only need casual consistency
Based on this observation, researchers are exploring new kinds of databases that preserve casuality, with performance and availability characteristics similar to those of eventually consistent systems
Research is still recent. Not much there
Capturing casual dependencies
Won’t talk about how non-linearizable systems can maintain causal consistency, but just talk about the key ideas
To maintain causality, you need to know which operation happened before which other operation. This is a partial order.
concurrent operations may be processed in any order, but if one happens before another, then they must be processed in that order (in every replica)
replica must be ensured that its processing in the proper order
Need a way of describing knowledge of a node in the system. If node 1 has already seen value X when it issued the write Y, then X and Y may be casually related.
Similar to detecting concurrent writes section, where in a leaderless datastore, we need to detect concurrent writes to same key in order to prevent lost updates.
- Casual consistency goes further
- It needs to track casual dependencies across the entire database, not just for a single key.
- Version vectors can be generalized to do this(????)
Database needs to know which version of the data was read by application. The version number from prior operation is passed back to the database on a write.
Sequence Number Ordering
Causality is an important theoretical concept, but keeping track of all casual dependencies can impractical.
- In many apps, clients can read lots of data before writing, then it’s not clear whether the write is casually dependent on all or some of those data points
There is a better way
Sequence numbers or timestamps to order events
- Very small, only take a couple of bytes
- Can give total order
- Every operation has unique sequence of number, you can compare two sequence numbers to determine which is greater
- Can create sequence numbers in a total order that is consistent with causality
In single-leader replication, it can just give each operation a counter number. The follower will always know the state of the main node.
Non-causal Sequence Number Generators
it’s less clear how to generate sequence numbers for operations for leaderless or multi-leader databases.
Various methods are
- Each node has it’s own independent set of sequence numbers
- Given two nodes, one node can generate only odd numbers, the other only even. Reserve bits to contain unique node identifier to ensure two different nodes never generate same sequence number
- Attach a timestamp from time-of-day clock. Could be sufficient to totally order operations
- Preallocate blocks of sequence numbers, node 1 gets numbers 1 to 1,000. Node 2 gets 1,000 to 2,000 and so on.
These three options perform better and are more scalable than pushing all operations through a single leader that increments a counter. They all have a problem though.
The numbers they generate are not consistent with causality
Causality problems occur when number generators do not correct capture ordering of operations across nodes.
- One node can go super fast while other node goes really slow
- Operations get lost
- Timestamps in physical clocks start to skew
- One operation may be given a number range and then later casually it’s given a lower number range
Lamport Timestamps
Solution is Lamport timestamp. One of the most cited papers in field of distributed systems
Each node is given unique identifier. Each node keeps counter of the number of operations processed.
Lamport timestamp is the pair (counter, NodeID)
- Similar to even/odd BUT every node and client keeps track of the maximum counter value it has seen so far and includes maximum on every request
- If node sees there’s a new maximum it updates it’s counter to the new maximum
Timestamp order is not sufficient
If we want uniqueness of usernames and have two con-concurrent users, problem arises
Need Total Order Broadcast
Total Order Broadcast
Kleppmann talked about how we can use sequence numbers, but those have uniqueness constraint problem.
Single-Leader replication works, but it doesn’t scale. How can we scale for larger throughput? This idea is called total order broadcast or atomic broadcast in Distributed System literature
Usually described as protocol for exchanging messages between nodes.
- Has two properties
- No messages are lost
- Messages are delivered to every node in same order
Using total order broadcast
Coordination services like ZooKeeper and etcd actually implement this. There is a strong correlation between total order broadcast and consensus
Can do a lot with total order broadcast
- Implmement serializable transcations
Total Order Broadcast creates a log and delivering message is like appending to the log. All nodes are able to read the log
Also useful in implementing a lock service that requires fence tokens
Implementing Linearizable storage using total order broadcast
If you have total order broadcast you can implement linearizable storage, let’s say usernames for example
- Execute a compare-and-set operation on the register of that username, only one compare-and-set will win in concurrent
Use an append-only log
To implement this
- Increment and get linearizable integer, and attach value you get form register as sequence number to the message. Then send message to all nodes and recipient will deliver messages consecutively by sequence number
Distributed Transactions and Consensus
Consensus is one of the most important and fundamental problems in distributed computing
One surface seems simple -> Just get all nodes to agree on something. Unfortunately many broken systems have been built in the mistaken belief that this problem is easy to solve.
Although consensus is important, Kleppmann put it at the end because the topic is very subtle and requires prerequisite knowledge. Now that he’s gone over replication, transactions, and system models, it’s time to go over the consensus problem.
Situations where it’s important for nodes to agree
- Leader election - in single-leader, multiple nodes can think they’re leader. That’s bad
- Atomic Commit - Transaction spanning several nodes or partitions may fail on some nodes and succeed on others
- All nodes must agree on the outcome
Impossibility of Consensus
FLP result -> Proves their no algorithm that is always able to reach consensus if there is a risk that a node may crash
- This only is proved in the asynchronous system model, a very restrictive model where deterministic algorithms cannot use any clocks or timeouts
- Problem is solvable with clocks and timeouts
Distributed systems can usually achieve consensus in practice. FLP is a good theoretical problem though
Back to chapter
In the following chapters, Kleppmann will go over atomic commit problem in more detail. We will discuss the two-phase-commit(2PC). The most common way of solving atomic commit problem.
By learning from 2PC we will work our way toward better consensus algorithms used in ZooKeeper(Zab) and etcd(Raft)
Atomic Commit and Two Phase Commit (2PC)
In Chapter 7, we learned that the purpose of transaction atomicity is to provide simple semantics in the case where something goes wrong in the middle of making several writes. Outcome is either successful commit or an abort to roll back.
Atomicity prevents failed transactions filling the DB with half-finished results / state
From Single Node to Distributed Atomic Commit
Single Database node
- Atomicity implemented by the storage engine
- Database makes the write durable (stores in write-ahead-log), then appends commit record to the log on disk
- If database crashes, recover from the log
- This crucially depends on the order on the log. It must be correct
What if we try this on multiple nodes
- Like a multi-object transactions in a partitions database
- Sending commit request and independently adding transactions won’t work them
- Some nodes may find a constraint violation and abort
- some commit requests can be lost due to network
- some nodes can crash before record record is fully written and rollback on recovery while others continue and commit
- This leads to inconsistencies
Introduction to Two-Phase Commit
Algorithm for achieving atomic transaction commit across multiple nodes
2PC and 2PL are completely different things
2PC uses a coordinator (Transaction manager). Coordinator is implemented as a library with same application process that is requesting the transaction (e.g embedded in a Java EE container)
2PC transaction beings with application reading and writing data on multiple nodes as normal. Database nodes are called participants in transaction.
When application is ready to commit, Coordinator begins phase 1: it sends a prepare request to each of the nodes asking them whether they are able to commit.
Coordinator tracks responses from participants.
- If all say yes, then coordinator sends out commit request in phase 2
- If any say no, sends out an abort request to all nodes in phase 2
Process is similar to a traditional marriage ceremony in Western cultures.
A System of Promises
Going more into detail
When application wants to being distributed transaction, it requests transaction ID. It’s globally unique.
Application begins single node transaction on each of the participants and attaches the unique ID to the transaction. All reads and writes are done, if anything goes wrong the coordinator will abort
When application is ready to commit, coordinator sends out prepare request, if someone can’t, it aborts all of them
When participant receives request, it makes sure that it can definitely commit the transaction under all circumstances. Replying yes means it promises it will happen.
When coordinator receives all responses, it will write the decision on disk so it has a log in case of a crash
Once coordinator’s decision has been written to disk, commit or abort request is sent to all participants. If the request fails or times out, the coordinator must retry forever until it succeeds. There is no more going back if the decision was to commit.
Protocol contains two crucial points of no return.
Coordinator Failure
What happens if the coordinator crashes?
- My guess is that it looks at it’s logs
If coordinator fails before sending the prepare requests, a participant can safely abort the transaction. Once participant has received a prepare request and voted “yes”, it can no longer abort. It must wait to hear back from coordination
Timeouts don’t work because node1 and abort but it can be inconsistent with node2 (???)
Only way is to wait for coordinator to recover
- Coordinator must write it’s commit or abort decision to a transaction log on disk before sending commit or abort requests to participants. Any Transactions that don’t have a commit record in the logs are aborted
Three Phase Commit
Two Phase commit is called a blocking atomic commit protocol due to the fact that 2PC can become stuck waiting for coordinator recovery.
An alternative to two phase commit has been proposed, Three Phase Commit (3PC)
Guarantees atomicity with bounded delay and nodes with bounded response times
Nonblocking atomic commit requires a perfect failure detector
- A mechanism that can tell reliably whether a node has crashed or not
- Unbounded delay networks is not a reliable failure detector
2PC continues to be used for this reason
Distributed Transactions in Practice
Distributed transactions, especially 2PC ones, have a mixed reputation
- Provide an important safety guarantee that is hard to achieve otherwise
- Criticized for killing performance and promising more than they deliver
Distributed transactions in MySQL are reported to be 10x slower than single-node transactions
- Most of performance costs come from 2 phase commit due to additional disk forcing (fsync) that is required for crash recovery and additional network round trips.
Instead of auto dismissing distributed transactions, let’s examine them in more detail
There’s important lessons to be learned in them
Two Quite different types of Distributed Transactions are often conflated:
- Database-Internal Distributed Transaction
- Some distributed DBs support internal transactions among nodes
- VoltDB, MySQL Cluster’s NDB storage engine have this support
- Some distributed DBs support internal transactions among nodes
- Heterogeneous distributed transactions
- Participants are two or more different technologies
- Two databases from different vendors or even non-database systems like MQ
- Participants are two or more different technologies
Exactly-Once Message Processing
Heterogenoeous distributed transactions allow diverse systems to be integrated in powerful ways. Imagine a message queue that can only send a message if a DB transaction for processing the message was successful.
We can guarantee exactly once message processing because it will either commit or fail
Systems must use the same atomic commit protocol. Probably would need to support 2PC
Kleppmann will return to this in Chapter 11
XA Transactions
Standard way to implement 2PC across different technologies
- Supports many traditional relational DBs (PostgreSQL, MySQL, DB2, SQL Server, and Oracle)
- Supports message brokers (ActiveMQ, MSMQ, HornetQ, IBM MQ)
Not a network protocol, a C API for interfacing with a transaction coordinator
Check if drivers support XA. Load library into same process
If app crashes, coordinator goes with it Database drivers will ask for the coordinator
Holding locks while in doubt
Being in doubt is bad because a participant can hold onto a lock and block everyone else
If Coordinators log is lost, those rows will be locked and held forever
Recovering from Coordinator Failure
Rebooting the DB won’t fix this problem
Need to have an admin manually do it
XA also has a heursitic decisions tool but use it sparingly to not violate atomicity
Limitations of Distributed Transactions
Coordinator needs to not be run on a single machine (single point of failure for entire system)
- Many coordinators are implmeneted on a single machine
Coordinator’s logs are part of server state. Servers are no longer stateless
XA needs compatibility
Fault Tolerant Consensus
Improving this model
VSR, Paxos, Raft, and Zab implement a different way to avoid these problems above
Decide on sequence of values
Skipping….
Limitations of Consensus
Rely on timeouts to detect failed nodes Require strict majority to operate Assume a fixed set of nodes that participate in voting Sometimes particularly sensitive to network problems
- infinite bouncing leadership in Raft when network is unreliable in one link
Membership and Coordination Services
Hadoop, Nova, and Kafka all rely on ZooKeeper running in the background. What is that these projects get from it?
Strongly consistency Total Ordering of Operations Failure detection Change notifications
Allocating work to nodes
- If a leader fails, another node should take over
Service Discovery
Finding out what IP address to connect to in order to reach a service
Probably doesn’t require consensus. Leader election does though
- If your system already knows who the leader is, then you can tell all the other ones who it is
Membership SErvices
All of this comes from research into Membership services in the 1980s which was needed for air traffic control