Add Rust based ovsinit binary
This binary allows us to more reliably migrate IPs, which also
includes routes as well, with proper rollback in place to avoid
a system falling off the network.
This first iteration is used in the Neutron & OVN charts only,
but the long term plan is to leverage it into the Open vSwitch
charts potentially to have a single "auto_bridge_add" source
of truth.
Change-Id: Ic4de23297b67a602d9aba4b00f0fb234d9d37cfe
(cherry picked from commit 62c4dd963918ea193aea48be8db93f0ea52fa308)
diff --git a/crates/ovsinit/Cargo.toml b/crates/ovsinit/Cargo.toml
new file mode 100644
index 0000000..ac7d810
--- /dev/null
+++ b/crates/ovsinit/Cargo.toml
@@ -0,0 +1,19 @@
+[package]
+name = "ovsinit"
+version = "0.1.0"
+edition = "2021"
+
+[dependencies]
+clap = { version = "4.5.29", features = ["derive"] }
+env_logger = { version = "0.11.6", features = ["unstable-kv"] }
+futures = "0.3.31"
+futures-util = "0.3.31"
+ipnet = "2.11.0"
+libc = "0.2.169"
+log = { version = "0.4.25", features = ["kv"] }
+netlink-packet-route = "0.19.0"
+rtnetlink = "0.14.1"
+serde = { version = "1.0", features = ["derive"] }
+serde_json = "1.0"
+thiserror = "2.0.11"
+tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
diff --git a/crates/ovsinit/src/config.rs b/crates/ovsinit/src/config.rs
new file mode 100644
index 0000000..7c3d6b7
--- /dev/null
+++ b/crates/ovsinit/src/config.rs
@@ -0,0 +1,82 @@
+use serde::Deserialize;
+use std::collections::HashMap;
+use std::{fs::File, path::PathBuf};
+use thiserror::Error;
+use log::{error, info};
+
+#[derive(Deserialize)]
+pub struct NetworkConfig {
+ #[serde(flatten)]
+ pub bridges: HashMap<String, Option<String>>,
+}
+
+#[derive(Debug, Error)]
+pub enum NetworkConfigError {
+ #[error("Failed to open file: {0}")]
+ OpenFile(#[from] std::io::Error),
+
+ #[error("Failed to parse JSON: {0}")]
+ ParseJson(#[from] serde_json::Error),
+}
+
+impl NetworkConfig {
+ pub fn from_path(path: &PathBuf) -> Result<Self, NetworkConfigError> {
+ let file = File::open(path)?;
+ NetworkConfig::from_file(file)
+ }
+
+ pub fn from_file(file: File) -> Result<Self, NetworkConfigError> {
+ let config: NetworkConfig = serde_json::from_reader(file)?;
+ Ok(config)
+ }
+
+ pub fn bridges_with_interfaces_iter(&self) -> impl Iterator<Item = (&String, &String)> {
+ self.bridges.iter().filter_map(|(k, v)| {
+ if let Some(v) = v {
+ Some((k, v))
+ } else {
+ info!(bridge = k.as_str(); "Bridge has no interface, skipping.");
+
+ None
+ }
+ })
+ }
+
+ #[allow(dead_code)]
+ pub fn from_string(json: &str) -> Result<Self, NetworkConfigError> {
+ let config: NetworkConfig = serde_json::from_str(json)?;
+ Ok(config)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_null_interface() {
+ let config = NetworkConfig::from_string("{\"br-ex\": null}").unwrap();
+
+ assert_eq!(config.bridges.len(), 1);
+ assert_eq!(config.bridges.get("br-ex"), Some(&None));
+ }
+
+ #[test]
+ fn test_bridges_with_interfaces_iter_with_null_interface() {
+ let config = NetworkConfig::from_string("{\"br-ex\": null}").unwrap();
+
+ let mut iter = config.bridges_with_interfaces_iter();
+ assert_eq!(iter.next(), None);
+ }
+
+ #[test]
+ fn test_bridges_with_interfaces_iter_with_interface() {
+ let config = NetworkConfig::from_string("{\"br-ex\": \"bond0\"}").unwrap();
+
+ let mut iter = config.bridges_with_interfaces_iter();
+ assert_eq!(
+ iter.next(),
+ Some((&"br-ex".to_string(), &"bond0".to_string()))
+ );
+ }
+}
diff --git a/crates/ovsinit/src/lib.rs b/crates/ovsinit/src/lib.rs
new file mode 100644
index 0000000..80fb9cd
--- /dev/null
+++ b/crates/ovsinit/src/lib.rs
@@ -0,0 +1,353 @@
+extern crate ipnet;
+
+mod routes;
+
+use futures_util::stream::TryStreamExt;
+use ipnet::IpNet;
+use log::{error, info};
+use netlink_packet_route::{
+ address::{AddressAttribute, AddressMessage},
+ route::{RouteAttribute, RouteMessage, RouteScope},
+ AddressFamily,
+};
+use rtnetlink::{Handle, IpVersion};
+use std::net::IpAddr;
+use thiserror::Error;
+
+#[derive(Error, Debug)]
+pub enum InterfaceError {
+ #[error("Interface {0} not found")]
+ NotFound(String),
+
+ #[error(transparent)]
+ NetlinkError(#[from] rtnetlink::Error),
+
+ #[error(transparent)]
+ IpNetError(#[from] ipnet::PrefixLenError),
+
+ #[error(transparent)]
+ RouteError(#[from] routes::RouteError),
+}
+
+#[derive(Error, Debug)]
+pub enum InterfaceMigrationError {
+ #[error(transparent)]
+ InterfaceError(#[from] InterfaceError),
+
+ #[error("IP configuration on both interfaces")]
+ IpConflict,
+}
+
+pub struct Interface {
+ name: String,
+ index: u32,
+ address_messages: Vec<AddressMessage>,
+ route_messages: Vec<RouteMessage>,
+}
+
+impl Interface {
+ pub async fn new(handle: &Handle, name: String) -> Result<Self, InterfaceError> {
+ let index = handle
+ .link()
+ .get()
+ .match_name(name.clone())
+ .execute()
+ .try_next()
+ .await
+ .map_err(|e| match e {
+ rtnetlink::Error::NetlinkError(inner) if -inner.raw_code() == libc::ENODEV => {
+ InterfaceError::NotFound(name.clone())
+ }
+ _ => InterfaceError::NetlinkError(e),
+ })?
+ .map(|link| link.header.index)
+ .ok_or_else(|| InterfaceError::NotFound(name.clone()))?;
+
+ let address_messages: Vec<AddressMessage> = handle
+ .address()
+ .get()
+ .set_link_index_filter(index)
+ .execute()
+ .map_err(InterfaceError::NetlinkError)
+ .try_filter(|msg| futures::future::ready(msg.header.family == AddressFamily::Inet))
+ .try_collect()
+ .await?;
+
+ let route_messages: Vec<RouteMessage> = handle
+ .route()
+ .get(IpVersion::V4)
+ .execute()
+ .map_err(InterfaceError::NetlinkError)
+ .try_filter(move |route_msg| {
+ let matches = route_msg
+ .attributes
+ .iter()
+ .any(|attr| matches!(attr, RouteAttribute::Oif(idx) if *idx == index))
+ && route_msg.header.kind != netlink_packet_route::route::RouteType::Local;
+
+ futures_util::future::ready(matches)
+ })
+ .try_collect()
+ .await?;
+
+ Ok(Self {
+ name,
+ index,
+ address_messages,
+ route_messages,
+ })
+ }
+
+ fn addresses(&self) -> Vec<IpNet> {
+ self.address_messages
+ .iter()
+ .filter_map(|msg| {
+ msg.attributes.iter().find_map(|nla| {
+ if let AddressAttribute::Address(ip) = nla {
+ IpNet::new(*ip, msg.header.prefix_len).ok()
+ } else {
+ None
+ }
+ })
+ })
+ .collect()
+ }
+
+ fn routes(&self) -> Result<Vec<routes::Route>, routes::RouteError> {
+ self.route_messages
+ .iter()
+ .filter_map(|msg| {
+ if msg.header.scope == RouteScope::Link {
+ return None;
+ }
+
+ Some(routes::Route::from_message(msg.clone()))
+ })
+ .collect::<Result<Vec<routes::Route>, routes::RouteError>>()
+ }
+
+ async fn up(&self, handle: &Handle) -> Result<(), InterfaceError> {
+ handle
+ .link()
+ .set(self.index)
+ .up()
+ .execute()
+ .await
+ .map_err(InterfaceError::NetlinkError)
+ }
+
+ async fn restore(&self, handle: &Handle) -> Result<(), InterfaceError> {
+ self.migrate_addresses_from_interface(handle, self).await?;
+ self.migrate_routes_from_interface(handle, self).await?;
+
+ Ok(())
+ }
+
+ async fn flush(&self, handle: &Handle) -> Result<(), InterfaceError> {
+ for msg in self.address_messages.iter() {
+ handle.address().del(msg.clone()).execute().await?;
+ }
+
+ // NOTE(mnaser): Once the interface has no more addresses, it will
+ // automatically lose all of it's routes.
+
+ Ok(())
+ }
+
+ async fn migrate_addresses_from_interface(
+ &self,
+ handle: &Handle,
+ src_interface: &Interface,
+ ) -> Result<(), InterfaceError> {
+ for msg in src_interface.address_messages.iter() {
+ let ip = msg.attributes.iter().find_map(|nla| match nla {
+ AddressAttribute::Address(ip) => Some(ip),
+ _ => None,
+ });
+
+ if let Some(ip) = ip {
+ handle
+ .address()
+ .add(self.index, *ip, msg.header.prefix_len)
+ .replace()
+ .execute()
+ .await?;
+ }
+ }
+
+ Ok(())
+ }
+
+ async fn migrate_routes_from_interface(
+ &self,
+ handle: &Handle,
+ src_interface: &Interface,
+ ) -> Result<(), InterfaceError> {
+ for route in src_interface.routes()?.iter() {
+ let mut request = handle.route().add();
+ request = request.protocol(route.protocol);
+
+ match route.destination.addr() {
+ IpAddr::V4(ipv4) => {
+ let mut request = request
+ .v4()
+ .replace()
+ .destination_prefix(ipv4, route.destination.prefix_len());
+
+ if let IpAddr::V4(gateway) = route.gateway {
+ request = request.gateway(gateway);
+ }
+
+ request.execute().await?;
+ }
+ IpAddr::V6(ipv6) => {
+ let mut request = request
+ .v6()
+ .replace()
+ .destination_prefix(ipv6, route.destination.prefix_len());
+
+ if let IpAddr::V6(gateway) = route.gateway {
+ request = request.gateway(gateway);
+ }
+
+ request.execute().await?;
+ }
+ }
+ }
+
+ Ok(())
+ }
+
+ pub async fn migrate_from_interface(
+ &self,
+ handle: &Handle,
+ src_interface: &Interface,
+ ) -> Result<(), InterfaceMigrationError> {
+ self.up(handle).await?;
+
+ match (
+ src_interface.address_messages.is_empty(),
+ self.address_messages.is_empty(),
+ ) {
+ (false, false) => {
+ // Both source and destination interfaces have IPs assigned
+ error!(
+ src_interface = src_interface.name.as_str(),
+ dst_interface = self.name.as_str(),
+ src_ip_addresses = format!("{:?}", src_interface.addresses()).as_str(),
+ dst_ip_addresses = format!("{:?}", self.addresses()).as_str();
+ "Both source and destination interfaces have IPs assigned. This is not safe in production, please fix manually."
+ );
+
+ Err(InterfaceMigrationError::IpConflict)
+ }
+ (false, true) => {
+ // Source interface has IPs, destination interface has no IPs
+ info!(
+ src_interface = src_interface.name.as_str(),
+ dst_interface = self.name.as_str(),
+ ip_addresses = format!("{:?}", src_interface.addresses()).as_str(),
+ routes = format!("{:?}", src_interface.routes()).as_str();
+ "Migrating IP addresses from interface to bridge."
+ );
+
+ if let Err(e) = src_interface.flush(handle).await {
+ error!(
+ src_interface = src_interface.name.as_str(),
+ error = e.to_string().as_str();
+ "Error while flushing IPs from source interface."
+ );
+
+ if let Err(restore_err) = src_interface.restore(handle).await {
+ error!(
+ src_interface = src_interface.name.as_str(),
+ error = restore_err.to_string().as_str();
+ "Error while restoring IPs to source interface."
+ );
+ }
+
+ return Err(InterfaceMigrationError::InterfaceError(e));
+ }
+
+ info!(
+ src_interface = src_interface.name.as_str(),
+ dst_interface = self.name.as_str();
+ "Successfully flushed IP addresses from source interface."
+ );
+
+ if let Err(e) = self
+ .migrate_addresses_from_interface(handle, src_interface)
+ .await
+ {
+ error!(
+ dst_interface = self.name.as_str(),
+ error = e.to_string().as_str();
+ "Error while migrating IP addresses to destination interface."
+ );
+
+ if let Err(restore_err) = src_interface.restore(handle).await {
+ error!(
+ src_interface = src_interface.name.as_str(),
+ error = restore_err.to_string().as_str();
+ "Error while restoring IPs to source interface."
+ );
+ }
+
+ return Err(InterfaceMigrationError::InterfaceError(e));
+ }
+
+ info!(
+ src_interface = src_interface.name.as_str(),
+ dst_interface = self.name.as_str();
+ "Successfully migrated IP addresseses to new interface."
+ );
+
+ if let Err(e) = self
+ .migrate_routes_from_interface(handle, src_interface)
+ .await
+ {
+ error!(
+ dst_interface = self.name.as_str(),
+ routes = format!("{:?}", src_interface.routes()).as_str(),
+ error = e.to_string().as_str();
+ "Error while migrating routes to destination interface."
+ );
+
+ if let Err(restore_err) = src_interface.restore(handle).await {
+ error!(
+ src_interface = src_interface.name.as_str(),
+ routes = format!("{:?}", src_interface.routes()).as_str(),
+ error = restore_err.to_string().as_str();
+ "Error while restoring source interface."
+ );
+ }
+
+ return Err(InterfaceMigrationError::InterfaceError(e));
+ }
+
+ Ok(())
+ }
+ (true, false) => {
+ // Destination interface has IPs, source interface has no IPs
+ info!(
+ src_interface = src_interface.name.as_str(),
+ dst_interface = self.name.as_str(),
+ ip_addresses = format!("{:?}", self.addresses()).as_str();
+ "Bridge already has IPs assigned. Skipping migration."
+ );
+
+ Ok(())
+ }
+ (true, true) => {
+ // Neither interface has IPs
+ info!(
+ src_interface = src_interface.name.as_str(),
+ dst_interface = self.name.as_str();
+ "Neither interface nor bridge have IPs assigned. Skipping migration."
+ );
+
+ Ok(())
+ }
+ }
+ }
+}
diff --git a/crates/ovsinit/src/main.rs b/crates/ovsinit/src/main.rs
new file mode 100644
index 0000000..fb77530
--- /dev/null
+++ b/crates/ovsinit/src/main.rs
@@ -0,0 +1,63 @@
+mod config;
+
+use clap::Parser;
+use env_logger::Env;
+use log::error;
+use rtnetlink::Handle;
+use std::{path::PathBuf, process};
+
+#[derive(Parser, Debug)]
+#[command(version, about, long_about = None)]
+struct Cli {
+ #[arg(default_value = "/tmp/auto_bridge_add", help = "Path to the JSON file")]
+ config: PathBuf,
+}
+
+#[tokio::main]
+async fn main() {
+ let cli = Cli::parse();
+
+ let env = Env::default()
+ .filter_or("MY_LOG_LEVEL", "info")
+ .write_style_or("MY_LOG_STYLE", "always");
+ env_logger::init_from_env(env);
+
+ let network_config = match config::NetworkConfig::from_path(&cli.config) {
+ Ok(network_config) => network_config,
+ Err(e) => {
+ error!("Failed to load network config: {}", e);
+
+ process::exit(1);
+ }
+ };
+
+ let (connection, handle, _) = rtnetlink::new_connection().expect("Failed to create connection");
+ tokio::spawn(connection);
+
+ for (bridge_name, interface_name) in network_config.bridges_with_interfaces_iter() {
+ let interface = get_interface(&handle, interface_name).await;
+ let bridge = get_interface(&handle, bridge_name).await;
+
+ if let Err(e) = bridge.migrate_from_interface(&handle, &interface).await {
+ error!(
+ "Failed to migrate from {} to {}: {}",
+ interface_name, bridge_name, e
+ );
+ process::exit(1);
+ }
+ }
+}
+
+async fn get_interface(handle: &Handle, name: &str) -> ovsinit::Interface {
+ match ovsinit::Interface::new(handle, name.to_string()).await {
+ Ok(interface) => interface,
+ Err(ovsinit::InterfaceError::NotFound(name)) => {
+ error!(interface = name.as_str(); "Interface not found.");
+ process::exit(1);
+ }
+ Err(e) => {
+ error!(error = e.to_string().as_str(); "Failed to lookup interface.");
+ process::exit(1);
+ }
+ }
+}
diff --git a/crates/ovsinit/src/routes.rs b/crates/ovsinit/src/routes.rs
new file mode 100644
index 0000000..a4e0130
--- /dev/null
+++ b/crates/ovsinit/src/routes.rs
@@ -0,0 +1,150 @@
+use ipnet::IpNet;
+use log::error;
+use netlink_packet_route::{
+ route::{RouteAddress, RouteAttribute, RouteMessage, RouteProtocol},
+ AddressFamily,
+};
+use std::{
+ fmt,
+ net::{IpAddr, Ipv4Addr, Ipv6Addr},
+};
+use thiserror::Error;
+
+#[derive(Error, Debug)]
+pub enum RouteError {
+ #[error("Invalid gateway")]
+ InvalidGateway,
+
+ #[error("Invalid destination")]
+ InvalidDestination,
+
+ #[error("Invalid prefix length")]
+ InvalidPrefixLength,
+
+ #[error("Missing gateway")]
+ MissingGateway,
+
+ #[error("Missing destination")]
+ MissingDestination,
+}
+
+pub struct Route {
+ pub protocol: RouteProtocol,
+ pub destination: IpNet,
+ pub gateway: IpAddr,
+}
+
+impl fmt::Debug for Route {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(f, "{} via {}", self.destination, self.gateway)
+ }
+}
+
+impl Route {
+ pub fn from_message(message: RouteMessage) -> Result<Self, RouteError> {
+ let mut gateway = None;
+ let mut destination = None;
+
+ for nla in message.attributes.iter() {
+ if let RouteAttribute::Gateway(ip) = nla {
+ gateway = match ip {
+ RouteAddress::Inet(ip) => Some(IpAddr::V4(*ip)),
+ RouteAddress::Inet6(ip) => Some(IpAddr::V6(*ip)),
+ _ => return Err(RouteError::InvalidGateway),
+ };
+ }
+
+ if let RouteAttribute::Destination(ref ip) = nla {
+ destination = match ip {
+ RouteAddress::Inet(ip) => Some(
+ IpNet::new(IpAddr::V4(*ip), message.header.destination_prefix_length)
+ .map_err(|_| RouteError::InvalidPrefixLength)?,
+ ),
+ RouteAddress::Inet6(ip) => Some(
+ IpNet::new(IpAddr::V6(*ip), message.header.destination_prefix_length)
+ .map_err(|_| RouteError::InvalidPrefixLength)?,
+ ),
+ _ => return Err(RouteError::InvalidDestination),
+ };
+ }
+ }
+
+ let gateway = match gateway {
+ Some(gateway) => gateway,
+ None => return Err(RouteError::MissingGateway),
+ };
+
+ let destination = match destination {
+ Some(destination) => destination,
+ None => match message.header.address_family {
+ AddressFamily::Inet => IpNet::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0)
+ .map_err(|_| RouteError::InvalidPrefixLength)?,
+ AddressFamily::Inet6 => IpNet::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0)
+ .map_err(|_| RouteError::InvalidPrefixLength)?,
+ _ => return Err(RouteError::InvalidDestination),
+ },
+ };
+
+ Ok(Route {
+ protocol: message.header.protocol,
+ destination,
+ gateway,
+ })
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use netlink_packet_route::AddressFamily;
+ use std::net::Ipv4Addr;
+
+ #[tokio::test]
+ async fn test_default_ipv4_route() {
+ let mut message = RouteMessage::default();
+
+ message.header.address_family = AddressFamily::Inet;
+ message.header.destination_prefix_length = 0;
+ message.header.protocol = RouteProtocol::Static;
+ message
+ .attributes
+ .push(RouteAttribute::Gateway(RouteAddress::Inet(Ipv4Addr::new(
+ 192, 168, 1, 1,
+ ))));
+
+ let route = Route::from_message(message).unwrap();
+
+ assert_eq!(route.protocol, RouteProtocol::Static);
+ assert_eq!(
+ route.destination,
+ IpNet::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0).unwrap()
+ );
+ assert_eq!(route.gateway, IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)));
+ }
+
+ #[tokio::test]
+ async fn test_default_ipv6_route() {
+ let mut message = RouteMessage::default();
+
+ message.header.address_family = AddressFamily::Inet6;
+ message.header.destination_prefix_length = 0;
+ message.header.protocol = RouteProtocol::Static;
+ message
+ .attributes
+ .push(RouteAttribute::Gateway(RouteAddress::Inet6(Ipv6Addr::new(
+ 0, 0, 0, 0, 0, 0, 0, 1,
+ ))));
+
+ let route = Route::from_message(message).unwrap();
+
+ assert_eq!(route.protocol, RouteProtocol::Static);
+ assert_eq!(
+ route.destination,
+ IpNet::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0).unwrap()
+ );
+ assert_eq!(
+ route.gateway,
+ IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1))
+ );
+ }
+}