blob: 745664390d4abf4913d5337ab4e18816345f672d [file] [log] [blame]
Mohammed Naser3415a2a2025-03-06 21:16:12 -05001mod codec;
2pub mod ipc;
3pub mod tcp;
4
5use bytes::BytesMut;
6use futures_util::{Sink, SinkExt, Stream, stream::StreamExt};
7use jsonrpsee::core::{
8 async_trait,
9 client::{ReceivedMessage, TransportReceiverT, TransportSenderT},
10};
11use serde_json::{Value, json};
12use thiserror::Error;
13
14#[derive(Debug, Error)]
15enum TransportError {
16 #[error("Connection closed.")]
17 ConnectionClosed,
18
19 #[error("IO error: {0}")]
20 Io(#[from] std::io::Error),
21
22 #[error("Unkown error: {0}")]
23 Unknown(String),
24}
25
26struct Sender<T: Send + Sink<BytesMut>> {
27 inner: T,
28}
29
30#[async_trait]
31impl<T: Send + Sink<BytesMut, Error = impl std::error::Error> + Unpin + 'static> TransportSenderT
32 for Sender<T>
33{
34 type Error = TransportError;
35
36 async fn send(&mut self, body: String) -> Result<(), Self::Error> {
37 let mut message: Value =
38 serde_json::from_str(&body).map_err(|e| TransportError::Unknown(e.to_string()))?;
39
40 // NOTE(mnaser): In order to be able to use the subscription client, we need to
41 // drop the subscription message for the "update" method, as the
42 // remote doesn't support JSON-RPC 2.0.
43 if message["method"] == json!("update") {
44 return Ok(());
45 }
46
47 // NOTE(mnaser): jsonrpsee runs using JSON-RPC 2.0 only which the remote doesn't
48 // support, so we intercept the message, remove "jsonrpc" and then
49 // send the message.
50 message.as_object_mut().map(|obj| obj.remove("jsonrpc"));
51
52 // NOTE(mnaser): OVSDB expects all requests to have a "params" key, so we add an
53 // empty array if it doesn't exist.
54 if !message.as_object().unwrap().contains_key("params") {
55 message["params"] = json!([]);
56 }
57
58 self.inner
59 .send(BytesMut::from(message.to_string().as_str()))
60 .await
61 .map_err(|e| TransportError::Unknown(e.to_string()))?;
62
63 Ok(())
64 }
65
66 async fn close(&mut self) -> Result<(), Self::Error> {
67 self.inner
68 .close()
69 .await
70 .map_err(|e| TransportError::Unknown(e.to_string()))?;
71
72 Ok(())
73 }
74}
75
76struct Receiver<T: Send + Stream> {
77 inner: T,
78}
79
80#[async_trait]
81impl<T: Send + Stream<Item = Result<Value, std::io::Error>> + Unpin + 'static> TransportReceiverT
82 for Receiver<T>
83{
84 type Error = TransportError;
85
86 async fn receive(&mut self) -> Result<ReceivedMessage, Self::Error> {
87 match self.inner.next().await {
88 None => Err(TransportError::ConnectionClosed),
89 Some(Ok(mut message)) => {
90 // NOTE(mnaser): jsonrpsee runs using JSON-RPC 2.0 only which the remote doesn't
91 // support, so we intercept the message, add "jsonrpc" and then
92 // send the message.
93 message
94 .as_object_mut()
95 .map(|obj| obj.insert("jsonrpc".to_string(), json!("2.0")));
96
97 // NOTE(mnaser): jsonrpsee expects no error field if there is a result, due to the
98 // remote not supporting JSON-RPC 2.0, we need to remove the "error"
99 // field if there is a "result" field.
100 if message.as_object().unwrap().contains_key("result") {
101 message.as_object_mut().map(|obj| obj.remove("error"));
102 }
103
104 // NOTE(mnaser): If a message comes in with it's "id" field set to null, then
105 // we remove it.
106 if message.as_object().unwrap().contains_key("id") && message["id"] == json!(null) {
107 message.as_object_mut().map(|obj| obj.remove("id"));
108 }
109
110 Ok(ReceivedMessage::Bytes(message.to_string().into_bytes()))
111 }
112 Some(Err(e)) => Err(TransportError::Io(e)),
113 }
114 }
115}