learn()
{ start }
build++
const tutorial

Modern Android App Architecture - Part 2: Reactive Patterns & Offline-First

Master reactive data streams with Kotlin Flow, implement offline-first architecture with Room, and build robust synchronization strategies for modern Android apps.

Cuppa Team8 min read
androidkotlinflowroomoffline-firststate-management

Part 2 of our Modern Android App Architecture series focuses on reactive programming patterns, offline-first architecture, and advanced state management techniques.

Reactive Data Streams with Kotlin Flow

StateFlow vs SharedFlow vs Flow

class DataManager {
    // StateFlow: Hot flow with initial value, replays latest to new collectors
    private val _userState = MutableStateFlow<UserState>(UserState.Loading)
    val userState: StateFlow<UserState> = _userState.asStateFlow()

    // SharedFlow: Hot flow for events, no initial value
    private val _events = MutableSharedFlow<AppEvent>()
    val events: SharedFlow<AppEvent> = _events.asSharedFlow()

    // Cold Flow: Created fresh for each collector
    fun getItems(): Flow<List<Item>> = flow {
        while (true) {
            emit(fetchItems())
            delay(5000)
        }
    }
}

Advanced Flow Operators

class FeedViewModel @Inject constructor(
    private val feedRepository: FeedRepository,
    private val userRepository: UserRepository
) : ViewModel() {

    // Combining multiple flows
    val feedWithUser: StateFlow<FeedUiState> = combine(
        feedRepository.observeFeed(),
        userRepository.observeCurrentUser()
    ) { feed, user ->
        FeedUiState.Success(
            items = feed.filter { it.isVisibleTo(user) },
            user = user
        )
    }
    .catch { emit(FeedUiState.Error(it.message ?: "Unknown error")) }
    .stateIn(
        scope = viewModelScope,
        started = SharingStarted.WhileSubscribed(5000),
        initialValue = FeedUiState.Loading
    )

    // Debouncing search queries
    val searchResults: StateFlow<List<SearchResult>> = searchQuery
        .debounce(300) // Wait 300ms after last keystroke
        .distinctUntilChanged() // Only emit when value changes
        .flatMapLatest { query ->
            if (query.isBlank()) {
                flowOf(emptyList())
            } else {
                searchRepository.search(query)
                    .catch { emit(emptyList()) }
            }
        }
        .stateIn(
            scope = viewModelScope,
            started = SharingStarted.WhileSubscribed(5000),
            initialValue = emptyList()
        )

    // Retrying with exponential backoff
    fun loadData(): Flow<Result<Data>> = flow {
        emit(Result.Loading)
        val data = dataSource.fetchData()
        emit(Result.Success(data))
    }
    .retry(3) { e ->
        (e is IOException).also { shouldRetry ->
            if (shouldRetry) delay((2.0.pow(retryCount++) * 1000).toLong())
        }
    }
    .catch { emit(Result.Error(it)) }
}

Flow Transformation Patterns

// Paginated data loading
class PaginatedRepository @Inject constructor(
    private val api: ApiService,
    private val database: AppDatabase
) {
    fun getPaginatedItems(): Flow<PagingData<Item>> {
        return Pager(
            config = PagingConfig(pageSize = 20),
            remoteMediator = ItemRemoteMediator(api, database),
            pagingSourceFactory = { database.itemDao().pagingSource() }
        ).flow
    }
}

// Caching with replay
class CachedDataRepository {
    private val cache = MutableStateFlow<Map<String, Data>>(emptyMap())

    fun getData(id: String): Flow<Data> = flow {
        // Emit cached value immediately if available
        cache.value[id]?.let { emit(it) }

        // Then fetch fresh data
        val fresh = api.fetchData(id)
        cache.value = cache.value + (id to fresh)
        emit(fresh)
    }.distinctUntilChanged()
}

Offline-First Architecture

Room Database Setup

@Database(
    entities = [
        UserEntity::class,
        PostEntity::class,
        CommentEntity::class
    ],
    version = 2,
    exportSchema = true
)
@TypeConverters(Converters::class)
abstract class AppDatabase : RoomDatabase() {
    abstract fun userDao(): UserDao
    abstract fun postDao(): PostDao
    abstract fun commentDao(): CommentDao

    companion object {
        val MIGRATION_1_2 = object : Migration(1, 2) {
            override fun migrate(database: SupportSQLiteDatabase) {
                database.execSQL(
                    "ALTER TABLE posts ADD COLUMN sync_status TEXT NOT NULL DEFAULT 'SYNCED'"
                )
            }
        }
    }
}

Sync Status Management

enum class SyncStatus {
    SYNCED,      // Data is synced with server
    PENDING,     // Waiting to be synced
    SYNCING,     // Currently syncing
    FAILED       // Sync failed, will retry
}

@Entity(tableName = "posts")
data class PostEntity(
    @PrimaryKey val id: String,
    val title: String,
    val content: String,
    val authorId: String,
    @ColumnInfo(name = "created_at") val createdAt: Long,
    @ColumnInfo(name = "updated_at") val updatedAt: Long,
    @ColumnInfo(name = "sync_status") val syncStatus: SyncStatus = SyncStatus.SYNCED,
    @ColumnInfo(name = "sync_attempts") val syncAttempts: Int = 0
)

Repository with Offline Support

class PostRepository @Inject constructor(
    private val postDao: PostDao,
    private val api: ApiService,
    private val syncManager: SyncManager,
    @IoDispatcher private val ioDispatcher: CoroutineDispatcher
) {
    // Observe posts from local database
    fun observePosts(): Flow<List<Post>> {
        return postDao.observeAll()
            .map { entities -> entities.map { it.toDomainModel() } }
            .flowOn(ioDispatcher)
    }

    // Create post (offline-first)
    suspend fun createPost(title: String, content: String): Result<Post> = withContext(ioDispatcher) {
        try {
            val post = PostEntity(
                id = UUID.randomUUID().toString(),
                title = title,
                content = content,
                authorId = currentUserId,
                createdAt = System.currentTimeMillis(),
                updatedAt = System.currentTimeMillis(),
                syncStatus = SyncStatus.PENDING
            )

            // Save locally immediately
            postDao.insert(post)

            // Schedule sync
            syncManager.scheduleSyncPost(post.id)

            Result.success(post.toDomainModel())
        } catch (e: Exception) {
            Result.failure(e)
        }
    }

    // Sync posts with server
    suspend fun syncPosts() = withContext(ioDispatcher) {
        try {
            // Fetch latest from server
            val serverPosts = api.getPosts()

            // Update local database
            postDao.upsertAll(serverPosts.map { it.toEntity() })

            // Sync pending local changes
            val pendingPosts = postDao.getPendingSync()
            pendingPosts.forEach { post ->
                syncPost(post)
            }
        } catch (e: Exception) {
            Log.e("PostRepository", "Sync failed", e)
        }
    }

    private suspend fun syncPost(post: PostEntity) {
        try {
            postDao.updateSyncStatus(post.id, SyncStatus.SYNCING)

            val response = if (post.syncAttempts == 0) {
                api.createPost(post.toDto())
            } else {
                api.updatePost(post.id, post.toDto())
            }

            postDao.update(post.copy(
                syncStatus = SyncStatus.SYNCED,
                syncAttempts = 0
            ))
        } catch (e: Exception) {
            postDao.update(post.copy(
                syncStatus = SyncStatus.FAILED,
                syncAttempts = post.syncAttempts + 1
            ))
        }
    }
}

WorkManager for Background Sync

@HiltWorker
class SyncWorker @AssistedInject constructor(
    @Assisted context: Context,
    @Assisted params: WorkerParameters,
    private val postRepository: PostRepository,
    private val userRepository: UserRepository
) : CoroutineWorker(context, params) {

    override suspend fun doWork(): Result {
        return try {
            postRepository.syncPosts()
            userRepository.syncProfile()
            Result.success()
        } catch (e: Exception) {
            if (runAttemptCount < 3) {
                Result.retry()
            } else {
                Result.failure()
            }
        }
    }
}

