Skip to content

kRPC: Endpoints do not terminate when the transport does #100

Open
@ShayOinif

Description

Hello.
I don't know if it is an issue or not, but I definitely find it weird.
It seems like there is no way of telling when the client is disconnected like a status flow, or on disconnection lambda to register.
Right now when I consume flow and server goes away, the flow is now completed.
As a workaround I created my little helper flows (Also to maintain single client for my app), like so:

      internal val clientCoroutineScope = CoroutineScope(Job())
        
      internal val httpClientSharedFlow = channelFlow {
          val client = HttpClient { installRPC() }
          send(client)
          awaitClose {
              client.close()
          }
      }.shareIn(clientCoroutineScope, SharingStarted.WhileSubscribed(3.seconds), 1)
        
        internal sealed interface ClientState {
            data class Disconnected(val message: String) : ClientState
            data class Connected(val client: RPCClient) : ClientState
        }
        
        @OptIn(ExperimentalCoroutinesApi::class)
        internal val myServiceClientStateFlow = httpClientSharedFlow.flatMapLatest<HttpClient, ClientState> {
            val client = it.rpc {
                url("ws://0.0.0.0:3024/firstRoute")
                rpcConfig { serialization { json() } }
            }
        
            flow {
                emit(ClientState.Connected(client))
                client.coroutineContext.job.join()
            }.onCompletion {
                client.cancel()
                throw Throwable("Remote disconnected")
            }
        }.retryWhen { cause, _ ->
            repeat(3) {
                emit(ClientState.Disconnected("Error: $cause, retrying in ${3 - it} seconds..."))
                delay(1.seconds)
            }
            true
        }.stateIn(
            clientCoroutineScope,
            SharingStarted.WhileSubscribed(3.seconds),
            ClientState.Disconnected("Initial")
        )
        
        val uiClientStateFlow = myServiceClientStateFlow.map {
            when (it) {
                is ClientState.Connected -> "Connected"
                is ClientState.Disconnected -> "Disconnected - ${it.message}"
            }
        }
        
        @OptIn(ExperimentalCoroutinesApi::class, InternalRPCApi::class)
        val uiRpcFlow = myServiceClientStateFlow.transformLatest {
            if (it is ClientState.Connected) {
                streamScoped {
                    emitAll(
                        it.client.withService<MyService>().coldFlow(getPlatform().name)
                    )
                }
            }
        }.retryWhen { cause, attempt ->
            emit("Error - $attempt: $cause")
            delay(1.seconds)
            true
        }

I wonder if it is the intended use or am I missing something or should the interface or behavior change?
Thanks,
Shay Oinif

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions