分布式ID生成研究转载
原创Snowflake算法
snowflake是Twitter开源分布式ID生成算法,保证业务集群中的所有机器都能在某个时间点生成一个。64 bits的唯一ID(long)。如下图所示,sign为固定1bit符号标识符,即生成的ID是一个正数,时间戳描述ID生成时间,工作节点用于区分集群中的不同机器,并发序列保证相同的时间和相同的机器。ID递增。
snowflake.png
本文主要介绍了《中国》中流行的两个开源发行版。ID生成方法:百度公司。UidGenerator和美团Leaf。
百度UidGenerator
UidGenerator提供了两种生成方案:DefaultUidGenerator、CachedUidGenerator。
DefaultUidGenerator
核心代码如下:
protected synchronized long nextId() {
long currentSecond = getCurrentSecond();
// Clock moved backwards, refuse to generate uid
if (currentSecond < lastSecond) {
long refusedSeconds = lastSecond - currentSecond;
throw new UidGenerateException("Clock moved backwards. Refusing for %d seconds", refusedSeconds);
}
// At the same second, increase sequence
if (currentSecond == lastSecond) {
sequence = (sequence + 1) & bitsAllocator.getMaxSequence();
// Exceed the max sequence, we wait the next second to generate uid
if (sequence == 0) {
currentSecond = getNextSecond(lastSecond);
}
// At the different second, sequence restart from zero
} else {
sequence = 0L;
}
lastSecond = currentSecond;
// Allocate bits for UID
return bitsAllocator.allocate(currentSecond - epochSeconds, workerId, sequence);
}
这种方法简单粗暴,向上就是。synchronized锁,确保多线程采集。id在只有一个线程可以进入此方法的情况下,其中的工作节点。workerId是每次业务启动时的数据库表。WORKER_NODE插入一条数据后获得的最新记录是自我增加的。ID。
public long assignWorkerId() {
// build worker node entity
WorkerNodeEntity workerNodeEntity = buildWorkerNode();
// add worker node for new (ignore the same IP + PORT)
workerNodeDAO.addWorkerNode(workerNodeEntity);
LOGGER.info("Add worker node:" + workerNodeEntity);
return workerNodeEntity.getId();
}
缺点是显而易见的:1,锁的粒度比较大,容易造成多线程并发等待;2,每次都要计算以获得最新信息ID。
CachedUidGenerator
CachedUidGenerator使用预取,使用两个循环队列,Uid-RingBuffer用于存储预取的Uid、Flag-RingBuffer用于存储Uid状态(它是否可以装满和消费)。
ringbuffer.png
循环队列是用数组实现的,因为数组元素在内存中连续分配,所以可以最大限度地使用数组。CPU cache以提高性能。但与此同时,它也会带来“伪共享”FalseSharing问题,出于这个目的,在Tail、Cursor指针、Flag-RingBuffer中采用了CacheLine 相辅相成。
cacheline_padding.png
RingBuffer填充时机
初始化预填充:RingBuffer在初始化时,整个RingBuffer。
即时填充:Take在消费时,立即检查剩余的可用。slot量(tail - cursor)如果小于设定的阈值,则完全空闲。slots。
句号填充:Schedule线程,计时以完成空闲slots。
核心代码如下:
public long take() {
// spin get next available cursor
long currentCursor = cursor.get();
long nextCursor = cursor.updateAndGet(old -> old == tail.get() ? old : old + 1);
// check for safety consideration, it never occurs
Assert.isTrue(nextCursor >= currentCursor, "Curosr cant move back");
// trigger padding in an async-mode if reach the threshold
long currentTail = tail.get();
if (currentTail - nextCursor < paddingThreshold) {
LOGGER.info("Reach the padding threshold:{}. tail:{}, cursor:{}, rest:{}", paddingThreshold, currentTail,
nextCursor, currentTail - nextCursor);
bufferPaddingExecutor.asyncPadding();
}
// cursor catch the tail, means that there is no more available UID to take
if (nextCursor == currentCursor) {
rejectedTakeHandler.rejectTakeBuffer(this);
}
// 1. check next slot flag is CAN_TAKE_FLAG
int nextCursorIndex = calSlotIndex(nextCursor);
Assert.isTrue(flags[nextCursorIndex].get() == CAN_TAKE_FLAG, "Curosr not in can take status");
// 2. get UID from next slot
// 3. set next slot flag as CAN_PUT_FLAG.
long uid = slots[nextCursorIndex];
flags[nextCursorIndex].set(CAN_PUT_FLAG);
// Note that: Step 2,3 can not swap. If we set flag before get value of slot, the producer may overwrite the
// slot with a new UID, and this may cause the consumer take the UID twice after walk a round the ring
return uid;
}
CachedUidGenerator官方统计数据可以提供600万/s稳定的吞吐量。缺点是由于使用了预取,ID中的时间信息意义丢失了,无法表达。ID生成的真实时间。
github地址: https://github.com/baidu/uid-generator
美团Leaf
Leaf 两种生成的ID方式:数字段模式和。snowflake模式。
号段模式
业务一次只能分到一个细分市场。(step决定大小)价值。用完后,到数据库获取新的号码段。每项业务都需要发放不同的号码。biz_tag要区分的字段,每个biz-tag的ID相互隔离,互不影响。
5e4ff128.png
为了解决等待最新号码段和阻塞业务线程的问题,号码段模式采用了两个号码段缓存-Doublebuffer道路。当前号码段已下发10%如果下一个号码段没有更新,则启动另一个更新线程来更新下一个号码段。在当前号码段完全分配后,如果下一个号码段准备好,则切换到下一个号码段作为当前segment然后它将被发行,这个循环将继续下去。
f2625fac.png
核心代码如下:
public Result getIdFromSegmentBuffer(final SegmentBuffer buffer) {
while (true) {
try {
// 获取buffer的读锁
buffer.rLock().lock();
// 获取当前编号段
final Segment segment = buffer.getCurrent();
if ( // nextReady is false (下一段未被初始化.)
!buffer.isNextReady()
// idle = max - currentValue (当前号段下发的值达到设定的阈值。 0.9 )
&& (segment.getIdle() < 0.9 * segment.getStep())
// buffer 中的 threadRunning字段. 表示线程池是否已提交运行。.(是否有其他线程已经开始初始化另一个号码段.
// 使用 CAS 进行更新. buffer 在任何时候,只有一个线程会异步更新另一个数据段.
&& buffer.getThreadRunning().compareAndSet(false, true)
) {
// 放入用于异步更新的线程池。.
service.execute(new Runnable() {
@Override
public void run() {
Segment next = buffer.getSegments()[buffer.nextPos()];
boolean updateOk = false;
try {
updateSegmentFromDb(buffer.getKey(), next);
// 更新成功,设置标记位true
updateOk = true;
logger.info("update segment {} from db {}", buffer.getKey(), next);
} catch (Exception e) {
logger.warn(buffer.getKey() + " updateSegmentFromDb exception", e);
} finally {
if (updateOk) {
// 获取buffer 的写锁
buffer.wLock().lock();
// next准备完成
buffer.setNextReady(true);
// next设置运行标记位false
buffer.getThreadRunning().set(false);
buffer.wLock().unlock();
} else {
buffer.getThreadRunning().set(false);
}
}
}
});
}
// 获取value
long value = segment.getValue().getAndIncrement();
// value < 当前编号段的最大值,然后返回更改值
if (value < segment.getMax()) {
return new Result(value, Status.SUCCESS);
}
} finally {
buffer.rLock().unlock();
}
// 等待下一个数据段完成,执行代码在-> execute()
// buffer.setNextReady(true);
// buffer.getThreadRunning().set(false);
waitAndSleep(buffer);
try {
// buffer 级别加写锁定.
buffer.wLock().lock();
final Segment segment = buffer.getCurrent();
// 获取value -> 为什么要重复收购value, 当多线程执行时,在进行waitAndSleep() 后,
// 当前Segment可能已经被替换了.直接一次性采办value的操作,可以提高id配送的速度(没有必要经历另一个周期。),并防止错误(在交换Segment在检查之前)
long value = segment.getValue().getAndIncrement();
if (value < segment.getMax()) {
return new Result(value, Status.SUCCESS);
}
// 在这里执行, 其他线程未交换编号段,并且当前号码段中的所有号码都已下发.
// 判断nextReady是否为true.
if (buffer.isNextReady()) {
// 调换segment
buffer.switchPos();
// 转账完成后, 设置nextReady为false
buffer.setNextReady(false);
} else {
// 在这里进入的条件
// 1. 当前号段获取的值较大maxValue
// 2. 另一段还没有准备好。
// 3. 在等待中成长waitAndSleep中的时间.
logger.error("Both two segments in {} are not ready!", buffer);
return new Result(EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL, Status.EXCEPTION);
}
} finally {
// finally释放代码块中的写锁定.
buffer.wLock().unlock();
}
}
}
这项计划的缺点是ID趋势是否在增加,而ID数字是可计算的,不适用于订单。ID生成场景,例如两天中午的竞争配对。12按顺序排序。id减去这个数字就可以粗略地计算出公司的日订单量,这是无法容忍的。
snowflake方案
核心代码如下:
public synchronized Result get(String key) {
long timestamp = timeGen();
if (timestamp < lastTimestamp) {
long offset = lastTimestamp - timestamp;
if (offset <= 5) {
try {
wait(offset << 1);
timestamp = timeGen();
if (timestamp < lastTimestamp) {
return new Result(-1, Status.EXCEPTION);
}
} catch (InterruptedException e) {
LOGGER.error("wait interrupted");
return new Result(-2, Status.EXCEPTION);
}
} else {
return new Result(-3, Status.EXCEPTION);
}
}
if (lastTimestamp == timestamp) {
sequence = (sequence + 1) & sequenceMask;
if (sequence == 0) {
//seq 为0该时间表示下一毫秒时间开始正确。seq做随机
sequence = RANDOM.nextInt(100);
timestamp = tilNextMillis(lastTimestamp);
}
} else {
//如果是新的ms开始
sequence = RANDOM.nextInt(100);
}
lastTimestamp = timestamp;
long id = ((timestamp - twepoch) << timestampLeftShift) | (workerId << workerIdShift) | sequence;
return new Result(id, Status.SUCCESS);
}
代码可以在生成和百度公司的方式中找到。DefaultUidGenerator同样,使用不同的地方。Zookeeper永久顺序节点特征被自动配对。snowflake节点配置wokerID。
a3f985a8.png
业务将是周期性的ZooKeeper报告当前机器节点时间。每次服务重新启动时,它都会检查当前机器时间是否大于上次上传的机器时间。
这项计划的缺点是引入ZooKeeper,维护成本相对较高。为了减少对……的影响ZooKeeper依赖,除了每次都要去。ZooKeeper除了获取数据外,还会在本机文件系统上缓存数据。workerID文件,当ZooKeeper如果出现问题,当机器出现问题需要重启时,可以确保服务能够正常启动。
github地址: https://github.com/Meituan-Dianping/Leaf
https://www.jianshu.com/p/058829af0b0a
版权声明
所有资源都来源于爬虫采集,如有侵权请联系我们,我们将立即删除