当前位置: 萬仟网 > IT编程>数据库>Redis > 关于使用Redisson订阅数问题

关于使用Redisson订阅数问题

2022年01月14日 Redis 我要评论
目录一、前提二、源码分析1、redissonlock#lock() 方法2、详细看下subscribe()方法3、回到subscribe()方法主要逻辑还是交给了 lockpubsub#subscri

一、前提

最近在使用分布式锁redisson时遇到一个线上问题:发现是subscriptionsperconnection or subscriptionconnectionpoolsize 的大小不够,需要提高配置才能解决。

二、源码分析

下面对其源码进行分析,才能找到到底是什么逻辑导致问题所在:

1、redissonlock#lock() 方法

private void lock(long leasetime, timeunit unit, boolean interruptibly) throws interruptedexception {
        long threadid = thread.currentthread().getid();
        // 尝试获取,如果ttl == null,则表示获取锁成功
        long ttl = tryacquire(leasetime, unit, threadid);
        // lock acquired
        if (ttl == null) {
            return;
        }

        // 订阅锁释放事件,并通过await方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题
        rfuture<redissonlockentry> future = subscribe(threadid);
        if (interruptibly) {
            commandexecutor.syncsubscriptioninterrupted(future);
        } else {
            commandexecutor.syncsubscription(future);
        }

        // 后面代码忽略
        try {
            // 无限循环获取锁,直到获取锁成功
            // ...
        } finally {
            // 取消订阅锁释放事件
            unsubscribe(future, threadid);
        }
}

总结下主要逻辑:

  • 获取当前线程的线程id;
  • tryaquire尝试获取锁,并返回ttl
  • 如果ttl为空,则结束流程;否则进入后续逻辑;
  • this.subscribe(threadid)订阅当前线程,返回一个rfuture;
  • 如果在指定时间没有监听到,则会产生如上异常。
  • 订阅成功后, 通过while(true)循环,一直尝试获取锁
  • fially代码块,会解除订阅

所以上述这情况问题应该出现在subscribe()方法中

2、详细看下subscribe()方法

protected rfuture<redissonlockentry> subscribe(long threadid) {
    // entryname 格式:“id:name”;
    // channelname 格式:“redisson_lock__channel:name”;
    return pubsub.subscribe(getentryname(), getchannelname());
}

redissonlock#pubsub 是在redissonlock构造函数中初始化的:

public redissonlock(commandasyncexecutor commandexecutor, string name) {
    // ....
    this.pubsub = commandexecutor.getconnectionmanager().getsubscribeservice().getlockpubsub();
}

而subscribeservice在masterslaveconnectionmanager的实现中又是通过如下方式构造的

public masterslaveconnectionmanager(masterslaveserversconfig cfg, config config, uuid id) {
    this(config, id);
    this.config = cfg;

    // 初始化
    inittimer(cfg);
    initsingleentry();
}

protected void inittimer(masterslaveserversconfig config) {
    int[] timeouts = new int[]{config.getretryinterval(), config.gettimeout()};
    arrays.sort(timeouts);
    int mintimeout = timeouts[0];
    if (mintimeout % 100 != 0) {
        mintimeout = (mintimeout % 100) / 2;
    } else if (mintimeout == 100) {
        mintimeout = 50;
    } else {
        mintimeout = 100;
    }

    timer = new hashedwheeltimer(new defaultthreadfactory("redisson-timer"), mintimeout, timeunit.milliseconds, 1024, false);

    connectionwatcher = new idleconnectionwatcher(this, config);

    // 初始化:其中this就是masterslaveconnectionmanager实例,config则为masterslaveserversconfig实例:
    subscribeservice = new publishsubscribeservice(this, config);
}

publishsubscribeservice构造函数

private final semaphorepubsub semaphorepubsub = new semaphorepubsub(this);
public publishsubscribeservice(connectionmanager connectionmanager, masterslaveserversconfig config) {
    super();
    this.connectionmanager = connectionmanager;
    this.config = config;
    for (int i = 0; i < locks.length; i++) {
        // 这里初始化了一组信号量,每个信号量的初始值为1
        locks[i] = new asyncsemaphore(1);
    }
}

3、回到subscribe()方法主要逻辑还是交给了 lockpubsub#subscribe()里面

private final concurrentmap<string, e> entries = new concurrenthashmap<>();

public rfuture<e> subscribe(string entryname, string channelname) {
      // 从publishsubscribeservice获取对应的信号量。 相同的channelname获取的是同一个信号量
     // public asyncsemaphore getsemaphore(channelname channelname) {
    //    return locks[math.abs(channelname.hashcode() % locks.length)];
    // }
    asyncsemaphore semaphore = service.getsemaphore(new channelname(channelname));

    atomicreference<runnable> listenerholder = new atomicreference<runnable>();    
    rpromise<e> newpromise = new redissonpromise<e>() {
        @override
        public boolean cancel(boolean mayinterruptifrunning) {
            return semaphore.remove(listenerholder.get());
        }
    };

    runnable listener = new runnable() {

        @override
        public void run() {
            //  如果存在redissonlockentry, 则直接利用已有的监听
            e entry = entries.get(entryname);
            if (entry != null) {
                entry.acquire();
                semaphore.release();
                entry.getpromise().oncomplete(new transferlistener<e>(newpromise));
                return;
            }

            e value = createentry(newpromise);
            value.acquire();

            e oldvalue = entries.putifabsent(entryname, value);
            if (oldvalue != null) {
                oldvalue.acquire();
                semaphore.release();
                oldvalue.getpromise().oncomplete(new transferlistener<e>(newpromise));
                return;
            }

            // 创建监听,
            redispubsublistener<object> listener = createlistener(channelname, value);
            // 订阅监听
            service.subscribe(longcodec.instance, channelname, semaphore, listener);
        }
    };

    // 最终会执行listener.run方法
    semaphore.acquire(listener);
    listenerholder.set(listener);

    return newpromise;
}

asyncsemaphore#acquire()方法

public void acquire(runnable listener) {
    acquire(listener, 1);
}

public void acquire(runnable listener, int permits) {
    boolean run = false;

    synchronized (this) {
        // counter初始化值为1
        if (counter < permits) {
            // 如果不是第一次执行,则将listener加入到listeners集合中
            listeners.add(new entry(listener, permits));
            return;
        } else {
            counter -= permits;
            run = true;
        }
    }

    // 第一次执行acquire, 才会执行listener.run()方法
    if (run) {
        listener.run();
    }
}

梳理上述逻辑:

1、从publishsubscribeservice获取对应的信号量, 相同的channelname获取的是同一个信号量
2、如果是第一次请求,则会立马执行listener.run()方法, 否则需要等上个线程获取到该信号量执行完方能执行;
3、如果已经存在redissonlockentry, 则利用已经订阅就行
4、如果不存在redissonlockentry, 则会创建新的redissonlockentry,然后进行。

从上面代码看,主要逻辑是交给了publishsubscribeservice#subscribe方法

4、publishsubscribeservice#subscribe逻辑如下:

private final concurrentmap<channelname, pubsubconnectionentry> name2pubsubconnection = new concurrenthashmap<>();
private final queue<pubsubconnectionentry> freepubsubconnections = new concurrentlinkedqueue<>();

public rfuture<pubsubconnectionentry> subscribe(codec codec, string channelname, asyncsemaphore semaphore, redispubsublistener<?>... listeners) {
    rpromise<pubsubconnectionentry> promise = new redissonpromise<pubsubconnectionentry>();
    // 主要逻辑入口, 这里要主要channelname每次都是新对象, 但内部覆写hashcode+equals。
    subscribe(codec, new channelname(channelname), promise, pubsubtype.subscribe, semaphore, listeners);
    return promise;
}

