Distrito Telefónica. Innovation & Talent Hub

Back
Development

Context Management in Asynchronous Applications with Reactor and Kotlin

Traditionally in one-thread-per-requests JVM applications the solution is to use ThreadLocals, they allow to store information only visible to the current thread making it easy to code around and making it available to all the code implicitly without the need to pass it explicitly as parameters. 

But when dealing with non-blocking asynchronous as multiples tasks can interleave in the same thread or one task can execute seamlessly in different threads along its lifecycle the ThreadLocal solution doesn’t work (at least not out of the box). 

Two non-blocking asynchronous models that are more and more popular are Reactor, which powers Spring WebFlux, and Kotlin coroutines which offers one of the best abstractions to work with asynchronous code. Let’s see how we can deal with context in these two models, but let’s first introduce how they work. 

Asynchronous contexts

Both models offer their own mechanism for handling context, let’s explore them a bit: 

Project Reactor

Reactor has a Context which is basically a Map like interface tied to a Reactor’s Flux or Mono. The main access points for it are deferContextual for reading and contextWrite for writing: 

val key = "someKey"  
val helloMono = Mono.just("Hello World! Welcome ")  
   .flatMap { s -> Mono.deferContextual { context -> Mono.just(s + context[key]) } }  
   .contextWrite { it.put(key, "user") }  
 
helloMono.block() // Will produce "Hello World! Welcome user" 

As long as the Publisher chain is uninterrupted the context will be kept and available for the whole chain operators. 

Kotlin Coroutines

Similarly in the world of coroutines we have the coroutine context which is implicitly available and passed around to all suspend functions as part of the Continuation. The main access points are currentCoroutineContext for reading and builders like withContext for writing for example: 

data class CurrentUser(  
   val name: String  
) : AbstractCoroutineContextElement(CurrentUser) {  
   companion object Key : CoroutineContext.Key<CurrentUser>  
}  
 
suspend fun entry() {  
   withContext(CurrentUser("user")) {  
       println(doStuff()) // Will print "Hello World! Welcome user"  
   }  
}  
 
suspend fun doStuff(): String {  
   val user = currentCoroutineContext()[CurrentUser.Key]?.name ?: "unknown"  
   return "Hello World! Welcome $user"  
} 

So, if we were to write a new application using Reactor + Coroutines the best way to store our context would be the Reactor Context, that way it is available in the Reactor operators and in all suspend functions (indirectly through the CoroutineContext). 

Until now we have seen how to deal in both models with their contexts and how to mix them, but what happens with pre-existing code that are based on thread-base storage? Let’s check how we have mixed all together. 

Working with legacy thread-base code

Even though many libraries are migrating to the async world there are still lot of libraries that depend on the context being always available to them, moreover we could have many legacy code based on it, so we need to bridge these two worlds. 

Following our approach of making the Reactor Context the main context accessible at Kotlin coroutines, we will apply the same for the thread base, making it available as a thread local. 

Kotlin coroutines offers us a solution for this with ThreadContextElement, a hook that is called every time a coroutine is resumed, so it can restore its thread context and save it when suspended. Kotlin will call updateThreadContext when the coroutine is resumed in thread and restoreThreadContext when the coroutine is suspended. We can use this to move the Reactor Context between threads taking it from the coroutine Context. 

object ReactorContextHolder {  
   @JvmStatic  
   private val holder = ThreadLocal<ContextView>()  
 
   fun preserve(context: ContextView?) {  
       holder.set(context)  
   }  
 
   fun get(): ContextView? = holder.get()  
 
   fun clear() {  
       holder.remove()  
   }  
 
   fun asCoroutinesHook() = ReactorContextPropagator()  
 
   class ReactorContextPropagator : ThreadContextElement<ContextView?> {  
       companion object Key : CoroutineContext.Key<ReactorContextPropagator>  
 
       override val key = Key  
 
       override fun updateThreadContext(context: CoroutineContext): ContextView? =  
           holder.get().also {  
               runCatching {  
                   val currentContext = context[ReactorContext]?.context?.readOnly()  
                   preserve(currentContext)  
               }  
           }  
 
       override fun restoreThreadContext(context: CoroutineContext, oldState: ContextView?) {  
           runCatching {  
               preserve(oldState)  
           }  
       }  
   }  
} 

This allows us to have the Reactor Context available to all code and can be used to store any request contextual data we may need. 

For instance, let’s assume a typical Spring handler function 

override fun handle(request: ServerRequest): Mono<ServerResponse> {  
   return mono(Dispatchers.Unconfined) { // From kotlinx-coroutines-reactor  
       withContext(reactorContextHolder.asCoroutinesHook()) { // Installs our hook  
           // Suspend code here  
       }  
   }  
} 

After that we’ll always have the correct ContextView available with “ReactorContextHolder.get()” even if we are not in a Reactor operator or a suspend function. 

A similar approach can be taken on reactor side using onEachOperator hook you could install a lift operator that restores thread-locals from values from the reactor context. We didn’t use this, one main difference with our coroutines approach is that this works on all operators’ actions (imposing a greater performance overhead), and we transform to coroutines world as early as possible, only doing minimal work within reactor operators so the thread-local availability during those moments wasn’t important for us. 

Besides reusing our library codebase having the Reactor context globally available offers another advantage, integration with some third-party libraries, notably in our case log4j2 and Retrofit

Third-party integration: Log4J2

A very useful feature of Log4J2 (and most java logging frameworks) is the ability to add extra contextual information to all logs, it looks something like: 

ThreadContext.put("request", request.context.requestId.toString()) 


This would decorate all subsequent logs with a “request” tag with some traceId we generate for each request. 

The problem comes when Log4J2, by default, uses thread-locals to store those values, meaning that we would get wrong values in the logs emitted by our asynchronous application sharing threads. 

Thankfully Log4J2 allows to pick a custom implementation for the storage of ThreadContext, so our approach is to provide an implementation that leverages the reactor context (which, as we have seen, we are moving between threads when needed) to store the data. 

A minimal snippet of how to do this would be (assuming a ConcurrentMap gets initialized for each request on the Reactor context under the “threadContextMap” key) 

class ReactorBackedThreadContextMap : ThreadContextMap {  
   // Other methods removed for brevity  
 
override fun get(key: String?): String? {  
       return storage()[key]  
   }  
 
   override fun put(key: String, value: String?) {  
       storage()[key] = value  
   }  
 
   private fun storage(): ConcurrentMap<String, String?> {  
       return ReactorContextHolder.get().get("threadContextMap")  
   }  
} 

Third-party integration: Retrofit

The retrofit case is a bit more interesting, our legacy code uses Retrofit with OkHttp, we wanted to keep using it with our non-blocking stack, so we implemented Spring WebClient as the underlying client. We also make extensive use of ExchangeFilterFunction to log/decorate/cache and other enhancements over the requests. So, we needed the context to be correct when running those filters. 

And even though retrofit supports suspend functions, there is not a good way to see the coroutine Context on the other side of the proxy (although that may change some day). To overcome this limitation, we use a custom retrofit Call.Factory to create a Call that captures the context from our ReactorContextHolder and sets it properly in the reactor Producer when doing the call using the Spring WebClient, making it available for all the reactor chain and Kotlin coroutines code that follows, including the filter functions. 

As we can see, request context management changes a bit when running on asynchronous engines, but thanks to coroutines infrastructure we can bridge the gap and allow our legacy code to keep working and our approach allows us to be compatible with third-party libraries that are not completely adapted to that model.