信号类还提供了一列方法,可以对信号传递的数据进行操作。
1.map:方法
map:方法可以将信号传递的数据进行映射和转换,demo如下:
RACSignal *signal = [RACSignal create:^(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"test"]; //传递"test"
}];
RACSignal *mappedSignal = [signal map:^id(NSString *value) {
return @(value.length); //映射成"test"字符串的长度
}];
[mappedSignal subscribeNext:^(id x) {
NSInteger len = [(NSNumber *)x integerValue];
NSLog(@"%@",@(len)); //输出结果,结果是4
} completed:^{
NSLog(@"completed");
}];
上一篇已经分析了create:方法,创建了一个RACDynamicSignal类型的信号signal,当执行订阅subscriber时会触发didSubscribe回调,调用[subscriber sendNext:@"test"]。本例不是由signal直接订阅subscriber,而是map方法后新的mappedSignal对象订阅。map:方法如下:
- (RACSignal *)map:(id (^)(id value))block {
return [super map:block]; //调用父类(RACStrem)的map:方法
}
- (instancetype)map:(id (^)(id value))block {
NSCParameterAssert(block != nil);
Class class = self.class;
//调用flattenMap:方法
return [[self flattenMap:^(id value) {
return [class return:block(value)];
}] setNameWithFormat:@"[%@] -map:", self.name];
}
首先调用父类RACStrem的map:方法,接着调用flattenMap:方法,该方法默认由RACStrem类实现,如下:
- (instancetype)flattenMap:(RACStream * (^)(id value))block {
Class class = self.class;
return [[self bind:^{
return ^(id value, BOOL *stop) {
id stream = block(value) ?: [class empty];
NSCAssert([stream isKindOfClass:RACStream.class], @"Value returned from -flattenMap: is not a stream: %@", stream);
return stream;
};
}] setNameWithFormat:@"[%@] -flattenMap:", self.name];
}
实质是通过bind方法创建一个新的信号。但是由于子类RACSignal重现了flattenMap:方法,故不会调用bind:方法,而是调用如下方法:
- (RACSignal *)flattenMap:(RACSignal * (^)(id value))signalBlock {
//创建一个RACDynamicSignal,订阅subscriber时会触发didSubscribe回调
return [[RACSignal
create:^(id<RACSubscriber> subscriber) {
__block volatile int32_t subscriptions = 0;
//1.定义一个回调subscribeSignal,传入signal和nextBlock作为参数
void (^subscribeSignal)(RACSignal *, void (^)(id)) = ^(RACSignal *signal, void (^nextBlock)(id)) {
__block RACDisposable *savedDisposable;
//订阅信号
[signal subscribeSavingDisposable:^(RACDisposable *disposable) {
savedDisposable = disposable;
OSAtomicIncrement32(&subscriptions);
[subscriber.disposable addDisposable:savedDisposable];
} next:nextBlock error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
[subscriber.disposable removeDisposable:savedDisposable];
if (OSAtomicDecrement32(&subscriptions) == 0) [subscriber sendCompleted];
}];
};
//2.触发subscribeSignal回调,传入信号和nextBlock
subscribeSignal(self, ^(id x) {
//创建一个新的signal
RACSignal *innerSignal = signalBlock(x);
if (innerSignal == nil) return;
NSCAssert([innerSignal isKindOfClass:RACSignal.class], @"Expected a RACSignal, got %@", innerSignal);
//3.再次调用subscribeSignal回调,传入新的innerSignal和nextBlock
subscribeSignal(innerSignal, ^(id x) {
[subscriber sendNext:x]; //4.初始订阅者的next事件
});
});
}]
setNameWithFormat:@"[%@] -flattenMap:", self.name];
}
这是一个比较长的方法,根据注释,我们将它分成4个部分分析。
1.首先我们将demo中调用flattenMap:方法的信号对象记为signal,调用create:方法返回一个新的RACDynamicSignal,记为mappedSignal。
2.mappedSignal在demo中调用subscribeNext:completed:方法时,创建一个subscriber,记为subscriber0,同时触发mappedSignal的didSubscribe回调,进入步骤1。
3.首先定义一个subscribeSignal方法,传入两个参数,分别是要订阅的信号和订阅时执行的nextBlock,然后进行signal的订阅。
4.调用subscribeSignal方法,传入signal,触发signal的didSubscribe逻辑,在demo中是传递触发next事件,传入"test",如下:
RACSignal *signal = [RACSignal create:^(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"test"]; //传递"test"
}];
5.传回的参数x为"test",然后执行signalBlock(x),signalBlock由上一级map:方法传入,返回一个RACReturnSignal对象,内部value值是外层转化后的值,即字符串"test"的长度4,如下:
- (instancetype)map:(id (^)(id value))block {
NSCParameterAssert(block != nil);
Class class = self.class;
return [[self flattenMap:^(id value) {
return [class return:block(value)]; //返回一个新的RACReturnSignal对象,value是block转化后的值,即为4
}] setNameWithFormat:@"[%@] -map:", self.name];
}
6.RACReturnSignal对象就是innerSignal,然后再次调用subscribeSignal方法,传入innerSignal和相应的nextBlock。前一篇文章分析过,对RACReturnSignal类型的对象innerSignal订阅,attachSubscriber:方法会直接触发nextBlock,传递value值,即4。最后在nextBlock中触发初始订阅者subscriber0的next事件,传递4,并输出。
2.filter:方法
filter:方法可以过滤信号的值,来决定后续数据是否继续传递,demo如下:
RACSignal *signal = [RACSignal create:^(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"test"];
}];
RACSignal *filteredSignal = [signal filter:^BOOL(NSString *value) {
if (value.length>=4) { //字符串字数大于等于4,信号继续传递
return YES;
} else { //字符串字数小于4,信号终止传递
return NO;
}
}];
[filteredSignal subscribeNext:^(id x) {
NSString *str = (NSString *)x;
NSLog(str); //输出"test"
} completed:^{
NSLog(@"completed");
}];
在filter:方法里面通过返回YES或者NO来决定信号数据是否继续传递,demo中根据字符串的个数来判断。如果返回YES,则输出字符串,否则不输出。filter:方法如下:
- (RACSignal *)filter:(BOOL (^)(id value))block {
return [super filter:block]; //调用基类RACStream的filter:方法
}
- (instancetype)filter:(BOOL (^)(id value))block {
NSCParameterAssert(block != nil);
Class class = self.class;
//RACSignal的flatternMap:方法
return [[self flattenMap:^ id (id value) {
if (block(value)) {
return [class return:value];
} else {
return class.empty;
}
}] setNameWithFormat:@"[%@] -filter:", self.name];
}
调用基类RACStream的filter:方法,和map:方法类似,最终也会调用flattenMap:方法,不同的是,filter:方法传递的block逻辑不一样,如果外层block返回YES,则返回一个RACReturnSignal,内部value仍然是原值,如果外层返回NO,则返回一个RACEmptySignal。
回顾一下flattenMap:方法内部处理逻辑,如下:
- (RACSignal *)flattenMap:(RACSignal * (^)(id value))signalBlock {
return [[RACSignal
create:^(id<RACSubscriber> subscriber) {
__block volatile int32_t subscriptions = 0;
void (^subscribeSignal)(RACSignal *, void (^)(id)) = ^(RACSignal *signal, void (^nextBlock)(id)) {
__block RACDisposable *savedDisposable;
[signal subscribeSavingDisposable:^(RACDisposable *disposable) {
savedDisposable = disposable;
OSAtomicIncrement32(&subscriptions);
[subscriber.disposable addDisposable:savedDisposable];
} next:nextBlock error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
//RACReturnSignal和RACEmptySignal在attachSubscriber都会触发completed事件
[subscriber.disposable removeDisposable:savedDisposable];
if (OSAtomicDecrement32(&subscriptions) == 0) {
[subscriber sendCompleted];
}
}];
};
subscribeSignal(self, ^(id x) {
RACSignal *innerSignal = signalBlock(x);
if (innerSignal == nil) return;
NSCAssert([innerSignal isKindOfClass:RACSignal.class], @"Expected a RACSignal, got %@", innerSignal);
subscribeSignal(innerSignal, ^(id x) {
[subscriber sendNext:x];
});
});
}]
setNameWithFormat:@"[%@] -flattenMap:", self.name];
}
如果innerSignal是RACReturnSignal对象,则再次调用subscribeSignal方法,逻辑和map:方法一样。如果innerSignal是RACEmptySignal对象,则再次调用subscribeSignal方法,信号订阅时,会触发RACEmptySignal的attachSubscriber:方法,如下:
- (void)attachSubscriber:(RACLiveSubscriber *)subscriber {
NSCParameterAssert(subscriber != nil);
[subscriber sendCompleted]; //completed事件
}
只会触发completed事件,不会触发subscriber的next事件,也就不会触发初始订阅者subscriber0的next事件,不会输出。从而实现filter过滤信号数据的作用。
ignore:方法是在filter:方法的基础上封装了一下,实现指定信号数据的忽略。
- (instancetype)ignore:(id)value {
return [[self filter:^ BOOL (id innerValue) {
return innerValue != value && ![innerValue isEqual:value];
}] setNameWithFormat:@"[%@] -ignore: %@", self.name, [value rac_description]];
}
3.concat:方法
concat:方法用于将若干个signal按顺序连接,signal按照先后顺序依次订阅subscriber。demo如下:
RACSignal *signal1 = [RACSignal create:^(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"one"];//发送next事件
[subscriber sendCompleted];//发送completed事件
}];
RACSignal *signal2 = [RACSignal create:^(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"two"];//发送next事件
[subscriber sendCompleted];//发送completed事件
}];
[[signal1 concat:signal2] subscribeNext:^(id x) {
NSString *str = (NSString *)x;
NSLog(str); //输出字符串
} completed:^{
NSLog(@"completed");//输出"completed"
}];
执行demo,输出的顺序是"one","two","completed"。分析concat:方法,如下:
- (RACSignal *)concat:(RACSignal *)signal {
return [[RACSignal create:^(id<RACSubscriber> subscriber) {
[self subscribeSavingDisposable:^(RACDisposable *disposable) {
[subscriber.disposable addDisposable:disposable];
} next:^(id x) {
[subscriber sendNext:x]; //触发next事件
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
[signal subscribe:subscriber]; //在completed事件回调中触发下一个signal的订阅逻辑
}];
}] setNameWithFormat:@"[%@] -concat: %@", self.name, signal];
}
在demo中,首先signal1被订阅,sendNext:方法触发next事件,将数据传递给subscriber,输出"one"。然后sendCompleted:方法,触发completed事件,引起下一个信号对象signal2的订阅逻辑。如果signal1不调用sendCompleted方法,则结束流程。subscriber订阅signal2后,sendNext:方法触发next事件,输出字符串"two",sendCompleted方法触发completed事件,输出"completed"。
4.merge:方法
merge:方法将多个信号连接起来,实现多个信号依次被订阅。demo如下:
RACSignal *signal1 = [RACSignal create:^(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"test1"];
}];
RACSignal *signal2 = [RACSignal create:^(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"test2"];
}];
RACSignal *mergeSignal = [signal1 merge:signal2];
[mergeSignal subscribeNext:^(id x) {
NSLog(@"%@",x); //依次输出"test1"、"test2"
}];
mergerSignal订阅subscriber时,依次执行signal1和signal2的didSubscribe回调,输出"test1","test2"。
merge:方法实现如下:
- (RACSignal *)merge:(RACSignal *)signal {
return [[RACSignal
merge:@[ self, signal ]]
setNameWithFormat:@"[%@] -merge: %@", self.name, signal];
}
+ (RACSignal *)merge:(id<NSFastEnumeration>)signals {
NSMutableArray *copiedSignals = [[NSMutableArray alloc] init];
for (RACSignal *signal in signals) {
[copiedSignals addObject:signal];
}
return [[copiedSignals.rac_signal flatten] setNameWithFormat:@"+merge: %@", copiedSignals];
}
首先将signal1和signal2存入一个数组中,然后执行NSArray的rac_signal方法,创建一个RACDynamicSignal,订阅时,依次触发next事件,并且将signal1和signal2作为参数传递。
- (RACSignal *)rac_signal {
NSArray *collection = [self copy];
return [[RACSignal create:^(id<RACSubscriber> subscriber) {
for (id obj in collection) {
[subscriber sendNext:obj];
if (subscriber.disposable.disposed) return;
}
[subscriber sendCompleted];
}] setNameWithFormat:@"%@ -rac_signal", self.rac_description];
}
flatten:方法调用flattenMap:方法,根据上文对flattenMap:方法的分析,首先依次触发next事件,传递signal1和signal2,然后调用signalBlock,返回value。然后调用subscribeSignal()方法,触发next事件,依次输出test1和test2。
- (instancetype)flatten {
__weak RACStream *stream __attribute__((unused)) = self;
return [[self flattenMap:^(id value) {
return value;
}] setNameWithFormat:@"[%@] -flatten", self.name];
}
5.zipWith:方法
zipWith:方法的作用是将两个信号发出的数据压缩成一个传递,即subscriber调用一次sendNext,demo如下:
RACSignal *signal1 = [RACSignal create:^(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"test1"];
[subscriber sendCompleted];
}];
RACSignal *signal2 = [RACSignal create:^(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"test2"];
[subscriber sendCompleted];
}];
RACSignal *zipSignal = [signal1 zipWith:signal2];
[zipSignal subscribeNext:^(id x) {
RACTuple *tuple = (RACTuple *)x;
for (NSString *value in tuple.array) {
NSLog(@"%@",value);
}
} completed:^{
NSLog(@"completed");
}];
signal1和signal2被zipWith:方法压缩成一个信号zipSignal,输出的内容是一个RACTuple对象,包含signal1和signal2发送的数据"test1"和"test2",依次输出。下面分析一下zipWith:方法:
- (RACSignal *)zipWith:(RACSignal *)signal {
NSCParameterAssert(signal != nil);
//创建一个RACDynamic信号
return [[RACSignal create:^(id<RACSubscriber> subscriber) {
//管理signal1的数据集合
__block BOOL selfCompleted = NO;
NSMutableArray *selfValues = [NSMutableArray array];
//管理signal2的数据集合
__block BOOL otherCompleted = NO;
NSMutableArray *otherValues = [NSMutableArray array];
//是否触发subscriber的completed事件
void (^sendCompletedIfNecessary)(void) = ^{
@synchronized (selfValues) {
BOOL selfEmpty = (selfCompleted && selfValues.count == 0);
BOOL otherEmpty = (otherCompleted && otherValues.count == 0);
if (selfEmpty || otherEmpty) [subscriber sendCompleted];
}
};
//是否触发subscriber的next事件
void (^sendNext)(void) = ^{
@synchronized (selfValues) {
if (selfValues.count == 0) return;
if (otherValues.count == 0) return;
RACTuple *tuple = [RACTuple tupleWithObjects:selfValues[0], otherValues[0], nil];
[selfValues removeObjectAtIndex:0];
[otherValues removeObjectAtIndex:0];
[subscriber sendNext:tuple];
sendCompletedIfNecessary();
}
};
//signal1被订阅
[self subscribeSavingDisposable:^(RACDisposable *disposable) {
[subscriber.disposable addDisposable:disposable];
} next:^(id x) {
@synchronized (selfValues) {
[selfValues addObject:x ?: RACTupleNil.tupleNil];
sendNext();
}
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
@synchronized (selfValues) {
selfCompleted = YES;
sendCompletedIfNecessary();
}
}];
//signal2被订阅
[signal subscribeSavingDisposable:^(RACDisposable *disposable) {
[subscriber.disposable addDisposable:disposable];
} next:^(id x) {
@synchronized (selfValues) {
[otherValues addObject:x ?: RACTupleNil.tupleNil];
sendNext();
}
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
@synchronized (selfValues) {
otherCompleted = YES;
sendCompletedIfNecessary();
}
}];
}] setNameWithFormat:@"[%@] -zipWith: %@", self.name, signal];
}
该方法包含如下步骤:
1、维护selfValues和otherValues两个数组,里面分别包含signal1和signal2发出的信号数据,以及selfCompleted和otherCompleted两个变量,表示信号响应是否完成。
2、signal1订阅,触发next事件,将"test1"加入selfValues,触发sendNext()方法,由于此时otherValues尚没有数据,直接return。signal1的触发subscriber的completed事件,selfCompleted变为YES。
3、signal2订阅,触发next事件,将"test2"加入otherValues,触发sendNext()方法,将selfValues和otherValues数组中的第一个元素去除,拼成一个RACTuple对象,作为信号数据传递给subscriber的next事件,输出tuple中的各个元素。
4、signal2触发completed事件,otherCompleted变为YES。调用sendCompletedIfNecessary(),由于selfCompleted或者otherCompleted变量置为Yes,且selfValues和otherValues数组中元素为空,触发subscriber的completed事件,输出"completed"。