Skip to content

Commit c07640e

Browse files
emit data into collected flow
1 parent 63928aa commit c07640e

File tree

5 files changed

+180
-30
lines changed

5 files changed

+180
-30
lines changed

store/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ kotlin {
6565
implementation(kotlin("test"))
6666
implementation(libs.junit)
6767
implementation(libs.kotlinx.coroutines.test)
68+
implementation(libs.turbine)
6869
}
6970
}
7071

store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealMutableStore.kt

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,17 @@ package org.mobilenativefoundation.store.store5.impl
44

55
import co.touchlab.kermit.CommonWriter
66
import co.touchlab.kermit.Logger
7+
import kotlinx.coroutines.channels.Channel
78
import kotlinx.coroutines.flow.Flow
9+
import kotlinx.coroutines.flow.emitAll
10+
import kotlinx.coroutines.flow.filter
811
import kotlinx.coroutines.flow.first
912
import kotlinx.coroutines.flow.flow
1013
import kotlinx.coroutines.flow.flowOf
14+
import kotlinx.coroutines.flow.map
15+
import kotlinx.coroutines.flow.merge
1116
import kotlinx.coroutines.flow.onEach
17+
import kotlinx.coroutines.flow.receiveAsFlow
1218
import kotlinx.coroutines.sync.Mutex
1319
import kotlinx.coroutines.sync.withLock
1420
import org.mobilenativefoundation.store.store5.Bookkeeper
@@ -17,6 +23,7 @@ import org.mobilenativefoundation.store.core5.ExperimentalStoreApi
1723
import org.mobilenativefoundation.store.store5.MutableStore
1824
import org.mobilenativefoundation.store.store5.StoreReadRequest
1925
import org.mobilenativefoundation.store.store5.StoreReadResponse
26+
import org.mobilenativefoundation.store.store5.StoreReadResponseOrigin
2027
import org.mobilenativefoundation.store.store5.StoreWriteRequest
2128
import org.mobilenativefoundation.store.store5.StoreWriteResponse
2229
import org.mobilenativefoundation.store.store5.Updater
@@ -38,6 +45,8 @@ internal class RealMutableStore<Key : Any, Network : Any, Output : Any, Local :
3845
private val keyToWriteRequestQueue = mutableMapOf<Key, WriteRequestQueue<Key, Output, *>>()
3946
private val keyToThreadSafety = mutableMapOf<Key, ThreadSafety>()
4047

48+
private val writeRequestChannel = Channel<Pair<Key, Output>>()
49+
4150
override fun <Response : Any> stream(request: StoreReadRequest<Key>): Flow<StoreReadResponse<Output>> =
4251
flow {
4352
safeInitStore(request.key)
@@ -60,7 +69,14 @@ internal class RealMutableStore<Key : Any, Network : Any, Output : Any, Local :
6069
}
6170
}
6271

63-
delegate.stream(request).collect { storeReadResponse -> emit(storeReadResponse) }
72+
emitAll(
73+
merge(
74+
delegate.stream(request),
75+
writeRequestChannel.receiveAsFlow()
76+
.filter { it.first == request.key }
77+
.map { StoreReadResponse.Data(value = it.second, origin = StoreReadResponseOrigin.Cache) },
78+
)
79+
)
6480
}
6581

