Distrito Telefónica. Hub de Innovación y Talento

Volver
Development

Gestión del contexto en aplicaciones asíncronas con Reactor y Kotlin

Tradicionalmente en aplicaciones JVM de un hilo por solicitud la solución es utilizar ThreadLocals, que permiten almacenar información solo visible para el hilo actual facilitando la codificación alrededor de la misma y haciéndola disponible a todo el código implícitamente sin necesidad de pasarla explícitamente como parámetros. 

Pero cuando se trata de no-bloqueo asíncrono como múltiples tareas pueden intercalar en el mismo hilo o una tarea puede ejecutar sin problemas en diferentes hilos a lo largo de su ciclo de vida de la solución ThreadLocal no funciona (al menos no fuera de la caja). 

Dos modelos asíncronos no bloqueantes cada vez más populares son Reactor que impulsa Spring WebFlux, y Corrutinas de Kotlin que ofrece una de las mejores abstracciones para trabajar con código asíncrono. Veamos cómo podemos tratar el contexto en estos dos modelos, pero antes introduzcamos cómo funcionan. 

Contextos asíncronos

Ambos modelos ofrecen su propio mecanismo para manejar el contexto, explorémoslos un poco: 

Proyecto Reactor

Reactor tiene un Contexto que es básicamente una interfaz similar a Map vinculada a un Flux o Mono de Reactor. Los principales puntos de acceso son deferContextual para leer y contextWrite escritura: 

val key = "someKey"  
val helloMono = Mono.just("Hello World! Welcome ")  
   .flatMap { s -> Mono.deferContextual { context -> Mono.just(s + context[key]) } }  
   .contextWrite { it.put(key, "user") }  
 
helloMono.block() // Will produce "Hello World! Welcome user" 
 
Mientras la cadena de editores no se interrumpa, el contexto se mantendrá y estará disponible para todos los operadores de la cadena. 

Corrutinas de Kotlin

De forma similar, en el mundo de las corrutinas tenemos el contexto de corrutina que está implícitamente disponible y se pasa a todas las funciones de suspensión como parte de la función Continuación. Los principales puntos de acceso son currentCoroutineContext para lectura y constructores como withContext para escribir, por ejemplo: 

data class CurrentUser(  
   val name: String  
) : AbstractCoroutineContextElement(CurrentUser) {  
   companion object Key : CoroutineContext.Key<CurrentUser>  
}  
 
suspend fun entry() {  
   withContext(CurrentUser("user")) {  
       println(doStuff()) // Will print "Hello World! Welcome user"  
   }  
}  
 
suspend fun doStuff(): String {  
   val user = currentCoroutineContext()[CurrentUser.Key]?.name ?: "unknown"  
   return "Hello World! Welcome $user"  
} 

Así, si fuéramos a escribir una nueva aplicación usando Reactor + Corrutinas la mejor forma de almacenar nuestro contexto sería el Reactor Context, de esa forma está disponible en los operadores Reactor y en todas las funciones de suspensión (indirectamente a través del CoroutineContext). 

Hasta ahora hemos visto cómo tratar en ambos modelos con sus contextos y cómo mezclarlos, pero ¿qué ocurre con el código preexistente que se basa en el almacenamiento basado en hilos? Comprobemos cómo lo hemos combinado todo. 

Trabajar con código heredado basado en hilos

Aunque muchas bibliotecas están migrando al mundo asíncrono todavía hay muchas que dependen de que el contexto esté siempre disponible para ellas, además podríamos tener mucho código heredado basado en el mismo, por lo que necesitamos tender un puente entre estos dos mundos. 

Siguiendo nuestro enfoque de hacer del Contexto Reactor el contexto principal accesible en Corrutinas de Kotlin, aplicaremos lo mismo para la base de hilos, haciéndola disponible como local de hilos. 

Las corrutinas de Kotlin nos ofrecen una solución para esto con ThreadContextElement un gancho que se llama cada vez que se reanuda una corrutina, para que pueda restaurar su contexto de hilo y guardarlo cuando se suspende. Kotlin llamará a updateThreadContext cuando la corrutina se reanude en el hilo y a restoreThreadContext cuando la corrutina se suspenda. Podemos usar esto para mover el Contexto del Reactor entre hilos tomándolo del Contexto de la corrutina. 

object ReactorContextHolder {  
   @JvmStatic  
   private val holder = ThreadLocal<ContextView>()  
 
   fun preserve(context: ContextView?) {  
       holder.set(context)  
   }  
 
   fun get(): ContextView? = holder.get()  
 
   fun clear() {  
       holder.remove()  
   }  
 
   fun asCoroutinesHook() = ReactorContextPropagator()  
 
   class ReactorContextPropagator : ThreadContextElement<ContextView?> {  
       companion object Key : CoroutineContext.Key<ReactorContextPropagator>  
 
       override val key = Key  
 
