Bobby Encoded
PostsAbout
PostsAbout

© 2026 Bobby Jose

← Back to Blog

Mastering Kotlin Flow and Reactive Patterns

October 11, 2025 · 9 min read

Android, Kotlin, Flow, Coroutines, Reactive Programming, Interview Prep

Part 6 of the Android Deep Dive series

Kotlin Coroutines provide structured concurrency for async operations, while Flow enables reactive streams. This is one of the deepest technical topics in Android interviews - understanding when to use different Flow types and how to handle concurrent operations separates senior developers from juniors.

2025-2026 Trend

With StateFlow vs LiveData debates settled (StateFlow wins), interviewers now focus on advanced patterns: combine vs zip, flatMapLatest for search debouncing, and proper cancellation handling.

Coroutine Basics

Suspend Functions and Dispatchers

// Suspend function - Can be paused and resumed
suspend fun fetchMeal(id: String): Meal {
    return withContext(Dispatchers.IO) {
        api.getMeal(id)
    }
}

// Launching coroutines in ViewModel
class MealsViewModel : ViewModel() {
    fun loadData() {
        // viewModelScope - Cancelled when ViewModel cleared
        viewModelScope.launch {
            val meal = fetchMeal("123")
            _uiState.value = UiState.Success(meal)
        }
    }
}

// Async for parallel operations
suspend fun loadDashboard(): Dashboard {
    return coroutineScope {
        val mealsDeferred = async { mealRepository.getMeals() }
        val goalsDeferred = async { userRepository.getGoals() }
        val progressDeferred = async { progressRepository.getProgress() }

        Dashboard(
            meals = mealsDeferred.await(),
            goals = goalsDeferred.await(),
            progress = progressDeferred.await()
        )
    }
}

Dispatchers

// Dispatchers.Main - UI operations
withContext(Dispatchers.Main) {
    textView.text = "Updated"
}

// Dispatchers.IO - Network, disk, database
withContext(Dispatchers.IO) {
    val data = api.fetchData()
    database.save(data)
}

// Dispatchers.Default - CPU-intensive work
withContext(Dispatchers.Default) {
    val sorted = largeList.sorted()
    val processed = heavyComputation(sorted)
}

// Custom dispatcher injection for testing
@Module
@InstallIn(SingletonComponent::class)
object DispatcherModule {
    @Provides
    @IoDispatcher
    fun provideIoDispatcher(): CoroutineDispatcher = Dispatchers.IO
}

@Qualifier
@Retention(AnnotationRetention.BINARY)
annotation class IoDispatcher

Flow Basics

// Creating Flows
fun getNumbers(): Flow<Int> = flow {
    for (i in 1..5) {
        delay(100)
        emit(i)
    }
}

// flowOf for simple values
val simpleFlow = flowOf(1, 2, 3)

// asFlow for collections
val listFlow = listOf(1, 2, 3).asFlow()

// Flow from callback-based API
fun locationUpdates(): Flow<Location> = callbackFlow {
    val callback = object : LocationCallback() {
        override fun onLocationResult(result: LocationResult) {
            trySend(result.lastLocation)
        }
    }

    locationClient.requestLocationUpdates(request, callback, Looper.getMainLooper())

    awaitClose {
        locationClient.removeLocationUpdates(callback)
    }
}

Flow Operators

// Transform operators
flow.map { it * 2 }
flow.filter { it > 5 }
flow.take(3)
flow.drop(2)

// Combine operators
val combined = flow1.combine(flow2) { a, b -> a + b }
val zipped = flow1.zip(flow2) { a, b -> Pair(a, b) }
val merged = merge(flow1, flow2, flow3)

// Flatten operators
flow.flatMapConcat { getDetails(it) }  // Sequential
flow.flatMapMerge { getDetails(it) }   // Concurrent
flow.flatMapLatest { getDetails(it) }  // Cancel previous

// Terminal operators
flow.first()
flow.firstOrNull()
flow.toList()
flow.reduce { acc, value -> acc + value }
flow.fold(0) { acc, value -> acc + value }

// Error handling
flow.catch { e ->
    emit(defaultValue)
    // or: throw CustomException(e)
}
flow.retry(3) { e -> e is IOException }
flow.retryWhen { cause, attempt ->
    if (attempt < 3 && cause is IOException) {
        delay(1000 * attempt)
        true
    } else false
}

// Lifecycle
flow.onStart { emit(Loading) }
flow.onEach { logItem(it) }
flow.onCompletion { cause ->
    if (cause == null) println("Completed successfully")
}

