分布式ID生成研究转载

原创
小哥 3年前 (2022-10-27) 阅读数 46 #大杂烩

Snowflake算法

snowflake是Twitter开源分布式ID生成算法,保证业务集群中的所有机器都能在某个时间点生成一个。64 bits的唯一ID(long)。如下图所示,sign为固定1bit符号标识符,即生成的ID是一个正数,时间戳描述ID生成时间,工作节点用于区分集群中的不同机器,并发序列保证相同的时间和相同的机器。ID递增。

snowflake.png

本文主要介绍了《中国》中流行的两个开源发行版。ID生成方法:百度公司。UidGenerator和美团Leaf。

百度UidGenerator

UidGenerator提供了两种生成方案:DefaultUidGenerator、CachedUidGenerator。

DefaultUidGenerator

核心代码如下:

    protected synchronized long nextId() {
        long currentSecond = getCurrentSecond();

        // Clock moved backwards, refuse to generate uid
        if (currentSecond < lastSecond) {
            long refusedSeconds = lastSecond - currentSecond;
            throw new UidGenerateException("Clock moved backwards. Refusing for %d seconds", refusedSeconds);
        }

        // At the same second, increase sequence
        if (currentSecond == lastSecond) {
            sequence = (sequence + 1) & bitsAllocator.getMaxSequence();
            // Exceed the max sequence, we wait the next second to generate uid
            if (sequence == 0) {
                currentSecond = getNextSecond(lastSecond);
            }

        // At the different second, sequence restart from zero
        } else {
            sequence = 0L;
        }

        lastSecond = currentSecond;

        // Allocate bits for UID
        return bitsAllocator.allocate(currentSecond - epochSeconds, workerId, sequence);
    }

这种方法简单粗暴,向上就是。synchronized锁,确保多线程采集。id在只有一个线程可以进入此方法的情况下,其中的工作节点。workerId是每次业务启动时的数据库表。WORKER_NODE插入一条数据后获得的最新记录是自我增加的。ID。

    public long assignWorkerId() {
        // build worker node entity
        WorkerNodeEntity workerNodeEntity = buildWorkerNode();

        // add worker node for new (ignore the same IP + PORT)
        workerNodeDAO.addWorkerNode(workerNodeEntity);
        LOGGER.info("Add worker node:" + workerNodeEntity);

        return workerNodeEntity.getId();
    }

缺点是显而易见的:1,锁的粒度比较大,容易造成多线程并发等待;2,每次都要计算以获得最新信息ID。

CachedUidGenerator

CachedUidGenerator使用预取,使用两个循环队列,Uid-RingBuffer用于存储预取的Uid、Flag-RingBuffer用于存储Uid状态(它是否可以装满和消费)。

ringbuffer.png

循环队列是用数组实现的,因为数组元素在内存中连续分配,所以可以最大限度地使用数组。CPU cache以提高性能。但与此同时,它也会带来“伪共享”FalseSharing问题,出于这个目的,在Tail、Cursor指针、Flag-RingBuffer中采用了CacheLine 相辅相成。

cacheline_padding.png

RingBuffer填充时机
初始化预填充:RingBuffer在初始化时,整个RingBuffer。
即时填充:Take在消费时,立即检查剩余的可用。slot量(tail - cursor)如果小于设定的阈值,则完全空闲。slots。
句号填充:Schedule线程,计时以完成空闲slots。

