April 14, 2017
最近上完了 MIT 6.824 Distributed Systems,把论文读完了,Lab 做了 1,2,3,这里是一些笔记。
MapReduce
- 限制编程的模型以方便数据的分发和运算的并行执行,对应用程序隐藏容错、数据分发、分布式计算的细节。
- 把数据分为 M 个 partition,每个 partition 执行一个 map task,map task 按 key hash 输出 R 个文件到本地,M 个 map task 完成后启动 R 个 Reduce task,第 1 个 Reduce task 读 M 个 map task 的第 1 个输出文件(总共读 M 个),并输出一个文件到 GFS,其他 Reduce task 类似,最后总共生成 R 个结果文件。
- Map worker 挂了只需要重新运行该 partition 对应的 map task,已经读完该 map worker 输出的中间文件的 Reduce task 可以顺利完成,没有开始读的则读取新 map task 的结果,读到一半的则需要从头来一次。
GFS
Raft
- 选主:一开始所有 server 都是 follower,election timeout 之后是 candidate,谁先获得大多数选票就是主,主有版本号,即 Term,一个 server 在一个 Term 里只投一票,所以一个 Term 里至多只有一个主。当选后给 follower 发心跳,其他 server 在收到心跳后转为 follower ,并重启 election timer。
- 状态通过 commit log 进行 replicate。
- 主容错:主挂了,其他 server election 会 time out,从而启动下一轮选举,每个 server election timeout 是随机数,从而减少多 server 同时启动选举的可能性,而为了使得已经 commit 的 log 不丢掉,server 只给那些 log 比自己更新的 candidate 投票,更新是指最后一条 Log 跟自己最后一条 Log 比 Term 大或者 Term 相同,但 Index 更大(即更长)
- Log 只从 Master 同步到 follower,follower 发现冲突则以 master 为准,master 从不删除自己的 Log,follower 收到 AppendEntries 请求时检查 prevTerm, prevIndex 是否和自己 Log 的最后一条 log 对应的上,对应不上就拒绝该请求。Master 只 commit 当前 Term 的 log,并且只有大多数 server 都同步了该 log 时才提交。Log 更新方式,follower 的检查,master commit 规则三者保证了 State machine safety,也就保证了所有状态机的状态是一致的。
Dynamo
TODO
- why the influxdb cluster’s initial design failed?
- redis cluster 原理
Spanner
- 理解了第四节就理解了本文
- 其主要优点是 externally consistent 读写,以及全球一致的读,并且只读事务是无锁的。
- 通过 paxos 实现了 replicated state machine,一个 tablet 一个 paxos group,所以 replication 的单位是 tablet,由于 tablet 是存在 Colossus(GFS 后续)中,所以数据在数据中心内有 replica,跨数据中心有 replica。
- 数据是多版本的,每个版本的时间戳被赋值为其 commit time,因此 spanner 支持 snapshot read。
- Read transaction 的难点在于,可能 read 多条记录,而这些记录分布在不同机器上(甚至是不同 国家),而在 read 过程中可能有写,要保证读和写是 serializable (即读写不能交叉)的才行。
- TrueTime 的理解:每台机器的时钟偏差是有限的,TT.now() 表示所有机器的时钟都在 [earliest, latest] 之间,也就是说,在每一台机器,其物理时间都 <= latest,TT.after(t) 代表在每台机器,t 都过去了,即在物理上,t 已经过去,每台机器都一致这么认为,由于物理时间是严格有序的,spanner 通过把执行操作的时刻(如 commit transaction 的时刻)和物理时间关联,也就定义了操作的顺序,这个顺序每台机器都认同。TrueTime 减少了机器间信息的交换和协同。
- 举个例子,看下 4.1.1 小节的 paxos leader leases,如 4.1.2 小节所讲,spanner 需要保证一个 paxos group 给出的 timestamp 必须单调递增。如何保证?考虑切主的情况,假设 Leader A 第一次当选,调用 TT.now() ,把 earliest1 当成当前时间戳,lease 为 10s,那么给出的时间戳必须是 [earliest1,earliest1 + 10s),在任期内,可以给 max(last commit timestamp + 1, TT.now().earliest),切主为 B,切换时 smax = last commit timestamp,A 必须等待 TT.after(smax),所以 B 当选时必然有 TT.now().earlist > smax,这样,B 给出的时间戳,就一定比 A 大,从而保证了单调性
- External-consistency invariant :如果事务 T2 在 T1 的 commit 之后开始,那么 spanner 给 T2 的 commit 的时间戳一定大于 T1 的时间戳。其意义是什么?首先 spanner 给出的 commit 时间戳不代表物理时间,也就是 T2 不一定发生在 T1 之后,所以给出时间戳不能保证事务的 linearity(这点是通过锁来保证的),但是在物理上如果 T2 在 T1 的 commit 之后开始,那么 T2 的 commit 肯定在 T1 的 commit 之后(因为 T2.commit > T2.start > T1.commit),这个不变量保证了 spanner 给 transaction 时间戳和物理上 transaction 的发生顺序是一致的。
- 这点为什么重要?考虑 read transaction ,假设 spanner 给的时间戳是 sread,假设 sw1 <sread <sw2,那么这次读,必须读到 w1 写的内容而不能读到 w2 写的内容。由于 spanner 给的时间戳和物理上的顺序是一致的,这也就意味着在物理上,w1 commit 在 w2 之前,sread 读到 w1 写的内容跟物理上是保持一致的。
- 为了保证这个不变量,spanner 实现了
Start
,Commit Wait
协议,Start
限制了给 Transaction Ti 的时间戳,而 Commit Wait
则延迟了 Ti 的实际 commit 时间。
- 由于保留了每个时间点的数据,所以就允许 snapshot 读和读历史时间点的数据。
- 读写 Transaction 的基本原理:
- 写:定好写的 transaction 的时间戳,延迟写 commit 的时间点,把数据写到 (key, timestamp) -> value 里
- 读:定好读的时间戳 t,把读发往各 paxos group,如果 paxos group 数据 tsafe < t 则等待,直到 tsafe > t,然后 snapshot 读时间点 t 的数据。读到的应该是 timestamp < t 并且最大的那个时间戳,即小于 t 的最近一次写。
- Spanner 的算法大部分在确定如何给 transaction 赋时间戳
2PC
2PC 在这篇文章里讲的《Transaction Management in the R* Distributed Database Management System 》。
我们来考虑各种失败情况:
- Prepare 信息发送到一半 coordinator (设为 C) 挂了,设S1 已经收到 prepare 信息,S1 会一直等着,C 重启,为了使 S1 不至于一直卡主,S1 设一个超时时间,超时结束,S1 不能 Abort,因为这时 Transaction 可能成功了,只是给它的 Commit message 丢了,当然也可能失败,所以 S1 必须询问 C Transaction 状态,因为 C 中 Transaction 没有相关记录(没有 Commit record),那么 C 只需对自己不知道的 Transaction 回复 Abort 就行。
- Prepare 消息/Vote 消息丢失。对此可以这样处理,C 在一定时间内没有收到 Vote 结果则Abort Transaction,给各 Server 发 Abort消息。 那么已经 Prepare 的 Server 可以收到消息从而清除状态,Abort 消息丢没关系,处于 Prepare 状态的 Server 过时会主动询问 C,C 回复 Abort 就行。
- C 写入 Commit/Abort 记录后挂了。C 重启后恢复 Transaction 状态,知道 Transaction Commit 了,于是它向所有 server 发送 COMMIT/Abort,在收到所有 Server 的 Ack 后可以忘记这个 Transaction。其他 Server 可能之前已经收到了 COMMIT/Abort 信息,对于这些 server,只需忽略额外的 COMMIT/Abort 消息就行。
综上,2PC 能够容忍各种错误,并且 Transaction 有最终结果并且相关状态会得到合理的回收,所以它是正确可行的。
Bayou
《Managing Update Conflicts in Bayou, a Weakly Connected Replicated Storage System》。
- 为什么不用一个中心 Server?想在 iPhone 即使没网络连接时也能使用;利用 iPhone 之间间歇的可用性
- 如何让所有设备认同同一个顺序?给每个 Update 一个 Timestamp <T, I>,T 是当前设备的时钟,I 是设备号 a <b if a.T < b.T or (a.T= b.T and a.I < b.I)
- Lamport logical clock 用来实现 causal consistency:
- Tmax = highest timestamp seen from any device (including self)
T = max(Tmax + 1, wall-clock time) – to generate a timestamp
- 如何确定一个 Update 已经提交了?
- primary replica:Primary 给每个收到的 Commit 一个 Commit Sequence Number(CSN),这样 Timestamp 变为 <CSN,local-time, device-id>,未 commit 的 update 排在所有已提交的 updates 的后面
- 两个设备如何同步 log? 假设 A 同步 log 给 B,需要 B 告诉 A 已经有了那些 Log
- 已提交的 log,B 只需要告诉 A 自己 log 里最大的 CSN,这样 A 只需要发送大于 CSN 的 Log 就行
- 对于未提交的 Log
§ A has:
<-,10,X>
<-,20,Y>
<-,30,X>
<-,40,X>
B has:
<-,10,X>
<-,20,Y>
<-,30,X>
§ B 只需要告诉A <30,X>,<20,Y>(这是一个 vector clock)A 发送时,对于 X 只需发送 > 30 的 log,对于 Y 发送 >20 的 log
- 如何加入一个新节点?在 Z 未加入时 ,vector lock 是不带 Z 的版本号的,而 Z 加入之后再退出,vector clock 也是不带 Z 版本号,如何区分两种情况?(因为两种情况同步时需要发送的 log 数量不一样)
Z joins by contacting some server X
Z's ID is <Tz,X>
Tz is X's logical clock as of when Z joined
X issues <-,Tz,X>:"new server ID=<Tz,X>"
Suppose Z's ID is <20,X>
A syncs to B
A has log entry from Z <-,25,<20,X>>
B's VV has no Z entry -- has B never seen Z, or already seen Z's retirement?
One case:
B's VV: [X:10, ...]
10 < 20 implies B hasn't yet seen X's "new server Z" update
The other case:
B's VV: [X:30, ...]
20 < 30 implies B once knew about Z, but then saw a retirement update
Chubby
问题
- Db 实现是否可以用 leveldb ?
- Session 不持久化为什么有利于 proxy
TODO
为什么需要 Chubby
- 为客户端同步,达成一致提供高可用的服务。包括选主,集群包含的机器信息的维护,meta-data 的存储。
设计
- 提供一个锁服务而不是一个客户端库是因为:replication 和 primary election 是随着流量增大而逐渐加入的,使用客户端库,需要改更多的历史代码;除了选主,服务还需要发布 master 在哪等 meta 信息,把 lock 服务和这些 meta 信息的存储放在一起能减少服务依赖的 server 并且一致性协议是共享的(指选主和发布 meta 信息所需的一致性协议);开发者更熟悉 lock 接口
- 减少了服务为了实现高可用需要的服务器数。不再需要 quorum,单服务器也可以。
- event 通知机制用于通知 master 变更,防止轮询;cache file 以减少服务端压力,即使不需要轮询,开发者众多,难以一一教育;consistent hash 方便开发者认知;提供安全机制以防止财产损失和牢狱之灾
- 选择 Coarse-grained lock (即持有时间长的锁)是因为:锁获取率不与应用的 qps 成比例,锁服务不可用更少影响客户端;锁性能的重点在,锁在切主时不丢失,以避免客户端锁移交的巨大开销;
架构
- 读写都在 master,master 通过 raft 等一致性协议容错
- 通过 DNS 找到服务器列表,相当于 DNS 做服务发现
- 如果一个 replica 几小时都不恢复,会被 replace:master 通过 DNS 发现服务器列表变更,启动 membership change 协议。
API
- 用 content generation number 来模拟 compare-and-swap
- Sequencer: lock 带版本号,client 向 chubby 要版本号,然后在发往 server 的消息中带上版本号,server check lock 是否过期。
Cache
- Client 缓存文件内容和 meta 信息,master 维护客户端缓存的文件列表,当文件发生变更时,阻塞变更,通知受影响客户端再进行变更操作,在通知过程中读请求是不允许被缓存的,通知是通过客户端的 KeepAlive 请求的响应带过去的,失效时选择 invalidate 而不是 update 是因为可能存在很多无效 update
Session
- Client 在连上 chubby server 时创建 session,session 包含客户端文件 handle,锁,缓存内容,session 失效,这些东西都失效。Session 通过客户端周期性的 KeepAlive 请求保持,客户端和 chubby 对 session 是否失效的认识保持一致是通过 lease 实现,chubby 的 lease timeout 比 client 端的要长,这样就防止客户端认为自己还持有锁,但 chubby 已经把锁给其他客户端的情况。
- Chubby master 响应 KeepAlive 请求时会故意阻塞不返回,客户端在 KeepAlive 请求返回时发起下一个 KeepAlive 请求,这样一个客户端任何时刻只有一个 KeepAlive 请求在进行,但 master 要在客户端 lease 失效前返回,在这过程中,可能会有缓存失效的信息需求带给客户端,这时 KeepAlive 请求提前返回,这样,所有 RPC 都是从客户端发往服务端的,简化了客户端。
Fail-over
- 在客户端 lease 过期时,客户端清空缓存,启动 grace period (持续 45s),从而给 master 切换一些时间,在这过程中,所有请求都会阻塞。客户端会不停的尝试发送 KeepAlive 请求(发往一个 Server 的请求超时后应该会查询 master location 再继续发),当 master 选出来后 KeepAlive 成功返回,客户端获得新 lease 。
- 在新 master 选出之后,会设一个最长可能的 lease,在这过程中,不允许任何操作,只用来让客户端 session check-in,从而恢复 session(包括文件 hanle,锁,ephemeral file),master 会有版本号,用来防止接受旧 master 的请求。
经验
- Chubby 主要被用作名字服务。名字服务即 name -> ip,如 DNS。服务发现可以算是名字服务。
- Use as a name service。DNS TTL 不好选,太短 DNS 服务器压力大,太长则服务宕机时切换时间太长。即使是 60s 的 TTL,3000 客户端产生的 QPS 也有 150,000 (可能是互相查名字导致的这么 多),google dns load 是个大问题。通过把名字请求批量化进一步减少服务端压力(QPS 减少了,http 请求头解析等 overhead 就减少了,因此能减少压力);名字服务只需要及时的通知而不是完全一致,因此引入了协议转换 proxy;引入 chubby dns server,方便迁移,因为只需要改下名字,用标准的 DNS 客户端就行。
- Abusive clients。共用统一的 chubby 服务以减少维护成本,硬件成本。但要做好隔离。为减少勿用,review chubby usage。需要在软件手册里有性能建议。Aggressive caching(当 open 多次时,make open cheap 而不是让 open 增加指数延迟时间,通过减少引入新行为而减少教育成本),限制存储大小(迁移有成本,因为涉及排期,上下文切换)
- Lessions learned。RPC 使用影响协议选择。KeepAlive 选择了 UDP 是因为上层有 lease timeout,而 TCP 拥塞控制会导致不必要的延迟从而使得 session timeout。
Bigtable
问题
- Data model 和关系型数据库模型的区别是什么?
- Row key 用于 partition ,如何维护顺序?
- Tablet location 里是按 row key 排序,再加上一个 tablet 内是有序的
- 如何区分 server 永久挂和暂时挂?
- Tablet server 永久挂,如何迁移 tablet 数据?因为 tablet 通过 GFS 做 replication,那么 tablet server 挂,GFS 会自作多情地把上面的数据恢复到另一个 server,那么为了 locality,bigtable master 就要考虑到 GFS 相应的 replica 的位置,如果是,那么 tablet assignment 的策略是 GFS 控制的?一个 tablet 对应多个 SSTable 文件,正常情况下,因为 GFS 一定会放一个 replica 在 writer 所在的机器,那么一个 Tablet 对应的所有 SSTable 会放在一台机器上,保证了 locality,但是这些 SSTable 的 replica 不一定都在一台机器上,并且在 tablet server 挂了的情况下,如何保证一个 tablet 的所有 SSTable 放在一台机器上?
- Tablet 如何 split 如何 merge?6 提到 split 时 child tablet 会重用父亲的 SSTable
TODO
- 《implementation techniques for main memory database systems》
- Leveldb 源码
自己的理解
- 用户创建 schema,schema file 写到 chubby,master 应该会创建第一个 tablet, tablet 分裂由 tablet 发起。
- Client 读写流程:client 指定 row key ,通过 chubby 三层查找找到 tablet location (tablet location 有缓存),把请求发往对应 tablet server,tablet server 读(写) memtable 和 SSTable,返回。 GFS 中 SSTable 应该位于和 tablet server 同一台机器。
- Tablet location 万一是过期信息怎么办?Tablet server 会知道,这时不断查 chubby,最终会查到,因为 master 在 assign tablet 完成后会更新 chubby。这会造成 tablet 短期不可用么?会,因为 replicate 的是数据,而不是 tablet server,在 tablet server 恢复前,这些数据是不可用的,虽然数据没丢。
- Master 负责:tablet 的均衡放置;tablet server 检活和恢复;回收垃圾 SSTable
- 容错。Master 无状态,随便挂。Tablet server 挂了,master 负责把 tablet 迁移并维护 tablet location 中新 tablet 的位置。Tablet server 检活:tablet server 在 chubby 目录中创建文件
- 文中所有 process 由 cluster management system 管理。在 evaluation 小节用了 1786 台机器组成的 GFS,启动了最多 500 个 tablet server 用于测试,如果没有 cluster management system,是很难进行这样的测试的。
摘录
- (Row, column, time) -> value, row group 用于 data partition,Sorted map,说明 key -> value 是 O(1),同时允许遍历。读写 一个 Row key 下的数据是 serializable,作者说后悔没有提供 transaction,在 spanner 中提供了。选好 row key,能控制数据的 locality。
- column family 是权限、资源计量、压缩单位。Column family 属于 schema 而 column 不属于。
- Tablet location。客户端不通过 master 获取 tablet location 信息,而是通过 chubby file + tablet server 获得,客户端缓存 tablet 位置,失效时通过 cache miss 实现(猜测应该是访问对应的 tablet server 发现没有对应的 tablet),metadata 的 row 会变,但其 tablet 不会经常被移动,因为要使它tablet 分裂需要较多的 user table tablet。
- Tablet assignment。master 无状态,tablet location 通过 chubby file + tablet server 存,而 Server list 通过一个 chubby 一个目录存。Tablet server 通过 master + chubby lock 联合检活,万一挂了会把上面的 tablet 分配到别的 tablet server。
- Master 在 METADATA Table 维护的信息包括:SSTable 文件名列表(没被引用的将被 master 回收);Tablet -> (SSTables,commit logs, redo points)
- Schema 在 chubby 维护,一个表的 schema 对应一个 chubby file。利用其 atomic file change + client consistent cache。
- 用途:存 GA click 表,summary 表;Google Earth,用于存 图像以及 index GFS 中的 data;个性化搜索用来存 userid -> user action (比如 search query)
Chord
PNUTS
- Tablet split,merge 具体如何实现?
- insert 可能存在的一个问题:第一个 insert1 到 tablet master,tablet master 接受,然后 publish 更新消息 M1 到 message broker,第二个 Insert2,tablet master 为什么能拒绝它?假设数据更新总是从 MB -> Router -> SU,那么 M1 到达 SU 前,SU 本地数据库里就是 insert1 之前的状态
- 如果 SU 通过本地数据库判断是否有 primary key conflict 的情况,那么 insert2 依然会成功
- 如果 SU 在 insert1 操作之后在内存记录操作结果,那么问题就是:挂了之后如何恢复;如何保证内存不爆。
- 如果 SU publish M1 后更新本地数据库,那么就必须避免自己消费到 M1,但是从文中看,每个 tablet 一个 Topic,所以所有 Router 收到的消息应该是一样的(顺序可能不一样),那么 Router 通过 hostname 排除 M1?那 M1 就是白发了,有点浪费。这个方法还有个问题,万一 publish 之后 SU 挂了,那么重启之后还是需要消费 M1 才能恢复。或者 SU 挂了其数据就全被丢弃了?那就比较不稳定。
- Update 消息流到底是怎样的,是先到 master SU,还是到 master MB?
- 如何通知客户端写的结果?
Ceph
Ceph 中两个重要的底层技术是数据分布算法 CRUSH 和对象存储 RADOS.
CRUSH
- 原理
- 存储设备组成层级的 cluster map
- 一个 bucket 包含多个 item,计算得到当前 bucket 选哪个 item,计算方法跟 bucket type 有关,根据 item add, remove 的频率以及各 item 权重是否一样,其最优的副本放置策略不一样,因此计算方法不一样
- 把 object id 的 哈希值(hash 到 [0,1])和权重比较,从而每个 Item 被选中的概率和其权重占所有 item 的比例一致
- 对于 failed 的设备,保留在 cluster map 中以防止其他设备的数据的 reshuffle,并且 failed 设备的数据被重新分布到别的设备。Overload 和 collision 的处理类似。这样,failed 和 overload 设备的数据也有了归属, 并且是通过计算得到的。那么 failed/overload 的设备,如何恢复?在RADOS 论文里有一些描述。
- 优点
- Replica 放置策略可配置
- 数据分布均匀,和各设备权重一致
- 设备 fail 时数据移动量最优,增加或减少设备时数据移动量也还不错,并且可以通过选择不同 bucket type 进行调优
- Map 的性能还不错 (logn for hierarchy size, different according to bucket type for bucket size)
Metadata 量较少(cluster map,distribute rule),和 object 数量无关。
RADOS
- Device state: up, down, in, out. In + down = 暂时 down.
- 版本冲突怎么办
- 设备暂时 fail,新起副本,迁移数据,当该设备恢复时,其数据就没有用了,如何避免短暂的 fail 导致的数据迁移?Cassandra 的 hinted handoff 解决了这个问题。
- 需要考虑的基本问题:
- Data distribution
- Replication
- Falut torelance
- Data migration: peering + recovery。似乎是整个 PG 一起同步的,因此需要有算法确定 PG 的每个 OSD 具体缺哪个 objects,而只发送那些 objects。担心其不可用时间
○ Consistency:serializability。写需写所有副本,读只读一个副本。如果副本一段时间内没收到 Heartbeat(猜测,任何副本连接不上都会导致副本不可读),则该副本不可读,这种方法是 lease。