StateFlow and SharedFlow

StateFlow - Hot Flow with Current Value

class MealsViewModel @Inject constructor(
    private val repository: MealRepository
) : ViewModel() {

    private val _uiState = MutableStateFlow(MealsUiState())
    val uiState: StateFlow<MealsUiState> = _uiState.asStateFlow()

    fun loadMeals() {
        _uiState.update { it.copy(isLoading = true) }

        viewModelScope.launch {
            val meals = repository.getMeals()
            _uiState.update {
                it.copy(isLoading = false, meals = meals)
            }
        }
    }
}

SharedFlow - Hot Flow for Events

// SharedFlow for one-time events (navigation, snackbars)
class EventBus {
    private val _events = MutableSharedFlow<Event>()
    val events = _events.asSharedFlow()

    suspend fun emit(event: Event) {
        _events.emit(event)
    }
}

// SharedFlow with replay
private val _events = MutableSharedFlow<Event>(
    replay = 1,                    // Replay last N emissions
    extraBufferCapacity = 64,      // Buffer size
    onBufferOverflow = BufferOverflow.DROP_OLDEST
)

Interview Tip

Key difference: StateFlow requires initial value and conflates equal values. SharedFlow doesn't require initial value and emits all values. Use StateFlow for state, SharedFlow for events.

Converting Cold Flow to Hot StateFlow

val meals: StateFlow<List<Meal>> = repository.observeMeals()
    .stateIn(
        scope = viewModelScope,
        started = SharingStarted.WhileSubscribed(5000),
        initialValue = emptyList()
    )

Combining Multiple Flows

@HiltViewModel
class DashboardViewModel @Inject constructor(
    private val mealRepository: MealRepository,
    private val userRepository: UserRepository,
    private val preferencesRepository: PreferencesRepository
) : ViewModel() {

    private val selectedDate = MutableStateFlow(LocalDate.now())

    val uiState: StateFlow<DashboardUiState> = combine(
        selectedDate,
        mealRepository.observeMeals(),
        userRepository.observeGoals(),
        preferencesRepository.observeSettings()
    ) { date, meals, goals, settings ->
        val todaysMeals = meals.filter { it.date == date }
        val totals = calculateTotals(todaysMeals)

        DashboardUiState(
            date = date,
            meals = todaysMeals,
            goals = goals,
            totals = totals,
            settings = settings
        )
    }.stateIn(
        scope = viewModelScope,
        started = SharingStarted.WhileSubscribed(5000),
        initialValue = DashboardUiState()
    )

    // Derived state
    val nutritionProgress: StateFlow<Float> = uiState
        .map { state ->
            if (state.goals.calories > 0) {
                (state.totals.calories / state.goals.calories).coerceIn(0f, 1f)
            } else 0f
        }
        .stateIn(viewModelScope, SharingStarted.Lazily, 0f)
}

Structured Concurrency

// coroutineScope - Waits for all children, fails if any fail
suspend fun processData(): ProcessedData = coroutineScope {
    val part1 = async { processPart1() }
    val part2 = async { processPart2() }

    ProcessedData(part1.await(), part2.await())
}

// supervisorScope - Failure of one child doesn't cancel siblings
suspend fun loadMultipleSources(): List<Result<Data>> = supervisorScope {
    val sources = listOf("source1", "source2", "source3")

    sources.map { source ->
        async {
            try {
                Result.success(fetchFromSource(source))
            } catch (e: Exception) {
                Result.failure(e)
            }
        }
    }.awaitAll()
}

// Job hierarchy
viewModelScope.launch {  // Parent job
    val job1 = launch { task1() }  // Child job
    val job2 = launch { task2() }  // Child job

    // If viewModelScope cancelled, both children cancelled
    // If job1 fails, job2 also cancelled (unless supervisorScope)
}

Cancellation

// Cooperative cancellation
suspend fun processItems(items: List<Item>) {
    items.forEach { item ->
        ensureActive()  // Check for cancellation
        // or: yield()
        processItem(item)
    }
}

// withTimeout
suspend fun fetchWithTimeout(): Data {
    return withTimeout(5000) {
        api.fetchData()
    }
}

// withTimeoutOrNull - Returns null on timeout
suspend fun fetchOrNull(): Data? {
    return withTimeoutOrNull(5000) {
        api.fetchData()
    }
}

// NonCancellable for cleanup
suspend fun closeResources() {
    withContext(NonCancellable) {
        // This will complete even if parent cancelled
        database.close()
        file.delete()
    }
}

Interview Questions

