System Design: Learning from Open Source


Cassandra - AP
- no single point of failure: every node/replica is equal
- easy incremental scalability
- 10% read pair
- wide column model: schema-less, dynamically create fields
- specify the consistency level of each query
- support multi-datacenter replica
- fast writes, fast negative lookups
- gossip protocol - p2p
- applicable for high write rate and really low read rate

SSTable: Sorted String Tablelink
- sorted, immutable, fast random read
- create a standalone key:offset index for fast access, indexed are loaded in to memory
- read access
- tombstone record is appended for delete.
- Periodically collapsed/merge together - overwrite and remove old data - it will handle the ttl delete expired data

- Data can be compressed
Bloom Filter
Index

MemTable - SSTable in memory
- fast random writes
- all writes go to MemTable, also logged to CommitLog sequentially(HLog in HBase)
- read checks the MemTable first and then SSTable indexes in sequence, reads always read the latest value without reaching the older values

- periodically flushed to disk(SSTable)


Tombstone - mark for delete
Commit log - marked as free after MemTable is flushed
Sloppy quorums and hinted handoff
Merkle trees
- Anti-entropy
the root node contains the hash of all children
- if the root hash is not the same, compare the next child nodes and find out the actual discrepancy

- each node maintains a Merkle tree for every key range

Quorum reads and writes
w + r > n
w=n, r=1 if few writes, many reads

version is used to determine which is newer
Seed nodes
primary key
partition keys
- first part of primary key
- randomly distributed via the hash algorithm
clustering columns

- determine the ordering of the data on disk
Compound primary key
- the first part of that key is hashed
- other columns are used to sort
Avoid skewed workloads and relieving hot spots - a celebrity user
- add a 2-digit random number at the end of the key
- need keep track which keys are being split.

favor materialized view instead of index
local index
- another table, where the key is indexed, the columns contain the row keys
- all nodes are involved, slow


SSTable Attached Secondary Index (SASI)

Run repair at least once every gc_grace_seconds, otherwise you may resurrect the dead
Time-window compaction
- good for time series data, with TTLs
Size-tiered compaction
Leveled compaction

Denormalizing using collections
- sets, lists, and maps
Denormalizing with materialized views
- read same data using an alternate key entirely
Snitches
- GossipingPropertyFileSnitch
Consistency levels
Any, ONE, TWO, THREE, QUORUM, ALL
Cross data centers
LOCAL_ONE, LOCAL_QUORUM, EACH_QUORUM

Handling slow nodes
- prefer lower latency nodes when requesting the entire record
- asks for checksums from other nodes
rapid read protection
- proactively make the request to another node while waiting for the original node to respond

failure detector - phi
- statistical distribution

SimpleStrategy
NetworkTopologyStrategy
- Rack awareness
- replicas are placed in different racks

PRIMARY KEY ((sensorID, time_bucket), timestamp)
geospatial data

- PRIMARY KEY (geo_key, geohash)

LSM - Log Structured Merge Trees
- append-only
- underlying data structure behind SSTable and Memtable
- small in-memory tree and a large (complete) data tree
- guarantee consistent insert rate - transform random writes into sequential ones in log file and MemTable
- reads are independent from writes

- SSTable is immutable, always in an optimized layout
Write optimized; deletes are expensive.
Writes go to CommitLog first, then to SSTable which is flushed to disk later.
Compact Periodically

B Tree
- all leafs are in same level
B+ tree
- log(N) read
- all keys are stored in leafs
- leaf nodes are linked, fast range query
- leaf nodes are not necessarily next to each other - OPTIMIZE TABLE to put leafs in order
- locality on a page-level
- Not good for lots of random add

- Updates and deletes done in disk

Session Coordinator
-- Also in Solr Cloud
-- Client talks to any node which hashes the key, finds the shard and do all stuff and return the result to the client.

Partitioned - consistency hashing
Replicated - configurable, work across data centers

Partitioned by hash of key and sorted by secondary keys
(user_id, update_timestamp)
The first part of that key is hashed to determine the partition, but the other columns are used as a concatenated index for sorting the data in Cassandra’s SSTables.

Dynamodb
- decentralization, p2p
- High availability for writes
- Vector Clock: list of node,counter
- [Sx,1] -> [Sx,2]
  -> [Sx,2],[Sy,1]
  -> [Sx,2],[Sz,1]
     ->
- last write wins or decide by client
- W+R>N
- N: replica copies

- LastEvaluatedKey: pagination
- always writable
- replicas updated asynchronously by a background process


Netflix Dynomite
Dapper, zipkin

- Sampling, async loggin

