
Kotlin Coroutinen gehen über die reine Nebenläufigkeit hinaus
Jedes Mal, wenn Sie von Kotlin Coroutinen hören, denken Sie wahrscheinlich an eine einfache, prägnante und leistungsstarke Lösung für die Bearbeitung asynchroner Aufgaben, wie zum Beispiel Netzwerkanfragen. Aber ist das ihr einziger Zweck? Betrachten wir die Verwendung von Kotlin Coroutinen jenseits der Nebenläufigkeit.
Kotlin Coroutinen Primitive
Beginnen wir damit, zu verstehen, wie der zugrunde liegende Mechanismus von Coroutinen funktioniert. Wenn wir uns die Kotlin Coroutinen Primitive ansehen, handelt es sich nur um ein paar Klassen und Funktionen:
ContinuationCoroutineContextsuspendCoroutinecreateCoroutinestartCoroutine
Das ist alles, was wir in unserem kotlin-stdlib haben. Aber wofür werden sie verwendet? Tauchen wir tiefer in die Funktionsweise von Kotlin Coroutinen ein.
Continuation
Continuation ist lediglich ein Interface mit zwei Mitgliedern: context und resumeWith:
public interface Continuation<in T> {
public val context: CoroutineContext
public fun resumeWith(result: Result<T>)
}
Was tut es und welchen Zweck hat es? Continuation – ist wörtlich die primäre Coroutine. Es ist wie ein verbesserter Callback mit der Fähigkeit, zusätzliche Informationen zu verbreiten und ein nützliches Ausführungsergebnis an die Coroutine zu liefern.
Wir haben noch nicht über CoroutineContext gesprochen, betrachten wir vorerst nur resumeWith.
resumeWith
Wie jeder andere Callback ist dies eine Funktion, die aufgerufen wird, wenn die Coroutine ihre Arbeit beendet. Sie verwendet kotlin.Result, um alle in der Coroutine auftretenden Ausnahmen sicher weiterzuleiten.
Wir können also unsere Nebenläufigkeitslogik mithilfe von Continuation auf folgende Weise erstellen:
suspend fun executeNetworkRequest(): String =
suspendCoroutine { continuation ->
thread {
continuation.resumeWith(someBlockingRequest())
}
}
suspendCoroutine– ist eine Brücke zwischen Kotlin Coroutinen-Code und nicht-Coroutinen-basiertem Code.
Oder verwenden Sie eine bestehende asynchrone (Pseudocode):
suspend fun executeNetworkRequest(): String =
suspendCoroutine { continuation ->
apiService.getSomething()
// onSuccess & onFailure sind Callbacks
.onSuccess(continuation::resume)
.onFailure(continuation::resumeWithException)
.executeAsync()
}
Continuation<in T>.resume(..)ist die Erweiterung, um die Übergabe vonkotlin.Resultjedes Mal zu vermeiden.
Wir können also nicht nur unsere Nebenläufigkeitslogik implementieren, sondern auch bestehende nutzen und sie mit Kotlin Coroutinen zum Laufen bringen.
startCoroutine
Wir können auch Suspend-Funktionen aus nicht-Suspend-Kontexten mit startCoroutine starten. In Kotlin wird dies immer am Ende verwendet, wenn Ihre main-Funktion suspend ist.
kotlinx.coroutinesverwendet es auch, um Coroutinen auszuführen, aber der Mechanismus dort ist natürlich viel komplexer.
import kotlin.coroutines.*
fun main() {
val operation: suspend () -> Unit = ::a
operation.startCoroutine(
object : Continuation<Unit> {
// Wir werden später darüber sprechen
override val coroutineContext = EmptyCoroutineContext
// Wird aufgerufen, wenn die Coroutine ihre Arbeit beendet hat
override fun resumeWith(result: Result<Unit>) {
println(
if(result.isSuccess)
"Erfolgreich abgeschlossen"
else "Fehler aufgetreten: ${result.exceptionOrNull()}"
)
}
}
)
}
suspend fun a() {...}
Aber natürlich können Sie Suspend-Funktionen aus
kotlinx.coroutinesnicht einfach auf diese Weise aufrufen, wenn Sie Coroutinen ausführen.
CoroutineContext
Jetzt kommen wir zu einem weiteren Mitglied von Continuation – CoroutineContext. Wozu dient es?
CoroutineContext ist ein Provider, der die benötigten Daten an die Coroutine weitergibt. In der realen Welt geht es dabei normalerweise um die Übergabe von Parametern über eine komplexe Kette von Coroutinen hinweg.
Um es klarer auszudrücken, CoroutineContext in
kotlinx.coroutinessteht für strukturierte Nebenläufigkeit.
Einfaches Beispiel
Erstellen wir aus unserem vorherigen Beispiel für startCoroutine Code mit der Fähigkeit, Werte aus CoroutineContext abzurufen:
import kotlin.coroutines.*
// definieren wir unseren Container für Daten, die wir innerhalb der Coroutine benötigen
// er sollte immer von 'CoroutineContext.Element' erben
data class ExecutionContext(val someInt: Int) : CoroutineContext.Element {
override val key = EXECUTION_CONTEXT_KEY
}
// definieren wir einen typensicheren Schlüssel, mit dem wir unseren Wert erhalten
val EXECUTION_CONTEXT_KEY = object : CoroutineContext.Key<ExecutionContext> {}
// definieren wir den Coroutine-Kontext, den wir an Continuation übergeben werden
private val myCoroutineContext = object : CoroutineContext {
private val values = mapOf<CoroutineContext.Key<*>, CoroutineContext.Element>(
EXECUTION_CONTEXT_KEY to ExecutionContext(10000)
)
override operator fun <E : CoroutineContext.Element> get(key: CoroutineContext.Key<E>): E? {
return values[key] as? E
}
// .. wir lassen andere Funktionen der Einfachheit halber weg
}
suspend fun a() {
// hier rufen wir den Wert aus dem Coroutine-Kontext ab
// coroutineContext ist Compiler-intrinsisch, kann nur
// aus der Suspend-Welt aufgerufen werden
val executionContext = coroutineContext[EXECUTION_CONTEXT_KEY]!!
println(executionContext.someInt!!)
}
fun main() {
val operation: suspend () -> Unit = ::a
operation.startCoroutine(
object : Continuation<Unit> {
override val context: CoroutineContext = myCoroutineContext
override fun resumeWith(result: Result<Unit>) {
println(
if(result.isSuccess)
"Erfolgreich abgeschlossen"
else "Fehler aufgetreten: ${result.exceptionOrNull()}"
)
}
}
)
}
CoroutineContext.Element– ist der Abstrakt, der zum Speichern von Elementen inCoroutineContextverwendet wird.
CoroutineContext.Key– ist der Bezeichner vonCoroutineContext.Element.
Sie können diesen Code hier ausprobieren.
Beispiel aus der Praxis
Stellen wir uns vor, wir haben unseren API-Dienst. Normalerweise benötigen wir eine Autorisierungsebene, daher betrachten wir das nächste Beispiel (als solches habe ich gRPC genommen):
// definieren wir ein Element, das in `CoroutineContext` bestehen bleibt
data class AuthorizationContext(
val accessHash: String?,
val provider: AuthorizationProvider,
) : CoroutineContext.Element {
companion object Key : CoroutineContext.Key<AuthorizationContext>
override val key: CoroutineContext.Key<*> = Key
}
// jetzt definieren wir unseren Interceptor
class AuthorizationInterceptor(
private val authorizationProvider: AuthorizationProvider,
) : CoroutineContextServerInterceptor() {
companion object {
private val ACCESS_TOKEN_METADATA_KEY: Metadata.Key<String> =
Metadata.Key.of("access-token", Metadata.ASCII_STRING_MARSHALLER)
}
override fun coroutineContext(call: ServerCall<*, *>, headers: Metadata): CoroutineContext {
return AuthorizationContext(
accessHash = headers.get(ACCESS_TOKEN_METADATA_KEY),
provider = authorizationProvider,
)
}
}
CoroutineContext in kotlinx.coroutines ist buchstäblich nur eine Map<K : CoroutineContext.Key, V : CoroutineContext.Element> (genauer gesagt, es ist eine ConcurrentHashMap auf der JVM, zum Beispiel), genau wie in unserem obigen Beispiel. Aber, wenn wir über kotlinx.coroutines sprechen, wird es an alle Kind-Coroutinen innerhalb der gewünschten Coroutine weitergegeben (einen solchen Mechanismus hatten wir nicht).
So können wir es jetzt in Kind-Coroutinen abrufen:
suspend inline fun provideAuthorization(block: (UserId) -> Unit) {
val authContext = coroutineContext[AuthorizationContext]
authContext.accessHash ?: throw StatusException(Status.UNAUTHORIZED)
val userId = authContext.provider.provide(authContext.accessHash)
return block(userId)
}
Interessante Tatsache: coroutineContext ist die einzige Eigenschaft in Kotlin, die den
suspend-Modifikator hat. 👀
Für gRPC müssen wir auch unseren Interceptor registrieren und unsere RPCs schreiben. Aber die Idee dieser Lösung für gRPC ist einfach – Logik entkoppeln und die Entwicklererfahrung vereinfachen.
Für Java verwendet gRPC ThreadLocal, daher können wir
CoroutineContextauch als Alternative zuThreadLocalbetrachten. Wir könnenThreadLocalinnerhalb von Coroutinen nicht verwenden, da eine Coroutine normalerweise nicht mit einem bestimmten Thread verknüpft ist (insbesondere, wenn wir überwithContextsprechen). Coroutinen werden eher auf einem anderen Thread fortgesetzt (außerdem können verschiedene Coroutinen auf einem einzigen Thread ausgeführt werden).
Aber bedeutet das nicht, dass der einzige Grund, warum Coroutinen existieren, die Nebenläufigkeit ist? Lassen Sie es mich erklären.
Sequence
Eines der häufigsten Beispiele ist die – kotlin.sequences.Sequence<T>. Kurz gesagt, es ist eine faule Sammlung, die nur dann iteriert, wenn Sie Elemente konsumieren. Sie können mehr darüber hier lesen.
Wenn Sie jemals die SequenceScope-Quellen betrachtet haben, verwendet sie unter der Haube Suspend-Funktionen:
@RestrictSuspension
public abstract class SequenceScope<in T> internal constructor() {
/**
* Liefert einen Wert an den erstellten [Iterator] und suspendiert
* bis der nächste Wert angefordert wird.
*/
public abstract suspend fun yield(value: T)
// ... andere
}
@RestrictSuspensionverbietet Konsumenten, nicht-Mitglieds-Suspend-Funktionen aufzurufen.
Die Idee ist also, dass Elemente faul konsumiert werden. Sie können sie als reguläre Sammlungen verwenden und die Vorteile der faulen Iteration nutzen.
Aber wie funktioniert es unter der Haube? Werfen wir einen Blick auf die Implementierungsquellen:
private typealias State = Int
private const val State_NotReady: State = 0
private const val State_ManyNotReady: State = 1
private const val State_ManyReady: State = 2
private const val State_Ready: State = 3
private const val State_Done: State = 4
private const val State_Failed: State = 5
private class SequenceBuilderIterator<T> : SequenceScope<T>(), Iterator<T>, Continuation<Unit> {
private var state = State_NotReady
private var nextValue: T? = null
private var nextIterator: Iterator<T>? = null
var nextStep: Continuation<Unit>? = null
override fun hasNext(): Boolean {
while (true) {
when (state) {
State_NotReady -> {}
State_ManyNotReady ->
if (nextIterator!!.hasNext()) {
state = State_ManyReady
return true
} else {
nextIterator = null
}
State_Done -> return false
State_Ready, State_ManyReady -> return true
else -> throw exceptionalState()
}
state = State_Failed
val step = nextStep!!
nextStep = null
step.resume(Unit) // WICHTIG: Es beginnt die Ausführung des nächsten Yields
}
}
override fun next(): T {
when (state) {
State_NotReady, State_ManyNotReady -> return nextNotReady()
State_ManyReady -> {
state = State_ManyNotReady
return nextIterator!!.next()
}
State_Ready -> {
state = State_NotReady
@Suppress("UNCHECKED_CAST")
val result = nextValue as T
nextValue = null
return result
}
else -> throw exceptionalState()
}
}
private fun nextNotReady(): T {
if (!hasNext()) throw NoSuchElementException() else return next()
}
private fun exceptionalState(): Throwable = when (state) {
State_Done -> NoSuchElementException()
State_Failed -> IllegalStateException("Iterator has failed.")
else -> IllegalStateException("Unexpected state of the iterator: $state")
}
override suspend fun yield(value: T) {
nextValue = value
state = State_Ready
return suspendCoroutineUninterceptedOrReturn { c ->
nextStep = c
COROUTINE_SUSPENDED
}
}
override fun resumeWith(result: Result<Unit>) {
result.getOrThrow() // wirft einfach die Ausnahme erneut, falls vorhanden
state = State_Done
}
override val context: CoroutineContext
get() = EmptyCoroutineContext
}
COROUTINE_SUSPENDEDist eine spezielle Konstante, die intern vom Kotlin-Compiler verwendet wird, um die Suspendierung und Wiederaufnahme von Coroutinen zu verwalten. Es ist nichts, womit Entwickler normalerweise direkt interagieren, sondern dient als internes Signal innerhalb des Coroutinen-Mechanismus.
Sieht etwas schwierig zu lesen aus, nicht wahr? Gehen wir Schritt für Schritt vor:
- Zuerst beginnen wir mit Zuständen. Wir haben die nächsten Zustände, lassen Sie uns kurz darüber sprechen:
- State_NotReady: Der Iterator ist derzeit nicht bereit, ein Element bereitzustellen. Er wartet möglicherweise auf eine Operation oder weitere Verarbeitung, um ein Element verfügbar zu machen.
- State_ManyNotReady: Der Iterator ist darauf vorbereitet, mehrere Elemente bereitzustellen, aber sie sind nicht sofort verfügbar. Er wartet auf ein Signal, dass Elemente zum Verbrauch bereit sind (im Grunde wartet er auf einen Terminaloperator).
- State_ManyReady: Der Iterator ist jetzt bereit, mehrere Elemente bereitzustellen. Er kann sofort das nächste Element aus der Sequenz liefern.
- State_Ready: Der Iterator hat ein einzelnes Element bereit, das bereitgestellt werden kann. Er ist darauf eingestellt, das Element sofort auf Anfrage zu liefern.
- State_Done: Der Iterator hat keine weiteren Elemente mehr bereitzustellen. Er hat seine Aufgabe, Elemente aus der Sequenz zu erzeugen, abgeschlossen. Diesen Zustand erreichen wir, wenn wir
SequenceBuilderverlassen. - State_Failed: Etwas Unerwartetes ist passiert, und der Iterator ist auf ein Problem gestoßen. Normalerweise sollte dies nicht passieren.
hasNextgibt je nach Zustand einen Wert oder eine Reihe von Werten zurück, wenn er bereit ist, zu konsumieren. Darüber hinaus startet er die Ausführung der Sequenz bei jeder Iteration innerhalb vonwhile. Wenn alsoState_NotReadyvorliegt, wird es durch Ausführen der nächsten Yields bereit gemacht.- Die Funktion
nextruft das nächste Element aus dem Iterator basierend auf seinem aktuellen Zustand ab (ähnlich wie beihasNext). WennnextohnehasNextaufgerufen wurde, können SienextNotReady()erreichen. In anderen Situationen wird einfach der Wert zurückgegeben. - Die
yield-Funktion ändert lediglich die Zustände der Sequenziterator-Implementierung. Wenn neue Elemente hinzugefügt werden, wechselt sie zuState_Ready. Die Verwendung vonsuspendCoroutineUninterceptedOrReturnsuspendiert die Coroutine (Ausführung) und setzt sie später fort. Sie wird gestartet, wenn die vorherige Coroutine (Suspend-Punkt) beendet ist.
Um meine Erklärung abzuschließen, beenden wir sie einfach damit, wie wir die gleiche Funktionalität nur mit Callbacks realisieren könnten:
val sequence = sequence {
yield(1) {
yield(2) {
yield(3) { /* ... */ }
}
}
}
Aber es sieht etwas schwierig zu lesen aus, nicht wahr? Deshalb sind Coroutinen in dieser speziellen Situation nützlich.
Am Ende sieht es doch gar nicht so komplex aus, oder?
DeepRecursiveScope
Nun wollen wir uns einen weiteren Anwendungsfall für Kotlin Coroutinen ansehen – DeepRecursiveScope. Wie Sie wahrscheinlich wissen, haben wir normalerweise, wenn eine bestimmte Funktion sich selbst aufruft, eine Wahrscheinlichkeit, einen StackOverflowError zu erhalten, da jeder Aufruf dazu beiträgt, unseren Stack zu füllen.
Zu diesem Zweck existiert zum Beispiel auch die Sprachkonstruktion
tailrec. Der Unterschied ist, dasstailreckeine Verzweigungen (bedingte Prüfungen) mit Aufrufen anderer Funktionen haben kann.Sie können mehr darüber hier lesen.
DeepRecursiveScope stützt sich also nicht auf den traditionellen Stack-Flow, sondern nutzt alle Funktionen, die Coroutinen bieten. Um es besser zu verstehen, betrachten wir das klassische Beispiel mit Fibonacci-Zahlen:
val fibonacci = DeepRecursiveFunction<Int, Int> { x ->
when {
x < 0 -> 0
x == 1 -> 1
else -> callRecursive(x - 1) + callRecursive(x - 2)
}
}
println(fibonacci(12)) // Ausgabe: 144
Für ein komplexeres Beispiel können Sie die kdoc konsultieren.
Wir werden uns nicht mit den genauen Implementierungsdetails von DeepRecursiveScope aufhalten (Sie können sie hier nachlesen), da sie die gleiche Idee wie Sequence mit zusätzlichem Verhalten zur Unterstützung der bereitgestellten Mechanismen haben, aber lassen Sie uns diskutieren, wie Kotlin Coroutinen dieses spezielle Problem lösen. Außerdem gibt es einen sehr guten Artikel darüber von Roman Elizarov.
Coroutinen intern
Wie genau löst es das Problem? Wie ich bereits erwähnt habe, sind Coroutinen von CPS (Continuation Passing Style) inspiriert, aber es ist nicht genau das, was der Kotlin-Compiler tut, um Coroutinen so effizient zu handhaben.
Der Kotlin-Compiler verwendet eine Kombination von Optimierungen, um den Coroutinen-Stack und die Ausführung effizient zu verwalten. Schauen wir uns an, was genau er tut:
- Compiler-Transformationen: Der Kotlin-Compiler generiert einen Zustandsautomaten, ähnlich dem, was wir in den Implementierungsdetails von Sequenzen gesehen haben. Es beseitigt nicht alle Stack-Aufrufe, aber reduziert sie ausreichend, um keinen
StackOverflowErrorzu erhalten. - Heap-Allokation von Continuations: In einer traditionellen Callback-Kette schiebt jeder Funktionsaufruf Daten auf den Call-Stack. Wenn die Kette tief ist, kann dies viel Stack-Speicher verbrauchen. Beim Coroutine-Ansatz wird, wenn eine Coroutine suspendiert wird, ihre Continuation als Objekt auf dem Heap gespeichert. Dieses Continuation-Objekt enthält die notwendigen Informationen (Stack, Dispatcher usw.), um die Ausführung der Coroutine fortzusetzen. Diese Heap-Speicherung ermöglicht eine viel größere Kapazität, tiefe Aufrufketten zu verarbeiten, ohne das Risiko eines Stack-Überlaufs.
Der genaue Mechanismus von Coroutinen ist der folgende:
- Serialisierung: Der Stack-Zustand der suspendierten Coroutine wird in einem Heap-allokierten Continuation-Objekt gespeichert.
- Wiederaufnahme: Wenn die Wiederaufnahme bereit ist, richtet das Framework einen nativen Stack ein, um den erfassten Zustand nachzubilden.
- Speicherkopie: Der serialisierte Stack-Zustand wird vom Continuation-Objekt in den nativen Stack kopiert.
- Kontextkonfiguration: Der Ausführungskontext wird an den ursprünglichen Zustand angepasst.
- Programmzeiger: Der Programmzeiger wird auf den gespeicherten Wert für die korrekte Anweisung gesetzt.
- Aufruf: Der Continuation-Code wird mit CPS aufgerufen, wodurch die Ausführung fortgesetzt wird.
Die Stack-Wiederherstellung hilft uns auch bei der Wiederaufnahme auf verschiedenen Threads, da diese nichts über den Stack unserer Coroutine wissen.
So können wir von nun an verstehen, wie Coroutinen intern funktionieren. Gehen wir zu anderen Beispielen über, in denen Kotlin Coroutinen über die Nebenläufigkeit hinaus verwendet werden.
Jetpack Compose
Wenn Sie jemals mit Compose gearbeitet und beispielsweise Zeigerereignisse verarbeitet haben, haben Sie wahrscheinlich bemerkt, dass einige Hacks aus Coroutinen verwendet werden, um auf Updates zu warten:
@RestrictsSuspension // <---
@JvmDefaultWithCompatibility
interface AwaitPointerEventScope : Density {
// ...
suspend fun awaitPointerEvent(
pass: PointerEventPass = PointerEventPass.Main
): PointerEvent
suspend fun <T> withTimeoutOrNull(
timeMillis: Long,
block: suspend AwaitPointerEventScope.() -> T
): T? = block()
suspend fun <T> withTimeout(
timeMillis: Long,
block: suspend AwaitPointerEventScope.() -> T
): T = block()
// ...
}
Wie Sie sehen, ist der Scope, der zur Verarbeitung von Zeigerereignissen verwendet wird, mit @RestrictsSuspension markiert. Wenn wir die bereitgestellte Dokumentation konsultieren, sehen wir Folgendes:
Dies ist ein eingeschränkter Suspendierungsbereich. Code in diesem Bereich wird
immer un-dispatched aufgerufen und darf nur für Aufrufe von
[awaitPointerEvent] suspendiert werden. Diese Funktionen werden synchron fortgesetzt und der Aufrufer kann das Ergebnis
**vor** dem nächsten await-Aufruf mutieren, um die nächste Stufe der Eingabeverarbeitungspipeline zu beeinflussen.
awaitPointerEvent wird mit Kotlin-Primitiven verarbeitet, ohne kotlinx.coroutines. Da wir in einer solchen Situation keine kotlinx.coroutines-Logik benötigen (es ist im Grunde nur ein Callback, der nach einer Benutzeraktion vom Main-Looper-Thread aufgerufen wird).
Fazit
Zusammenfassend hat dieser Artikel verschiedene Facetten von Kotlin Coroutinen beleuchtet und ihre Vielseitigkeit jenseits traditioneller Nebenläufigkeitsaufgaben hervorgehoben. Wir haben uns mit den inneren Mechanismen der Coroutinen-Primitive befasst, ihre Verwendung in Sequenzen und komplexen Problemlösungsszenarien wie tiefer Rekursion diskutiert und reale Beispiele untersucht, die ihre breite Anwendbarkeit demonstrieren. Der Titel "Coroutinen gehen über die reine Nebenläufigkeit hinaus" spiegelt treffend die vielfältigen Fähigkeiten wider, die Kotlin Coroutinen in der modernen Softwareentwicklung bieten.
Fühlen Sie sich frei, Ihr Fachwissen zwanglos in den Kaffeepausen-Chat einzubringen!