核心代码如下:

    public long take() {
        // spin get next available cursor
        long currentCursor = cursor.get();
        long nextCursor = cursor.updateAndGet(old -> old == tail.get() ? old : old + 1);

        // check for safety consideration, it never occurs
        Assert.isTrue(nextCursor >= currentCursor, "Curosr cant move back");

        // trigger padding in an async-mode if reach the threshold
        long currentTail = tail.get();
        if (currentTail - nextCursor < paddingThreshold) {
            LOGGER.info("Reach the padding threshold:{}. tail:{}, cursor:{}, rest:{}", paddingThreshold, currentTail,
                    nextCursor, currentTail - nextCursor);
            bufferPaddingExecutor.asyncPadding();
        }

        // cursor catch the tail, means that there is no more available UID to take
        if (nextCursor == currentCursor) {
            rejectedTakeHandler.rejectTakeBuffer(this);
        }

        // 1. check next slot flag is CAN_TAKE_FLAG
        int nextCursorIndex = calSlotIndex(nextCursor);
        Assert.isTrue(flags[nextCursorIndex].get() == CAN_TAKE_FLAG, "Curosr not in can take status");

        // 2. get UID from next slot
        // 3. set next slot flag as CAN_PUT_FLAG.
        long uid = slots[nextCursorIndex];
        flags[nextCursorIndex].set(CAN_PUT_FLAG);

        // Note that: Step 2,3 can not swap. If we set flag before get value of slot, the producer may overwrite the
        // slot with a new UID, and this may cause the consumer take the UID twice after walk a round the ring
        return uid;
    }

CachedUidGenerator官方统计数据可以提供600万/s稳定的吞吐量。缺点是由于使用了预取,ID中的时间信息意义丢失了,无法表达。ID生成的真实时间。

github地址: https://github.com/baidu/uid-generator

美团Leaf

Leaf 两种生成的ID方式:数字段模式和。snowflake模式。

号段模式

业务一次只能分到一个细分市场。(step决定大小)价值。用完后,到数据库获取新的号码段。每项业务都需要发放不同的号码。biz_tag要区分的字段,每个biz-tag的ID相互隔离,互不影响。

5e4ff128.png

为了解决等待最新号码段和阻塞业务线程的问题,号码段模式采用了两个号码段缓存-Doublebuffer道路。当前号码段已下发10%如果下一个号码段没有更新,则启动另一个更新线程来更新下一个号码段。在当前号码段完全分配后,如果下一个号码段准备好,则切换到下一个号码段作为当前segment然后它将被发行,这个循环将继续下去。

f2625fac.png

核心代码如下:

    public Result getIdFromSegmentBuffer(final SegmentBuffer buffer) {
        while (true) {
            try {
                // 获取buffer的读锁
                buffer.rLock().lock();
                // 获取当前编号段
                final Segment segment = buffer.getCurrent();

                if (    // nextReady is false (下一段未被初始化.)
                        !buffer.isNextReady()
                        // idle = max - currentValue (当前号段下发的值达到设定的阈值。 0.9 )
                        && (segment.getIdle() < 0.9 * segment.getStep())
                        // buffer 中的 threadRunning字段. 表示线程池是否已提交运行。.(是否有其他线程已经开始初始化另一个号码段.
                        // 使用 CAS 进行更新. buffer 在任何时候,只有一个线程会异步更新另一个数据段.
                        && buffer.getThreadRunning().compareAndSet(false, true)
                ) {
                    // 放入用于异步更新的线程池。.
                    service.execute(new Runnable() {
                        @Override
                        public void run() {
                            Segment next = buffer.getSegments()[buffer.nextPos()];
                            boolean updateOk = false;
                            try {
                                updateSegmentFromDb(buffer.getKey(), next);

                                // 更新成功,设置标记位true
                                updateOk = true;
                                logger.info("update segment {} from db {}", buffer.getKey(), next);
                            } catch (Exception e) {
                                logger.warn(buffer.getKey() + " updateSegmentFromDb exception", e);
                            } finally {
                                if (updateOk) {
                                    // 获取buffer 的写锁
                                    buffer.wLock().lock();
                                    // next准备完成
                                    buffer.setNextReady(true);
                                    // next设置运行标记位false
                                    buffer.getThreadRunning().set(false);
                                    buffer.wLock().unlock();
                                } else {
                                    buffer.getThreadRunning().set(false);
                                }
                            }
                        }
                    });
                }

                // 获取value
                long value = segment.getValue().getAndIncrement();

                // value < 当前编号段的最大值,然后返回更改值
                if (value < segment.getMax()) {
                    return new Result(value, Status.SUCCESS);
                }
            } finally {
                buffer.rLock().unlock();
            }

            // 等待下一个数据段完成,执行代码在-> execute()
            // buffer.setNextReady(true);
            // buffer.getThreadRunning().set(false);
            waitAndSleep(buffer);

            try {
                // buffer 级别加写锁定.
                buffer.wLock().lock();
                final Segment segment = buffer.getCurrent();
                // 获取value -> 为什么要重复收购value, 当多线程执行时,在进行waitAndSleep() 后,
                // 当前Segment可能已经被替换了.直接一次性采办value的操作,可以提高id配送的速度(没有必要经历另一个周期。),并防止错误(在交换Segment在检查之前)
                long value = segment.getValue().getAndIncrement();
                if (value < segment.getMax()) {
                    return new Result(value, Status.SUCCESS);
                }

                // 在这里执行, 其他线程未交换编号段,并且当前号码段中的所有号码都已下发.
                // 判断nextReady是否为true.
                if (buffer.isNextReady()) {
                    // 调换segment
                    buffer.switchPos();
                    // 转账完成后, 设置nextReady为false
                    buffer.setNextReady(false);
                } else {
                    // 在这里进入的条件
                    // 1. 当前号段获取的值较大maxValue
                    // 2. 另一段还没有准备好。
                    // 3. 在等待中成长waitAndSleep中的时间.
                    logger.error("Both two segments in {} are not ready!", buffer);
                    return new Result(EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL, Status.EXCEPTION);
                }
            } finally {
                // finally释放代码块中的写锁定.
                buffer.wLock().unlock();
            }
        }
    }