Local Index (vs Global Index)
local secondary indexes - indexes by document
each partition maintains its own secondary indexes, covering only the documents in that partition
scatter and gather each partition's response then do rank etc
- tail latency
- Cassandra, Sorl, MongoDB

global secondary index - indexes by term (fast read, slow write)
- partitioned differently from the primary key index
- only needs to make a request to the partition containing the term
- writes are slower and more complicated - often asynchronous


- a write to a single document may affect multiple partitions of the index
- Dynamodb

Masterless: peer-to-peer, de-centralization
Gossip protocol 
- membership changes and failure detection are performed by propagating the changes to a set of randomly chosen neighbors, who in turn propagate to another set of neighbors

- Gossip is a peer-to-peer communication protocol in which nodes periodically exchange state information about themselves and about other nodes they know about. The gossip process runs every second and exchanges state messages with up to three other nodes in the cluster. The nodes exchange information about themselves and about the other nodes that they have gossiped about, so all nodes quickly learn about all other nodes in the cluster.

Separate failure detection from membership updates

- Do not rely on a single peer for failure detection

Automation, no downtime
- automatically partition, recovery
 - add/replace servers

Wide Column
Dynamic schema

Kafka
Why it is fast
Sequentially read/write vs random read/write
Memory Mapped File
Zero copy

Batch data(compressed)

Broker
- One broker serves as the controller which is responsible to choose partition leader etc.
Producers write to brokers, consumers read from brokers
Topics are split into partitions
Partition: ordered, immutable, replicated
- redundancy and scalability
All replicas have the exact same log with the same offsets.
All reads and writes go to the leader of the partition

Node liveness - alive and not fall too far behind
In sync replicas(ISR)
- dynamic
- eligible for leader election
-- A message is considered "committed" when all in sync replicas have applied it to their log
availability vs consistency

consumer_id: hostname:uuid
message in the partition - offset
- consumer keeps track of the offset of messages in zk or kafka topic
Consumer group
- Every message will be only consumed by one consumer in same consumer group.
One consumer can consume data from multiple partition.
Every partition can only be consumed by one consumer in same consumer group.

Consumers track (offset, partition, topic)
Zookeeper maintains states: leader, replicas
Batching - avoid small messages
Zero copying
Batch Compression

- written in batches
- compressed
- use data format such as avro
- no guarantee of time-ordering of messages across the entire topic, just within a single partition.

Kafka provides message ordering inside partitions
- 200k messages/sec for writes and 3M messages/sec for reads
- Each broker can handle terabytes of messages

Kafka - Usage
- Separation between producers and consumers, loose coupling
- decouple read and write
Consumers: - real-time and batch
- Not suitable for long-term data store
- persisting kafka logs to s3 or others pinterest secor

BigTable - link
- Masters assgins tablets to tablet servers, track where tablets are located and redistributes tasks as needed.
- tablet servers split tablets when they exceed size limits
- distributed Lock servers, chubby
- self-managing systems

- Darwinian infrastructure, Perform time consuming operation in parallel and take the winner.

HBase
- Master/Slave
One Store for every column family = one MemStore + HFiles

HFile Index -> index level expansion, multiple level index

Push vs Pull
- Hard to handle diverse consumers with different capability in push mode

Delivery: at-least-once
Delivery mode: at least once, at most once, exactly once

Quorum
Majority - tolerate f failures, require 2f+1 nodes
- used by Zookeeper, shared cluster configuration

Page Cache
Native memory

Message Queue
- Asynchronous Processing
Separation of producers and consumers
Scaled separately
Evening out traffic spikes
-- Monitoring queue lengths

Publish/Subscribe Mode
Private queue for each consumer.
Messages are cloned to each privet queue.

Competing consumers

Storm
- use ZK to manage its cluster state such as message acknowledgements, processing status etc
- Nimbus Service on Master Node (Thrift service)
- Supervisor Service on Worker Node
- Topology, Spout(source of streams, no process), Bolt, Tuple
Shuffle grouping
Fields grouping

- tuples with the same value for a particular field name are always emitted to the same instance of a bolt

Consistent Hashing - link, link
- data are usually not uniformly distributed - cache unbalanced
- virtual replicas
- each cache mapped(hashed) multiple points/times into the ring.
- each cache associated with multiple segments of the ring
- the hash table is resized (e.g. a new cache host is added to the system), only k/n keys need to be remapped

Hash - Monotonicity

Redis
Use memory as main storage and disk only for persistence
Cache
Write-through cache
Write-back cache
Sideline cache
- expensive as it uses ram
- How much data, it may be too large/cost to keep all in memory
TTL key
PERSIST key
Publish Subscribe Model
- decouple publishers and subscribers
- a subscriber will never receive messages that were sent when not connected.
- messages were sent to all online subscribers

