RACSignal的子类
RACReturnSignal
这个子类只实现了两个方法:
+ (RACSignal *)return:(id)value {
#ifndef DEBUG
// In release builds, use singletons for two very common cases.
if (value == RACUnit.defaultUnit) {
static RACReturnSignal *unitSingleton;
static dispatch_once_t unitPred;
dispatch_once(&unitPred, ^{
unitSingleton = [[self alloc] init];
unitSingleton->_value = RACUnit.defaultUnit;
});
return unitSingleton;
} else if (value == nil) {
static RACReturnSignal *nilSingleton;
static dispatch_once_t nilPred;
dispatch_once(&nilPred, ^{
nilSingleton = [[self alloc] init];
nilSingleton->_value = nil;
});
return nilSingleton;
}
#endif
RACReturnSignal *signal = [[self alloc] init];
signal->_value = value;
#ifdef DEBUG
[signal setNameWithFormat:@"+return: %@", value];
#endif
return signal;
}
根据传入的值为nil或者是RACUnit的单例对象来返回两个单例信号。
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);
return [RACScheduler.subscriptionScheduler schedule:^{
[subscriber sendNext:self.value];
[subscriber sendCompleted];
}];
}
重写父类的设置发送信息的类,在发送自己的信息后立马发送完成信号。
RACErrorSignal
+ (RACSignal *)error:(NSError *)error {
RACErrorSignal *signal = [[self alloc] init];
signal->_error = error;
#ifdef DEBUG
[signal setNameWithFormat:@"+error: %@", error];
#else
signal.name = @"+error:";
#endif
return signal;
}
返回一个error信号
#pragma mark Subscription
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);
return [RACScheduler.subscriptionScheduler schedule:^{
[subscriber sendError:self.error];
}];
}
设置发送对象后直接发送一个失败信号
RACEmptySignal
+ (RACSignal *)empty {
#ifdef DEBUG
// Create multiple instances of this class in DEBUG so users can set custom
// names on each.
return [[[self alloc] init] setNameWithFormat:@"+empty"];
#else
static id singleton;
static dispatch_once_t pred;
dispatch_once(&pred, ^{
singleton = [[self alloc] init];
});
return singleton;
#endif
}
返回一个单例
#pragma mark Subscription
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);
return [RACScheduler.subscriptionScheduler schedule:^{
[subscriber sendCompleted];
}];
}
直接完成
RACDynamicSignal
这个可以说是一个是一个冷信号类吧
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
if (self.didSubscribe != NULL) {
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
//
RACDisposable *innerDisposable = self.didSubscribe(subscriber);
[disposable addDisposable:innerDisposable];
}];
[disposable addDisposable:schedulingDisposable];
}
return disposable;
}
主要就是设置一个接受subscriber和自身的RACPassthroughSubscriber,并在里面实现转发,并在当前线程或后台线程直接调用create时设置的block:
RACDisposable *innerDisposable = self.didSubscribe(subscriber);
RACChannelTerminal
用来实现通道终端,代表 RACChannel 的一个终端,用来实现双向绑定。它的实现在RACChannel.h中。
RACChannelTerminal
初始化
@interface RACChannelTerminal<ValueType> ()
/// The values for this terminal.
@property (nonatomic, strong, readonly) RACSignal<ValueType> *values;
/// A subscriber will will send values to the other terminal.
@property (nonatomic, strong, readonly) id<RACSubscriber> otherTerminal;
- (instancetype)initWithValues:(RACSignal<ValueType> *)values otherTerminal:(id<RACSubscriber>)otherTerminal;
@end
- (instancetype)initWithValues:(RACSignal *)values otherTerminal:(id<RACSubscriber>)otherTerminal {
NSCParameterAssert(values != nil);
NSCParameterAssert(otherTerminal != nil);
self = [super init];
_values = values;
_otherTerminal = otherTerminal;
return self;
}
方法
return [self.values subscribe:subscriber];
}
#pragma mark <RACSubscriber>
- (void)sendNext:(id)value {
[self.otherTerminal sendNext:value];
}
- (void)sendError:(NSError *)error {
[self.otherTerminal sendError:error];
}
- (void)sendCompleted {
[self.otherTerminal sendCompleted];
}
- (void)didSubscribeWithDisposable:(RACCompoundDisposable *)disposable {
[self.otherTerminal didSubscribeWithDisposable:disposable];
}
其实就是在发送消息的时候,让另一个热信号发消息,本身持有的热信号接受订阅,如图
RACSubject(热信号)
RACSubject是rac中的热信号类。他一共有三个子类:
- RACBehaviorSubject(重演最后值的信号,当被订阅时,会向订阅者发送它最后接收到的值;)
- RACReplaySubject(重演信号,保存发送过的值,当被订阅时,会向订阅者重新发送这些值。)
- RACGroupedSignal(分组信号,用来实现 RACSignal 的分组功能)
现在我们就来分析下他们的实现:
RACBehaviorSubject
初始化方法
+ (instancetype)behaviorSubjectWithDefaultValue:(nullable ValueType)value
这个子类中添加了一个属性
@property (nonatomic, strong) ValueType currentValue;
这个初始化发放可以用来设置默认值
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber
在调用后直接给订阅者发送currentValue
- (void)sendNext:(id)value
- (void)sendNext:(id)value {
@synchronized (self) {
self.currentValue = value;
[super sendNext:value];
}
}
在发送一次消息后改变currentValue
RACReplaySubject
初始化方法
+ (instancetype)replaySubjectWithCapacity:(NSUInteger)capacity;
设置一个默认缓存数
- (instancetype)initWithCapacity:(NSUInteger)capacity {
self = [super init];
_capacity = capacity;
_valuesReceived = (capacity == RACReplaySubjectUnlimitedCapacity ? [NSMutableArray array] : [NSMutableArray arrayWithCapacity:capacity]);
return self;
}
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber
RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
@synchronized (self) {
for (id value in self.valuesReceived) {
if (compoundDisposable.disposed) return;
[subscriber sendNext:(value == RACTupleNil.tupleNil ? nil : value)];
}
if (compoundDisposable.disposed) return;
if (self.hasCompleted) {
[subscriber sendCompleted];
} else if (self.hasError) {
[subscriber sendError:self.error];
} else {
RACDisposable *subscriptionDisposable = [super subscribe:subscriber];
[compoundDisposable addDisposable:subscriptionDisposable];
}
}
}];
[compoundDisposable addDisposable:schedulingDisposable];
在订阅者调用的时候,遍历有效数据,重新发送一边数据给这个订阅者,如果成功,发送成功,如果失败发送失败
RACSubscriber
- (void)sendNext:(id)value {
@synchronized (self) {
[self.valuesReceived addObject:value ?: RACTupleNil.tupleNil];
[super sendNext:value];
if (self.capacity != RACReplaySubjectUnlimitedCapacity && self.valuesReceived.count > self.capacity) {
[self.valuesReceived removeObjectsInRange:NSMakeRange(0, self.valuesReceived.count - self.capacity)];
}
}
}
- (void)sendCompleted {
@synchronized (self) {
self.hasCompleted = YES;
[super sendCompleted];
}
}
- (void)sendError:(NSError *)e {
@synchronized (self) {
self.hasError = YES;
self.error = e;
[super sendError:e];
}
}
一系列方法,自己看吧
RACGroupedSignal
@interface RACGroupedSignal : RACSubject
/// The key shared by the group.
@property (nonatomic, readonly, copy) id<NSCopying> key;
+ (instancetype)signalWithKey:(id<NSCopying>)key;
@end
@interface RACGroupedSignal ()
@property (nonatomic, copy) id<NSCopying> key;
@end
@implementation RACGroupedSignal
#pragma mark API
+ (instancetype)signalWithKey:(id<NSCopying>)key {
RACGroupedSignal *subject = [self subject];
subject.key = key;
return subject;
}
@end