Q: What's the difference between StateFlow and SharedFlow?

AspectStateFlowSharedFlow
Initial valueRequiredOptional
Current value.value propertyNo .value
ReplayAlways 1Configurable (0+)
EqualityConflates equal valuesEmits all values
Use caseUI stateEvents, broadcasts
// StateFlow - Current state
private val _uiState = MutableStateFlow(UiState())
val uiState = _uiState.asStateFlow()

// SharedFlow - Events (no replay, no current value)
private val _events = MutableSharedFlow<Event>()
val events = _events.asSharedFlow()

Q: Explain the difference between launch and async.

  • launch: Returns Job, fire-and-forget, exceptions propagate to parent
  • async: Returns Deferred<T>, get result with .await(), exceptions thrown when awaiting
// launch - Don't need result
launch { saveToDatabase(data) }

// async - Need result
val userDeferred = async { fetchUser() }
val postsDeferred = async { fetchPosts() }

val user = userDeferred.await()
val posts = postsDeferred.await()

Q: How do you handle back-pressure in Flow?

// buffer - Decouple producer/consumer speeds
flow.buffer()  // Default buffer
flow.buffer(64)  // Custom size
flow.buffer(Channel.CONFLATED)  // Keep latest only

// conflate - Drop intermediates, keep latest
flow.conflate()

// collectLatest - Cancel previous collection
flow.collectLatest { value ->
    // If new value arrives, this is cancelled
    processValue(value)
}

// debounce - Emit after quiet period
searchQuery.debounce(300)
    .collectLatest { query ->
        search(query)
    }

// sample - Emit at intervals
sensorData.sample(100)  // Every 100ms

Q: What is structured concurrency and why is it important?

Structured concurrency ensures:

  1. Lifetime bound to scope - When scope cancels, all children cancel
  2. Errors propagate - Child failure cancels siblings (unless supervisor)
  3. No leaked coroutines - All coroutines complete before scope ends
// Structured - job tied to viewModelScope
viewModelScope.launch {
    // Cancelled when ViewModel cleared
}

// Unstructured - DANGEROUS - can leak
GlobalScope.launch {
    // Never cancelled! Runs until completion
}
// ViewModel cleared but GlobalScope keeps running = memory leak!

Common Mistakes

1. Not Handling CancellationException

// BAD - Swallows cancellation
try {
    delay(1000)
} catch (e: Exception) {
    // CancellationException caught, coroutine won't cancel!
}

// GOOD - Rethrow CancellationException
try {
    delay(1000)
} catch (e: CancellationException) {
    throw e
} catch (e: Exception) {
    handleError(e)
}

2. StateFlow Not Updating UI

// BAD - Same object reference, StateFlow won't emit
_uiState.value.meals.add(newMeal)  // Mutating same list!

// GOOD - New object (immutable update)
_uiState.update { current ->
    current.copy(meals = current.meals + newMeal)
}

3. Using GlobalScope

// BAD - Leaked coroutine
GlobalScope.launch {
    while (true) {
        pollServer()
        delay(5000)
    }
}

// GOOD - Scoped to component lifecycle
@Singleton
class PollingService @Inject constructor(
    @ApplicationScope private val scope: CoroutineScope
) {
    fun startPolling() {
        scope.launch {
            while (isActive) {
                pollServer()
                delay(5000)
            }
        }
    }
}

4. Collecting Flow on Wrong Thread

// BAD - Database operation on main thread
viewModelScope.launch {
    database.observeItems().collect { items ->
        // This might run on Main!
    }
}

// GOOD - Use flowOn for upstream
database.observeItems()
    .flowOn(Dispatchers.IO)
    .collect { items -> ... }

Common Mistake

WhileSubscribed(5000) keeps upstream alive during config changes. Without the timeout, rotating the device restarts the entire Flow.

Summary Table

ConceptPurposeKey Points
StateFlowHot flow with valueRequires initial value, conflates
SharedFlowHot flow for eventsNo initial value, configurable replay
combineMerge multiple flowsEmits when any flow emits
zipPair emissionsEmits only when both emit
flatMapLatestCancel previousPerfect for search
stateInConvert to StateFlowUse WhileSubscribed(5000)
supervisorScopeIsolated failuresChildren don't cancel siblings
ensureActive()Cooperative cancellationCheck in loops

Next in series: Part 7 - Custom UI & Animations covers Canvas drawing, gesture handling, and Material 3 theming.

← Previous

Building Custom Android UI and Animations

Next →

Design Patterns I Keep Coming Back To