Initial commit

Change-Id: I2b916ff0acd2a88aeef709cf4f900503e823d44d
diff --git a/client/src/lib.rs b/client/src/lib.rs
new file mode 100644
index 0000000..2d5fc87
--- /dev/null
+++ b/client/src/lib.rs
@@ -0,0 +1,3 @@
+pub mod rpc;
+pub mod schema;
+mod transports;
diff --git a/client/src/rpc.rs b/client/src/rpc.rs
new file mode 100644
index 0000000..4b6b372
--- /dev/null
+++ b/client/src/rpc.rs
@@ -0,0 +1,55 @@
+use crate::{
+    schema::{DatabaseSchema, MonitorRequest, TableUpdate},
+    transports::{ipc, tcp},
+};
+use jsonrpsee::{async_client::ClientBuilder, core::client::SubscriptionClientT, proc_macros::rpc};
+use std::{collections::HashMap, path::Path};
+use tokio::net::ToSocketAddrs;
+
+#[rpc(client)]
+pub trait Rpc {
+    /// 4.1.1.  List Databases
+    ///
+    /// This operation retrieves an array whose elements are the names of the
+    /// databases that can be accessed over this management protocol
+    /// connection.
+    #[method(name = "list_dbs")]
+    async fn list_databases(&self) -> Result<Vec<String>, ErrorObjectOwned>;
+
+    /// 4.1.2.  Get Schema
+    ///
+    /// This operation retrieves a <database-schema> that describes hosted
+    /// database <db-name>.
+    #[method(name = "get_schema")]
+    async fn get_schema(&self, db_name: &str) -> Result<DatabaseSchema, ErrorObjectOwned>;
+
+    /// 4.1.5.  Monitor
+    ///
+    /// The "monitor" request enables a client to replicate tables or subsets
+    /// of tables within an OVSDB database by requesting notifications of
+    /// changes to those tables and by receiving the complete initial state
+    /// of a table or a subset of a table.
+    #[method(name = "monitor")]
+    async fn monitor(
+        &self,
+        db_name: &str,
+        matcher: Option<&str>,
+        requests: HashMap<String, MonitorRequest>,
+    ) -> Result<TableUpdate<serde_json::Value>, ErrorObjectOwned>;
+}
+
+pub async fn connect_tcp(
+    tcp: impl ToSocketAddrs,
+) -> Result<impl SubscriptionClientT, std::io::Error> {
+    let (sender, receiver) = tcp::connect(tcp).await?;
+
+    Ok(ClientBuilder::default().build_with_tokio(sender, receiver))
+}
+
+pub async fn connect_unix(
+    socket_path: impl AsRef<Path>,
+) -> Result<impl SubscriptionClientT, std::io::Error> {
+    let (sender, receiver) = ipc::connect(socket_path).await?;
+
+    Ok(ClientBuilder::default().build_with_tokio(sender, receiver))
+}
diff --git a/client/src/schema.rs b/client/src/schema.rs
new file mode 100644
index 0000000..f17e757
--- /dev/null
+++ b/client/src/schema.rs
@@ -0,0 +1,120 @@
+use serde::de::{self, SeqAccess, Visitor};
+use serde::{Deserialize, Deserializer, Serialize};
+use std::collections::HashMap;
+use std::fmt;
+use std::marker::PhantomData;
+
+#[derive(Debug, Deserialize)]
+pub struct DatabaseSchema {
+    pub name: String,
+
+    pub version: String,
+
+    #[serde(rename = "cksum")]
+    pub checksum: Option<String>,
+
+    pub tables: HashMap<String, TableSchema>,
+}
+
+#[derive(Debug, Deserialize)]
+pub struct TableSchema {
+    pub columns: HashMap<String, ColumnSchema>,
+
+    #[serde(rename = "maxRows")]
+    pub max_rows: Option<u64>,
+
+    #[serde(rename = "isRoot")]
+    pub is_root: Option<bool>,
+
+    #[serde(rename = "indexes")]
+    pub indexes: Option<Vec<Vec<String>>>,
+}
+
+#[derive(Debug, Deserialize)]
+pub struct ColumnSchema {
+    pub r#type: serde_json::Value,
+
+    #[serde(rename = "ephemeral")]
+    pub ephemeral: Option<bool>,
+
+    #[serde(rename = "mutable")]
+    pub mutable: Option<bool>,
+}
+
+#[derive(Debug, Default, Deserialize, Serialize)]
+pub struct MonitorRequest {
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub columns: Option<Vec<String>>,
+
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub select: Option<MonitorRequestSelect>,
+}
+
+#[derive(Debug, Deserialize, Serialize)]
+pub struct MonitorRequestSelect {
+    initial: Option<bool>,
+    insert: Option<bool>,
+    delete: Option<bool>,
+    modify: Option<bool>,
+}
+
+pub type TableUpdate<T> = HashMap<String, TableUpdateRows<T>>;
+pub type TableUpdateRows<T> = HashMap<String, T>;
+
+#[derive(Debug, Deserialize)]
+pub struct RowUpdate<T> {
+    pub old: Option<T>,
+    pub new: Option<T>,
+}
+
+#[derive(Debug)]
+pub struct UpdateNotification<T> {
+    pub id: Option<String>,
+    pub message: TableUpdate<T>,
+}
+
+impl<'de, T> Deserialize<'de> for UpdateNotification<T>
+where
+    T: Deserialize<'de>,
+{
+    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
+    where
+        D: Deserializer<'de>,
+    {
+        // Define a visitor that carries a PhantomData for T.
+        struct UpdateNotificationVisitor<T> {
+            marker: PhantomData<T>,
+        }
+
+        impl<'de, T> Visitor<'de> for UpdateNotificationVisitor<T>
+        where
+            T: Deserialize<'de>,
+        {
+            type Value = UpdateNotification<T>;
+
+            fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
+                formatter
+                    .write_str("an array with two elements: Option<String> and a TableUpdate<T>")
+            }
+
+            fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
+            where
+                A: SeqAccess<'de>,
+            {
+                let id: Option<String> = seq
+                    .next_element()?
+                    .ok_or_else(|| de::Error::invalid_length(0, &self))?;
+                let message: TableUpdate<T> = seq
+                    .next_element()?
+                    .ok_or_else(|| de::Error::invalid_length(1, &self))?;
+
+                Ok(UpdateNotification { id, message })
+            }
+        }
+
+        // Start deserializing using the visitor.
+        deserializer.deserialize_seq(UpdateNotificationVisitor {
+            marker: PhantomData,
+        })
+    }
+}
diff --git a/client/src/transports/codec.rs b/client/src/transports/codec.rs
new file mode 100644
index 0000000..6e69571
--- /dev/null
+++ b/client/src/transports/codec.rs
@@ -0,0 +1,37 @@
+use bytes::{BufMut, BytesMut};
+use serde_json::Value;
+use std::io;
+use tokio_util::codec::{Decoder, Encoder};
+
+pub struct JsonCodec;
+
+impl Encoder<BytesMut> for JsonCodec {
+    type Error = io::Error;
+
+    fn encode(&mut self, data: BytesMut, buf: &mut BytesMut) -> Result<(), io::Error> {
+        buf.reserve(data.len());
+        buf.put(data);
+        Ok(())
+    }
+}
+
+impl Decoder for JsonCodec {
+    type Item = Value;
+    type Error = io::Error;
+
+    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Value>, io::Error> {
+        if src.is_empty() {
+            return Ok(None);
+        }
+
+        match serde_json::from_slice::<Value>(src) {
+            Ok(val) => {
+                src.clear();
+
+                Ok(Some(val))
+            }
+            Err(ref e) if e.is_eof() => Ok(None),
+            Err(e) => Err(e.into()),
+        }
+    }
+}
diff --git a/client/src/transports/ipc.rs b/client/src/transports/ipc.rs
new file mode 100644
index 0000000..09798a7
--- /dev/null
+++ b/client/src/transports/ipc.rs
@@ -0,0 +1,18 @@
+use crate::transports::{Receiver, Sender, codec::JsonCodec};
+use futures_util::stream::StreamExt;
+use jsonrpsee::core::client::{TransportReceiverT, TransportSenderT};
+use std::{io::Error, path::Path};
+use tokio::net::UnixStream;
+use tokio_util::codec::Framed;
+
+pub async fn connect(
+    socket: impl AsRef<Path>,
+) -> Result<(impl TransportSenderT + Send, impl TransportReceiverT + Send), Error> {
+    let connection = UnixStream::connect(socket).await?;
+    let (sink, stream) = Framed::new(connection, JsonCodec).split();
+
+    let sender = Sender { inner: sink };
+    let receiver = Receiver { inner: stream };
+
+    Ok((sender, receiver))
+}
diff --git a/client/src/transports/mod.rs b/client/src/transports/mod.rs
new file mode 100644
index 0000000..7456643
--- /dev/null
+++ b/client/src/transports/mod.rs
@@ -0,0 +1,115 @@
+mod codec;
+pub mod ipc;
+pub mod tcp;
+
+use bytes::BytesMut;
+use futures_util::{Sink, SinkExt, Stream, stream::StreamExt};
+use jsonrpsee::core::{
+    async_trait,
+    client::{ReceivedMessage, TransportReceiverT, TransportSenderT},
+};
+use serde_json::{Value, json};
+use thiserror::Error;
+
+#[derive(Debug, Error)]
+enum TransportError {
+    #[error("Connection closed.")]
+    ConnectionClosed,
+
+    #[error("IO error: {0}")]
+    Io(#[from] std::io::Error),
+
+    #[error("Unkown error: {0}")]
+    Unknown(String),
+}
+
+struct Sender<T: Send + Sink<BytesMut>> {
+    inner: T,
+}
+
+#[async_trait]
+impl<T: Send + Sink<BytesMut, Error = impl std::error::Error> + Unpin + 'static> TransportSenderT
+    for Sender<T>
+{
+    type Error = TransportError;
+
+    async fn send(&mut self, body: String) -> Result<(), Self::Error> {
+        let mut message: Value =
+            serde_json::from_str(&body).map_err(|e| TransportError::Unknown(e.to_string()))?;
+
+        // NOTE(mnaser): In order to be able to use the subscription client, we need to
+        //               drop the subscription message for the "update" method, as the
+        //               remote doesn't support JSON-RPC 2.0.
+        if message["method"] == json!("update") {
+            return Ok(());
+        }
+
+        // NOTE(mnaser): jsonrpsee runs using JSON-RPC 2.0 only which the remote doesn't
+        //               support, so we intercept the message, remove "jsonrpc" and then
+        //               send the message.
+        message.as_object_mut().map(|obj| obj.remove("jsonrpc"));
+
+        // NOTE(mnaser): OVSDB expects all requests to have a "params" key, so we add an
+        //               empty array if it doesn't exist.
+        if !message.as_object().unwrap().contains_key("params") {
+            message["params"] = json!([]);
+        }
+
+        self.inner
+            .send(BytesMut::from(message.to_string().as_str()))
+            .await
+            .map_err(|e| TransportError::Unknown(e.to_string()))?;
+
+        Ok(())
+    }
+
+    async fn close(&mut self) -> Result<(), Self::Error> {
+        self.inner
+            .close()
+            .await
+            .map_err(|e| TransportError::Unknown(e.to_string()))?;
+
+        Ok(())
+    }
+}
+
+struct Receiver<T: Send + Stream> {
+    inner: T,
+}
+
+#[async_trait]
+impl<T: Send + Stream<Item = Result<Value, std::io::Error>> + Unpin + 'static> TransportReceiverT
+    for Receiver<T>
+{
+    type Error = TransportError;
+
+    async fn receive(&mut self) -> Result<ReceivedMessage, Self::Error> {
+        match self.inner.next().await {
+            None => Err(TransportError::ConnectionClosed),
+            Some(Ok(mut message)) => {
+                // NOTE(mnaser): jsonrpsee runs using JSON-RPC 2.0 only which the remote doesn't
+                //               support, so we intercept the message, add "jsonrpc" and then
+                //               send the message.
+                message
+                    .as_object_mut()
+                    .map(|obj| obj.insert("jsonrpc".to_string(), json!("2.0")));
+
+                // NOTE(mnaser): jsonrpsee expects no error field if there is a result, due to the
+                //               remote not supporting JSON-RPC 2.0, we need to remove the "error"
+                //               field if there is a "result" field.
+                if message.as_object().unwrap().contains_key("result") {
+                    message.as_object_mut().map(|obj| obj.remove("error"));
+                }
+
+                // NOTE(mnaser): If a message comes in with it's "id" field set to null, then
+                //               we remove it.
+                if message.as_object().unwrap().contains_key("id") && message["id"] == json!(null) {
+                    message.as_object_mut().map(|obj| obj.remove("id"));
+                }
+
+                Ok(ReceivedMessage::Bytes(message.to_string().into_bytes()))
+            }
+            Some(Err(e)) => Err(TransportError::Io(e)),
+        }
+    }
+}
diff --git a/client/src/transports/tcp.rs b/client/src/transports/tcp.rs
new file mode 100644
index 0000000..2599b64
--- /dev/null
+++ b/client/src/transports/tcp.rs
@@ -0,0 +1,18 @@
+use crate::transports::{Receiver, Sender, codec::JsonCodec};
+use futures_util::stream::StreamExt;
+use jsonrpsee::core::client::{TransportReceiverT, TransportSenderT};
+use std::io::Error;
+use tokio::net::{TcpStream, ToSocketAddrs};
+use tokio_util::codec::Framed;
+
+pub async fn connect(
+    socket: impl ToSocketAddrs,
+) -> Result<(impl TransportSenderT + Send, impl TransportReceiverT + Send), Error> {
+    let connection = TcpStream::connect(socket).await?;
+    let (sink, stream) = Framed::new(connection, JsonCodec).split();
+
+    let sender = Sender { inner: sink };
+    let receiver = Receiver { inner: stream };
+
+    Ok((sender, receiver))
+}