// Schedule periodic sync
@Module
@InstallIn(SingletonComponent::class)
object SyncModule {

    @Provides
    @Singleton
    fun provideSyncManager(
        @ApplicationContext context: Context
    ): SyncManager {
        val workManager = WorkManager.getInstance(context)

        val syncRequest = PeriodicWorkRequestBuilder<SyncWorker>(
            repeatInterval = 15,
            repeatIntervalTimeUnit = TimeUnit.MINUTES
        )
            .setConstraints(
                Constraints.Builder()
                    .setRequiredNetworkType(NetworkType.CONNECTED)
                    .build()
            )
            .setBackoffCriteria(
                BackoffPolicy.EXPONENTIAL,
                WorkRequest.MIN_BACKOFF_MILLIS,
                TimeUnit.MILLISECONDS
            )
            .build()

        workManager.enqueueUniquePeriodicWork(
            "sync-work",
            ExistingPeriodicWorkPolicy.KEEP,
            syncRequest
        )

        return SyncManagerImpl(workManager)
    }
}

Advanced State Management

Handling Complex UI States

data class FeedUiState(
    val posts: List<Post> = emptyList(),
    val isLoading: Boolean = false,
    val isRefreshing: Boolean = false,
    val error: String? = null,
    val hasMore: Boolean = true,
    val currentPage: Int = 0
) {
    val isEmpty: Boolean
        get() = posts.isEmpty() && !isLoading

    val showEmptyState: Boolean
        get() = isEmpty && error == null
}

@HiltViewModel
class FeedViewModel @Inject constructor(
    private val postRepository: PostRepository
) : ViewModel() {

    private val _uiState = MutableStateFlow(FeedUiState())
    val uiState: StateFlow<FeedUiState> = _uiState.asStateFlow()

    init {
        observePosts()
        loadPosts()
    }

    private fun observePosts() {
        viewModelScope.launch {
            postRepository.observePosts()
                .collect { posts ->
                    _uiState.update { it.copy(posts = posts) }
                }
        }
    }

    fun loadPosts() {
        if (_uiState.value.isLoading) return

        viewModelScope.launch {
            _uiState.update { it.copy(isLoading = true, error = null) }

            postRepository.fetchPosts(page = _uiState.value.currentPage)
                .onSuccess { newPosts ->
                    _uiState.update {
                        it.copy(
                            isLoading = false,
                            hasMore = newPosts.isNotEmpty(),
                            currentPage = it.currentPage + 1
                        )
                    }
                }
                .onFailure { error ->
                    _uiState.update {
                        it.copy(
                            isLoading = false,
                            error = error.message
                        )
                    }
                }
        }
    }

    fun refresh() {
        viewModelScope.launch {
            _uiState.update { it.copy(isRefreshing = true, error = null) }

            postRepository.refreshPosts()
                .onSuccess {
                    _uiState.update {
                        it.copy(
                            isRefreshing = false,
                            currentPage = 1,
                            hasMore = true
                        )
                    }
                }
                .onFailure { error ->
                    _uiState.update {
                        it.copy(
                            isRefreshing = false,
                            error = error.message
                        )
                    }
                }
        }
    }
}

MVI Pattern with Sealed Classes

// Intent: User actions
sealed interface ProfileIntent {
    data object LoadProfile : ProfileIntent
    data class UpdateName(val name: String) : ProfileIntent
    data class UpdateBio(val bio: String) : ProfileIntent
    data object SaveProfile : ProfileIntent
}

// State: UI state
data class ProfileState(
    val profile: UserProfile? = null,
    val editedName: String = "",
    val editedBio: String = "",
    val isLoading: Boolean = false,
    val isSaving: Boolean = false,
    val error: String? = null
) {
    val hasChanges: Boolean
        get() = profile?.let {
            it.name != editedName || it.bio != editedBio
        } ?: false
}

