learn()
{ start }
build++
const tutorial
Modern Android App Architecture - Part 2: Reactive Patterns & Offline-First
Master reactive data streams with Kotlin Flow, implement offline-first architecture with Room, and build robust synchronization strategies for modern Android apps.
•Cuppa Team•8 min read
androidkotlinflowroomoffline-firststate-management
Part 2 of our Modern Android App Architecture series focuses on reactive programming patterns, offline-first architecture, and advanced state management techniques.
Reactive Data Streams with Kotlin Flow
StateFlow vs SharedFlow vs Flow
class DataManager {
// StateFlow: Hot flow with initial value, replays latest to new collectors
private val _userState = MutableStateFlow<UserState>(UserState.Loading)
val userState: StateFlow<UserState> = _userState.asStateFlow()
// SharedFlow: Hot flow for events, no initial value
private val _events = MutableSharedFlow<AppEvent>()
val events: SharedFlow<AppEvent> = _events.asSharedFlow()
// Cold Flow: Created fresh for each collector
fun getItems(): Flow<List<Item>> = flow {
while (true) {
emit(fetchItems())
delay(5000)
}
}
}
Advanced Flow Operators
class FeedViewModel @Inject constructor(
private val feedRepository: FeedRepository,
private val userRepository: UserRepository
) : ViewModel() {
// Combining multiple flows
val feedWithUser: StateFlow<FeedUiState> = combine(
feedRepository.observeFeed(),
userRepository.observeCurrentUser()
) { feed, user ->
FeedUiState.Success(
items = feed.filter { it.isVisibleTo(user) },
user = user
)
}
.catch { emit(FeedUiState.Error(it.message ?: "Unknown error")) }
.stateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5000),
initialValue = FeedUiState.Loading
)
// Debouncing search queries
val searchResults: StateFlow<List<SearchResult>> = searchQuery
.debounce(300) // Wait 300ms after last keystroke
.distinctUntilChanged() // Only emit when value changes
.flatMapLatest { query ->
if (query.isBlank()) {
flowOf(emptyList())
} else {
searchRepository.search(query)
.catch { emit(emptyList()) }
}
}
.stateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5000),
initialValue = emptyList()
)
// Retrying with exponential backoff
fun loadData(): Flow<Result<Data>> = flow {
emit(Result.Loading)
val data = dataSource.fetchData()
emit(Result.Success(data))
}
.retry(3) { e ->
(e is IOException).also { shouldRetry ->
if (shouldRetry) delay((2.0.pow(retryCount++) * 1000).toLong())
}
}
.catch { emit(Result.Error(it)) }
}
Flow Transformation Patterns
// Paginated data loading
class PaginatedRepository @Inject constructor(
private val api: ApiService,
private val database: AppDatabase
) {
fun getPaginatedItems(): Flow<PagingData<Item>> {
return Pager(
config = PagingConfig(pageSize = 20),
remoteMediator = ItemRemoteMediator(api, database),
pagingSourceFactory = { database.itemDao().pagingSource() }
).flow
}
}
// Caching with replay
class CachedDataRepository {
private val cache = MutableStateFlow<Map<String, Data>>(emptyMap())
fun getData(id: String): Flow<Data> = flow {
// Emit cached value immediately if available
cache.value[id]?.let { emit(it) }
// Then fetch fresh data
val fresh = api.fetchData(id)
cache.value = cache.value + (id to fresh)
emit(fresh)
}.distinctUntilChanged()
}
Offline-First Architecture
Room Database Setup
@Database(
entities = [
UserEntity::class,
PostEntity::class,
CommentEntity::class
],
version = 2,
exportSchema = true
)
@TypeConverters(Converters::class)
abstract class AppDatabase : RoomDatabase() {
abstract fun userDao(): UserDao
abstract fun postDao(): PostDao
abstract fun commentDao(): CommentDao
companion object {
val MIGRATION_1_2 = object : Migration(1, 2) {
override fun migrate(database: SupportSQLiteDatabase) {
database.execSQL(
"ALTER TABLE posts ADD COLUMN sync_status TEXT NOT NULL DEFAULT 'SYNCED'"
)
}
}
}
}
Sync Status Management
enum class SyncStatus {
SYNCED, // Data is synced with server
PENDING, // Waiting to be synced
SYNCING, // Currently syncing
FAILED // Sync failed, will retry
}
@Entity(tableName = "posts")
data class PostEntity(
@PrimaryKey val id: String,
val title: String,
val content: String,
val authorId: String,
@ColumnInfo(name = "created_at") val createdAt: Long,
@ColumnInfo(name = "updated_at") val updatedAt: Long,
@ColumnInfo(name = "sync_status") val syncStatus: SyncStatus = SyncStatus.SYNCED,
@ColumnInfo(name = "sync_attempts") val syncAttempts: Int = 0
)
Repository with Offline Support
class PostRepository @Inject constructor(
private val postDao: PostDao,
private val api: ApiService,
private val syncManager: SyncManager,
@IoDispatcher private val ioDispatcher: CoroutineDispatcher
) {
// Observe posts from local database
fun observePosts(): Flow<List<Post>> {
return postDao.observeAll()
.map { entities -> entities.map { it.toDomainModel() } }
.flowOn(ioDispatcher)
}
// Create post (offline-first)
suspend fun createPost(title: String, content: String): Result<Post> = withContext(ioDispatcher) {
try {
val post = PostEntity(
id = UUID.randomUUID().toString(),
title = title,
content = content,
authorId = currentUserId,
createdAt = System.currentTimeMillis(),
updatedAt = System.currentTimeMillis(),
syncStatus = SyncStatus.PENDING
)
// Save locally immediately
postDao.insert(post)
// Schedule sync
syncManager.scheduleSyncPost(post.id)
Result.success(post.toDomainModel())
} catch (e: Exception) {
Result.failure(e)
}
}
// Sync posts with server
suspend fun syncPosts() = withContext(ioDispatcher) {
try {
// Fetch latest from server
val serverPosts = api.getPosts()
// Update local database
postDao.upsertAll(serverPosts.map { it.toEntity() })
// Sync pending local changes
val pendingPosts = postDao.getPendingSync()
pendingPosts.forEach { post ->
syncPost(post)
}
} catch (e: Exception) {
Log.e("PostRepository", "Sync failed", e)
}
}
private suspend fun syncPost(post: PostEntity) {
try {
postDao.updateSyncStatus(post.id, SyncStatus.SYNCING)
val response = if (post.syncAttempts == 0) {
api.createPost(post.toDto())
} else {
api.updatePost(post.id, post.toDto())
}
postDao.update(post.copy(
syncStatus = SyncStatus.SYNCED,
syncAttempts = 0
))
} catch (e: Exception) {
postDao.update(post.copy(
syncStatus = SyncStatus.FAILED,
syncAttempts = post.syncAttempts + 1
))
}
}
}
WorkManager for Background Sync
@HiltWorker
class SyncWorker @AssistedInject constructor(
@Assisted context: Context,
@Assisted params: WorkerParameters,
private val postRepository: PostRepository,
private val userRepository: UserRepository
) : CoroutineWorker(context, params) {
override suspend fun doWork(): Result {
return try {
postRepository.syncPosts()
userRepository.syncProfile()
Result.success()
} catch (e: Exception) {
if (runAttemptCount < 3) {
Result.retry()
} else {
Result.failure()
}
}
}
}
// Schedule periodic sync
@Module
@InstallIn(SingletonComponent::class)
object SyncModule {
@Provides
@Singleton
fun provideSyncManager(
@ApplicationContext context: Context
): SyncManager {
val workManager = WorkManager.getInstance(context)
val syncRequest = PeriodicWorkRequestBuilder<SyncWorker>(
repeatInterval = 15,
repeatIntervalTimeUnit = TimeUnit.MINUTES
)
.setConstraints(
Constraints.Builder()
.setRequiredNetworkType(NetworkType.CONNECTED)
.build()
)
.setBackoffCriteria(
BackoffPolicy.EXPONENTIAL,
WorkRequest.MIN_BACKOFF_MILLIS,
TimeUnit.MILLISECONDS
)
.build()
workManager.enqueueUniquePeriodicWork(
"sync-work",
ExistingPeriodicWorkPolicy.KEEP,
syncRequest
)
return SyncManagerImpl(workManager)
}
}
Advanced State Management
Handling Complex UI States
data class FeedUiState(
val posts: List<Post> = emptyList(),
val isLoading: Boolean = false,
val isRefreshing: Boolean = false,
val error: String? = null,
val hasMore: Boolean = true,
val currentPage: Int = 0
) {
val isEmpty: Boolean
get() = posts.isEmpty() && !isLoading
val showEmptyState: Boolean
get() = isEmpty && error == null
}
@HiltViewModel
class FeedViewModel @Inject constructor(
private val postRepository: PostRepository
) : ViewModel() {
private val _uiState = MutableStateFlow(FeedUiState())
val uiState: StateFlow<FeedUiState> = _uiState.asStateFlow()
init {
observePosts()
loadPosts()
}
private fun observePosts() {
viewModelScope.launch {
postRepository.observePosts()
.collect { posts ->
_uiState.update { it.copy(posts = posts) }
}
}
}
fun loadPosts() {
if (_uiState.value.isLoading) return
viewModelScope.launch {
_uiState.update { it.copy(isLoading = true, error = null) }
postRepository.fetchPosts(page = _uiState.value.currentPage)
.onSuccess { newPosts ->
_uiState.update {
it.copy(
isLoading = false,
hasMore = newPosts.isNotEmpty(),
currentPage = it.currentPage + 1
)
}
}
.onFailure { error ->
_uiState.update {
it.copy(
isLoading = false,
error = error.message
)
}
}
}
}
fun refresh() {
viewModelScope.launch {
_uiState.update { it.copy(isRefreshing = true, error = null) }
postRepository.refreshPosts()
.onSuccess {
_uiState.update {
it.copy(
isRefreshing = false,
currentPage = 1,
hasMore = true
)
}
}
.onFailure { error ->
_uiState.update {
it.copy(
isRefreshing = false,
error = error.message
)
}
}
}
}
}
MVI Pattern with Sealed Classes
// Intent: User actions
sealed interface ProfileIntent {
data object LoadProfile : ProfileIntent
data class UpdateName(val name: String) : ProfileIntent
data class UpdateBio(val bio: String) : ProfileIntent
data object SaveProfile : ProfileIntent
}
// State: UI state
data class ProfileState(
val profile: UserProfile? = null,
val editedName: String = "",
val editedBio: String = "",
val isLoading: Boolean = false,
val isSaving: Boolean = false,
val error: String? = null
) {
val hasChanges: Boolean
get() = profile?.let {
it.name != editedName || it.bio != editedBio
} ?: false
}
// Effect: One-time events
sealed interface ProfileEffect {
data class ShowToast(val message: String) : ProfileEffect
data object NavigateBack : ProfileEffect
data class ShowError(val error: String) : ProfileEffect
}
@HiltViewModel
class ProfileViewModel @Inject constructor(
private val getUserProfileUseCase: GetUserProfileUseCase,
private val updateProfileUseCase: UpdateProfileUseCase
) : ViewModel() {
private val _state = MutableStateFlow(ProfileState())
val state: StateFlow<ProfileState> = _state.asStateFlow()
private val _effects = Channel<ProfileEffect>()
val effects: Flow<ProfileEffect> = _effects.receiveAsFlow()
fun processIntent(intent: ProfileIntent) {
when (intent) {
is ProfileIntent.LoadProfile -> loadProfile()
is ProfileIntent.UpdateName -> updateName(intent.name)
is ProfileIntent.UpdateBio -> updateBio(intent.bio)
is ProfileIntent.SaveProfile -> saveProfile()
}
}
private fun loadProfile() {
viewModelScope.launch {
_state.update { it.copy(isLoading = true) }
getUserProfileUseCase()
.onSuccess { profile ->
_state.update {
it.copy(
profile = profile,
editedName = profile.name,
editedBio = profile.bio,
isLoading = false
)
}
}
.onFailure { error ->
_state.update { it.copy(isLoading = false) }
_effects.send(ProfileEffect.ShowError(error.message ?: "Failed to load"))
}
}
}
private fun updateName(name: String) {
_state.update { it.copy(editedName = name) }
}
private fun updateBio(bio: String) {
_state.update { it.copy(editedBio = bio) }
}
private fun saveProfile() {
viewModelScope.launch {
_state.update { it.copy(isSaving = true) }
val state = _state.value
updateProfileUseCase(state.editedName, state.editedBio)
.onSuccess { profile ->
_state.update {
it.copy(
profile = profile,
isSaving = false
)
}
_effects.send(ProfileEffect.ShowToast("Profile updated"))
_effects.send(ProfileEffect.NavigateBack)
}
.onFailure { error ->
_state.update { it.copy(isSaving = false) }
_effects.send(ProfileEffect.ShowError(error.message ?: "Save failed"))
}
}
}
}
// In Composable
@Composable
fun ProfileScreen(
viewModel: ProfileViewModel = hiltViewModel()
) {
val state by viewModel.state.collectAsStateWithLifecycle()
LaunchedEffect(Unit) {
viewModel.effects.collect { effect ->
when (effect) {
is ProfileEffect.ShowToast -> {
// Show toast
}
is ProfileEffect.NavigateBack -> {
// Navigate back
}
is ProfileEffect.ShowError -> {
// Show error dialog
}
}
}
}
ProfileContent(
state = state,
onIntent = viewModel::processIntent
)
}
Performance Optimization
Selective Updates with StateFlow
// Only emit when specific fields change
val userName: StateFlow<String> = profileState
.map { it.profile?.name ?: "" }
.distinctUntilChanged()
.stateIn(viewModelScope, SharingStarted.WhileSubscribed(5000), "")
// Combine only when necessary
val canSave: StateFlow<Boolean> = combine(
profileState.map { it.hasChanges },
profileState.map { it.isSaving }
) { hasChanges, isSaving ->
hasChanges && !isSaving
}.stateIn(viewModelScope, SharingStarted.WhileSubscribed(5000), false)
Next Steps
In Part 3, we'll cover:
- Jetpack Compose advanced patterns
- Navigation architecture
- Testing strategies
- Performance monitoring
Resources
Continue to Part 3: Jetpack Compose & Advanced Patterns