数据流的传输过程, 就是订阅者角色的交接过程 ,数据由新的交接者进行传递。
就像是接驳, 一条流水线上, 数据从 一个一个工人手上经过。 数据每转换一次,就传递给新的工人,由新工人加工处理。 角色的转变。
当转变到最后一个工人手中, 此时源数据产生了, 整个流水线被激活。 数据没有产生时, 整个流水线上的工人全部待命。
数据的发送,会接力给新的订阅者。
在给新订阅者之前, 源信号的didsubscribe必定先执行。
必定需要订阅源信号,创建原信号的订阅者。 这样才能是老订阅者-》新订阅者这么转换
bind 正式用来处理两个信号的。
什么是信号
创建订阅者/订阅信号
subscribe信号的
subscribenext信号的
发送信号
sendnext订阅者的
subscribenextblock。。。信号是用来创建订阅者的。
subscribe
创建订阅者并且保存订阅者
如果不能保存订阅者,就直接立即给创建的这个订阅者发送消息。
如果保存了订阅者,则以后还是可以给这个订阅者发送消息。
订阅过程就是数据流发送过程。
什么是订阅者
因为他有3个block/next/completed/error
订阅者是用来发送数据的。
订阅者是持有disposble的,来决定是否发送数据。
RACSubject是信号也是订阅者。
他能保存订阅者,订阅者可以再次被发送消息。
也能保存数据。数据可以再次被发送
RACSignal subscribeNext: 创建订阅者
RACSignal subscribe: 直接传递订阅者进来
订阅的时候,保存订阅者。 把以前发送过的消息发给新的订阅者。
RACSubscriber(订阅者/或是继承了此订阅者协议的信号) sendNext:
无非是区别于下面2种需求:
热信号:
订阅和发送是相互独立的。
1、订阅的时候,保存订阅者。 把以前发送过的消息发给新的订阅者。
2、发送消息的时候,保存消息。并且给以前订阅过的订阅者发送新的消息。
冷信号:
3、订阅的时候,调用didsubscribe去动态的创建消息并发送消息; 订阅和发送是连贯的。只有订阅才会发送。订阅一次就didsubscribe一次。没有订阅,那个信号就是懒惰版的存在。没有买卖就没有杀戮。
(1)、 RACSignal subscribeNext:
1,调用subscriberWithNext,创建订阅者RACSubscriber
2,调用订阅者的subscribe
(2)、 RACSignal subscribe:
1,RACDynamicSignal。创建订阅者RACPassthroughSubscriber, 调用didSubscribe
2,RACSubject。 创建订阅者RACPassthroughSubscriber,保存订阅者
3,RACBehaviorSubject。
创建订阅者RACPassthroughSubscriber,保存订阅者
给当前的订阅者sendNext(发送已经发送过的消息。因为RACBehaviorSubject实质上就是RACSignal)
4,RACReplaySubject。
遍历当前保存的消息,给当前订阅者 sendNext (发送已经发送过的消息。因为RACBehaviorSubject实质上就是RACSignal)
如果已经hanCOmpleted,给当前订阅者 sendCompleted
如果已经hasError,给当前订阅者 sendError
否则:创建订阅者RACPassthroughSubscriber,保存订阅者
5,RACReturnSignal。调用订阅者的sendNext, sendCompleted
6,RACEmptySignal。调用订阅者的sendCompleted
7,RACErrorSignal。调用订阅者的sendError
8,RACChannelTerminal。给所有的Signal调用subscribe
(3)、 RACSubscriber sendNext:
RACPassthroughSubscriber主要作用是 控制是否dispose了
如果dispose了,就不sendnext;
如果没有dipose,就调用RACSubscriber的sendnext即nextBlock
1、给订阅者发送消息 ;
(
情况1: 保存消息,只保存一条; 给所有订阅者发送消息
情况2: 保存消息, 保存所有条数; 给所有订阅者发送消息
情况3: 给所有订阅者发送消息
)
RACStream
->RACSignal
->RACDynamicSignal
->RACEmptySignal
->RACErrorSignal
->RACReturnSignal
->RACSubject <RACSubscriber>
->RACBehaviorSubject
->RACReplaySubject
->RACGroupedSignal 通过继承RACSubject来增加属性key
->RACChannelTerminal <RACSubscriber>
[signal subscribe:subscriber];
[self.sourceSignal subscribe:_signal];
[self.values subscribe:subscriber];
[subscriber sendNext:signal]
[subscriber sendNext:x];
-
(RACDisposable *)subscribeNext:(void (^)(id x))nextBlock {
NSCParameterAssert(nextBlock != NULL);RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:NULL completed:NULL];
return [self subscribe:o];
}
-
(RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];NSMutableArray *subscribers = self.subscribers;
@synchronized (subscribers) {
[subscribers addObject:subscriber];
}[disposable addDisposable:[RACDisposable disposableWithBlock:^{
@synchronized (subscribers) {
// Since newer subscribers are generally shorter-lived, search
// starting from the end of the list.
NSUInteger index = [subscribers indexOfObjectWithOptions:NSEnumerationReverse passingTest:^ BOOL (id<RACSubscriber> obj, NSUInteger index, BOOL *stop) {
return obj == subscriber;
}];if (index != NSNotFound) [subscribers removeObjectAtIndex:index]; }}]];
return disposable;
}
-
(RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];if (self.didSubscribe != NULL) {
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
RACDisposable *innerDisposable = self.didSubscribe(subscriber);
[disposable addDisposable:innerDisposable];
}];[disposable addDisposable:schedulingDisposable];}
return disposable;
}