RxSwift 成长之路---基本概念和语法
更新时间: 2017-02-28 RxSwift + Moya + ObjectMapper + MVVM 的网络请求
说明
刚开始接触RxSwift
, 很多概念理不清; 试着阅读官方的playground
, 无奈英语水平有限, 看着英文文档,更加迷糊. 几经周折,找到这篇很好的教程, 作者整理的非常清晰, 感谢作者的分享精神.
请各位看官移步原文田腾飞的博客 【iOS开发】RxSwift入坑解读-你所需要知道的各种概念
此处整理的不够清晰, 仅仅是为了自己对这些概念的进一步的记忆.所以为了不耽误大家的时间, 请大家点击这里,阅读原文作者的博客沸沸腾(狒狒)
文件目录预览, 基本概念划分
内容拆分
基本概念
Observable
和Observer
Observable
是可被观察的,理解为事件源.Observer
是我们的观察者, 收到事件后, 事件的处理者 观察者需要订阅(subscribe
) 被观察者,才能收到Observable
的事件通知消息- 创建和订阅被观察者 创建被观察者其实就是创建一个
Observable
的sequence
,就是创建一个流, 然后就可以被订阅subscribe
, 这样被观察者发出事件, 我们就能做相应的处理- DisposeBag
DisposeBag
其实相当于iOS
中ARC
, 在适当的时候销毁观察者, 理解为内存管理者- subscribe
subscribe
是订阅sequence
发出的事件, 比如next
事件,error
事件等. 而subscribe(onNext:)
是监听sequence
发出的next
事件中的element
进行处理, 他会忽略error
和complete
事件. 相对应的还有subscribe(onError:)
和subscribe(onCompleted:)
never
是创建一个sequence
, 但是不发出任何事件信号
Observable<String>.never().subscribe { (_) in
print("不会打印这句话, 因为不会被执行")
}.addDisposableTo(DisposeBag())
empty
是创建一个空的sequence
, 只能发出一个complete
事件
Observable<Int>.empty().subscribe { event in print(event) }.addDisposableTo(disposeBag)
just
是创建一个sequence
能发出特定的事件, 能正常结束
Observable<String>.just("💗").subscribe { (string) in
print(string)
}.addDisposableTo(DisposeBag())
of
是创建一个sequence
能发出很多事件信号, subscribe只监听事件
Observable.of("1--", "2--", "3--", "4--").subscribe { (str) in
print(str)
}.addDisposableTo(DisposeBag())
Observable.of("1--", "2--", "3--", "4--").subscribe(onNext: { (str) in
print(str)
}, onError: nil, onCompleted: nil, onDisposed: nil).addDisposableTo(DisposeBag())
from
是从集合中创建sequence
, 例如: 字典, 数组, set
Observable.from(["😀", "😬", "😂", "😅"]).subscribe(onNext: { print($0)
}).addDisposableTo(DisposeBag())
create
我们可以自定义可观察的sequence
let myJust: (String) -> Observable<String> = { (element: String) -> Observable<String> in
return Observable.create({ (observer) -> Disposable in
observer.on(.next(element))
observer.on(.completed)
return Disposables.create()
})
}
myJust("自定义可观察的sequence").subscribe(onNext: { (str) in
print(str)
}).addDisposableTo(DisposeBag())
range
就是创建一个sequence
, 他可以发出这个范围从开始到结束的所有事件
Observable.range(start: 1, count: 10).subscribe { print($0)
}.addDisposableTo(DisposeBag())
repeatElement
创建一个sequence
, 发出特定事件n次
Observable.repeatElement("⚾️").take(3).subscribe { print($0)
}.addDisposableTo(DisposeBag())
generate
是创建一个可观察的sequence
, 当初始化的条件为true
时, 他就会发出所对应的事件
Observable.generate(initialState: 0, condition: { (element) -> Bool in
element < 5
}) { (element) -> Int in
element + 1
}.subscribe { (element) in
print(element)
}.addDisposableTo(DisposeBag())
deferred
(延期) 会为每一个订阅者observer
, 创建一个新的可观察序列
var count = 1
let deferredSequence = Observable<String>.deferred { () -> Observable<String> in
print("Creating \(count)")
count += 1
return Observable.create({ (observer) -> Disposable in
observer.onNext("🏄🏽♀️")
observer.onNext("🏄")
observer.onNext("⛷")
return Disposables.create()
})
}
deferredSequence.subscribe { print($0)
}.addDisposableTo(DisposeBag())
deferredSequence.subscribe { print($0)
}.addDisposableTo(DisposeBag())
error
创建一个可观察序列, 但不发出任何正常的事件, 只发出error
事件并结束
doOno doOnNext( :)
方法就是在subscribe(onNext:)
前调用,doOnCompleted(:)
就是在subscribe(onCompleted:)
前面调用的。
Observable.of("🚗", "🚕", "🚙", "🚌").do(onNext: { (str) in
print("doOn--\(str)")
}, onError: nil, onCompleted: nil, onSubscribe: nil, onDispose: nil).subscribe { (str) in
print(str)
}.addDisposableTo(DisposeBag())
Subject的使用
Subject
是Observable
和observer
之间的桥梁, 一个Subject
既是Observable
也是observer
, 他既可以发出事件,又可以监听事件PublishSubject
,ReplaySubject
和BehaviorSubject
是不会自动发出completed
事件的。
publishSubject
当你订阅publishSubject
的时候, 你只能接收到订阅他之后发生的事件, subject.onNext()
发出onNext
事件,对应的还有onError()
和onCompleted()
事件
let publishSubject = PublishSubject<String>()
let disposeBag = DisposeBag()
publishSubject.subscribe({ print("1---", $0)
}).addDisposableTo(disposeBag)
publishSubject.onNext("🐶")
publishSubject.onNext("🐱")
publishSubject.subscribe({ print("2---", $0)
}).addDisposableTo(disposeBag)
publishSubject.onNext("🅰️")
publishSubject.onNext("🅱️")
ReplaySubject
当你订阅ReplaySubject
的时候, 你可以接收到订阅他之后的事件, 单页可以接收订阅他之前发出的事件, 接收几个事件取决于bufferSize
的大小
let replaySubject = ReplaySubject<Any>.create(bufferSize: 3)
let replayDisposeBag = DisposeBag()
replaySubject.subscribe({ print("replaySubject--1--", $0)}).addDisposableTo(replayDisposeBag)
replaySubject.onNext("A")
replaySubject.onNext("B")
replaySubject.onNext("C")
replaySubject.onNext("D")
replaySubject.subscribe({ print("replaySubject--2--", $0)}).addDisposableTo(replayDisposeBag)
replaySubject.onNext("一")
replaySubject.onNext("二")
BehaviorSubject
当你订阅了BehaSubject
, 你会接收到订阅之前的最后一个事件
let behaviorSubject = BehaviorSubject(value: "🍎")
let behaviorDisposeBag = DisposeBag()
print("\n\n")
behaviorSubject.subscribe({ print("BehaviorSubject--1--", $0) }).addDisposableTo(behaviorDisposeBag)
behaviorSubject.onNext("🌝")
behaviorSubject.onNext("🌛")
behaviorSubject.onNext("🌟")
behaviorSubject.subscribe({ print("BehaviorSubject--2--", $0)} ).addDisposableTo(behaviorDisposeBag)
behaviorSubject.onNext("🌎")
behaviorSubject.onNext("🌕")
注:
PublishSubject
,ReplaySubject
和BehaviorSubject
是不会自动发出completed
事件的。
Variable
是behaviorSubject
的一个包装箱, 就像是一个箱子一样, 使用的时候需要调用asObservable
拆箱, 里面的value
是一个BehaviorSubject
, 他不会发出error
事件, 但是会自动发出complete
事件
let variable = Variable("🌖")
let variableDisposeBag = DisposeBag()
variable.asObservable().subscribe({ print("variableDisposeBag--1--", $0)} ).addDisposableTo(variableDisposeBag)
variable.value = "☀️"
variable.value = "🌤"
variable.asObservable().subscribe({ print("variableDisposeBag--2--", $0)}).addDisposableTo(variableDisposeBag)
variable.value = "🔴"
variable.value = "🔵"
联合操作
联合操作就是把多个
Observable
(被观察者) 合并成单个observable
流
startWith
在发出事件消息之前, 先发出某个特定的事件消息, 比如发出事件2, 3 ,startWith(1),后,会先发出1, 然后发出2, 3
Observable.of("2", "3").startWith("1").startWith( "-1", "0").subscribe({ print("startWith---",$0 )}).addDisposableTo(DisposeBag())
merge
把两个Observable
流合并成一个Observable
流, 根据时间轴发出对应的事件
let mergeDisposeBag = DisposeBag()
let mergeSubject1 = PublishSubject<String>()
let mergeSubject2 = PublishSubject<String>()
Observable.of(mergeSubject1, mergeSubject2).merge().subscribe(onNext: { print($0) }, onError: nil, onCompleted: nil, onDisposed: nil).addDisposableTo(mergeDisposeBag)
mergeSubject1.onNext("🌕")
mergeSubject1.onNext("🌖")
mergeSubject2.onNext("🌗")
mergeSubject1.onNext("🌘")
mergeSubject2.onNext("🌑")
zip
绑定小于等于8 个的Observable
流, 结合在一起办理, 注: zip 是一个事件对应一个事件, 不满足配对的将会被抛弃
let stringSubject = PublishSubject<String>()
let intSubject = PublishSubject<Int>()
let zipDisposeBag = DisposeBag()
Observable.zip(stringSubject, intSubject) { (stringElement, intElement) -> String in
return "\(stringElement) \(intElement)"
}.subscribe(onNext: { print($0)}).addDisposableTo(zipDisposeBag)
stringSubject.onNext("A")
stringSubject.onNext("B")
intSubject.onNext(1)
intSubject.onNext(2)
intSubject.onNext(3)
stringSubject.onNext("C")
intSubject.onNext(4)
combineLatest
绑定最多不超过8个的Obaservable
, 结合在一起处理. 和zip
不同的是, combineLast
是一个流的事件对应另一流的’最后’一个事件, 两个流的事件都是’最近’的事件
let stringSubject2 = PublishSubject<String>()
let intSubject2 = PublishSubject<Int>()
let zipDisposeBag2 = DisposeBag()
print("\n")
Observable.combineLatest(stringSubject2, intSubject2) { (stringElement, intElement) -> String in
return "\(stringElement) \(intElement)"
}.subscribe(onNext: { print($0)}).addDisposableTo(zipDisposeBag2)
stringSubject2.onNext("A")
stringSubject2.onNext("B")
intSubject2.onNext(1)
intSubject2.onNext(2)
intSubject2.onNext(3)
stringSubject2.onNext("C")
intSubject2.onNext(4)
switchLatest
可以对事件流进行转换, 本来监听的subject1
, 我们可以通过改变variable
里面的value
, 更换事件源, 变成监听subject2
let switchDisposeBag = DisposeBag()
let subject1 = BehaviorSubject(value: "😍")
let subject2 = BehaviorSubject(value: "😘")
let variable = Variable(subject1)
variable.asObservable().switchLatest().subscribe({ print($0)}).addDisposableTo(switchDisposeBag)
subject1.onNext("🍏")
subject1.onNext("🍎")
subject2.onNext("🍝")
variable.value = subject2
subject1.onNext("🍉")
subject2.onNext("🍲")
变换操作
变换操作
map
通过传入一个函数闭包把原来的sequence
转换为一个新的sequence
Observable.of(1, 2, 3).map { $0 * $0
}.subscribe({ print($0)}).addDisposableTo(DisposeBag())
flatMap
将一个sequence
转换为另一个sequence
, 当你接收一个sequence
事件, 你还想接收其他sequence
发出的事件的话,可以使用flatMap
, 他会将每一个sequence
事件进行处理后, 然后再以一个新的sequence
形式发出事件.
let flatMapDisposeBag = DisposeBag()
struct Player {
var score: Variable<Int>
}
let 😈 = Player(score: Variable(80))
let 👻 = Player(score: Variable(90))
let 💀 = Player(score: Variable(550))
let variablePlayer = Variable(😈)
variablePlayer.asObservable().flatMapLatest { $0.score.asObservable()
}.subscribe({ print($0)}).addDisposableTo(flatMapDisposeBag)
😈.score.value = 85
variablePlayer.value = 👻
😈.score.value = 95
😈.score.value = 222
variablePlayer.value = 💀
👻.score.value = 100
💀.score.value = 666
flatMapLatest 只会接收最新的value事件
scan
就是一个初始化的数, 然后不断的拿前一个结果和最新的值, 进行处理操作
Observable.range(start: 1, count: 100).scan(0) { (a, b) -> Int in
return a + b
}.subscribe({ print($0)}).addDisposableTo(DisposeBag())
Observable.of(10, 100, 1000).scan(0) { $0 + $1
}.subscribe({ print($0)}).addDisposableTo(DisposeBag())
过滤和约束
过滤和约束
filter
过滤掉某些不符合要求的事件
Observable.of("😀", "😙", "😀", "😀", "😜", "😍", "😇", "😗", "😘", "🙃", "😉").filter { (str) -> Bool in
return str == "😀"
}.subscribe({ print($0)}).addDisposableTo(DisposeBag())
Observable.range(start: 0, count: 100).filter { (num) -> Bool in
return num % 2 == 0
}.subscribe({ print($0)}).addDisposableTo(DisposeBag())
distinctUntilChanged
当下一个事件与前一个事件不是同一个事件才进行处理操作
Observable.of("⚽️", "🏈", "🏀", "⚾️", "⚾️", "⚾️", "🏀", "⚽️", "🏀", "⚽️", "⚽️", "🎾", "🏐", "⚽️", "🏐", "🏐").distinctUntilChanged().subscribe( { print($0)} ).addDisposableTo(DisposeBag())
elementAt 只处理在指定位置的事件
Observable.of("🎤", "🎼", "🎹", "🎧").single().subscribe( { print($0) } ).addDisposableTo(DisposeBag())
error(Sequence contains more than one element.)
Observable.of("🎹", "🎹", "🎧").single( {$0 == "🎧"} ).subscribe( { print($0) } ).addDisposableTo(DisposeBag())
completed
Observable.of("🎹", "🎹", "🎧").single( {$0 == "🎹"} ).subscribe( { print($0) } ).addDisposableTo(DisposeBag())
error(Sequence contains more than one element.)
Observable.of("🎹", "🎹", "🎧").single( {$0 == "🎤"} ).subscribe( { print($0) } ).addDisposableTo(DisposeBag())
error(Sequence doesn’t contain any elements.)
take
只处理前几个事件信号
Observable.of("☮️", "☪️", "🔯", "☯️", "♉️", "♒️").take(3).subscribe( { print($0) } ).addDisposableTo(DisposeBag())
takeLast
只处理后几个事件信号
Observable.of("☮️", "☪️", "🔯", "☯️", "♉️", "♒️").takeLast(3).subscribe( { print($0) } ).addDisposableTo(DisposeBag())
takeWhile
当条件满足时处理
Observable.range(start: 1, count: 5).takeWhile( { $0 <= 3} ).subscribe( { print($0) } ).addDisposableTo(DisposeBag())
takeUntil
接收事件消息, 直到另一个sequence 发出事件消息的时候
let sourceSequence = PublishSubject<String>()
let stopSequence = PublishSubject<String>()
let takeUntilDisposeBag = DisposeBag()
sourceSequence.takeUntil(stopSequence).subscribe( { print($0) } ).addDisposableTo(takeUntilDisposeBag)
sourceSequence.onNext("♻️")
sourceSequence.onNext("🈯️")
stopSequence.onNext("⛔️")
sourceSequence.onNext("💹")
skip
取消前几个事件
Observable.of("🈶", "🈷️", "🈺", "🈸", "🈚️").skip(2).subscribe( { print($0)} ).addDisposableTo(DisposeBag())
skipWhile
满足事件消息的都取消, 当遇到不满足条件的事件消息后,后面的就不考虑了
Observable.of("🈶", "🈷️", "🈺", "🈸", "🈚️").skipWhile( { $0 == "🈶"} ).subscribe( { print($0)} ).addDisposableTo(DisposeBag())
Observable.range(start: 0, count: 5).skipWhile( { $0 < 3} ).subscribe( { print($0)} ).addDisposableTo(DisposeBag())
skipWhileWithIndex
满足条件的都被取消, 可以根据元素和下表分别决定, 传入的闭包和skipWhile
有点区别而已, 当遇到不满足条件的事件消息后,后面的就不考虑了
let skipWithIndexDisposeBag = DisposeBag()
Observable.of("1🈶", "2😀", "1🈺", "🈸", "😅").skipWhileWithIndex { (element, index) -> Bool in
return index != 3
}.subscribe( { print("----", $0)} ).addDisposableTo(skipWithIndexDisposeBag)
注: 当遇到不满足条件的事件消息后,后面的就不考虑了
skipUntil
知道某个sequence
发出了事件消息, 才开始接收当前sequence
发出的事件消息
let skipUntilDisposeBag = DisposeBag()
let skipUntilStartSubject = PublishSubject<String>()
let skipUntilStopSubject = PublishSubject<Int>()
skipUntilStartSubject.skipUntil(skipUntilStopSubject).subscribe( { print($0) } ).addDisposableTo(skipUntilDisposeBag)
skipUntilStartSubject.onNext("🐠")
skipUntilStartSubject.onNext("🐟")
skipUntilStopSubject.onNext(0)// skipUntilStopSubject 接收消息后, skipUntilStartSubject才会开始接收消息
skipUntilStartSubject.onNext("🐡")
skipUntilStartSubject.onNext("🐬")
注:
skipUntilStopSubject
接收消息后,skipUntilStartSubject
才会开始接收消息
数学操作
数学操作
toArray
将sequence
转换成一个Array
, 并转换成单一事件信号, 然后结束
Observable.range(start: 1, count: 10).toArray().subscribe( { print($0) } ).addDisposableTo(DisposeBag())
reduce
用一个初始值, 对事件数据进行累计操作. reduce
接收一个初始值,和一个操作符
Observable.of(10, 100, 1000).reduce(1, accumulator: +).subscribe( { print($0) } ).addDisposableTo(DisposeBag())
concat
(合并) 把多个sequence
合并为 一个 sequence
, 并且当前面一个sequence
发出了complete
事件, 才会开始下一个sequence
事件
let concatDisposeBag = DisposeBag()
let concatBehaviorSubject1 = BehaviorSubject(value: "☀️")
let concatBehaviorSubject2 = BehaviorSubject(value: "🍎")
let concatVariable = Variable(concatBehaviorSubject1)
concatVariable.asObservable().concat().subscribe( { print($0) } ).addDisposableTo(concatDisposeBag)
concatBehaviorSubject1.onNext("🌧")
concatBehaviorSubject1.onNext("❄️")
concatVariable.value = concatBehaviorSubject2
concatBehaviorSubject2.onNext("🍓")
concatBehaviorSubject1.onCompleted()
concatBehaviorSubject2.onNext("🍒")
concatBehaviorSubject2.onNext("🍉")
concatBehaviorSubject1.onNext("⛅️")
next(☀️) next(🌧) next(❄️) next(🍓) next(🍒) next(🍉)
连接性操作
Connectable Observable 订阅时不开始发射事件消息, 而是仅当调用他们的connec() 方法时. 这样就可以等待我们所有想要的订阅者都订阅了以后再开始发事件消息, 这样能保证我们所有的订阅者都能够接收到事件消息. 简单说就是等大家都准备完成后, 才开始发消息
例: 每隔一秒 发送一个事件
let interval = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
_ = interval.subscribe(onNext: { print("Subscription: 1, Event: \($0)") })
DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + 5) {
_ = interval.subscribe(onNext: { print("Subscription: 2, Event: \($0)") })
}
publish
将一个正常的sequence
转换为 connectable sequence
, 可以指定时间发送事件
let intSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance).publish()
_ = intSequence.subscribe(onNext: { print("Subscription: 1, Event: \($0)") })
DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + 2) {
_ = intSequence.connect()
}
DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + 4) {
_ = intSequence.subscribe(onNext: { print("Subscription: 2, Event: \($0)") })
}
DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + 6) {
_ = intSequence.subscribe(onNext: { print("Subscription: 3, Event: \($0)") })
}
replay
讲一个正常的sequence
转换成一个connectable sequence
, 然后和replaySubject
的`bufferSize相似, 能接收到订阅之前的事件消息
let replaySequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance).replay(1) // 能接收到订阅之前的n条信息
_ = replaySequence.subscribe( { print("第1个订阅者--Event", $0)} )
DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + 1) {
_ = replaySequence.connect()
}
DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + 4) {
_ = replaySequence.subscribe( { print("第2个订阅者--Event", $0)} )
}
DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + 6) {
_ = replaySequence.subscribe( { print("第3个订阅者--Event", $0)} )
}
multicast
将一个正常的sequence
转换为 connectable sequence
, 并且通过特定的subject
发送出去, 例: publishSubject
, replaySubject
, behaviorSubject
, Variable
let subject = PublishSubject<Int>()
_ = subject.subscribe( { print("subject1--", $0) } )
let multicastSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance).multicast(subject)
_ = multicastSequence.subscribe( { print("subject2--", $0) } )
DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + 2) {
_ = multicastSequence.connect()
}
DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + 4) {
_ = multicastSequence.subscribe( { print("subject3--", $0) } )
}
错误处理
错误处理
catchErrorJustReturn
遇到’error’的时候就’return’一个值, 然后结束
let justReturnDisposeBag = DisposeBag()
let justReturnSubject = PublishSubject<String>()
justReturnSubject.catchErrorJustReturn("😱").subscribe( { print($0) } ).addDisposableTo(justReturnDisposeBag)
justReturnSubject.onNext("☺️")
justReturnSubject.onNext("😋")
justReturnSubject.onNext("😍")
justReturnSubject.onError(NSError(domain: "❎", code: 1000001, userInfo: nil) as Error)
catchError
捕获error
进行处理, 可以返回另一个sequence
进行订阅
let catchErrorDisposeBag = DisposeBag()
let catchErrorSubject1 = PublishSubject<String>()
let catchErrorSubject2 = PublishSubject<String>()
catchErrorSubject1.catchError { (error) -> Observable<String> in
print("errro: ---\(error)")
return catchErrorSubject2
}.subscribe( { print($0) } ).addDisposableTo(catchErrorDisposeBag)
catchErrorSubject1.onNext("😀")
catchErrorSubject1.onNext("😬")
catchErrorSubject2.onNext("😂") // 不会打印, 当subject1 遇到error后, subject2 才会被订阅
catchErrorSubject1.onError(NSError(domain: "❎", code: 1000001, userInfo: nil) as Error)
catchErrorSubject2.onNext("🤑") // 会打印, 因为已经被订阅了
retry
(重试) 遇见error
事件可以进行重试, 比如网络请求失败, 可以进行重新连接
let retryDisposeBag = DisposeBag()
var count = 1
let myRetrySequence = Observable<String>.create { (observer) -> Disposable in
observer.onNext("----1")
observer.onNext("----2")
observer.onNext("----3")
if count <= 2 {
let error = NSError(domain: "❎", code: 1000001, userInfo: nil) as Error
observer.onError(error)
print(error)
count += 1
}
observer.onNext("----4")
observer.onCompleted()
return Disposables.create()
}
myRetrySequence.retry(5).subscribe( { print($0) } ).addDisposableTo(retryDisposeBag)
retry()
无限重复— 直到成功
myRetrySequenceretry().subscribe( { print($0) } ).addDisposableTo(retryDisposeBag)
debug
debug
debug
打印所有的订阅, 事件, 和disposals
myRetrySequence.debug().retry(5).subscribe( { print($0) } ).addDisposableTo(retryDisposeBag)
RxSwift.Resources.total 查看RxSwift 所有资源的占用
print(RxSwift.Resources.total)
RxSwift.Resources.total 没能调用成功 😂