In this article we will cover how to implement a Redux library with Kotlin and Rx. But first things first, let’s start with the small introduction to Redux library:
In essence Redux is an architecture that has three pillars:

  1. A single source of truth for your application, which is called state.
  2. Immutability, this state can not be modified, only replaced by another.
  3. Purity, a single function without side effects (called reducer) that generates these new states from actions.

 

How to implement a Redux library with kotlin and Rx

Normally the state is encapsulated in what is usually called “Store”, when creating the store we will specify what is the initial state of the application and what is the reducing function.
From then on, we can subscribe to changes in status and send actions to modify the status of the application.

Small example of a counter:
The example case for this article will be a simple counter, but this architecture is completely scalable.

There will be two actions:

sealed class Action {
    object Increment : Action()
    object Decrement : Action()
    class Set(val value: Int) : Action()
}

Thanks to the sealed classes of kotlin this will allow us to have an exhaustive switch case with the addition that the actions may have different parameters.

And the reduce function would be:

fun reducer(state: Int, action: Action) : Int = when(action) {
    is Action.Increment -> state+1
    is Action.Decrement -> state-1
    is Action.Set -> action.value
}

 

Basic implementation with Rx:
If we loot at the available functions in Rx, we will quickly realize that there is a function that has a seed and a reduce function as parameters. So it seems that we already have all the work done:
We create a subject to pass the actions:

val actions: Subject = PublishSubject.create()

And we apply the reduce function to have our states:

val states = actions.reduce(0, ::reducer)

That’s it, now to perform an action we simply pass it to the subject:

actions.onNext(Action.Increment)


The problem is that the Rx reduce method returns a Single, not an Observable, so we will only have our final state if we tell the actions that we have finished.

actions.onComplete()

But we do not want that, what we want is that our application reacts to all the actions as they are carried out, not when we finish with it.

GSAS

Creating the Store:
The solution to this problem becomes evident if we really encapsulate this logic in a class that maintains the current state at all times:

class Store<S, A>(initialState: S, private val reducer: (S, A) -> S){

    var currentState = initialState
        private set

    val actions = PublishSubject.create()

    val states= actions.map {
        currentState = reducer(currentState, it)
        currentState
    }
}

 

In addition, when we carry out this step we gain a new functionality, we can check the status at a specific moment without having to subscribe to all the status changes.

But if we perform the following test we will see that we have not finished yet:

store.states.subscribe { println("state $it") }
    store.actions.onNext(Action.Increment)
    store.actions.onNext(Action.Increment)
    store.states.subscribe { println("sorry I am late $it") }
    store.actions.onNext(Action.Increment)
    println("final state: ${store.currentState}")

The output of this code is not what we would expect, we have only made three incremental actions, but when a new subscription is added the output is the following:
state 1
state 2
state 3
sorry I am late 4
final state: 4

Each time a new action arrives, it is executed by each subscriber in the observable. Which also means that if nobody is watching our actions are lost.
Luckily this problem can be solved in a short line of code:

val states: Observable = actions.map {
        currentState = reducer(currentState, it)
        currentState
    }.publish().apply { disposable = connect() }


With the publish () we create a connectable observable that is waiting for a connection to start issuing states.
With the ‘.apply {disposable = connect ()}’ we connect as soon as we create it, in this way all the actions will be executed even if we do not have any subscriber. We also keep the disposable of this connection in case of having to implement some kind of life cycle for the Store.

Too much purity:

With a good reducer function, without any side effects and always returning valid states, we still have a problem:

Modifying the state because of user actions is simple, but what happens when we want to maintain a persistence, make logs or collect data from a backend?
Our reducer function can not take care of these actions, especially if they are asynchronous. For this we have to introduce a new concept to the mix that will help us to make side effects, maintaining the reducer function as a pure function and being able to control which side effects are carried out.

Middlewares:
A Middleware is a software component that will allow us to introduce the desired side effects without breaking the architecture as planned.

interface Middleware<S, A> {
    fun dispatch(store: Store<S, A>, next: (A) -> Unit, action: A)
}