Handling failing nodes
A guesses B is failing, as the latest PING request timed out.
A will not take any action without any other hint.
C sends a PONG to A, with the gossip section containing information about B: C also thinks B is failing.

At this point A marks B as failed, and notifies the information to all the other nodes in the cluster, that will mark the node as failing.

Redis - Usage
- AP, may lose data
- Auto_increment ID: INCR
- List(as queue) - LPUSH, RPOP
- DelayedQueue

  - Use current time(remove millseconds) as score, then query zrangeBysocre(KEY, 0, score)

Memcached
Slab allocation
- partitioned as pages (1MB)
- a page is assgined to a slab-class decides which defines how large the objects can be stored in that page
- each slab-class has its own LRU.

Memcached lease
look aside cache problem - stale, thundering herd
incast congestion 
- client queue request: rolling window
extract memcached key from database translation log and broadcast the invalidation to all clusters
memcached routers between db and cache servers - aggregate deletes
Remote markers
- set it before cross-region update (write from replica region) the update and delete it after the replication id done to this replica

- fetch directly from remote master if the marker exists

Microsoft’s Orleans 
- build stateful service
- Gossip Protocol + Consistent Hashing + Distributed Hash Table

MongoDB
- CP
- replica sets

Hadoop
- Bringing computation to data
HDFS
- good for large files
DataNode
chunk - big(64mb), replicated
NameNode

- stores only the meta-data for chunks  and keeps them in memory
Secondary NameNode

YARN
a global ResourceManager (RM) and a per-application ApplicationMaster

Nginx
- event driven design
- requires much less threads
- have one worker process for each CPU
- each worker is single-threaded, can handle thousands of concurrent connections

Apache Http Server
- can be process-based, thread-based or event-based as determined by the MPM used

HTTP+JSON vs RPC(Thrift etc)



Avoid SPOF - Be Redundant
Be Robust - Hide error as much as possible
Be conservative in what you send, be liberal in what you accept
Logging

Test and Monitor Performance


Backup Requests with Cross Server Cancellation
A request is sent to Service B(1) along with the information that the request is also being sent to Service B(2). Then some delay later the request is sent to Service B(2). When B(1) completes the request it cancels the request on B(2). 

Speculative execution
- Make a call to the primary cluster to retrieve data
- If the call fails or doesn’t return within certain time, make another call to the standby cluster
- Return the data from the cluster which returns first

Offload work to client
- Uber: use driver's phone

Policy to expire/archive data
Monitor visually

Automate all the things

Throttling - nginx
Design to Be Disabled - feature toggle

Design for Rollback
Data Schema - Code Version Compatibility

Sharding
Horizontal partitioning
Vertical partitioning - multiple entities - partition by functionality
Partition Key
Entries with the same partition key are stored in the same node.
Single Master/Multiple Slaves
Multi-master replication - lose single-source-of-truth

Algorithmic Sharding - hash(key) % NUM
Dynamic Sharding - external locator service
HDFS - Name Node
HBase - range server
MongoDB - ConfigServer

Active/Active

LogStash
Input -> Filter -> Output
Shipper(Collector) -> Kafka(Redis) -> Solr(ElasticSearch)

DataBus
Source of truth DB, Derived DB
Dual-Writes, log mining

Rest.li

Asynchronous APIs
S2 Geometry Library

Timeline(Twitter/Pinterest)
Push - Fan-out-on-write
Pull -  Fan-out-on-load
High priority for online/active users

Zookeeper - CP
- a highly available data store for small amount of data
namespace likes Unix filesystem
- fail fast
Sequential consistency
-  updates from clients are always applied in FIFO
Atomicity, Single system image, Reliability,
Timeliness

- be up-to-date within a certain time bound, eventual consistency
znodes
-  persistent and ephemeral

- ephemerals cannot have children
A sequential znode is assigned a sequence number(monotonously increasing) as a part of its name during its creation.

ZooKeeper Watches - Push Model
- one-time trigger, need set another watch

- watches are always ordered/dispatched in FIFO manner

Read requests 
- handled locally

- may not be the up-to-date
Write requests - forwarded to leader, acked by majority nodes
ZooKeeper Atomic Broadcast (ZAB)
- herd effect

leader election
leader activation
ZooKeeper Atomic Broadcast (ZAB)
- server's identifier (sid) and transaction ID (zxid)
- server that has the most recent transaction ID (zxid) wins the leader election

leaders
followers, observers
- both commit proposals from the leader of the two-phase commit process
- observers do not participate in the voting processes
- Observers aid to the scalability of read requests and help in propagating updates that span multiple data centers.
Access Control List (ACL) policy

