当前位置: 萬仟网 > 移动技术>移动开发>Android > 详解用RxJava实现事件总线(Event Bus)

详解用RxJava实现事件总线(Event Bus)

2019年07月24日  | 萬仟网移动技术  | 我要评论
目前大多数开发者使用eventbus或者otto作为事件总线通信库,对于rxjava使用者来说,rxjava也可以轻松实现事件总线,因为它们都依据于观察者模式。 不多说,

目前大多数开发者使用eventbus或者otto作为事件总线通信库,对于rxjava使用者来说,rxjava也可以轻松实现事件总线,因为它们都依据于观察者模式。

不多说,上代码

/**
* rxbus
* created by yokeyword on 2015/6/17.
*/
public class rxbus {
  private static volatile rxbus defaultinstance;

  private final subject<object, object> bus;
  // publishsubject只会把在订阅发生的时间点之后来自原始observable的数据发射给观察者
  public rxbus() {
   bus = new serializedsubject<>(publishsubject.create());
  }
  // 单例rxbus
  public static rxbus getdefault() {
    if (defaultinstance == null) {
      synchronized (rxbus.class) {
        if (defaultinstance == null) {
          defaultinstance = new rxbus();
        }
      }
    }
    return defaultinstance ;
  }
  // 发送一个新的事件
  public void post (object o) {
    bus.onnext(o);
  }
  // 根据传递的 eventtype 类型返回特定类型(eventtype)的 被观察者
  public <t> observable<t> toobservable (class<t> eventtype) {
    return bus.oftype(eventtype);
//    这里感谢小鄧子的提醒: oftype = filter + cast
//    return bus.filter(new func1<object, boolean>() {
//      @override
//      public boolean call(object o) {
//        return eventtype.isinstance(o);
//      }
//    }) .cast(eventtype);
  }
}

注:

1、subject同时充当了observer和observable的角色,subject是非线程安全的,要避免该问题,需要将 subject转换为一个 serializedsubject ,上述rxbus类中把线程非安全的publishsubject包装成线程安全的subject。

2、publishsubject只会把在订阅发生的时间点之后来自原始observable的数据发射给观察者。

3、oftype操作符只发射指定类型的数据,其内部就是filter+cast(这里非常感谢@小鄧子  的提醒)

public final <r> observable<r> oftype(final class<r> klass) {
  return filter(new func1<t, boolean>() {
    @override
    public final boolean call(t t) {
      return klass.isinstance(t);
    }
  }).cast(klass);
}

filter操作符可以使你提供一个指定的测试数据项,只有通过测试的数据才会被“发射”。

cast操作符可以将一个observable转换成指定类型的observable。

分析:

rxbus工作流程图

1、首先创建一个可同时充当observer和observable的subject;

2、在需要接收事件的地方,订阅该subject(此时subject是作为observable),在这之后,一旦subject接收到事件,立即发射给该订阅者;

3、在我们需要发送事件的地方,将事件post至subject,此时subject作为observer接收到事件(onnext),然后会发射给所有订阅该subject的订阅者。

对于rxbus的使用,就和普通的rxjava订阅事件很相似了。

先看发送事件的代码:

rxbus.getdefault().post(new userevent (1, "yoyo"));

userevent是要发送的事件,如果你用过eventbus, 很容易理解,userevent的代码:

public class userevent {
  long id;
  string name;
  public userevent(long id,string name) {
    this.id= id;
    this.name= name;
  }
  public long getid() {
    return id;
  }
  public string getname() {
    return name;
  }
}

再看接收事件的代码:

// rxsubscription是一个subscription的全局变量,这段代码可以在oncreate/onstart等生命周期内
rxsubscription = rxbus.getdefault().toobserverable(userevent.class)
    .subscribe(new action1<userevent>() {
        @override
        public void call(userevent userevent) {
          long id = userevent.getid();
          string name = userevent.getname();
          ...
        }
      },
    new action1<throwable>() {
      @override
      public void call(throwable throwable) {
        // todo: 处理异常
      }    
    });

最后,一定要记得在生命周期结束的地方取消订阅事件,防止rxjava可能会引起的内存泄漏问题。

@override
protected void ondestroy() {
  super.ondestroy();
  if(!rxsubscription.isunsubscribed()) {
    rxsubscription.unsubscribe();
  }
}

这样,一个简单的event bus就实现了!如果你的项目已经开始使用rxjava,也许可以考虑替换掉eventbus或otto,减小项目体积。

rxbus、eventbus因为解耦太彻底,滥用的话,项目可维护性会越来越低;一些简单场景更推荐用回调、subject来代替事件总线。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持萬仟网。

如您对本文有疑问或者有任何想说的,请点击进行留言回复,万千网友为您解惑!

相关文章:

验证码:
Copyright © 2017-2021  萬仟网 保留所有权利. 粤ICP备17035492号-1
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com