A middleware consists of a single function that receives as parameters the store in which it is installed, a function to continue with the execution flow and the action to be performed.
In this way the middleware has all the information it may need as well as a mechanism to continue the execution.

The following example would be a middleware that does not perform any action:

val doNothing = object : Middleware<Int, Action> {
    override fun dispatch(store: Store<Int, Action>, next: (Action) -> Unit, action: Action) {
        next(action)
    }
}

And this one that prints on the screen the current state before and after executing an action:

val printLogger = object : Middleware<Int, Action> {
    override fun dispatch(store: Store<Int, Action>, next: (Action) -> Unit, action: Action) {
        println("State is ${store.currentState} before action: $action")
        next(action)
        println("State is ${store.currentState} after action: $action")
    }
}

As it is the middleware itself that decides at what point in its code the action is executed, they have the flexibility to perform all types of tasks, including asynchronous calls. Being able to capture the action that indicates that the request has to be sent and sending a new action when the data arrive.

fun Middleware<S, A>.decompose() =
        { s: Store<S, A> ->
            { next: (A) -> Unit ->
                { a: A ->
                    dispatch(s, next, a)
                }
            }
        }

This function decomposes the middleware so that we can first pass the store, then the continuation function and to finish the action.

Now we will define a function that allows us to compose Middlewares:

private fun Middleware<S, A>.combine(other: Middleware<S, A>) = object : Middleware<S, A> {
        override fun dispatch(store: Store<S, A>, next: (A) -> Unit, action: A) =
            this@combine.dispatch(store, other.decompose()(store)(next), action)
    }

Thanks to the fact that we have been able to decompose the second middleware, we can pass only two of the parameters and pass the resulting function as next for the other.

private inline fun <R, T : R> Array.reduceOrNull(operation: (acc: R, T) -> R) =
        if (this.isEmpty()) null else this.reduce(operation) 
   
private fun Array>.combineAll(): Middleware<S, A>? =
            this.reduceOrNull { acc, middleware -> acc.combine(middleware)}

With these two functions we can now reduce an entire Array of middlewares easily. With this we can make the last one to finish the Store.

private val actions = PublishSubject.create()
    fun dispatch(action: A) = middlewares.combineAll()?.dispatch(this, actions::onNext, action) ?: actions.onNext(action)

Instead of directly exposing the actions, we add a function that combines all the Middleware and uses the result, or if there are no Middlewares, it sends the result directly to the Subject of actions.

Final result:
We have managed to write a library of about 40 lines that implements the most important functionalities to implement redux library and with built-in support for Rx out of the box:

interface Middleware<S, A> {
    fun dispatch(store: Store<S, A>, next: (A) -> Unit, action: A)

    fun combine(other: Middleware<S, A>) : Middleware<S,A> = object : Middleware<S, A> {
        override fun dispatch(store: Store<S, A>, next: (A) -> kotlin.Unit, action: A) =
            this@Middleware.dispatch(store, other.decompose()(store)(next), action)
    }

     fun decompose() =
        { s: Store<S, A> ->
            { next: (A) -> Unit ->
                { a: A ->
                    dispatch(s, next, a)
                }
            }
        }
}

inline fun <R, T : R> Array.reduceOrNull(operation: (acc: R, T) -> R) = if (this.isEmpty()) null else this.reduce(operation)

fun <S,A> Array>.combineAll() =
    this.reduceOrNull { acc, middleware -> acc.combine(middleware) }

class Store<S, A>(initialState: S, private val reducer: (S, A) -> S, private vararg val middlewares: Middleware<S, A>) {

    var currentState = initialState
        private set

    private val disposable: Disposable

    private val actions = PublishSubject.create()

    val states: Observable = actions.map {
        currentState = reducer(currentState, it)
        currentState
    }.publish().apply { disposable = connect() }


    fun dispatch(action: A) =
        middlewares.combineAll()?.dispatch(this, actions::onNext, action) ?: actions.onNext(action)
}

 

If you are interested in Redux Library, I highly recommend you to subscribe to our monthly newsletter