// Effect: One-time events
sealed interface ProfileEffect {
    data class ShowToast(val message: String) : ProfileEffect
    data object NavigateBack : ProfileEffect
    data class ShowError(val error: String) : ProfileEffect
}

@HiltViewModel
class ProfileViewModel @Inject constructor(
    private val getUserProfileUseCase: GetUserProfileUseCase,
    private val updateProfileUseCase: UpdateProfileUseCase
) : ViewModel() {

    private val _state = MutableStateFlow(ProfileState())
    val state: StateFlow<ProfileState> = _state.asStateFlow()

    private val _effects = Channel<ProfileEffect>()
    val effects: Flow<ProfileEffect> = _effects.receiveAsFlow()

    fun processIntent(intent: ProfileIntent) {
        when (intent) {
            is ProfileIntent.LoadProfile -> loadProfile()
            is ProfileIntent.UpdateName -> updateName(intent.name)
            is ProfileIntent.UpdateBio -> updateBio(intent.bio)
            is ProfileIntent.SaveProfile -> saveProfile()
        }
    }

    private fun loadProfile() {
        viewModelScope.launch {
            _state.update { it.copy(isLoading = true) }

            getUserProfileUseCase()
                .onSuccess { profile ->
                    _state.update {
                        it.copy(
                            profile = profile,
                            editedName = profile.name,
                            editedBio = profile.bio,
                            isLoading = false
                        )
                    }
                }
                .onFailure { error ->
                    _state.update { it.copy(isLoading = false) }
                    _effects.send(ProfileEffect.ShowError(error.message ?: "Failed to load"))
                }
        }
    }

    private fun updateName(name: String) {
        _state.update { it.copy(editedName = name) }
    }

    private fun updateBio(bio: String) {
        _state.update { it.copy(editedBio = bio) }
    }

    private fun saveProfile() {
        viewModelScope.launch {
            _state.update { it.copy(isSaving = true) }

            val state = _state.value
            updateProfileUseCase(state.editedName, state.editedBio)
                .onSuccess { profile ->
                    _state.update {
                        it.copy(
                            profile = profile,
                            isSaving = false
                        )
                    }
                    _effects.send(ProfileEffect.ShowToast("Profile updated"))
                    _effects.send(ProfileEffect.NavigateBack)
                }
                .onFailure { error ->
                    _state.update { it.copy(isSaving = false) }
                    _effects.send(ProfileEffect.ShowError(error.message ?: "Save failed"))
                }
        }
    }
}

// In Composable
@Composable
fun ProfileScreen(
    viewModel: ProfileViewModel = hiltViewModel()
) {
    val state by viewModel.state.collectAsStateWithLifecycle()

    LaunchedEffect(Unit) {
        viewModel.effects.collect { effect ->
            when (effect) {
                is ProfileEffect.ShowToast -> {
                    // Show toast
                }
                is ProfileEffect.NavigateBack -> {
                    // Navigate back
                }
                is ProfileEffect.ShowError -> {
                    // Show error dialog
                }
            }
        }
    }

    ProfileContent(
        state = state,
        onIntent = viewModel::processIntent
    )
}

Performance Optimization

Selective Updates with StateFlow

// Only emit when specific fields change
val userName: StateFlow<String> = profileState
    .map { it.profile?.name ?: "" }
    .distinctUntilChanged()
    .stateIn(viewModelScope, SharingStarted.WhileSubscribed(5000), "")

// Combine only when necessary
val canSave: StateFlow<Boolean> = combine(
    profileState.map { it.hasChanges },
    profileState.map { it.isSaving }
) { hasChanges, isSaving ->
    hasChanges && !isSaving
}.stateIn(viewModelScope, SharingStarted.WhileSubscribed(5000), false)

Next Steps

In Part 3, we'll cover:

  • Jetpack Compose advanced patterns
  • Navigation architecture
  • Testing strategies
  • Performance monitoring

Resources

Continue to Part 3: Jetpack Compose & Advanced Patterns