6682
@ExperimentalStoreApi
@@ -74,6 +90,9 @@ internal class RealMutableStore<Key : Any, Network : Any, Output : Any, Local :
7490
.collect { writeRequest ->
7591
val storeWriteResponse = try {
7692
delegate.write(writeRequest.key, writeRequest.value)
93+
if (!delegate.hasSourceOfTruth()) {
94+
writeRequestChannel.trySend(writeRequest.key to writeRequest.value)
95+
}
7796
when (val updaterResult = tryUpdateServer(writeRequest)) {
7897
is UpdaterResult.Error.Exception -> StoreWriteResponse.Error.Exception(updaterResult.error)
7998
is UpdaterResult.Error.Message -> StoreWriteResponse.Error.Message(updaterResult.message)

store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealStore.kt

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,16 @@ import co.touchlab.kermit.CommonWriter
1919
import co.touchlab.kermit.Logger
2020
import kotlinx.coroutines.CompletableDeferred
2121
import kotlinx.coroutines.CoroutineScope
22+
import kotlinx.coroutines.channels.Channel
2223
import kotlinx.coroutines.flow.Flow
2324
import kotlinx.coroutines.flow.emitAll
25+
import kotlinx.coroutines.flow.filter
2426
import kotlinx.coroutines.flow.first
2527
import kotlinx.coroutines.flow.flow
2628
import kotlinx.coroutines.flow.map
2729
import kotlinx.coroutines.flow.onEach
2830
import kotlinx.coroutines.flow.onStart
31+
import kotlinx.coroutines.flow.receiveAsFlow
2932
import kotlinx.coroutines.flow.transform
3033
import org.mobilenativefoundation.store.cache5.Cache
3134
import org.mobilenativefoundation.store.store5.CacheType
@@ -73,6 +76,8 @@ internal class RealStore<Key : Any, Network : Any, Output : Any, Local : Any>(
7376
converter = converter
7477
)
7578

79+
private val localOnlyChannel = Channel<Pair<Key, Output>>()
80+
7681
@Suppress("UNCHECKED_CAST")
7782
override fun stream(request: StoreReadRequest<Key>): Flow<StoreReadResponse<Output>> =
7883
flow {
@@ -96,7 +101,16 @@ internal class RealStore<Key : Any, Network : Any, Output : Any, Local : Any>(
96101
if (memCache == null) {
97102
logger.w("Local-only request made with no cache or source of truth configured")
98103
}
99-
emit(StoreReadResponse.NoNewData(origin = StoreReadResponseOrigin.Cache))
104+
if (cachedToEmit == null) {
105+
emit(StoreReadResponse.NoNewData(origin = StoreReadResponseOrigin.Cache))
106+
}
107+
emitAll(
108+
localOnlyChannel.receiveAsFlow()
109+
.filter { it.first == request.key }
110+
.map {
111+
StoreReadResponse.Data(value = it.second, origin = StoreReadResponseOrigin.Cache)
112+
}
113+
)
100114
return@flow
101115
}
102116

@@ -162,6 +176,9 @@ internal class RealStore<Key : Any, Network : Any, Output : Any, Local : Any>(
162176
memCache?.put(request.key, data)
163177
}
164178
}
179+
if (sourceOfTruth == null && request.fetch && it is StoreReadResponse.Data) {
180+
localOnlyChannel.trySend(request.key to it.value)
181+
}
165182
}
166183

167184
override suspend fun clear(key: Key) {
@@ -331,6 +348,8 @@ internal class RealStore<Key : Any, Network : Any, Output : Any, Local : Any>(
331348
internal suspend fun latestOrNull(key: Key): Output? =
332349
fromMemCache(key) ?: fromSourceOfTruth(key)
333350

351+
internal fun hasSourceOfTruth() = sourceOfTruth != null
352+
334353
private suspend fun fromSourceOfTruth(key: Key) =
335354
sourceOfTruth?.reader(key, CompletableDeferred(Unit))?.map { it.dataOrNull() }?.first()
336355

store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/LocalOnlyTests.kt

Lines changed: 63 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package org.mobilenativefoundation.store.store5
22

3+
import app.cash.turbine.test
34
import kotlinx.atomicfu.atomic
4-
import kotlinx.coroutines.flow.first
55
import kotlinx.coroutines.test.TestScope
66
import kotlinx.coroutines.test.runTest
77
import org.mobilenativefoundation.store.store5.impl.extensions.get
@@ -11,6 +11,7 @@ import kotlin.test.Test
1111
import kotlin.test.assertEquals
1212
import kotlin.test.assertTrue
1313
import kotlin.time.Duration
14+
import org.mobilenativefoundation.store.store5.impl.extensions.fresh
1415

1516
class LocalOnlyTests {
1617
private val testScope = TestScope()
@@ -25,8 +26,9 @@ class LocalOnlyTests {
2526
.build()
2627
)
2728
.build()
28-
val response = store.stream(StoreReadRequest.localOnly(0)).first()
29-
assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.Cache), response)
29+
store.stream(StoreReadRequest.localOnly(0)).test {
30+
assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.Cache), awaitItem())
31+
}
3032
}
3133

3234
@Test
@@ -48,9 +50,10 @@ class LocalOnlyTests {
4850
val a = store.get(0)
4951
assertEquals("result", a)
5052
assertEquals(1, fetcherHitCounter.value)
51-
val response = store.stream(StoreReadRequest.localOnly(0)).first()
52-
assertEquals("result", response.requireData())
53-
assertEquals(1, fetcherHitCounter.value)
53+
store.stream(StoreReadRequest.localOnly(0)).test {
54+
assertEquals("result", awaitItem().requireData())
55+
assertEquals(1, fetcherHitCounter.value)
56+
}
5457
}
5558

