看别人实现一个Observable

最近在看Rx相关的东西,刚好看到别人实现的一个Observable,一开始看这个代码的时候还是有点费劲,这里就记录一下这个代码的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
var observable = new Observable(function(observer){
observer.next(1);
observer.next(2);
observer.complete();
})

var observer = {
next: function (value) {
console.log(value)
},
complete: function () {
console.log('complete!')
}
}

observable.subscribe(observer)
//1
//2
//complete!

现在我们来做最简单的Observable的实现,通过上面的代码我们知道Observable有一个订阅的方法subscribe

1
2
3
4
5
6
7
8
class Observable {
constructor(callFn){
this.callFn = callFn
}
subscribe(observer){
this.callFn(observer)
}
}

好了,运行一下没问题。真的没问题了吗,我们看一下下面的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
var observable = new Observable(function(observer){
observer.next(1);
observer.next(2);
observer.complete();
observer.error();
observer.next(3);
})

var observer = {
next: function (value) {
console.log(value)
},
complete: function () {
console.log('complete!')
},
error: function () {
console.log('error!')
}
}

observable.subscribe(observer)
//1
//2
//complete!
//error!
//3

这个结果也是预料之中的了,那要怎么样才能在调用了observer的complete和error方法后不再继续执行下去呢。这里的observer完全是外部传入的,我们要实现上面的功能只能重新包装一下传入的observer,经过包装的observer有一个状态,调用complete跟error方法的时候就改变这个状态。然后每次调用next方法的时候就要先判断这个状态,看是否已经结束或者中间发生了错误。这样我们来重新定义一个新的observer,套用一下React的说法称这个新的observer高阶observer。233333

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class Observer {
constructor(oldObserver){
this.isStopped = false
this.oldObserver = oldObserver
}
stop() {
this.isStopped = true
}
next(value){
if(!this.isStopped) {
try {
this.oldObserver.next(value)
} catch(e){
this.stop()
}
}
}
complete(){

}
error(){

}
}