Redis队列——信息队列,延时队列

原创
小哥 3年前 (2022-11-09) 阅读数 5 #大杂烩

异步消息队列

  1. 使用的数据结构: list

  2. 主要实现: go实现简单消息队列

    package main

    import ( "encoding/json" "fmt" "github.com/gomodule/redigo/redis" "os" )

    type Producer struct { // 生产者 }

    func (p *Producer)publish(conn redis.Conn, listKey string, data string) (reply interface{}, err error){ return conn.Do("lpush", listKey, data) }

    type Customer struct { // 消费者 }

    func (c *Customer)putMessage(conn redis.Conn, listKey string) (interface{}, error) { return conn.Do("rpop", listKey) }

    func (c *Customer)getCount(conn redis.Conn, listKey string) (interface{}, error) { return conn.Do("llen", listKey) }

    func HandlecError(err error, when string) { if err != nil{ fmt.Println("error happen at", when) os.Exit(500) }else { fmt.Println("连接成功") } }

    func main() { // 连接redis操作 conn, err := redis.Dial("tcp","127.0.0.1:6379") HandlecError(err, "connect") defer func() { _ = conn.Close() }()

    producer := Producer{}
    personMap := make(map[string]interface{})
    personMap["name"]  = "hxh"
    personMap["work"] = "toDoSomething"
    bytes, _ := json.Marshal(personMap)
    _,_ = producer.publish(conn, "test_queue", string(bytes))
    
    customer := Customer{}
    num, _ := customer.getCount(conn, "test_queue")
    fmt.Println("队列数为", num)
    
    values, err := redis.String(customer.putMessage(conn,"test_queue"))
    dataMap := make(map[string]interface{}) // 准备好map来装
    _ = json.Unmarshal([]byte(values), &dataMap)
    fmt.Println(dataMap["work"])

    }

  3. 问题:

    • 空队列: 如果队列为空,客户将不断pop空轮询,这会拉高客户的cpu和服务器redis的qps

      可以sleep一下

    • 队列延迟问题: 使用阻塞读取blpop/brpop,当队列没有数据时,它将进入睡眠状态。

    • 空闲断开问题: 在线程阻塞的情况下,Redis 客户端连接变为空闲连接,空闲时间过长,服务器通常会主动断开连接,减少空闲资源的使用。这次 blpop/brpop 将引发异常。处理不当

  4. 缺点: 无法保证消息是可靠的。(没有 ack 保证)

延迟消息队列

  1. 使用的数据结构: zset
  2. 主要实现: 将消息序列化为字符串。 为 zset 的 value,此消息的过期处理时间。 score,然后使用多个线程轮询。 zset 获取到期 处理的任务.
  3. 问题
    • 确保可用性,以防一个线程挂起,而其他线程可以继续处理。
    • 并发并竞争任务,以确保任务不能多次执行。
  4. 实现: go实现简单延迟队列

    package main

    import ( "encoding/json" "fmt" "github.com/gomodule/redigo/redis" uuid "github.com/satori/go.uuid" "os" "sync" "time" )

    type delayQueue struct { // 延迟队列 }

    func (d *delayQueue) publish(conn redis.Conn, zSetKey string, dataMap map[string]interface{}, time int64) (reply interface{}, err error) { // 生成唯一id,保证zset的每一个value都不一样,time执行时间戳 dataMap["uuid"] = uuid.NewV4().String() bytes, _ := json.Marshal(dataMap) return conn.Do("zadd", zSetKey, time, string(bytes)) }

    func (d delayQueue) customer(conn redis.Conn, zSetKey string) { for true { now := time.Now().Unix() data, err := redis.Strings(conn.Do("zrangebyscore", zSetKey, 0, now, "limit", 0, 1)) if err == nil && len(data) > 0 { res, delErr := conn.Do("zrem", "test-delay-queue", data[0]) if res.(int64) >= 1 && delErr == nil { dataMap := make(map[string]interface{}) // 准备好map来装 _ = json.Unmarshal([]byte(data[0]), &dataMap) fmt.Println("任务是:", dataMap["work"]) }else { fmt.Println(delErr) } }else { time.Sleep(time.Second 10) continue } } }

    func HandlecError(err error, when string) { if err != nil { fmt.Println("error happen at", when) os.Exit(500) } else { fmt.Println("连接成功") } }

    func main() { var wg sync.WaitGroup

    // 连接redis操作
    conn, err := redis.Dial("tcp", "127.0.0.1:6379")
    HandlecError(err, "connect")
    defer func() {
        _ = conn.Close()
    }()
    
    delayQueue := delayQueue{}
    personMap := make(map[string]interface{})
    personMap["name"] = "hxh"
    personMap["work"] = "toDoSomething"
    _,_ = delayQueue.publish(conn, "test-delay-queue", personMap, time.Now().Unix())
    
    wg.Add(1)
    go func() {
        delayQueue.customer(conn, "test-delay-queue")
        wg.Done()
    }()
    wg.Wait()

    }

版权声明

所有资源都来源于爬虫采集,如有侵权请联系我们,我们将立即删除