ZooKeeper recipes
Barrier
Lock
Leader election
Group membership
Two-phase commit

- Haddop ResourceManager(RM)
-- Leader election in multiple name niodes(NN)

Chubby
- master lease, periodically renewed
- interface much like a distributed file system with advisory locks

Client
Session lease, Session lease timeout

KeepAlives

HBase:
- only one HMaster
- RegionServer(ephemeral znode)

Two-phase commit - 2PC
In the first phase, the coordinator node asks all the transaction's participating processes to prepare and vote to either commit or abort the transaction.
In the second phase, the coordinator decides whether to commit or abort the transaction, depending on the result of the voting in the first phase. If all participants voted for commit, it commits the transaction; otherwise, it aborts it. It finally notifies the result to all the participants.

Logical Clock
- happened-before
- every node maintains local clock - a counter
- vector clock

Distributed Hash Table (DHT)
- Chord
Chord - peer-to-peer
- decentralization
- consistent hashing
- get to the exact node in the smallest number of hops by storing a small amount of local data at each node
- maintains a finger table(logN routing info), get destination node in O(logn) hops

Scalability Rules: 50 Principles for Scaling Web Sites
Don’t Check Your Work
Relax Temporal Constraints

Communicate Asynchronously As Much As Possible
Ensure You Can Wire On and Off Functions

Scale Out - horizontal scaling
Axes of Scale
X Axis - Clone Things
Y Axis - Split Different Things - by function, services

Z Axis - Split Similar Things - sharding

Isolate Faults - Circular breaker

Use CDN, Cloud
Don’t Overengineer


Architecting for Scale
Build systems keeping availability in mind.
- Dependencies, Customers, Limit Scope
Always Think About Scaling.
Mitigate risk.
Monitor availability.
Respond to availability issues in a predictable and defined way.

Define Service SLAs

Make your apps do something reasonable even if not all is right
– Better to give users limited functionality than an error page
Use higher priorities for interactive requests

Things to Consider:
Scalability, Reliability(Data consistency), Availability, Manageability
Performance, Cost

Read Heavy vs Write Heavy
Eventual Consistency vs Strong Consistency
Database sharding vs replication

Replication
- Scale reads not writes
- Statement-based
- Write-ahead log (WAL)

- Logical log - MySQL binlog

Micro Services
Choose the right database for the right service

Raft
- Leader, Follower, Candidate
- duoshuo, majorities
- reject other node's leader request if current node is newer - commitLog version
- random election timeout

- heartbeat


Paxos

How to Leader Election
- each server waits for a random amount of time and then say the announcement with time‐stamp: I’m the leader

Avro
- provide a compact serialization format, schemas that are separate from the message payloads and that do not require generated code when they change, 
- provide strong data typing and schema evolution, with both backwards and forwards compatibility.

Ways to rebalancing partitions
Fixed number of partitions - good for hash partitioning
- Cassandra
- create many more partitions than there are nodes, and assign several partitions to each node
- assgin more partitions to powerful nodes
- Only entire partitions are moved between nodes.
The number of partitions does not change, nor does the assignment of keys to partitions change. The only thing that changes is the assignment of partitions to nodes.

Dynamic partitioning
- create partitions dynamically: HBase

Ways to request routing
1. send to any node which may forward the request to appropriate node
2. send all requests to a routing tier
3. client is aware of assignment of partitions, connect directly to the appropriate node

Use a separate coordination service such as Zookeeper to keep track of this cluster metadata
Cassandra and Riak - use gossip
HBase, SolrCloud and Kafka also use Zookeeper to track partition assignment.


Couchbase - a routing tier moxi

Avro vs Protocol Buffers vs Thrift
code generation
version, back-compatible

Thrift
Transport

Server

Labels

adsense (5) Algorithm (69) Algorithm Series (35) Android (7) ANT (6) bat (8) Big Data (7) Blogger (14) Bugs (6) Cache (5) Chrome (19) Code Example (29) Code Quality (7) Coding Skills (5) Database (7) Debug (16) Design (5) Dev Tips (63) Eclipse (32) Git (5) Google (33) Guava (7) How to (9) Http Client (8) IDE (7) Interview (88) J2EE (13) J2SE (49) Java (186) JavaScript (27) JSON (7) Learning code (9) Lesson Learned (6) Linux (26) Lucene-Solr (112) Mac (10) Maven (8) Network (9) Nutch2 (18) Performance (9) PowerShell (11) Problem Solving (11) Programmer Skills (6) regex (5) Scala (6) Security (9) Soft Skills (38) Spring (22) System Design (11) Testing (7) Text Mining (14) Tips (17) Tools (24) Troubleshooting (29) UIMA (9) Web Development (19) Windows (21) xml (5)