blob: bd5362ac68a7d8cc131f5f1fa53fbd44b8bd0d5d [file] [log] [blame]
Mohammed Naser3415a2a2025-03-06 21:16:12 -05001use jsonrpsee::core::client::{Subscription, SubscriptionClientT};
2use ovsdb_client::{
3 rpc::{self, RpcClient},
4 schema::{MonitorRequest, UpdateNotification},
5};
6use std::collections::HashMap;
7use tracing::Level;
8use tracing_subscriber::FmtSubscriber;
9
10#[tokio::main]
11async fn main() -> Result<(), Box<dyn std::error::Error>> {
12 let subscriber = FmtSubscriber::builder()
13 .with_max_level(Level::TRACE)
14 .finish();
15 tracing::subscriber::set_global_default(subscriber)?;
16
17 let socket_addr = "127.0.0.1:6641";
18 let database = "OVN_Northbound";
19 let table = "NB_Global";
20
21 let client = rpc::connect_tcp(socket_addr).await?;
22
23 // // 4.1.1. List Databases
24 let _databases = client.list_databases().await?;
25
26 // // 4.1.2. Get Schema
27 let schema = client.get_schema(database).await?;
28 let columns = schema
29 .tables
30 .get(table)
31 .expect("table not found")
32 .columns
33 .keys()
34 .cloned()
35 .collect::<Vec<_>>();
36
37 let mut requests = HashMap::new();
38 requests.insert(
39 table.to_owned(),
40 MonitorRequest {
41 columns: Some(columns),
42 ..Default::default()
43 },
44 );
45
46 let initial = client.monitor("OVN_Northbound", None, requests).await?;
47 println!("Initial state: {:?}", initial);
48
49 let mut stream: Subscription<UpdateNotification<serde_json::Value>> = client.subscribe_to_method("update").await?;
50
51 while let Some(update) = stream.next().await {
52 match update {
53 Ok(update) => println!("Received update: {:?}", update),
54 Err(e) => eprintln!("Error receiving update: {:?}", e),
55 }
56 }
57
58 Ok(())
59}