calls Inbox
An unlimited channel that delivers incoming JsonRpcCalls from the remote peer.
The channel remains open for the lifetime of the session and is closed automatically when close is called or the underlying transport terminates.
Each element is either a JsonRpcCall.Notify (no reply needed) or a JsonRpcCall.ExpectsResponse (a reply is required). Failing to reply to a JsonRpcCall.ExpectsResponse will leave the remote caller suspended until its timeout elapses.
Sequential handler
Use this when each handler completes quickly and does not call request back on the same session.
scope.launch {
for (call in jsonRpc.callsInbox) {
when (call) {
is JsonRpcCall.ExpectsResponse -> when (call.method) {
"greet" -> call.replyWithResult(JsonPrimitive("Hello!"))
else -> call.replyWithMethodNotFound()
}
is JsonRpcCall.Notify -> { /* fire-and-forget, no reply needed */}
}
}
}Concurrent handler
Wrap the handler body in a nested kotlinx.coroutines.launch so the consumer loop stays free to receive the next message while prior handlers are still running. This is required when a handler suspends on request: without it, the loop cannot deliver the awaited response, causing a deadlock.
Use a kotlinx.coroutines.SupervisorJob in the scope so that one failing handler does not cancel the others.
val handlerScope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
handlerScope.launch {
for (call in jsonRpc.callsInbox) {
handlerScope.launch { // each call is handled concurrently
when (call) {
is JsonRpcCall.ExpectsResponse -> {
val upstream = otherPeer.request("fetch", call.params!!, 5.seconds)
call.replyWithResult(upstream.getOrThrow())
}
is JsonRpcCall.Notify -> handleNotification(call)
}
}
}
}