Allow non strict (#5)
* Allow non-strict for MAC address
* Add docs for limitation and use cases
diff --git a/README.rst b/README.rst
index 3db4f1b..64caeab 100644
--- a/README.rst
+++ b/README.rst
@@ -4,8 +4,12 @@
This is a simple server which can be used to manage complex Neutron policies
which are not possible to be managed using the default Neutron ``policy.json``
-file due to the lack of programmatic control. It covers the following use
-cases:
+file due to the lack of programmatic control.
+
+You can reference policy example in
+https://github.com/vexxhost/atmosphere/blob/main/roles/neutron/vars/main.yml#L125-L130
+
+It covers the following use cases:
-------------------------------------------
Allowed Address Pairs for Provider Networks
@@ -25,3 +29,58 @@
``allowed_address_pairs`` attribute with the same MAC & IP address.
- Users cannot modify the ``fixed_ips`` attribute of a port if another port on
the same network has an ``allowed_address_pairs`` attribute with the IP.
+
+---------
+Use cases
+---------
+
+Here is a example policy.yaml file for Neutron to use Neutron policy server:
+
+.. code-block:: yaml
+
+ delete_port: ((rule:admin_only) or (rule:service_api) or role:member and rule:network_owner
+ or role:member and project_id:%(project_id)s) and http://neutron-server:9697/port-delete
+ update_port:allowed_address_pairs: ((rule:admin_only) or (role:member and rule:network_owner)
+ or role:manager and project_id:%(project_id)s) or (role:member and project_id:%(project_id)s
+ and http://neutron-server:9697/address-pair )
+ update_port:allowed_address_pairs:ip_address: ((rule:admin_only) or (role:member and
+ rule:network_owner) or role:manager and project_id:%(project_id)s) or (role:member
+ and project_id:%(project_id)s)
+ update_port:allowed_address_pairs:mac_address: ((rule:admin_only) or (role:member
+ and rule:network_owner) or role:manager and project_id:%(project_id)s) or (role:member
+ and project_id:%(project_id)s)
+ update_port:fixed_ips: ((rule:admin_only) or (rule:service_api) or role:manager and
+ project_id:%(project_id)s or role:member and rule:network_owner) and http://neutron-server:9697/port-update
+ update_port:mac_address: ((rule:admin_only) or (rule:service_api) or role:manager
+ and project_id:%(project_id)s) and http://neutron-server:9697/port-update
+
+All rules above contains original rules with Neutron policy server URL integrated.
+Environment can consider make Neutron policy server URL a hard condition like above if
+wish the protection for allowed address pair exists across network ownership when
+update or delete ports.
+
+-----------
+Strict Mode
+-----------
+
+By default MAC address need to also match for add allowed address pairs,
+update port and delete port cases, but it can be disabled by provide query parameter
+`strict=0`. Like `http://neutron-server:9697/port-delete?strict=0`.
+With strict disabled, Mac address will not required to match.
+Policy can pass with only IP address match. This is useful with some HA structure
+which one IP might needs to switch cross two instances.
+
+-----------------
+Known Limitations
+-----------------
+
+Current limitation for cross-ownership network port address pair binding only
+allows fixed IP address format x.x.x.x without CIDR format like
+`/32` or `/24`. And the reason for that limitation is, when using CIDR like
+`10.10.10.0/24`, it will lock all ports with IPs under 10.10.10.0/24 to prevent
+delete actions. But that’s pretty damage to security consider user doesn’t get
+the ownership to the entire network. Currently it can directly put in fixed IP
+address like 10.10.10.4.
+Also worth to mentioned that, CIDR format limitation are not affected on any
+existing use cases (which user actually owned the network).
+So network owner can add allowed address pair with CIDRs like 10.10.10.0/24.
diff --git a/neutron_policy_server/tests/test_neutron_address_pair.py b/neutron_policy_server/tests/test_neutron_address_pair.py
index f9d2e08..aa33ab5 100644
--- a/neutron_policy_server/tests/test_neutron_address_pair.py
+++ b/neutron_policy_server/tests/test_neutron_address_pair.py
@@ -18,7 +18,10 @@
def setUp(self, plugin=None, ext_mgr=None):
super(TestAddressPairCasesFlaskBase, self).setUp(plugin)
- address_pairs = [{"mac_address": "00:00:00:00:00:01", "ip_address": "10.0.0.1"}]
+ address_pairs = [
+ {"mac_address": "00:00:00:00:00:01", "ip_address": "10.0.0.1"},
+ {"mac_address": "00:00:00:00:00:02", "ip_address": "10.0.0.2"},
+ ]
db_options.set_defaults(cfg.CONF, connection="sqlite://")
with self.network() as net:
@@ -26,6 +29,9 @@
fixed_ips = [
{"subnet_id": subnet["subnet"]["id"], "ip_address": "10.0.0.1"}
]
+ fixed_ips2 = [
+ {"subnet_id": subnet["subnet"]["id"], "ip_address": "10.0.0.2"}
+ ]
self.port_resp = self._create_port(
self.fmt,
@@ -35,6 +41,14 @@
)
self.port = self.deserialize(self.fmt, self.port_resp)
+ self.port_2_resp = self._create_port(
+ self.fmt,
+ net["network"]["id"],
+ mac_address="00:00:00:00:00:03",
+ fixed_ips=fixed_ips2,
+ )
+ self.port2 = self.deserialize(self.fmt, self.port_2_resp)
+
self.port_resp_dep = self._create_port(
self.fmt,
net["network"]["id"],
@@ -52,6 +66,14 @@
"project_id": self.port["port"]["project_id"],
},
}
+ self.delete_port_strict = {
+ "rule": "delete_port",
+ "target": self.port2["port"],
+ "credentials": {
+ "user_id": "fake_user",
+ "project_id": self.port2["port"]["project_id"],
+ },
+ }
self.delete_port_dep_json = {
"rule": "delete_port",
"target": self.port_dep["port"],
@@ -102,11 +124,33 @@
"target": self.port["port"].copy(),
"credentials": {
"user_id": "fake_user",
- "project_id": self.port_dep["port"]["project_id"],
+ "project_id": self.port["port"]["project_id"],
},
}
- self.update_port["target"]["attributes_to_update"] = ["mac_address"]
- self.update_port["target"]["mac_address"] = "52:54:00:41:a4:97"
+ self.update_port["target"]["attributes_to_update"] = [
+ "fixed_ips",
+ "mac_address",
+ ]
+ self.update_port["target"]["fixed_ips"] = [
+ {"subnet_id": subnet["subnet"]["id"], "ip_address": "10.0.0.1"}
+ ]
+
+ self.update_port_strict = {
+ "rule": "update_port",
+ "target": self.port2["port"].copy(),
+ "credentials": {
+ "user_id": "fake_user",
+ "project_id": self.port2["port"]["project_id"],
+ },
+ }
+ self.update_port_strict["target"]["attributes_to_update"] = [
+ "fixed_ips",
+ "mac_address",
+ ]
+ self.update_port_strict["target"]["fixed_ips"] = [
+ {"subnet_id": subnet["subnet"]["id"], "ip_address": "10.0.0.2"}
+ ]
+ self.update_port_strict["target"]["mac_address"] = "00:00:00:00:20:01"
self.allowed_address_pairs = {
"rule": "allowed_address_pairs",
@@ -126,6 +170,10 @@
self.allowed_address_pairs_not_found["target"]["allowed_address_pairs"] = [
{"ip_address": "10.96.250.203", "mac_address": "fa:16:3e:da:ed:0b"}
]
+ self.allowed_address_pairs_only_ip_found = deepcopy(self.allowed_address_pairs)
+ self.allowed_address_pairs_only_ip_found["target"]["allowed_address_pairs"] = [
+ {"ip_address": "10.0.0.1", "mac_address": "00:00:00:00:00:31"}
+ ]
self.allowed_address_pairs_address_not_found = deepcopy(
self.allowed_address_pairs
)
@@ -170,14 +218,53 @@
def test_port_delete_success(self):
response = self.client.post( # pylint: disable=E1101
- "/port-delete", json=self.delete_port_dep_json
+ "/port-delete?strict=0", json=self.delete_port_dep_json
) # pylint: disable=E1101
self.assertEqual(b"True", response.data)
self.assertEqual(200, response.status_code)
+ def test_port_delete_fail_with_only_ip_dep(self):
+ response = self.client.post( # pylint: disable=E1101
+ "/port-delete?strict=0", json=self.delete_port_strict
+ ) # pylint: disable=E1101
+ self.assertEqual(
+ bytes(
+ (
+ "Address pairs dependency found for port: "
+ f"{self.port2['port']['id']}"
+ ),
+ "utf-8",
+ ),
+ response.data,
+ )
+ self.assertEqual(403, response.status_code)
+
+ def test_strict_port_delete_success_with_only_ip_dep(self):
+ response = self.client.post( # pylint: disable=E1101
+ "/port-delete?strict=1", json=self.delete_port_strict
+ ) # pylint: disable=E1101
+ self.assertEqual(b"True", response.data)
+ self.assertEqual(200, response.status_code)
+
+ def test_strict_port_delete_fail_with_dep(self):
+ response = self.client.post( # pylint: disable=E1101
+ "/port-delete?strict=1", json=self.delete_port_json
+ ) # pylint: disable=E1101
+ self.assertEqual(
+ bytes(
+ (
+ "Address pairs dependency found for port: "
+ f"{self.port['port']['id']}"
+ ),
+ "utf-8",
+ ),
+ response.data,
+ )
+ self.assertEqual(403, response.status_code)
+
def test_port_delete_fail_with_dep(self):
response = self.client.post( # pylint: disable=E1101
- "/port-delete", json=self.delete_port_json
+ "/port-delete?strict=0", json=self.delete_port_json
) # pylint: disable=E1101
self.assertEqual(
bytes(
@@ -193,28 +280,48 @@
def test_port_update_success(self):
response = self.client.post( # pylint: disable=E1101
- "/port-update", json=self.update_port_dep
+ "port-update?strict=0", json=self.update_port_dep
) # pylint: disable=E1101
self.assertEqual(b"True", response.data)
self.assertEqual(200, response.status_code)
def test_port_update_success_no_address_change(self):
response = self.client.post( # pylint: disable=E1101
- "/port-update", json=self.update_port_no_address
+ "port-update?strict=0", json=self.update_port_no_address
) # pylint: disable=E1101
self.assertEqual(b"True", response.data)
self.assertEqual(200, response.status_code)
def test_port_update_fail_no_match_port(self):
response = self.client.post( # pylint: disable=E1101
- "/port-update", json=self.update_port_not_exist
+ "port-update?strict=0", json=self.update_port_not_exist
) # pylint: disable=E1101
self.assertEqual(b"True", response.data)
self.assertEqual(200, response.status_code)
- def test_port_update_fail(self):
+ def test_port_update_fail_with_ip_dep_found(self):
+ """Failed if IP address dep found"""
+
response = self.client.post( # pylint: disable=E1101
- "/port-update", json=self.update_port
+ "port-update?strict=0", json=self.update_port_strict
+ )
+ self.assertEqual(
+ bytes(
+ (
+ "Address pairs dependency found for port: "
+ f"{self.port2['port']['id']}"
+ ),
+ "utf-8",
+ ),
+ response.data,
+ )
+ self.assertEqual(403, response.status_code)
+
+ def test_strict_port_update_fail(self):
+ """Failed only if IP and MAC address pair dep found"""
+
+ response = self.client.post( # pylint: disable=E1101
+ "/port-update?strict=1", json=self.update_port
)
self.assertEqual(
bytes(
@@ -228,13 +335,22 @@
)
self.assertEqual(403, response.status_code)
+ def test_strict_port_update_success_with_mac_not_match(self):
+ """Failed only if IP and MAC address pair dep found"""
+
+ response = self.client.post( # pylint: disable=E1101
+ "/port-update?strict=1", json=self.update_port_strict
+ )
+ self.assertEqual(b"True", response.data)
+ self.assertEqual(200, response.status_code)
+
def test_health_check_success(self):
response = self.client.get("/health") # pylint: disable=E1101
self.assertEqual(200, response.status_code)
def test_address_pair_success(self):
response = self.client.post( # pylint: disable=E1101
- "/address-pair", json=self.allowed_address_pairs
+ "/address-pair?strict=0", json=self.allowed_address_pairs
)
self.assertEqual(
b"True",
@@ -242,9 +358,30 @@
)
self.assertEqual(200, response.status_code)
+ def test_address_pair_failed_only_ip_match(self):
+ response = self.client.post( # pylint: disable=E1101
+ "/address-pair?strict=0", json=self.allowed_address_pairs_only_ip_found
+ )
+ self.assertEqual(
+ b"True",
+ response.data,
+ )
+ self.assertEqual(200, response.status_code)
+
+ def test_strict_address_pair_failed_only_ip_match(self):
+ response = self.client.post( # pylint: disable=E1101
+ "/address-pair?strict=1", json=self.allowed_address_pairs_only_ip_found
+ )
+
+ self.assertEqual(
+ b"Zero or Multiple match port found with MAC address 00:00:00:00:00:31.",
+ response.data,
+ )
+ self.assertEqual(403, response.status_code)
+
def test_address_pair_success_no_attributes_to_update(self):
response = self.client.post( # pylint: disable=E1101
- "/address-pair", json=self.allowed_address_pairs_no_attribute
+ "/address-pair?strict=0", json=self.allowed_address_pairs_no_attribute
)
self.assertEqual(
b"True",
@@ -254,7 +391,7 @@
def test_address_pair_success_empty(self):
response = self.client.post( # pylint: disable=E1101
- "/address-pair", json=self.allowed_address_pairs_empty
+ "/address-pair?strict=0", json=self.allowed_address_pairs_empty
)
self.assertEqual(
b"True",
@@ -264,7 +401,7 @@
def test_address_pair_success_not_in_attributes(self):
response = self.client.post( # pylint: disable=E1101
- "/address-pair", json=self.allowed_address_pairs_not_in_attribute
+ "/address-pair?strict=0", json=self.allowed_address_pairs_not_in_attribute
)
self.assertEqual(
b"True",
@@ -274,7 +411,7 @@
def test_address_pair_fail_target_not_found(self):
response = self.client.post( # pylint: disable=E1101
- "/address-pair", json=self.allowed_address_pairs_target_not_found
+ "/address-pair?strict=0", json=self.allowed_address_pairs_target_not_found
)
portname = self.allowed_address_pairs_target_not_found["target"]["id"]
self.assertEqual(
@@ -285,9 +422,21 @@
)
self.assertEqual(403, response.status_code)
- def test_address_pair_fail_mac_not_found(self):
+ def test_address_pair_fail_not_found(self):
response = self.client.post( # pylint: disable=E1101
- "/address-pair", json=self.allowed_address_pairs_not_found
+ "/address-pair?strict=0", json=self.allowed_address_pairs_not_found
+ )
+ self.assertEqual(
+ f"IP address not exists in network from project {self.port['port']['project_id']}.".encode(
+ "utf-8"
+ ),
+ response.data,
+ )
+ self.assertEqual(403, response.status_code)
+
+ def test_strict_address_pair_fail_mac_not_found(self):
+ response = self.client.post( # pylint: disable=E1101
+ "/address-pair?strict=1", json=self.allowed_address_pairs_not_found
)
self.assertEqual(
b"Zero or Multiple match port found with MAC address fa:16:3e:da:ed:0b.",
@@ -297,7 +446,7 @@
def test_address_pair_fail_address_not_found(self):
response = self.client.post( # pylint: disable=E1101
- "/address-pair", json=self.allowed_address_pairs_address_not_found
+ "/address-pair?strict=0", json=self.allowed_address_pairs_address_not_found
)
self.assertEqual(
f"IP address not exists in network from project {self.port['port']['project_id']}.".encode(
diff --git a/neutron_policy_server/wsgi.py b/neutron_policy_server/wsgi.py
index dc27baa..00349d3 100644
--- a/neutron_policy_server/wsgi.py
+++ b/neutron_policy_server/wsgi.py
@@ -47,6 +47,9 @@
@app.route("/address-pair", methods=["POST"])
def enforce_address_pair():
+ """Check if allowed address pair set to valid target IP address and MAC"""
+ # Check only IP address if strict is 0
+ strict = bool(request.args.get("strict", default=1, type=int))
if "attributes_to_update" not in g.target:
LOG.info("No attributes_to_update found, skip check.")
return Response("True", status=200, mimetype="text/plain")
@@ -85,13 +88,15 @@
for pair in target_pairs:
if pair.get("ip_address") not in db_pairs_dict:
verify_address_pairs.append(pair)
- elif pair.get("mac_address") and db_pairs_dict[
- pair.get("ip_address")
- ] != pair.get("mac_address"):
+ elif (
+ strict
+ and pair.get("mac_address")
+ and db_pairs_dict[pair.get("ip_address")] != pair.get("mac_address")
+ ):
verify_address_pairs.append(pair)
for allowed_address_pair in verify_address_pairs:
- if "mac_address" in allowed_address_pair:
+ if strict and "mac_address" in allowed_address_pair:
with db_api.CONTEXT_READER.using(g.ctx):
ports = port_obj.Port.get_objects(
g.ctx,
@@ -136,15 +141,24 @@
@app.route("/port-update", methods=["POST"])
def enforce_port_update():
+ """Check if IP or MAC has address pair dependency
+
+ Make sure we allow update IP or MAC only if they don't
+ have any allowed address pair dependency
+ """
+ # Check only IP address if strict is 0
+ strict = bool(request.args.get("strict", default=1, type=int))
+
if "attributes_to_update" not in g.target:
LOG.info("No attributes_to_update found, skip check.")
return Response("True", status=200, mimetype="text/plain")
- elif ("mac_address" not in g.target["attributes_to_update"]) and (
+ elif (not strict or ("mac_address" not in g.target["attributes_to_update"])) and (
"fixed_ips" not in g.target["attributes_to_update"]
):
+ msg = ""
LOG.info(
- "No mac_address or fixed_ips in update targets "
- f"for port {g.target['id']}, skip check."
+ f"No {'mac_address or fixed_ips' if strict else 'fixed_ips'} in "
+ f"update targets for port {g.target['id']}, skip check."
)
return Response("True", status=200, mimetype="text/plain")
@@ -163,40 +177,40 @@
fixed_ips = [str(fixed_ip["ip_address"]) for fixed_ip in ports[0].fixed_ips]
- query = (
- g.ctx.session.query(models.AllowedAddressPair)
- .filter(
+ query = g.ctx.session.query(models.AllowedAddressPair).filter(
+ models.AllowedAddressPair.ip_address.in_(fixed_ips)
+ )
+ if strict:
+ query = query.filter(
models.AllowedAddressPair.mac_address.in_([str(ports[0].mac_address)])
)
- .filter(models.AllowedAddressPair.ip_address.in_(fixed_ips))
- )
- pairs = query.all()
- pairs = [
- aap_obj.AllowedAddressPair._load_object(context, db_obj)
- for db_obj in query.all()
- ]
+ pairs = [
+ aap_obj.AllowedAddressPair._load_object(context, db_obj)
+ for db_obj in query.all()
+ ]
if len(pairs) > 0:
msg = f"Address pairs dependency found for port: {g.target['id']}"
LOG.info(msg)
return Response(msg, status=403, mimetype="text/plain")
-
LOG.info(f"Update check passed for port: {g.target['id']}")
return Response("True", status=200, mimetype="text/plain")
@app.route("/port-delete", methods=["POST"])
def enforce_port_delete():
+ # Check only IP address if strict is 0
+ strict = bool(request.args.get("strict", default=1, type=int))
fixed_ips = [str(fixed_ip["ip_address"]) for fixed_ip in g.target["fixed_ips"]]
with db_api.CONTEXT_READER.using(g.ctx):
- query = (
- g.ctx.session.query(models.AllowedAddressPair)
- .filter(
+ query = g.ctx.session.query(models.AllowedAddressPair).filter(
+ models.AllowedAddressPair.ip_address.in_(fixed_ips)
+ )
+ if strict:
+ query = query.filter(
models.AllowedAddressPair.mac_address.in_(
[str(g.target["mac_address"])]
)
)
- .filter(models.AllowedAddressPair.ip_address.in_(fixed_ips))
- )
pairs = query.all()
pairs = [