-
概述
Proto DataStore 将数据作为自定义数据类型的实例进行存储。此实现要求您使用协议缓冲区来定义架构,但可以确保类型安全。
和Preferences DataStore不同的是,使用Proto DataStore会比较繁琐,需要使用proto语法预定义数据,优点是可以确保类型安全。
这里说的确保类型安全其实就是会按照.proto文件中定义的数据类型来读取写入对象,因为是事先定义,并且整个读取写入操作都被封装好,少了人为的读取写入,就达到了所谓的确保类型安全的目的。其实Preferences DataStore在使用过程中也是“确保”了类型安全的,不过这里的类型安全确保值得是人为的确保,通常对于一个属性或对象我们会定义一组特定类型的读取写入方法,使用的时候调取相关方法就好,很少情况下会出现类型不匹配错误。
-
配置
在使用Proto DataStore时,依赖于proto buffer的插件来根据proto文件自动生成相关实体类,只不过这些类只存在于编译运行期,并不属于用户代码,自动生成也是避免人为出错。
<font color="red">注意:下面的配置版本是适配的,使用的是Studio4.2.1,使用更高或更低版本需要选择适配版本,我这里根据google开发者中国文档和网上的博文一起整理的,没找到官方的完整配置。</font>
-
在项目build.gradle中
buildscript { ext.kotlin_version = "1.4.21" //这里的repositories是配置用于当前buildscript中的dependencies中依赖项的查找仓库 repositories { jcenter() google() mavenCentral() } dependencies { classpath "com.android.tools.build:gradle:4.0.2" classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version" classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.12' // NOTE: Do not place your application dependencies here; they belong // in the individual module build.gradle files } }
-
Module的build.gradle中
apply plugin: 'com.google.protobuf' protobuf { protoc { artifact = "com.google.protobuf:protoc:3.10.0" } // Generates the java Protobuf-lite code for the Protobufs in this project. See // https://github.com/google/protobuf-gradle-plugin#customizing-protobuf-compilation // for more information. generateProtoTasks { all().each { task -> task.builtins { java { option 'lite' } } } } } dependencies{ implementation "androidx.datastore:datastore:1.0.0" implementation "com.google.protobuf:protobuf-javalite:3.10.0" }
-
sync下载依赖项和插件,然后定义.proto文件
syntax = "proto3"; option java_package = "com.mph.review.bean.plain"; option java_multiple_files = true; message Demo2 { int32 aa = 1; string bb = 2; }
注意proto文件要放在src/main/proto文件夹下。
-
最后ReBuild项目,会在build>generated>source>proto>debug或者release>“java_package指定的包路径”中找到自动生成的类,比如上面的proto文件会生成:
-
-
代码使用
-
定义自己的androidx.datastore.core.Serializer
class MyDemo2Serializer(override val defaultValue : Demo2) : Serializer<Demo2> { override suspend fun readFrom(input : InputStream) : Demo2 { try { return Demo2.parseFrom(input) } catch(e : InvalidProtocolBufferException) { throw e } } override suspend fun writeTo(t : Demo2, output : OutputStream) { t.writeTo(output) } }
这里其实就是调用生成的Demo2类的相关方法进行实例化读取和写入。
-
创建proto的操作对象
val Context.demo2Proto by dataStore("demo2.proto",MyDemo2Serializer(Demo2.getDefaultInstance()))
我们使用kotlin的委托方法dataStore来生成,需要传入proto文件名,还有上面定义的Serializer。
-
读取写入
suspend fun incrementCounterByProto() { demo2Proto.updateData { currentSettings -> currentSettings.toBuilder() .setAa(currentSettings.aa + 1) .setBb(currentSettings.bb + "New") .build() } } private fun testProtoDataStore() { val aaFlow : Flow<Int> = demo2Proto.data.map { settings -> // The exampleCounter property is generated from the proto schema. settings.aa } }
-
-
源码分析
首先看一下构造:
public fun <T> dataStore( fileName: String, serializer: Serializer<T>, corruptionHandler: ReplaceFileCorruptionHandler<T>? = null, produceMigrations: (Context) -> List<DataMigration<T>> = { listOf() }, scope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob()) ): ReadOnlyProperty<Context, DataStore<T>> { return DataStoreSingletonDelegate( fileName, serializer, corruptionHandler, produceMigrations, scope ) }
返回的是DataStoreSingletonDelegate,它继承自ReadOnlyProperty:
public fun interface ReadOnlyProperty<in T, out V> { /** * Returns the value of the property for the given object. * @param thisRef the object for which the value is requested. * @param property the metadata for the property. * @return the property value. */ public operator fun getValue(thisRef: T, property: KProperty<*>): V }
可以看到,这里需要operator重写getValue方法,即:
@GuardedBy("lock") @Volatile private var INSTANCE: DataStore<T>? = null /** * Gets the instance of the DataStore. * * @param thisRef must be an instance of [Context] * @param property not used */ override fun getValue(thisRef: Context, property: KProperty<*>): DataStore<T> { return INSTANCE ?: synchronized(lock) { if (INSTANCE == null) { val applicationContext = thisRef.applicationContext INSTANCE = DataStoreFactory.create( serializer = serializer, produceFile = { applicationContext.dataStoreFile(fileName) }, corruptionHandler = corruptionHandler, migrations = produceMigrations(applicationContext), scope = scope ) } INSTANCE!! } }
当使用DataStoreSingletonDelegate的时候其实就是使用INSTANCE,也就是DataStore,DataStore接口有一个属性和一个方法,分别用于读取和写入操作,具体的读取和写入逻辑交给不同的子类实现:
public interface DataStore<T> { /** * Provides efficient, cached (when possible) access to the latest durably persisted state. * The flow will always either emit a value or throw an exception encountered when attempting * to read from disk. If an exception is encountered, collecting again will attempt to read the * data again. * * Do not layer a cache on top of this API: it will be be impossible to guarantee consistency. * Instead, use data.first() to access a single snapshot. * * @return a flow representing the current state of the data * @throws IOException when an exception is encountered when reading data */ public val data: Flow<T> /** * Updates the data transactionally in an atomic read-modify-write operation. All operations * are serialized, and the transform itself is a coroutine so it can perform heavy work * such as RPCs. * * The coroutine completes when the data has been persisted durably to disk (after which * [data] will reflect the update). If the transform or write to disk fails, the * transaction is aborted and an exception is thrown. * * @return the snapshot returned by the transform * @throws IOException when an exception is encountered when writing data to disk * @throws Exception when thrown by the transform function */ public suspend fun updateData(transform: suspend (t: T) -> T): T }
applicationContext.dataStoreFile(fileName)会创建一个File实例用于保存数据:
public fun Context.dataStoreFile(fileName: String): File = File(applicationContext.filesDir, "datastore/$fileName")
DataStoreFactory.create会返回一个SingleProcessDataStore实例:
public fun <T> create( serializer: Serializer<T>, corruptionHandler: ReplaceFileCorruptionHandler<T>? = null, migrations: List<DataMigration<T>> = listOf(), scope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob()), produceFile: () -> File ): DataStore<T> = SingleProcessDataStore( produceFile = produceFile, serializer = serializer, corruptionHandler = corruptionHandler ?: NoOpCorruptionHandler(), initTasksList = listOf(DataMigrationInitializer.getInitializer(migrations)), scope = scope )
接下来我们先看读取操作,上面的读取操作会返回一个Flow对象,看一下SingleProcessDataStore的data属性重写:
override val data: Flow<T> = flow { val currentDownStreamFlowState = downstreamFlow.value if (currentDownStreamFlowState !is Data) { // We need to send a read request because we don't have data yet. actor.offer(Message.Read(currentDownStreamFlowState)) } emitAll( downstreamFlow.dropWhile { if (currentDownStreamFlowState is Data<T> || currentDownStreamFlowState is Final<T> ) { // We don't need to drop any Data or Final values. false } else { // we need to drop the last seen state since it was either an exception or // wasn't yet initialized. Since we sent a message to actor, we *will* see a // new value. it === currentDownStreamFlowState } }.map { when (it) { is ReadException<T> -> throw it.readException is Final<T> -> throw it.finalException is Data<T> -> it.value is UnInitialized -> error( "This is a bug in DataStore. Please file a bug at: " + "https://issuetracker.google.com/issues/new?" + "component=907884&template=1466542" ) } } ) }
downstreamFlow.value是最新操作的数据,读取和写入都会重新更新downstreamFlow.value的值,这里判断如果不是Data(即可读取数据),则调用actor.offer方法来读取数据,看一下actor是什么:
private val actor = SimpleActor<Message<T>>( scope = scope, onComplete = { it?.let { downstreamFlow.value = Final(it) } // We expect it to always be non-null but we will leave the alternative as a no-op // just in case. synchronized(activeFilesLock) { activeFiles.remove(file.absolutePath) } }, onUndeliveredElement = { msg, ex -> if (msg is Message.Update) { // TODO(rohitsat): should we instead use scope.ensureActive() to get the original // cancellation cause? Should we instead have something like // UndeliveredElementException? msg.ack.completeExceptionally( ex ?: CancellationException( "DataStore scope was cancelled before updateData could complete" ) ) } } ) { msg -> when (msg) { is Message.Read -> { handleRead(msg) } is Message.Update -> { handleUpdate(msg) } } }
SimpleActor的构造方法如下:
internal class SimpleActor<T>( /** * The scope in which to consume messages. */ private val scope: CoroutineScope, /** * Function that will be called when scope is cancelled. Should *not* throw exceptions. */ onComplete: (Throwable?) -> Unit, /** * Function that will be called for each element when the scope is cancelled. Should *not* * throw exceptions. */ onUndeliveredElement: (T, Throwable?) -> Unit, /** * Function that will be called once for each message. * * Must *not* throw an exception (other than CancellationException if scope is cancelled). */ private val consumeMessage: suspend (T) -> Unit ) { ... ...
可见,这里最后的函数体(msg->...部分)赋值给了SimpleActor的consumeMessage属性。现在来看一下它的offer方法:
fun offer(msg: T) { // should never return false bc the channel capacity is unlimited check( messageQueue.trySend(msg) .onClosed { throw it ?: ClosedSendChannelException("Channel was closed normally") } .isSuccess ) // If the number of remaining messages was 0, there is no active consumer, since it quits // consuming once remaining messages hits 0. We must kick off a new consumer. if (remainingMessages.getAndIncrement() == 0) { scope.launch { // We shouldn't have started a new consumer unless there are remaining messages... check(remainingMessages.get() > 0) do { // We don't want to try to consume a new message unless we are still active. // If ensureActive throws, the scope is no longer active, so it doesn't // matter that we have remaining messages. scope.ensureActive() consumeMessage(messageQueue.receive()) } while (remainingMessages.decrementAndGet() != 0) } } }
我们看到,在这里调用了consumeMessage函数,传入的参数是从messageQueue中获取的,messageQueue在前面的check时通过trySend把actor.offer传入的Message.Read(currentDownStreamFlowState)添加的,还要注意的一点是这一切都是在scope.launch协程中发生的,scope是CoroutineScope(Dispatchers.IO + SupervisorJob()),这就保证了数据读取是一个不影响主线程的异步操作。
那么现在回到actor的构造处,因为它是Message.Read,就知道它会走handleRead方法:
private suspend fun handleRead(read: Message.Read<T>) { when (val currentState = downstreamFlow.value) { is Data -> { // We already have data so just return... } is ReadException -> { if (currentState === read.lastState) { readAndInitOrPropagateFailure() } // Someone else beat us but also failed. The collector has already // been signalled so we don't need to do anything. } UnInitialized -> { readAndInitOrPropagateFailure() } is Final -> error("Can't read in final state.") // won't happen } }
这里的if判断其实就是确保之前的一系列操作结束后并没有产生异常,因为这两个东西正常来说是同一个东西。那么只要不是Data或者不是Final(Final就是无数据),就会执行到readAndInitOrPropagateFailure方法:
private suspend fun readAndInitOrPropagateFailure() { try { readAndInit() } catch (throwable: Throwable) { downstreamFlow.value = ReadException(throwable) } }
这里又调用readAndInit方法:
private suspend fun readAndInit() { // This should only be called if we don't already have cached data. check(downstreamFlow.value == UnInitialized || downstreamFlow.value is ReadException) val updateLock = Mutex() var initData = readDataOrHandleCorruption() var initializationComplete: Boolean = false // TODO(b/151635324): Consider using Context Element to throw an error on re-entrance. val api = object : InitializerApi<T> { override suspend fun updateData(transform: suspend (t: T) -> T): T { return updateLock.withLock() { if (initializationComplete) { throw IllegalStateException( "InitializerApi.updateData should not be " + "called after initialization is complete." ) } val newData = transform(initData) if (newData != initData) { writeData(newData) initData = newData } initData } } } initTasks?.forEach { it(api) } initTasks = null // Init tasks have run successfully, we don't need them anymore. updateLock.withLock { initializationComplete = true } downstreamFlow.value = Data(initData, initData.hashCode()) }
readDataOrHandleCorruption用来读取文件中数据并转化成相关的实体类实例:
private suspend fun readDataOrHandleCorruption(): T { try { return readData() } catch (ex: CorruptionException) { val newData: T = corruptionHandler.handleCorruption(ex) try { writeData(newData) } catch (writeEx: IOException) { // If we fail to write the handled data, add the new exception as a suppressed // exception. ex.addSuppressed(writeEx) throw ex } // If we reach this point, we've successfully replaced the data on disk with newData. return newData } }
private suspend fun readData(): T { try { FileInputStream(file).use { stream -> return serializer.readFrom(stream) } } catch (ex: FileNotFoundException) { if (file.exists()) { throw ex } return serializer.defaultValue } }
可见,这里根据文件构造文件输入流,再把它传给我们之前传入的Serializer,也就是MyDemo2Serializer,调用它的readFrom方法,还记得我们在那里调用了自动生成的Demo2的parseFrom方法:
public static com.mph.review.bean.plain.Demo2 parseFrom(java.io.InputStream input) throws java.io.IOException { return com.google.protobuf.GeneratedMessageLite.parseFrom( DEFAULT_INSTANCE, input); }
DEFAULT_INSTANCE就是Demo2对象,是在静态块中实例化的:
private static final com.mph.review.bean.plain.Demo2 DEFAULT_INSTANCE; static { Demo2 defaultInstance = new Demo2(); // New instances are implicitly immutable so no need to make // immutable. DEFAULT_INSTANCE = defaultInstance; com.google.protobuf.GeneratedMessageLite.registerDefaultInstance( Demo2.class, defaultInstance); }
来看一下GeneratedMessageLite的parseFrom方法:
protected static <T extends GeneratedMessageLite<T, ?>> T parseFrom( T defaultInstance, InputStream input) throws InvalidProtocolBufferException { return checkMessageInitialized( parsePartialFrom( defaultInstance, CodedInputStream.newInstance(input), ExtensionRegistryLite.getEmptyRegistry())); }
parsePartialFrom方法如下:
static <T extends GeneratedMessageLite<T, ?>> T parsePartialFrom( T instance, CodedInputStream input, ExtensionRegistryLite extensionRegistry) throws InvalidProtocolBufferException { @SuppressWarnings("unchecked") // Guaranteed by protoc T result = (T) instance.dynamicMethod(MethodToInvoke.NEW_MUTABLE_INSTANCE); try { // TODO(yilunchong): Try to make input with type CodedInpuStream.ArrayDecoder use // fast path. Schema<T> schema = Protobuf.getInstance().schemaFor(result); schema.mergeFrom(result, CodedInputStreamReader.forCodedInput(input), extensionRegistry); schema.makeImmutable(result); } catch (IOException e) { if (e.getCause() instanceof InvalidProtocolBufferException) { throw (InvalidProtocolBufferException) e.getCause(); } throw new InvalidProtocolBufferException(e.getMessage()).setUnfinishedMessage(result); } catch (RuntimeException e) { if (e.getCause() instanceof InvalidProtocolBufferException) { throw (InvalidProtocolBufferException) e.getCause(); } throw e; } return result; }
因为instance是Demo2,所以看一下Demo2的dynamicMethod方法:
protected final java.lang.Object dynamicMethod( com.google.protobuf.GeneratedMessageLite.MethodToInvoke method, java.lang.Object arg0, java.lang.Object arg1) { switch (method) { case NEW_MUTABLE_INSTANCE: { return new com.mph.review.bean.plain.Demo2(); } case NEW_BUILDER: { return new Builder(); } ... ... } }
这里还是会返回一个新的Demo2对象,DEFAULT_INSTANCE已经是对象了,这里看起来好像有些多余,其实DEFAULT_INSTANCE是静态的、共用的,有可能其他地方也在用,我们不能直接修改这个实例避免影响其他地方。拿到result之后,parsePartialFrom的后续操作就是从文件输入流中读取保存值然后对result进行赋值。
到这里,readDataOrHandleCorruption的流程就走完了,我们读取到了保存的数据,回到readAndInit方法,接下来就是一些非必要的处理操作,我们来看一下。
首先会构造一个InitializerApi对象叫api,initTasks中的每个函数都要处理一次它,注意,这里的initTasks只会使用一次,只用于第一次读取的时候,这是为什么呢?因为这些操作是为了兼容旧数据,相当于对旧数据提供一个可以更新的入口,为什么这么说,我们往下看。
initTasks就是initTasksList,initTasksList来自于DataStoreFactory构造SingleProcessDataStore时传入的:
initTasksList = listOf(DataMigrationInitializer.getInitializer(migrations)),
migrations是DataStoreFactory的create方法传入的,这可以在我们调用dataStore时通过produceMigrations参数传入,是一个集合,里面存的是DataMigration,是一个接口:
public interface DataMigration<T> { public suspend fun shouldMigrate(currentData: T): Boolean public suspend fun migrate(currentData: T): T public suspend fun cleanUp() }
现在看一下DataMigrationInitializer.getInitializer方法返回的:
fun <T> getInitializer(migrations: List<DataMigration<T>>): suspend (api: InitializerApi<T>) -> Unit = { api -> runMigrations(migrations, api) }
是一个suspend函数,所以前面的initTasks的forEach中的it就是这个函数体,可见这里调用了runMigrations方法:
private suspend fun <T> runMigrations( migrations: List<DataMigration<T>>, api: InitializerApi<T> ) { val cleanUps = mutableListOf<suspend () -> Unit>() api.updateData { startingData -> migrations.fold(startingData) { data, migration -> if (migration.shouldMigrate(data)) { cleanUps.add { migration.cleanUp() } migration.migrate(data) } else { data } } } var cleanUpFailure: Throwable? = null cleanUps.forEach { cleanUp -> try { cleanUp() } catch (exception: Throwable) { if (cleanUpFailure == null) { cleanUpFailure = exception } else { cleanUpFailure!!.addSuppressed(exception) } } } // If we encountered a failure on cleanup, throw it. cleanUpFailure?.let { throw it } }
这里的api就是前面构造的InitializerApi,调用它的updateData方法,参数就是这里的startingData部分,赋值给transform,然后传入前面读取的initData并调用它,那现在我们再来读上面的代码就好理解了,startingData就是initData,也就是读取到的Demo2,migrations.fold方法会循环migrations中的所有的DataMigration,依次执行后面的代码块,每一个DataMigration都会判断shouldMigrate,为true则进行migrate迁移操作并返回处理后的数据,为false则返回原数据,fold操作使得后面的DataMigration会在前者处理后的基础上继续操作。迁移之后统一调用所有DataMigration的cleanUp方法,如果有需要可以重写这个方法执行一些逻辑。
回到readAndInit方法,我们已经完成旧数据的按需迁移工作,最后给downstreamFlow.value赋值Data,里面含有我们读取的Demo2和它的hashCode。
此时我们应该回到SingleProcessDataStore的data赋值的地方,也就是flow代码块处,继续往下走该执行emitAll方法了,这部分操作主要是去除重复数据和排错。现在我们只是拿到了一个包含数据的Flow对象,关于取数据我们后面再说。
下面来看一下写入。
写入是调用DataStore的updateData方法,看一下SingleProcessDataStore的实现:
override suspend fun updateData(transform: suspend (t: T) -> T): T { /** * The states here are the same as the states for reads. Additionally we send an ack that * the actor *must* respond to (even if it is cancelled). */ val ack = CompletableDeferred<T>() val currentDownStreamFlowState = downstreamFlow.value val updateMsg = Message.Update(transform, ack, currentDownStreamFlowState, coroutineContext) actor.offer(updateMsg) return ack.await() }
可以看到也是调用了actor的offer方法,只不过参数换成了Message.Update(transform, ack, currentDownStreamFlowState, coroutineContext),transform是前面设置数据的用户代码:
{ currentSettings -> currentSettings.toBuilder() .setAa(currentSettings.aa + 1) .setBb(currentSettings.bb + "New") .build() }
根据前面的经验,这次会走handleUpdate()->transformAndWrite()->writeData():
internal suspend fun writeData(newData: T) { file.createParentDirectories() val scratchFile = File(file.absolutePath + SCRATCH_SUFFIX) try { FileOutputStream(scratchFile).use { stream -> serializer.writeTo(newData, UncloseableOutputStream(stream)) stream.fd.sync() // TODO(b/151635324): fsync the directory, otherwise a badly timed crash could // result in reverting to a previous state. } if (!scratchFile.renameTo(file)) { throw IOException( "Unable to rename $scratchFile." + "This likely means that there are multiple instances of DataStore " + "for this file. Ensure that you are only creating a single instance of " + "datastore for this file." ) } } catch (ex: IOException) { if (scratchFile.exists()) { scratchFile.delete() // Swallow failure to delete } throw ex } }
和读取同理,也会调用serializer的writeTo方法:
override suspend fun writeTo(t : Demo2, output : OutputStream) { t.writeTo(output) }
DataStore之Proto
©著作权归作者所有,转载或内容合作请联系作者
- 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
- 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
- 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
推荐阅读更多精彩内容
- 概述Preferences DataStore 使用键存储和访问数据。因为同是键值对存储,所以优点应该和Share...
- /** * 泛型:泛指任意类型 * 修饰函数:泛型函数 * 类:泛型类 */ /** * 使用out修饰的集合,只...