5659
@Test
@@ -73,9 +76,11 @@ class LocalOnlyTests {
7376
val a = store.get(0)
7477
assertEquals("result", a)
7578
assertEquals(1, fetcherHitCounter.value)
76-
val response = store.stream(StoreReadRequest.localOnly(0)).first()
77-
assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.Cache), response)
78-
assertEquals(1, fetcherHitCounter.value)
79+
store.stream(StoreReadRequest.localOnly(0)).test {
80+
assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.Cache), awaitItem())
81+
assertEquals(1, fetcherHitCounter.value)
82+
}
83+
7984
}
8085

8186
@Test
@@ -88,8 +93,9 @@ class LocalOnlyTests {
8893
)
8994
.disableCache()
9095
.build()
91-
val response = store.stream(StoreReadRequest.localOnly(0)).first()
92-
assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.SourceOfTruth), response)
96+
store.stream(StoreReadRequest.localOnly(0)).test {
97+
assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.SourceOfTruth), awaitItem())
98+
}
9399
}
94100

95101
@Test
@@ -109,10 +115,12 @@ class LocalOnlyTests {
109115
val a = store.get(0)
110116
assertEquals("result", a)
111117
assertEquals(1, fetcherHitCounter.value)
112-
val response = store.stream(StoreReadRequest.localOnly(0)).first()
113-
assertEquals("result", response.requireData())
114-
assertEquals(StoreReadResponseOrigin.SourceOfTruth, response.origin)
115-
assertEquals(1, fetcherHitCounter.value)
118+
store.stream(StoreReadRequest.localOnly(0)).test {
119+
val response = awaitItem()
120+
assertEquals("result", response.requireData())
121+
assertEquals(StoreReadResponseOrigin.SourceOfTruth, response.origin)
122+
assertEquals(1, fetcherHitCounter.value)
123+
}
116124
}
117125

118126
@Test
@@ -134,9 +142,10 @@ class LocalOnlyTests {
134142
val a = store.get(0)
135143
assertEquals("result", a)
136144
assertEquals(1, fetcherHitCounter.value)
137-
val response = store.stream(StoreReadRequest.localOnly(0)).first()
138-
assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.SourceOfTruth), response)
139-
assertEquals(1, fetcherHitCounter.value)
145+
store.stream(StoreReadRequest.localOnly(0)).test {
146+
assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.SourceOfTruth), awaitItem())
147+
assertEquals(1, fetcherHitCounter.value)
148+
}
140149
}
141150

142151
@Test
@@ -145,8 +154,41 @@ class LocalOnlyTests {
145154
.from(Fetcher.of { _: Int -> throw RuntimeException("Fetcher shouldn't be hit") })
146155
.disableCache()
147156
.build()
148-
val response = store.stream(StoreReadRequest.localOnly(0)).first()
149-
assertTrue(response is StoreReadResponse.NoNewData)
150-
assertEquals(StoreReadResponseOrigin.Cache, response.origin)
157+
store.stream(StoreReadRequest.localOnly(0)).test {
158+
val response = awaitItem()
159+
assertTrue(response is StoreReadResponse.NoNewData)
160+
assertEquals(StoreReadResponseOrigin.Cache, response.origin)
161+
}
162+
}
163+
164+
@Test
165+
fun collectNewDataFromFetcher() = testScope.runTest {
166+
val fetcherHitCounter = atomic(0)
167+
val store = StoreBuilder
168+
.from(
169+
Fetcher.of { _: Int ->
170+
fetcherHitCounter += 1
171+
"result $fetcherHitCounter"
172+
}
173+
)
174+
.cachePolicy(
175+
MemoryPolicy
176+
.builder<Int, String>()
177+
.build()
178+
)
179+
.build()
180+
181+
store.stream(StoreReadRequest.localOnly(0)).test {
182+
assertTrue(awaitItem() is StoreReadResponse.NoNewData)
183+
184+
assertEquals("result 1", store.fresh(0))
185+
assertEquals("result 1", awaitItem().requireData())
186+
187+
assertEquals("result 2", store.fresh(0))
188+
assertEquals("result 2", awaitItem().requireData())
189+
190+
// different key, not collected
191+
assertEquals("result 3", store.fresh(1))
192+
}
151193
}
152194
}

store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/UpdaterTests.kt

Lines changed: 76 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,20 @@
11
package org.mobilenativefoundation.store.store5
22

