在阅读源码之前容我抛出个小问题,看看下面的代码👇
//发送验证码
-(void)sendCodeRequesSignal{
RACSignal *signal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[self.accountServiceAssembly.authService sendResetPWDOTPCoder:self.phone withSuccessCallBack:^(id data) {
[subscriber sendNext:data];
[subscriber sendCompleted];
} withFailedCallBack:^(NSError *error) {
error = [HttpErrorCodeHandler errorWithCode:error.code userinfo:nil];
[subscriber sendError:error];
}];
return nil;
}];
[signal subscribeNext:^(id data) {
NSLog(@"%@", data);
}];
}
这是公司项目中对RAC的一段使用,生成的RACSignal实例自始至终都没有被引用,而这里是有一个异步请求的。作为一个热信号,请求结束后signal是应该已经释放掉了,又是如何做出响应的???(聪明的你可能已经猜到了与subscriber有关😊)。
好了,上面的问题先放一下。本文不会详细的讲述ReactiveCocoa各API的使用,着重说说一下几点 :
1、RACSignal源码
2、RACCommond源码
3、bind方法
4、踩坑
<b>一、RACSignal</b>
下面是RACSignal最基本的一个使用👇
RACSignal *signal = [RACDynamicSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
//1、发送信号
[subscriber sendNext:@"窈窕淑女"];
// //理论上这里可以对整个信息通道做任意修改,直接属性调用nextBlock发送信号
RACPassthroughSubscriber *passSubscriber = (RACPassthroughSubscriber *)subscriber;
RACSubscribeNextBlock nextBlock = [[passSubscriber valueForKeyPath:@"innerSubscriber"] valueForKeyPath:@"next"];
nextBlock(@"直接属性调用");
[subscriber sendCompleted];
return [RACDisposable disposableWithBlock:^{
NSLog(@"清理工作");
}];
}];
//第一次订阅信号
RACDisposable *disposeable1 = [signal subscribeNext:^(id x1) {
NSLog(@"君子好逑1");
}];
//第二次订阅信号
RACDisposable *disposeable2 = [signal subscribeNext:^(id x2) {
NSLog(@"君子好逑2");
}];
//生成静态(block不执行)信息->订购(触发信号)->订购事件做出响应
这里初始化了一个信号,并做了两次订阅。先看下信号是怎么生成的, RACSignal有一个RACDynamicSignal的子类只有create和subscribe两个接口
+ (RACSignal *)createSignal:(RACDidSubscribeBlock)didSubscribe {
RACDynamicSignal *signal = [[self alloc] init];
signal->_didSubscribe = [didSubscribe copy];
return [signal setNameWithFormat:@"+createSignal:"];
}
这里只生成了一个signal对象,并存储了入参didSubscribe block,从字面量不难猜测是在订阅之后执行的。
接着我们来看下subscribeNext:的实现
- (RACDisposable *)subscribeNext:(RACSubscribeNextBlock)nextBlock {
NSCParameterAssert(nextBlock != NULL);
//传入nextBlock后生成一个记录此任务的订购者(RACSubscriber);
RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:NULL completed:NULL];
//生成的"订购者"又如何与信号(self)关联上呢?-> RACDynamicSignal
return [self subscribe:o];
}
+ (instancetype)subscriberWithNext:(void (^)(id x))next error:(void (^)(NSError *error))error completed:(void (^)(void))completed {
RACSubscriber *subscriber = [[self alloc] init];
subscriber->_next = [next copy];
subscriber->_error = [error copy];
subscriber->_completed = [completed copy];
return subscriber;
}
从订阅者RACSubscriber的初始化api可知,一个订阅者是可以包含next、error、complete。而且如下两种实现方式并不是等效的,虽然效果可能是一样的(后面会讲到):
//订阅方法1
[signal subscribeNext:^(id x) {
}];
[signal subscribeError:^(NSError *error) {
}];
[signal subscribeCompleted:^{
}];
//订阅方法2
[signal subscribeNext:^(id x) {
} error:^(NSError *error) {
} completed:^{
}];
这里我们已经生成了一个完整的订购者和信号,但两者又是如何关联起来的呢,继续看最后一句的实现,RACDynamicSignal中最重要的一个接口
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);
//相当于init
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
//1
subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
//此时的subscriber对象包含了"信号源(signal)"和"订购者"的全部信息,形成一个完整的信息通道对象
if (self.didSubscribe != NULL) {
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
//关键:在这里触发了didSubscribe任务块,并传入一个"包含了全量信息"的"订购者"。
RACDisposable *innerDisposable = self.didSubscribe(subscriber);
[disposable addDisposable:innerDisposable];
}];
//这里注册的任务是直接执行的,schedulingDisposable为全局属性做一些清理工作。
[disposable addDisposable:schedulingDisposable];
}
return disposable;
}
我们先看第一步,这里生成了一个RACPassthroughSubscriber类型的对象,我们看其入参subscriber、self、disposable包含了信号、订阅者的全部信息。再看self.didSubscribe(subscriber)这句,signal存储的block在此时才执行,并传入一个持有了全量信息的的subscriber。
这里我们可以回头看下篇首的那个问题,在ReactiveCocoa 的实现过程中RACSignal并没有持有Subscriber, 而是生成的RACPassthroughSubscriber持有了这两者。这样一来只要subscriber没被释放,异步请求返回后RACSignal自然就不会被释放了。我们随便打个断点看看就能知道了👇
到这里我们的信号终于触发了,但怎样告知订阅者呢?从上图innerSubscriber的4个属性可以很轻松地看出来。直接调用subscriber的三个接口就是了。理论上通过subscriber你可以对整个信号通道的任意属性进行修改,但那是不符合游戏规则的😄
- (void)sendNext:(id)value {
@synchronized (self) {
void (^nextBlock)(id) = [self.next copy];
if (nextBlock == nil) return;
nextBlock(value);
}
}
- (void)sendError:(NSError *)e {
@synchronized (self) {
void (^errorBlock)(NSError *) = [self.error copy];
[self.disposable dispose];
if (errorBlock == nil) return;
errorBlock(e);
}
}
- (void)sendCompleted {
@synchronized (self) {
void (^completedBlock)(void) = [self.completed copy];
[self.disposable dispose];
if (completedBlock == nil) return;
completedBlock();
}
}
上面这三个订阅方法,没什么好说的了。需要注意下sendNext是没有调用dispose方法的,所以信号发送完了不要忘记sendCompleted方法的调用哦。也正因如此我们可以多次调用sendNext:哦。
通过源码的分析,我们很容易看出RACSignal是个不折不扣的热信号。当然,我们有木有办法不这样用呢,当然可以。篇首的问题就是抛出的砖,我们拿到subscriber不立即发消息出去,先存起来,等想要发出信号的时候再通知订阅者。事实上我们的项目中有不少这样的用法。如果觉得这样用很诡异,那我们接着往下看真正的冷信号RACCommand👇
<b>二、RACCommand</b>
因为RACCommand中有对RACSubject原理的应用,所以先简单的说下RACSubject,不废话,就抠两段代码看看你们就懂了
NSMutableArray *subscribers = self.subscribers;
@synchronized (subscribers) {
[subscribers addObject:subscriber];
}
- (void)enumerateSubscribersUsingBlock:(void (^)(id<RACSubscriber> subscriber))block {
NSArray *subscribers;
@synchronized (self.subscribers) {
subscribers = [self.subscribers copy];
}
for (id<RACSubscriber> subscriber in subscribers) {
block(subscriber);
}
}
#pragma mark RACSubscriber
- (void)sendNext:(id)value {
[self enumerateSubscribersUsingBlock:^(id<RACSubscriber> subscriber) {
[subscriber sendNext:value];
}];
}
和上面的RACSignal一个最明显的区别是:RACSignal每次订阅都会立即得到反馈(简单点说每订阅一次会有一个RACPassthroughSubscriber生成),是一一对应的。而RACSubject会收集多个订阅者(RACPassthroughSubscriber)后,一次信息的发送多个订阅者响应。
看上图,RACCommand其实就比RACSubject多了个Worker而已。接下来先看个command简单的使用
一样的先上个简单的🌰
//1
RACCommand *command = [[RACCommand alloc] initWithSignalBlock:^RACSignal *(id input) {
return [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:input];
[subscriber sendCompleted];
return [RACDisposable disposableWithBlock:^{
}];
}];
}];
//executionSignals: RACSignal
//2
[command.executionSignals subscribeNext:^(RACSignal *signal) {
NSLog(@"订阅的信号执行"); //x 是signal
}];
//3
[command.executionSignals.switchToLatest subscribeNext:^(id x) {
NSLog(@"%@", x); //x 是data
}];
//4、开始执行命令
[command execute:@"全军撤退"];
[command execute:@"回防高地"];
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(3 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
[command execute:@"发起进攻"];
});
相对来说command就要复杂很多了,第一步我们创建了一个command命令,第二步和第三步又有什么区别,为什么会有这样的区别呢?待会再说。第四步我们的command能这样连续发送执行命令么?好了,晕已。还是慢慢看吧。
- (id)initWithEnabled:(RACSignal *)enabledSignal signalBlock:(RACSignal * (^)(id input))signalBlock {
NSCParameterAssert(signalBlock != nil);
self = [super init];
if (self == nil) return nil;
//1、创建储存管理的容器(signal生成block)
_activeExecutionSignals = [[NSMutableArray alloc] init];
_signalBlock = [signalBlock copy];
//2、监听容器中block元素的改变。
RACSignal *newActiveExecutionSignals = [[[[[self
rac_valuesAndChangesForKeyPath:@keypath(self.activeExecutionSignals) options:NSKeyValueObservingOptionNew observer:nil]
reduceEach:^(id _, NSDictionary *change) {
NSArray *signals = change[NSKeyValueChangeNewKey];
if (signals == nil) return [RACSignal empty];
return [signals.rac_sequence signalWithScheduler:RACScheduler.immediateScheduler];
}]
concat]
publish]
autoconnect];
//2、这里还是处理第二步生成的signal
_executionSignals = [[[newActiveExecutionSignals
map:^(RACSignal *signal) {
return [signal catchTo:[RACSignal empty]];
}]
deliverOn:RACScheduler.mainThreadScheduler]
setNameWithFormat:@"%@ -executionSignals", self];
// `errors` needs to be multicasted so that it picks up all
// `activeExecutionSignals` that are added.
//
// In other words, if someone subscribes to `errors` _after_ an execution
// has started, it should still receive any error from that execution.
RACMulticastConnection *errorsConnection = [[[newActiveExecutionSignals
flattenMap:^(RACSignal *signal) {
return [[signal
ignoreValues]
catch:^(NSError *error) {
return [RACSignal return:error];
}];
}]
deliverOn:RACScheduler.mainThreadScheduler]
publish];
_errors = [errorsConnection.signal setNameWithFormat:@"%@ -errors", self];
[errorsConnection connect];
RACSignal *immediateExecuting = [RACObserve(self, activeExecutionSignals) map:^(NSArray *activeSignals) {
return @(activeSignals.count > 0);
}];
_executing = [[[[[immediateExecuting
deliverOn:RACScheduler.mainThreadScheduler]
// This is useful before the first value arrives on the main thread.
startWith:@NO]
distinctUntilChanged]
replayLast]
setNameWithFormat:@"%@ -executing", self];
RACSignal *moreExecutionsAllowed = [RACSignal
if:RACObserve(self, allowsConcurrentExecution)
then:[RACSignal return:@YES]
else:[immediateExecuting not]];
if (enabledSignal == nil) {
enabledSignal = [RACSignal return:@YES];
} else {
enabledSignal = [[[enabledSignal
startWith:@YES]
takeUntil:self.rac_willDeallocSignal]
replayLast];
}
_immediateEnabled = [[RACSignal
combineLatest:@[ enabledSignal, moreExecutionsAllowed ]]
and];
_enabled = [[[[[self.immediateEnabled
take:1]
concat:[[self.immediateEnabled skip:1] deliverOn:RACScheduler.mainThreadScheduler]]
distinctUntilChanged]
replayLast]
setNameWithFormat:@"%@ -enabled", self];
return self;
}
这并不是一段好理解的代码,需要绑定的东西太多了。你只需要明白这里只做了一件事,生成一个数组,并监听这个数组的任意改变事件。着重理解下executionSignals,这是一个包含信号的信号。这样一来,下面这两句代码就不难理解了👇
[command.executionSignals subscribeNext:^(RACSignal *signal) {
NSLog(@"订阅的信号执行"); //x 是signal
}];
[command.executionSignals.switchToLatest subscribeNext:^(id x) {
NSLog(@"%@", x); //x 是data
}];
前者是command中管理者抛出来的信号,后者才是订阅了我们最后生成的那个signal。 command中虽然只存了一个生成信号的block:_signalBlock = [signalBlock copy],调用signalBlock生成signal对象便交由executionSignals管理了。好了,signalBlock是什么时候调用的呢?
- (RACSignal *)execute:(id)input {
// `immediateEnabled` is guaranteed to send a value upon subscription, so
// -first is acceptable here.
BOOL enabled = [[self.immediateEnabled first] boolValue];
if (!enabled) {
NSError *error = [NSError errorWithDomain:RACCommandErrorDomain code:RACCommandErrorNotEnabled userInfo:@{
NSLocalizedDescriptionKey: NSLocalizedString(@"The command is disabled and cannot be executed", nil),
RACUnderlyingCommandErrorKey: self
}];
return [RACSignal error:error];
}
//1
RACSignal *signal = self.signalBlock(input);
NSCAssert(signal != nil, @"nil signal returned from signal block for value: %@", input);
// We subscribe to the signal on the main thread so that it occurs _after_
// -addActiveExecutionSignal: completes below.
//
// This means that `executing` and `enabled` will send updated values before
// the signal actually starts performing work.
RACMulticastConnection *connection = [[signal
subscribeOn:RACScheduler.mainThreadScheduler]
multicast:[RACReplaySubject subject]];
@weakify(self);
[self addActiveExecutionSignal:connection.signal];
[connection.signal subscribeError:^(NSError *error) {
@strongify(self);
[self removeActiveExecutionSignal:connection.signal];
} completed:^{
@strongify(self);
[self removeActiveExecutionSignal:connection.signal];
}];
[connection connect];
return [connection.signal setNameWithFormat:@"%@ -execute: %@", self, [input rac_description]];
}
当我们调用execute后,会生成signal,此时我们给executionSignals(RACSignal)订阅的事件自然会做出相应。但RACSignal中的didSubscribe并未执行。我们可以通过以下两种方式订阅这个包含在信号中的信号
[command.executionSignals subscribeNext:^(RACSignal *signal1) {
NSLog(@"订阅的信号执行1___%p", signal1); //signal p = 0x608000025160
[signal1 subscribeNext:^(id x) {
NSLog(@"%@", x);
}];
}];
[command.executionSignals.switchToLatest subscribeNext:^(id x) {
NSLog(@"%@", x); //x = 1
}];
从源码中我们只需要看出来RACCommand是一个包含了信号的信号,通过对signal的包装实现了信息的双向传递。
<b>三、bind方法</b>
前面我们分析了信息的一对一传递(RACSignal),一对多传递(RACSubject),双向传递(RACCommand)。接下来我们来看看多个信号的的绑定
RACSignal *signal1 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"bindSignal___a"];
[subscriber sendCompleted];
return nil;
}];
RACSignal *testSignal = [signal1 bind:^RACStreamBindBlock{
//RACStreamBindBlock (^block)(void) 类型的入参
return ^RACSignal* (id value, BOOL *stop){
return [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:[NSString stringWithFormat:@"%@+bindSignal___b", value]];
[subscriber sendCompleted];
return nil;
}];
};
}];
[testSignal subscribeNext:^(id x) {
NSLog(@"%@", x);
}];
bind api我们一般不直接使用,但会有很多基于bind所生成的接口。在这段示例代码中,我们创建了signal1并创建了signal2与signal1绑定,生成testSignal后订阅该signal。
- (RACSignal *)bind:(RACStreamBindBlock (^)(void))block {
NSCParameterAssert(block != NULL);
/*
* -bind: should:
*
* 1. Subscribe to the original signal of values.
* 2. Any time the original signal sends a value, transform it using the binding block.
* 3. If the binding block returns a signal, subscribe to it, and pass all of its values through to the subscriber as they're received.
* 4. If the binding block asks the bind to terminate, complete the _original_ signal.
* 5. When _all_ signals complete, send completed to the subscriber.
*
* If any signal sends an error at any point, send that to the subscriber.
*/
//1、这里我们只需要知道传入的block是一个可以生成signal2的块。返回的是合并后的signal。这里和command对热信号的封装很相似。只是command是以对象的形式存储,这里是以block的形式存储
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACStreamBindBlock bindingBlock = block();
//2、bindingBlock 是一个执行即能生成一个signal的block
NSMutableArray *signals = [NSMutableArray arrayWithObject:self];
RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];
//结束处理
void (^completeSignal)(RACSignal *, RACDisposable *) = ^(RACSignal *signal, RACDisposable *finishedDisposable) {
BOOL removeDisposable = NO;
@synchronized (signals) {
[signals removeObject:signal];
if (signals.count == 0) {
[subscriber sendCompleted];
[compoundDisposable dispose];
} else {
removeDisposable = YES;
}
}
if (removeDisposable) [compoundDisposable removeDisposable:finishedDisposable];
};
//添加处理
void (^addSignal)(RACSignal *) = ^(RACSignal *signal) {
@synchronized (signals) {
[signals addObject:signal];
}
RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init];
[compoundDisposable addDisposable:selfDisposable];
//6、signal2被订阅
RACDisposable *disposable = [signal subscribeNext:^(id x) {
[subscriber sendNext:x];
} error:^(NSError *error) {
[compoundDisposable dispose];
[subscriber sendError:error];
} completed:^{
@autoreleasepool {
completeSignal(signal, selfDisposable);
}
}];
selfDisposable.disposable = disposable;
};
@autoreleasepool {
RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init];
[compoundDisposable addDisposable:selfDisposable];
//3、订阅合并后的signal后,开始订阅第一个signal。
RACDisposable *bindingDisposable = [self subscribeNext:^(id x) {
// Manually check disposal to handle synchronous errors.
if (compoundDisposable.disposed) return;
BOOL stop = NO;
//4、接收到第一个signal后,调用传入的block生成第二个signal。
id signal = bindingBlock(x, &stop);
@autoreleasepool {
//5、到这里还有signal2没有被订阅了,只能在addSignal(signal)里了
if (signal != nil) addSignal(signal);
if (signal == nil || stop) {
[selfDisposable dispose];
completeSignal(self, selfDisposable);
}
}
} error:^(NSError *error) {
[compoundDisposable dispose];
[subscriber sendError:error];
} completed:^{
@autoreleasepool {
completeSignal(self, selfDisposable);
}
}];
selfDisposable.disposable = bindingDisposable;
}
return compoundDisposable;
}] setNameWithFormat:@"[%@] -bind:", self.name];
}
我们来看下bind的执行顺序,首先调用signal1的bind方法,传入的并不是signal,而是一个调用即可生成一个signal的block。上面的代码中直接返回的是合并后的testSignal。当testSignal被订阅后,上面的这段didSubscribe代码块才开始执行,跳过预处理代码直接看第3步[self subscribeNext:], 这里的self其实是signal1,直接订阅了signal1,获取到signal1传递来的消息(id x)后,将其当作“signal2生成block”的入参,对应的就是前面示例中的value了。
至此signal2只是生成了,而并未被订阅。再看第5、6步了,在addSignal 里面直接订阅了signal2,并通过[subscriber sendNext:x];把消息发送给了testSignal的订阅者。
这样一来总结下就成了如下顺序了:
生成一个包含[signal1,signal2生成block]处理逻辑的signal->订阅signal->订阅signal1->生成signal2->订阅signal2。
如果多次订阅testSignal, signal1对象一直是那个signal1,而signal2已不再是那个signal2了。
<b>四、踩坑</b>
1、RACObserver
RACSignal *signal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) { //1
MTModel *model = [[MTModel alloc] init]; // MTModel有一个名为的title的属性
[subscriber sendNext:model];
[subscriber sendCompleted];
return nil;
}];
self.flattenMapSignal = [signal flattenMap:^RACStream *(MTModel *model) { //2
return RACObserve(model, title);
}];
[self.flattenMapSignal subscribeNext:^(id x) { //3
NSLog(@"subscribeNext - %@", x);
}];
这里就不绕圈子了,RACObserver是会造成循环引用的,直接看RACObserver源码👇
#define RACObserve(TARGET, KEYPATH) \
({ \
_Pragma("clang diagnostic push") \
_Pragma("clang diagnostic ignored \"-Wreceiver-is-weak\"") \
__weak id target_ = (TARGET); \
[target_ rac_valuesForKeyPath:@keypath(TARGET, KEYPATH) observer:self]; \
_Pragma("clang diagnostic pop") \
})
好了,找到self就懂,不废话。
2、sendComplete的调用问题(或者说RACSignal与RACSubject的持有问题)
//不会有任何问题
RACSignal *signal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"next"];
return nil;
}];
[signal subscribeNext:^(id x) {
NSLog(@"%@", x);
}];
//会导致subject的不释放
RACSubject *subject = [RACSubject subject];
[subject subscribeNext:^(id x) {
NSLog(@"%@", x);
}];
[subject sendNext:@"next"];
subscriber(: RACPassthroughSubscriber)持有property:@[signal, subscriber, disposable];
signal仅持有didSubscribe,并没有持有任何subscriber
subject(:RACSignal)持有订阅者数组:@[subscriber1(: RACPassthroughSubscriber),
subscriber2(: RACPassthroughSubscriber),
subscriber3(: RACPassthroughSubscriber)]。
而数组中的每一个订阅者都是持有signal, subscriber, disposable三项的。
所以其实这里的循环引用一直都在,不要说用weak,这里是必须用强引用的。好了,ReactiveCocoa只能自己处理内存问题了(也必须是这样)。由于sendNext:事件是可以多次触发的,唯一的解决方案只有用sendComplelte了。如果还不懂的话,可以回头看前面对RACSignal和RACSubject源码的分析就通透了。