Initial commit
Change-Id: I2b916ff0acd2a88aeef709cf4f900503e823d44d
diff --git a/client/Cargo.toml b/client/Cargo.toml
new file mode 100644
index 0000000..6773ee0
--- /dev/null
+++ b/client/Cargo.toml
@@ -0,0 +1,23 @@
+[package]
+name = "ovsdb-client"
+version = "0.0.1"
+edition = "2021"
+description = "Async Rust client for the Open vSwitch Database Protocol with monitoring support"
+license = "Apache-2.0"
+keywords = ["ovsdb", "ovs", "openvswitch", "database", "networking"]
+categories = ["database", "network-programming", "api-bindings", "asynchronous"]
+repository = "https://review.vexxhost.dev/plugins/gitiles/ovsdb"
+
+[dependencies]
+bytes = "1.10.1"
+futures-util = { version = "0.3.31" }
+jsonrpsee = { version = "0.24.8", features = ["async-client", "client-core", "macros"] }
+serde = "1.0.218"
+serde_json = "1.0.140"
+thiserror = "2.0.12"
+tokio = { version = "1.43.0", features = ["net", "rt-multi-thread"] }
+tokio-util = { version = "0.7.13", features = ["codec"] }
+
+[dev-dependencies]
+tracing = "0.1.41"
+tracing-subscriber = "0.3.19"
diff --git a/client/README.md b/client/README.md
new file mode 100644
index 0000000..a8fed3a
--- /dev/null
+++ b/client/README.md
@@ -0,0 +1,143 @@
+# ovsdb-client
+
+A Rust implementation of the OVSDB protocol client based on [RFC7047](https://datatracker.ietf.org/doc/html/rfc7047).
+
+## Overview
+
+This crate provides a client implementation for the Open vSwitch Database Management Protocol (OVSDB), allowing Rust applications to:
+
+- Connect to OVSDB servers over TCP or Unix sockets
+- Query database schemas
+- Monitor tables for changes in real-time
+- Execute transactions against OVSDB databases
+
+## Features
+
+- **Multiple Transport Options**: Connect via TCP or Unix socket
+- **Schema Handling**: Retrieve and parse database schemas
+- **Monitoring**: Subscribe to changes in database tables
+- **JSON-RPC**: Built on top of `jsonrpsee` for reliable RPC communication
+- **Async API**: Fully async API designed for use with Tokio
+
+## Quick Start
+
+```rust
+use ovsdb_client::{
+ rpc::{self, RpcClient},
+ schema::{MonitorRequest, UpdateNotification},
+};
+use std::collections::HashMap;
+
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn std::error::Error>> {
+ // Connect to an OVSDB server on localhost
+ let client = rpc::connect_tcp("127.0.0.1:6641").await?;
+
+ // List available databases
+ let databases = client.list_databases().await?;
+ println!("Available databases: {:?}", databases);
+
+ // Get schema for a specific database
+ let schema = client.get_schema("OVN_Northbound").await?;
+
+ // Set up monitoring for a table
+ let mut requests = HashMap::new();
+ requests.insert(
+ "NB_Global".to_owned(),
+ MonitorRequest {
+ columns: Some(vec!["name".to_owned(), "nb_cfg".to_owned()]),
+ ..Default::default()
+ },
+ );
+
+ // Start monitoring and get initial state
+ let initial = client.monitor("OVN_Northbound", None, requests).await?;
+ println!("Initial state: {:?}", initial);
+
+ // Subscribe to updates
+ let mut stream = client.subscribe_to_method("update").await?;
+ while let Some(update) = stream.next().await {
+ match update {
+ Ok(update) => println!("Received update: {:?}", update),
+ Err(e) => eprintln!("Error: {:?}", e),
+ }
+ }
+
+ Ok(())
+}
+```
+
+## API Overview
+
+### Connections
+
+```rust
+// Connect via TCP
+let client = rpc::connect_tcp("127.0.0.1:6641").await?;
+
+// Connect via Unix socket
+let client = rpc::connect_unix("/var/run/openvswitch/db.sock").await?;
+```
+
+### Basic Operations
+
+```rust
+// List databases
+let databases = client.list_databases().await?;
+
+// Get schema
+let schema = client.get_schema("OVN_Northbound").await?;
+```
+
+### Monitoring
+
+```rust
+// Create monitor request
+let mut requests = HashMap::new();
+requests.insert(
+ "Table_Name".to_owned(),
+ MonitorRequest {
+ columns: Some(vec!["column1".to_owned(), "column2".to_owned()]),
+ ..Default::default()
+ },
+);
+
+// Start monitoring
+let initial_state = client.monitor("Database_Name", None, requests).await?;
+
+// Subscribe to updates
+let mut stream = client.subscribe_to_method("update").await?;
+while let Some(update) = stream.next().await {
+ // Process updates
+}
+```
+
+## Development Setup
+
+To develop or test with this crate, you'll need an OVSDB server. You can use Docker to run one:
+
+```bash
+docker run -it --rm -p 6641:6641 registry.atmosphere.dev/library/ovn-central:main /bin/bash -c "mkdir /etc/ovn; /root/ovnkube.sh nb-ovsdb"
+```
+
+This starts an OVN Northbound OVSDB server that listens on port 6641.
+
+## OVSDB Protocol Support
+
+This implementation supports the following OVSDB operations as defined in RFC7047:
+
+- List Databases (Section 4.1.1)
+- Get Schema (Section 4.1.2)
+- Monitor (Section 4.1.5)
+- Update Notifications (Section 4.1.6)
+
+Future versions will add support for additional operations such as Transact (Section 4.1.3) and Monitor Cancellation (Section 4.1.7).
+
+## Related Crates
+
+- [ovsdb-schema](https://crates.io/crates/ovsdb-schema): Core OVSDB data types and serialization
+- [ovsdb-derive](https://crates.io/crates/ovsdb-derive): Derive macros for OVSDB struct generation
+
+## License
+
+This project is licensed under the [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0).
diff --git a/client/examples/ovsdb-monitor.rs b/client/examples/ovsdb-monitor.rs
new file mode 100644
index 0000000..bd5362a
--- /dev/null
+++ b/client/examples/ovsdb-monitor.rs
@@ -0,0 +1,59 @@
+use jsonrpsee::core::client::{Subscription, SubscriptionClientT};
+use ovsdb_client::{
+ rpc::{self, RpcClient},
+ schema::{MonitorRequest, UpdateNotification},
+};
+use std::collections::HashMap;
+use tracing::Level;
+use tracing_subscriber::FmtSubscriber;
+
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn std::error::Error>> {
+ let subscriber = FmtSubscriber::builder()
+ .with_max_level(Level::TRACE)
+ .finish();
+ tracing::subscriber::set_global_default(subscriber)?;
+
+ let socket_addr = "127.0.0.1:6641";
+ let database = "OVN_Northbound";
+ let table = "NB_Global";
+
+ let client = rpc::connect_tcp(socket_addr).await?;
+
+ // // 4.1.1. List Databases
+ let _databases = client.list_databases().await?;
+
+ // // 4.1.2. Get Schema
+ let schema = client.get_schema(database).await?;
+ let columns = schema
+ .tables
+ .get(table)
+ .expect("table not found")
+ .columns
+ .keys()
+ .cloned()
+ .collect::<Vec<_>>();
+
+ let mut requests = HashMap::new();
+ requests.insert(
+ table.to_owned(),
+ MonitorRequest {
+ columns: Some(columns),
+ ..Default::default()
+ },
+ );
+
+ let initial = client.monitor("OVN_Northbound", None, requests).await?;
+ println!("Initial state: {:?}", initial);
+
+ let mut stream: Subscription<UpdateNotification<serde_json::Value>> = client.subscribe_to_method("update").await?;
+
+ while let Some(update) = stream.next().await {
+ match update {
+ Ok(update) => println!("Received update: {:?}", update),
+ Err(e) => eprintln!("Error receiving update: {:?}", e),
+ }
+ }
+
+ Ok(())
+}
diff --git a/client/src/lib.rs b/client/src/lib.rs
new file mode 100644
index 0000000..2d5fc87
--- /dev/null
+++ b/client/src/lib.rs
@@ -0,0 +1,3 @@
+pub mod rpc;
+pub mod schema;
+mod transports;
diff --git a/client/src/rpc.rs b/client/src/rpc.rs
new file mode 100644
index 0000000..4b6b372
--- /dev/null
+++ b/client/src/rpc.rs
@@ -0,0 +1,55 @@
+use crate::{
+ schema::{DatabaseSchema, MonitorRequest, TableUpdate},
+ transports::{ipc, tcp},
+};
+use jsonrpsee::{async_client::ClientBuilder, core::client::SubscriptionClientT, proc_macros::rpc};
+use std::{collections::HashMap, path::Path};
+use tokio::net::ToSocketAddrs;
+
+#[rpc(client)]
+pub trait Rpc {
+ /// 4.1.1. List Databases
+ ///
+ /// This operation retrieves an array whose elements are the names of the
+ /// databases that can be accessed over this management protocol
+ /// connection.
+ #[method(name = "list_dbs")]
+ async fn list_databases(&self) -> Result<Vec<String>, ErrorObjectOwned>;
+
+ /// 4.1.2. Get Schema
+ ///
+ /// This operation retrieves a <database-schema> that describes hosted
+ /// database <db-name>.
+ #[method(name = "get_schema")]
+ async fn get_schema(&self, db_name: &str) -> Result<DatabaseSchema, ErrorObjectOwned>;
+
+ /// 4.1.5. Monitor
+ ///
+ /// The "monitor" request enables a client to replicate tables or subsets
+ /// of tables within an OVSDB database by requesting notifications of
+ /// changes to those tables and by receiving the complete initial state
+ /// of a table or a subset of a table.
+ #[method(name = "monitor")]
+ async fn monitor(
+ &self,
+ db_name: &str,
+ matcher: Option<&str>,
+ requests: HashMap<String, MonitorRequest>,
+ ) -> Result<TableUpdate<serde_json::Value>, ErrorObjectOwned>;
+}
+
+pub async fn connect_tcp(
+ tcp: impl ToSocketAddrs,
+) -> Result<impl SubscriptionClientT, std::io::Error> {
+ let (sender, receiver) = tcp::connect(tcp).await?;
+
+ Ok(ClientBuilder::default().build_with_tokio(sender, receiver))
+}
+
+pub async fn connect_unix(
+ socket_path: impl AsRef<Path>,
+) -> Result<impl SubscriptionClientT, std::io::Error> {
+ let (sender, receiver) = ipc::connect(socket_path).await?;
+
+ Ok(ClientBuilder::default().build_with_tokio(sender, receiver))
+}
diff --git a/client/src/schema.rs b/client/src/schema.rs
new file mode 100644
index 0000000..f17e757
--- /dev/null
+++ b/client/src/schema.rs
@@ -0,0 +1,120 @@
+use serde::de::{self, SeqAccess, Visitor};
+use serde::{Deserialize, Deserializer, Serialize};
+use std::collections::HashMap;
+use std::fmt;
+use std::marker::PhantomData;
+
+#[derive(Debug, Deserialize)]
+pub struct DatabaseSchema {
+ pub name: String,
+
+ pub version: String,
+
+ #[serde(rename = "cksum")]
+ pub checksum: Option<String>,
+
+ pub tables: HashMap<String, TableSchema>,
+}
+
+#[derive(Debug, Deserialize)]
+pub struct TableSchema {
+ pub columns: HashMap<String, ColumnSchema>,
+
+ #[serde(rename = "maxRows")]
+ pub max_rows: Option<u64>,
+
+ #[serde(rename = "isRoot")]
+ pub is_root: Option<bool>,
+
+ #[serde(rename = "indexes")]
+ pub indexes: Option<Vec<Vec<String>>>,
+}
+
+#[derive(Debug, Deserialize)]
+pub struct ColumnSchema {
+ pub r#type: serde_json::Value,
+
+ #[serde(rename = "ephemeral")]
+ pub ephemeral: Option<bool>,
+
+ #[serde(rename = "mutable")]
+ pub mutable: Option<bool>,
+}
+
+#[derive(Debug, Default, Deserialize, Serialize)]
+pub struct MonitorRequest {
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub columns: Option<Vec<String>>,
+
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub select: Option<MonitorRequestSelect>,
+}
+
+#[derive(Debug, Deserialize, Serialize)]
+pub struct MonitorRequestSelect {
+ initial: Option<bool>,
+ insert: Option<bool>,
+ delete: Option<bool>,
+ modify: Option<bool>,
+}
+
+pub type TableUpdate<T> = HashMap<String, TableUpdateRows<T>>;
+pub type TableUpdateRows<T> = HashMap<String, T>;
+
+#[derive(Debug, Deserialize)]
+pub struct RowUpdate<T> {
+ pub old: Option<T>,
+ pub new: Option<T>,
+}
+
+#[derive(Debug)]
+pub struct UpdateNotification<T> {
+ pub id: Option<String>,
+ pub message: TableUpdate<T>,
+}
+
+impl<'de, T> Deserialize<'de> for UpdateNotification<T>
+where
+ T: Deserialize<'de>,
+{
+ fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
+ where
+ D: Deserializer<'de>,
+ {
+ // Define a visitor that carries a PhantomData for T.
+ struct UpdateNotificationVisitor<T> {
+ marker: PhantomData<T>,
+ }
+
+ impl<'de, T> Visitor<'de> for UpdateNotificationVisitor<T>
+ where
+ T: Deserialize<'de>,
+ {
+ type Value = UpdateNotification<T>;
+
+ fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
+ formatter
+ .write_str("an array with two elements: Option<String> and a TableUpdate<T>")
+ }
+
+ fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
+ where
+ A: SeqAccess<'de>,
+ {
+ let id: Option<String> = seq
+ .next_element()?
+ .ok_or_else(|| de::Error::invalid_length(0, &self))?;
+ let message: TableUpdate<T> = seq
+ .next_element()?
+ .ok_or_else(|| de::Error::invalid_length(1, &self))?;
+
+ Ok(UpdateNotification { id, message })
+ }
+ }
+
+ // Start deserializing using the visitor.
+ deserializer.deserialize_seq(UpdateNotificationVisitor {
+ marker: PhantomData,
+ })
+ }
+}
diff --git a/client/src/transports/codec.rs b/client/src/transports/codec.rs
new file mode 100644
index 0000000..6e69571
--- /dev/null
+++ b/client/src/transports/codec.rs
@@ -0,0 +1,37 @@
+use bytes::{BufMut, BytesMut};
+use serde_json::Value;
+use std::io;
+use tokio_util::codec::{Decoder, Encoder};
+
+pub struct JsonCodec;
+
+impl Encoder<BytesMut> for JsonCodec {
+ type Error = io::Error;
+
+ fn encode(&mut self, data: BytesMut, buf: &mut BytesMut) -> Result<(), io::Error> {
+ buf.reserve(data.len());
+ buf.put(data);
+ Ok(())
+ }
+}
+
+impl Decoder for JsonCodec {
+ type Item = Value;
+ type Error = io::Error;
+
+ fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Value>, io::Error> {
+ if src.is_empty() {
+ return Ok(None);
+ }
+
+ match serde_json::from_slice::<Value>(src) {
+ Ok(val) => {
+ src.clear();
+
+ Ok(Some(val))
+ }
+ Err(ref e) if e.is_eof() => Ok(None),
+ Err(e) => Err(e.into()),
+ }
+ }
+}
diff --git a/client/src/transports/ipc.rs b/client/src/transports/ipc.rs
new file mode 100644
index 0000000..09798a7
--- /dev/null
+++ b/client/src/transports/ipc.rs
@@ -0,0 +1,18 @@
+use crate::transports::{Receiver, Sender, codec::JsonCodec};
+use futures_util::stream::StreamExt;
+use jsonrpsee::core::client::{TransportReceiverT, TransportSenderT};
+use std::{io::Error, path::Path};
+use tokio::net::UnixStream;
+use tokio_util::codec::Framed;
+
+pub async fn connect(
+ socket: impl AsRef<Path>,
+) -> Result<(impl TransportSenderT + Send, impl TransportReceiverT + Send), Error> {
+ let connection = UnixStream::connect(socket).await?;
+ let (sink, stream) = Framed::new(connection, JsonCodec).split();
+
+ let sender = Sender { inner: sink };
+ let receiver = Receiver { inner: stream };
+
+ Ok((sender, receiver))
+}
diff --git a/client/src/transports/mod.rs b/client/src/transports/mod.rs
new file mode 100644
index 0000000..7456643
--- /dev/null
+++ b/client/src/transports/mod.rs
@@ -0,0 +1,115 @@
+mod codec;
+pub mod ipc;
+pub mod tcp;
+
+use bytes::BytesMut;
+use futures_util::{Sink, SinkExt, Stream, stream::StreamExt};
+use jsonrpsee::core::{
+ async_trait,
+ client::{ReceivedMessage, TransportReceiverT, TransportSenderT},
+};
+use serde_json::{Value, json};
+use thiserror::Error;
+
+#[derive(Debug, Error)]
+enum TransportError {
+ #[error("Connection closed.")]
+ ConnectionClosed,
+
+ #[error("IO error: {0}")]
+ Io(#[from] std::io::Error),
+
+ #[error("Unkown error: {0}")]
+ Unknown(String),
+}
+
+struct Sender<T: Send + Sink<BytesMut>> {
+ inner: T,
+}
+
+#[async_trait]
+impl<T: Send + Sink<BytesMut, Error = impl std::error::Error> + Unpin + 'static> TransportSenderT
+ for Sender<T>
+{
+ type Error = TransportError;
+
+ async fn send(&mut self, body: String) -> Result<(), Self::Error> {
+ let mut message: Value =
+ serde_json::from_str(&body).map_err(|e| TransportError::Unknown(e.to_string()))?;
+
+ // NOTE(mnaser): In order to be able to use the subscription client, we need to
+ // drop the subscription message for the "update" method, as the
+ // remote doesn't support JSON-RPC 2.0.
+ if message["method"] == json!("update") {
+ return Ok(());
+ }
+
+ // NOTE(mnaser): jsonrpsee runs using JSON-RPC 2.0 only which the remote doesn't
+ // support, so we intercept the message, remove "jsonrpc" and then
+ // send the message.
+ message.as_object_mut().map(|obj| obj.remove("jsonrpc"));
+
+ // NOTE(mnaser): OVSDB expects all requests to have a "params" key, so we add an
+ // empty array if it doesn't exist.
+ if !message.as_object().unwrap().contains_key("params") {
+ message["params"] = json!([]);
+ }
+
+ self.inner
+ .send(BytesMut::from(message.to_string().as_str()))
+ .await
+ .map_err(|e| TransportError::Unknown(e.to_string()))?;
+
+ Ok(())
+ }
+
+ async fn close(&mut self) -> Result<(), Self::Error> {
+ self.inner
+ .close()
+ .await
+ .map_err(|e| TransportError::Unknown(e.to_string()))?;
+
+ Ok(())
+ }
+}
+
+struct Receiver<T: Send + Stream> {
+ inner: T,
+}
+
+#[async_trait]
+impl<T: Send + Stream<Item = Result<Value, std::io::Error>> + Unpin + 'static> TransportReceiverT
+ for Receiver<T>
+{
+ type Error = TransportError;
+
+ async fn receive(&mut self) -> Result<ReceivedMessage, Self::Error> {
+ match self.inner.next().await {
+ None => Err(TransportError::ConnectionClosed),
+ Some(Ok(mut message)) => {
+ // NOTE(mnaser): jsonrpsee runs using JSON-RPC 2.0 only which the remote doesn't
+ // support, so we intercept the message, add "jsonrpc" and then
+ // send the message.
+ message
+ .as_object_mut()
+ .map(|obj| obj.insert("jsonrpc".to_string(), json!("2.0")));
+
+ // NOTE(mnaser): jsonrpsee expects no error field if there is a result, due to the
+ // remote not supporting JSON-RPC 2.0, we need to remove the "error"
+ // field if there is a "result" field.
+ if message.as_object().unwrap().contains_key("result") {
+ message.as_object_mut().map(|obj| obj.remove("error"));
+ }
+
+ // NOTE(mnaser): If a message comes in with it's "id" field set to null, then
+ // we remove it.
+ if message.as_object().unwrap().contains_key("id") && message["id"] == json!(null) {
+ message.as_object_mut().map(|obj| obj.remove("id"));
+ }
+
+ Ok(ReceivedMessage::Bytes(message.to_string().into_bytes()))
+ }
+ Some(Err(e)) => Err(TransportError::Io(e)),
+ }
+ }
+}
diff --git a/client/src/transports/tcp.rs b/client/src/transports/tcp.rs
new file mode 100644
index 0000000..2599b64
--- /dev/null
+++ b/client/src/transports/tcp.rs
@@ -0,0 +1,18 @@
+use crate::transports::{Receiver, Sender, codec::JsonCodec};
+use futures_util::stream::StreamExt;
+use jsonrpsee::core::client::{TransportReceiverT, TransportSenderT};
+use std::io::Error;
+use tokio::net::{TcpStream, ToSocketAddrs};
+use tokio_util::codec::Framed;
+
+pub async fn connect(
+ socket: impl ToSocketAddrs,
+) -> Result<(impl TransportSenderT + Send, impl TransportReceiverT + Send), Error> {
+ let connection = TcpStream::connect(socket).await?;
+ let (sink, stream) = Framed::new(connection, JsonCodec).split();
+
+ let sender = Sender { inner: sink };
+ let receiver = Receiver { inner: stream };
+
+ Ok((sender, receiver))
+}