3-
import kotlinx.coroutines.ExperimentalCoroutinesApi
3+
import app.cash.turbine.test
4+
import kotlin.test.BeforeTest
5+
import kotlin.test.Test
6+
import kotlin.test.assertEquals
7+
import kotlin.test.assertIs
8+
import kotlin.test.assertNotNull
9+
import kotlin.time.Duration.Companion.minutes
410
import kotlinx.coroutines.flow.first
511
import kotlinx.coroutines.flow.flow
612
import kotlinx.coroutines.flow.last
713
import kotlinx.coroutines.flow.take
814
import kotlinx.coroutines.test.TestScope
915
import kotlinx.coroutines.test.runTest
1016
import org.mobilenativefoundation.store.core5.ExperimentalStoreApi
17+
import org.mobilenativefoundation.store.store5.impl.extensions.asMutableStore
1118
import org.mobilenativefoundation.store.store5.impl.extensions.inHours
1219
import org.mobilenativefoundation.store.store5.util.assertEmitsExactly
1320
import org.mobilenativefoundation.store.store5.util.fake.Notes
@@ -23,13 +30,8 @@ import org.mobilenativefoundation.store.store5.util.model.NetworkNote
2330
import org.mobilenativefoundation.store.store5.util.model.NoteData
2431
import org.mobilenativefoundation.store.store5.util.model.NotesWriteResponse
2532
import org.mobilenativefoundation.store.store5.util.model.OutputNote
26-
import kotlin.test.BeforeTest
27-
import kotlin.test.Test
28-
import kotlin.test.assertEquals
29-
import kotlin.test.assertIs
30-
import kotlin.test.assertNotNull
3133

32-
@OptIn(ExperimentalCoroutinesApi::class, ExperimentalStoreApi::class)
34+
@OptIn(ExperimentalStoreApi::class)
3335
class UpdaterTests {
3436
private val testScope = TestScope()
3537
private lateinit var api: NotesApi
@@ -266,4 +268,71 @@ class UpdaterTests {
266268
)
267269
assertEquals(NetworkNote(NoteData.Single(newNote)), api.db[NotesKey.Single(Notes.One.id)])
268270
}
271+
272+
@Test
273+
fun collectResponseAfterWriting() = testScope.runTest {
274+
val ttl = inHours(1)
275+
276+
val store = StoreBuilder.from<NotesKey, NetworkNote>(
277+
fetcher = Fetcher.of { key -> api.get(key, ttl = ttl) },
278+
)
279+
.cachePolicy(MemoryPolicy.builder<NotesKey, NetworkNote>().setExpireAfterWrite(10.minutes).build())
280+
.build().asMutableStore<NotesKey, NetworkNote, NetworkNote, NetworkNote, NetworkNote>(
281+
Updater.by(
282+
{ _, v -> UpdaterResult.Success.Typed(v) },
283+
),
284+
null,
285+
)
286+
287+
val readRequest = StoreReadRequest.fresh(NotesKey.Single(Notes.One.id))
288+
289+
store.stream<NotesWriteResponse>(readRequest).test {
290+
assertEquals(StoreReadResponse.Loading(origin = StoreReadResponseOrigin.Fetcher()), awaitItem())
291+
assertEquals(
292+
StoreReadResponse.Data(
293+
NetworkNote(NoteData.Single(Notes.One), ttl = ttl),
294+
StoreReadResponseOrigin.Fetcher()
295+
),
296+
awaitItem()
297+
)
298+
299+
val newNote = Notes.One.copy(title = "New Title-1")
300+
val writeRequest = StoreWriteRequest.of<NotesKey, NetworkNote, NotesWriteResponse>(
301+
key = NotesKey.Single(Notes.One.id),
302+
value = NetworkNote(NoteData.Single(newNote), 0)
303+
)
304+
305+
val storeWriteResponse = store.write(writeRequest)
306+
307+
// Write is success
308+
assertEquals(
309+
StoreWriteResponse.Success.Typed(
310+
NetworkNote(NoteData.Single(newNote), 0)
311+
),
312+
storeWriteResponse
313+
)
314+
315+
// New data added by 'write' is collected
316+
317+
assertEquals(
318+
NetworkNote(NoteData.Single(newNote), 0),
319+
awaitItem().requireData()
320+
)
321+
322+
// different key, not collected
323+
store.write(StoreWriteRequest.of<NotesKey, NetworkNote, NotesWriteResponse>(
324+
key = NotesKey.Single(Notes.Five.id),
325+
value = NetworkNote(NoteData.Single(newNote), 0)
326+
))
327+
}
328+
329+
val cachedReadRequest =
330+
StoreReadRequest.cached(NotesKey.Single(Notes.One.id), refresh = false)
331+
val cachedStream = store.stream<NotesWriteResponse>(cachedReadRequest)
332+
333+
assertEquals(
334+
NetworkNote(NoteData.Single(Notes.One.copy(title = "New Title-1")), 0),
335+
cachedStream.first().requireData()
336+
)
337+
}
269338
}

0 commit comments

Comments
 (0)