private void subscribe(codec codec, channelname channelname,  rpromise<pubsubconnectionentry> promise, pubsubtype type, asyncsemaphore lock, redispubsublistener<?>... listeners) {

    pubsubconnectionentry connentry = name2pubsubconnection.get(channelname);
    if (connentry != null) {
        // 从已有connection中取,如果存在直接把listeners加入到pubsubconnectionentry中
        addlisteners(channelname, promise, type, lock, connentry, listeners);
        return;
    }

    // 没有时,才是最重要的逻辑
    freepubsublock.acquire(new runnable() {

        @override
        public void run() {
            if (promise.isdone()) {
                lock.release();
                freepubsublock.release();
                return;
            }

            // 从队列中取头部元素
            pubsubconnectionentry freeentry = freepubsubconnections.peek();
            if (freeentry == null) {
                // 第一次肯定是没有的需要建立
                connect(codec, channelname, promise, type, lock, listeners);
                return;
            }

            // 如果存在则尝试获取,如果remainfreeamount小于0则抛出异常终止了。
            int remainfreeamount = freeentry.tryacquire();
            if (remainfreeamount == -1) {
                throw new illegalstateexception();
            }

            pubsubconnectionentry oldentry = name2pubsubconnection.putifabsent(channelname, freeentry);
            if (oldentry != null) {
                freeentry.release();
                freepubsublock.release();

                addlisteners(channelname, promise, type, lock, oldentry, listeners);
                return;
            }

            // 如果remainfreeamount=0, 则从队列中移除
            if (remainfreeamount == 0) {
                freepubsubconnections.poll();
            }
            freepubsublock.release();

            // 增加监听
            rfuture<void> subscribefuture = addlisteners(channelname, promise, type, lock, freeentry, listeners);

            channelfuture future;
            if (pubsubtype.psubscribe == type) {
                future = freeentry.psubscribe(codec, channelname);
            } else {
                future = freeentry.subscribe(codec, channelname);
            }

            future.addlistener(new channelfuturelistener() {
                @override
                public void operationcomplete(channelfuture future) throws exception {
                    if (!future.issuccess()) {
                        if (!promise.isdone()) {
                            subscribefuture.cancel(false);
                        }
                        return;
                    }

                    connectionmanager.newtimeout(new timertask() {
                        @override
                        public void run(timeout timeout) throws exception {
                            subscribefuture.cancel(false);
                        }
                    }, config.gettimeout(), timeunit.milliseconds);
                }
            });
        }

    });
}


private void connect(codec codec, channelname channelname, rpromise<pubsubconnectionentry> promise, pubsubtype type, asyncsemaphore lock, redispubsublistener<?>... listeners) {
    // 根据channelname计算出slot获取pubsubconnection
    int slot = connectionmanager.calcslot(channelname.getname());
    rfuture<redispubsubconnection> connfuture = nextpubsubconnection(slot);
    promise.oncomplete((res, e) -> {
        if (e != null) {
            ((rpromise<redispubsubconnection>) connfuture).tryfailure(e);
        }
    });


    connfuture.oncomplete((conn, e) -> {
        if (e != null) {
            freepubsublock.release();
            lock.release();
            promise.tryfailure(e);
            return;
        }

        // 这里会从配置中读取subscriptionsperconnection
        pubsubconnectionentry entry = new pubsubconnectionentry(conn, config.getsubscriptionsperconnection());
        // 每获取一次,subscriptionsperconnection就会减直到为0
        int remainfreeamount = entry.tryacquire();

        // 如果旧的存在,则将现有的entry释放,然后将listeners加入到oldentry中
        pubsubconnectionentry oldentry = name2pubsubconnection.putifabsent(channelname, entry);
        if (oldentry != null) {
            releasesubscribeconnection(slot, entry);

            freepubsublock.release();

            addlisteners(channelname, promise, type, lock, oldentry, listeners);
            return;
        }


        if (remainfreeamount > 0) {
            // 加入到队列中
            freepubsubconnections.add(entry);
        }
        freepubsublock.release();

        rfuture<void> subscribefuture = addlisteners(channelname, promise, type, lock, entry, listeners);

        // 这里真正的进行订阅(底层与redis交互)
        channelfuture future;
        if (pubsubtype.psubscribe == type) {
            future = entry.psubscribe(codec, channelname);
        } else {
            future = entry.subscribe(codec, channelname);
        }

        future.addlistener(new channelfuturelistener() {
            @override
            public void operationcomplete(channelfuture future) throws exception {
                if (!future.issuccess()) {
                    if (!promise.isdone()) {
                        subscribefuture.cancel(false);
                    }
                    return;
                }

                connectionmanager.newtimeout(new timertask() {
                    @override
                    public void run(timeout timeout) throws exception {
                        subscribefuture.cancel(false);
                    }
                }, config.gettimeout(), timeunit.milliseconds);
            }
        });
    });
}