这项计划的缺点是ID趋势是否在增加,而ID数字是可计算的,不适用于订单。ID生成场景,例如两天中午的竞争配对。12按顺序排序。id减去这个数字就可以粗略地计算出公司的日订单量,这是无法容忍的。

snowflake方案

核心代码如下:

 public synchronized Result get(String key) {
        long timestamp = timeGen();
        if (timestamp < lastTimestamp) {
            long offset = lastTimestamp - timestamp;
            if (offset <= 5) {
                try {
                    wait(offset << 1);
                    timestamp = timeGen();
                    if (timestamp < lastTimestamp) {
                        return new Result(-1, Status.EXCEPTION);
                    }
                } catch (InterruptedException e) {
                    LOGGER.error("wait interrupted");
                    return new Result(-2, Status.EXCEPTION);
                }
            } else {
                return new Result(-3, Status.EXCEPTION);
            }
        }
        if (lastTimestamp == timestamp) {
            sequence = (sequence + 1) & sequenceMask;
            if (sequence == 0) {
                //seq 为0该时间表示下一毫秒时间开始正确。seq做随机
                sequence = RANDOM.nextInt(100);
                timestamp = tilNextMillis(lastTimestamp);
            }
        } else {
            //如果是新的ms开始
            sequence = RANDOM.nextInt(100);
        }
        lastTimestamp = timestamp;
        long id = ((timestamp - twepoch) << timestampLeftShift) | (workerId << workerIdShift) | sequence;
        return new Result(id, Status.SUCCESS);
    }

代码可以在生成和百度公司的方式中找到。DefaultUidGenerator同样,使用不同的地方。Zookeeper永久顺序节点特征被自动配对。snowflake节点配置wokerID。

a3f985a8.png

业务将是周期性的ZooKeeper报告当前机器节点时间。每次服务重新启动时,它都会检查当前机器时间是否大于上次上传的机器时间。
这项计划的缺点是引入ZooKeeper,维护成本相对较高。为了减少对……的影响ZooKeeper依赖,除了每次都要去。ZooKeeper除了获取数据外,还会在本机文件系统上缓存数据。workerID文件,当ZooKeeper如果出现问题,当机器出现问题需要重启时,可以确保服务能够正常启动。

github地址: https://github.com/Meituan-Dianping/Leaf

https://www.jianshu.com/p/058829af0b0a

版权声明

所有资源都来源于爬虫采集,如有侵权请联系我们,我们将立即删除

热门