       override fun updateThreadContext(context: CoroutineContext): ContextView? =  
           holder.get().also {  
               runCatching {  
                   val currentContext = context[ReactorContext]?.context?.readOnly()  
                   preserve(currentContext)  
               }  
           }  
 
       override fun restoreThreadContext(context: CoroutineContext, oldState: ContextView?) {  
           runCatching {  
               preserve(oldState)  
           }  
       }  
   }  
} 

Esto nos permite tener el Contexto del Reactor disponible para todo el código y puede ser usado para almacenar cualquier dato contextual de la solicitud que podamos necesitar. 

Por ejemplo, supongamos una función típica de Spring

override fun handle(request: ServerRequest): Mono<ServerResponse> {  
   return mono(Dispatchers.Unconfined) { // From kotlinx-coroutines-reactor  
       withContext(reactorContextHolder.asCoroutinesHook()) { // Installs our hook  
           // Suspend code here  
       }  
   }  
} 
 

Después de esto siempre tendremos el ContextView correcto disponible con "ReactorContextHolder.get()" incluso si no estamos en un operador Reactor o en una función de suspensión. 

Un enfoque similar se puede tomar en el lado del reactor utilizando onEachOperator hook se puede instalar un operador lift que restaure thread-locals a partir de valores del contexto del reactor. Nosotros no lo utilizamos, una diferencia principal con nuestro enfoque de corrutinas es que esto funciona en todas las acciones de los operadores (imponiendo una mayor sobrecarga de rendimiento), y nos transformamos al mundo de las corrutinas tan pronto como fue posible, haciendo solo un trabajo mínimo en los operadores de reactor, por lo que la disponibilidad thread-local durante esos momentos no era importante para nosotros. 

Además de reutilizar el código base de nuestra biblioteca, tener el contexto de Reactor disponible globalmente ofrece otra ventaja, la integración con algunas bibliotecas de terceros, especialmente en nuestro caso log4j2 y Retrofit

Integración de terceros: Log4J2

Una característica muy útil de Log4J2 (y la mayoría de los marcos de trabajo de registro de java) es la posibilidad de añadir información contextual adicional a todos los registros, algo parecido a: 

ThreadContext.put("request", request.context.requestId.toString()) 


Esto decoraría todos los registros posteriores con una etiqueta "solicitud" con algún traceId que generemos para cada solicitud. 

El problema viene cuando Log4J2, de forma predeterminada, utiliza thread-locals para almacenar esos valores, lo que significa que obtendríamos valores erróneos en los registros emitidos por nuestra aplicación asíncrona compartiendo hilos. 

Afortunadamente Log4J2 permite elegir una implementación personalizada para el almacenamiento de ThreadContext, por lo que nuestro enfoque es proporcionar una implementación que aproveche el contexto del reactor (que, como hemos visto, estamos moviendo entre hilos cuando sea necesario) para almacenar los datos. 

Un fragmento mínimo de cómo hacerlo sería (suponiendo que un ConcurrentMap se inicializa para cada solicitud en el contexto Reactor bajo la clave "threadContextMap") 

class ReactorBackedThreadContextMap : ThreadContextMap {  
   // Other methods removed for brevity  
 
override fun get(key: String?): String? {  
       return storage()[key]  
   }  
 
   override fun put(key: String, value: String?) {  
       storage()[key] = value  
   }  
 
   private fun storage(): ConcurrentMap<String, String?> {  
       return ReactorContextHolder.get().get("threadContextMap")  
   }  
} 

Integración de terceros: Retrofit

El caso de retrofit es un poco más interesante, nuestro código heredado usa Retrofit con OkHttp, queríamos seguir usándolo con nuestra pila de no-bloqueo, así que implementamos Spring WebClient como cliente subyacente. También hacemos un amplio uso de ExchangeFilterFunction para registrar/decorar/cachear y otras mejoras sobre las solicitudes. Así que necesitábamos que el contexto fuera correcto al ejecutar estos filtros. 

Y aunque retrofit es compatible con funciones de suspensión, no hay una buena manera de ver el Contexto corrutina en el otro lado del proxy (aunque eso puede cambiar algún día). Para superar esta limitación, utilizamos un Call.Factory personalizado para crear un Call que captura el contexto de nuestro ReactorContextHolder y lo establece correctamente en el reactor Producer al realizar la llamada utilizando el Spring WebClient, haciéndolo disponible para toda la cadena de reactores y el código de corrutinas de Kotlin que sigue, incluyendo las funciones de filtro. 

Como vemos, la gestión del contexto de solicitud cambia un poco cuando se ejecuta en motores asíncronos, pero gracias a la infraestructura de corrutinas podemos salvar la distancia y permitir que nuestro código heredado siga funcionando y nuestro enfoque nos permite ser compatibles con bibliotecas de terceros que no están completamente adaptadas a este modelo.