pubsubconnectionentry#tryacquire方法, subscriptionsperconnection代表了每个连接的最大订阅数。当tryacqcurie的时候会减少这个数量:

 public int tryacquire() {
    while (true) {
        int value = subscribedchannelsamount.get();
        if (value == 0) {
            return -1;
        }

        if (subscribedchannelsamount.compareandset(value, value - 1)) {
            return value - 1;
        }
    }
}

梳理上述逻辑:

1、还是进行重复判断, 根据channelname从name2pubsubconnection中获取,看是否存在已经订阅:pubsubconnectionentry; 如果存在直接把新的listener加入到pubsubconnectionentry。
2、从队列freepubsubconnections中取公用的pubsubconnectionentry, 如果没有就进入connect()方法

2.1 会根据subscriptionsperconnection创建pubsubconnectionentry, 然后调用其tryacquire()方法 - 每调用一次就会减1
2.2 将新的pubsubconnectionentry放入全局的name2pubsubconnection, 方便后续重复使用;
2.3 同时也将pubsubconnectionentry放入队列freepubsubconnections中。- remainfreeamount > 0
2.4 后面就是进行底层的subscribe和addlistener

3、如果已经存在pubsubconnectionentry,则利用已有的pubsubconnectionentry进行tryacquire;
4、如果remainfreeamount < 0 会抛出illegalstateexception异常;如果remainfreeamount=0,则会将其从队列中移除, 那么后续请求会重新获取一个可用的连接
5、最后也是进行底层的subscribe和addlistener;

三 总结

根因: 从上面代码分析, 导致问题的根因是因为publishsubscribeservice 会使用公共队列中的freepubsubconnections, 如果同一个key一次性请求超过subscriptionsperconnection它的默认值5时,remainfreeamount就可能出现-1的情况, 那么就会导致commandexecutor.syncsubscription(future)中等待超时,也就抛出如上异常subscribe timeout: (7500ms). increase 'subscriptionsperconnection' and/or 'subscriptionconnectionpoolsize' parameters.

解决方法: 在初始化redisson可以可指定这个配置项的值。

相关参数的解释以及默认值请参考官网:https://github.com/redisson/redisson/wiki/2.-configuration#23-common-settings

到此这篇关于关于使用redisson订阅数问题的文章就介绍到这了,更多相关redisson 订阅数 内容请搜索萬仟网以前的文章或继续浏览下面的相关文章希望大家以后多多支持萬仟网!

(0)
打赏 微信扫一扫 微信扫一扫

相关文章:

  • scratch走迷宫游戏脚本怎么编程? Scratch编写走迷宫游戏的技巧

    在scratch里,通过将光标控制键和面向方向命令结合,可以有效控制对象的移动,实现人机交互,从而完成走迷宫游戏的创建,今天我们就来看看scratch2.0中创建走迷宫游戏的技巧,…

    2022年01月14日 办公
  • python Tkinter是什么

    python Tkinter是什么

    tkinter(即 tk interface,简称“tk”)本质上是对 tcl/tk 软件包的 python 接口封装,它是 pytho... [阅读全文]
  • Python双端队列实现回文检测

    Python双端队列实现回文检测

    目录一、双端队列二、回文检测补充一、双端队列双端队列 deque 是一种有次序的数据集,跟队列相似,其两端可以称作"首" 和 "尾... [阅读全文]
  • 真我GT2Pro怎么拍好看的月亮?真我GT2Pro月亮模式设置教程

    真我gt2pro是一款功能全面可以给用户们带来非常不错的手机体验,那么真我gt2pro如何拍月亮呢?下面小编为大家带来了真我gt2pro拍月亮好看的方法,有需要的小伙伴可以来了解下…

    2022年01月14日 手机
  • Python特效之文字成像方法详解

    Python特效之文字成像方法详解

    目录一、特效预览二、程序原理三、程序源码一、特效预览处理前处理后细节放大后二、程序原理1.输入你想隐藏的文字2.然后写到另一张跟照片同等大小的空白纸张上3.将相... [阅读全文]

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2022  萬仟网 保留所有权利. 粤ICP备17035492号-1
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com