Allow non strict (#5)

* Allow non-strict for MAC address

* Add docs for limitation and use cases
diff --git a/README.rst b/README.rst
index 3db4f1b..64caeab 100644
--- a/README.rst
+++ b/README.rst
@@ -4,8 +4,12 @@
 
 This is a simple server which can be used to manage complex Neutron policies
 which are not possible to be managed using the default Neutron ``policy.json``
-file due to the lack of programmatic control.  It covers the following use
-cases:
+file due to the lack of programmatic control.
+
+You can reference policy example in
+https://github.com/vexxhost/atmosphere/blob/main/roles/neutron/vars/main.yml#L125-L130
+
+It covers the following use cases:
 
 -------------------------------------------
 Allowed Address Pairs for Provider Networks
@@ -25,3 +29,58 @@
   ``allowed_address_pairs`` attribute with the same MAC & IP address.
 - Users cannot modify the ``fixed_ips`` attribute of a port if another port on
   the same network has an ``allowed_address_pairs`` attribute with the IP.
+
+---------
+Use cases
+---------
+
+Here is a example policy.yaml file for Neutron to use Neutron policy server:
+
+.. code-block:: yaml
+
+  delete_port: ((rule:admin_only) or (rule:service_api) or role:member and rule:network_owner
+    or role:member and project_id:%(project_id)s) and http://neutron-server:9697/port-delete
+  update_port:allowed_address_pairs: ((rule:admin_only) or (role:member and rule:network_owner)
+    or role:manager and project_id:%(project_id)s) or (role:member and project_id:%(project_id)s
+    and http://neutron-server:9697/address-pair )
+  update_port:allowed_address_pairs:ip_address: ((rule:admin_only) or (role:member and
+    rule:network_owner) or role:manager and project_id:%(project_id)s) or (role:member
+    and project_id:%(project_id)s)
+  update_port:allowed_address_pairs:mac_address: ((rule:admin_only) or (role:member
+    and rule:network_owner) or role:manager and project_id:%(project_id)s) or (role:member
+    and project_id:%(project_id)s)
+  update_port:fixed_ips: ((rule:admin_only) or (rule:service_api) or role:manager and
+    project_id:%(project_id)s or role:member and rule:network_owner) and http://neutron-server:9697/port-update
+  update_port:mac_address: ((rule:admin_only) or (rule:service_api) or role:manager
+    and project_id:%(project_id)s) and http://neutron-server:9697/port-update
+
+All rules above contains original rules with Neutron policy server URL integrated.
+Environment can consider make Neutron policy server URL a hard condition like above if
+wish the protection for allowed address pair exists across network ownership when
+update or delete ports.
+
+-----------
+Strict Mode
+-----------
+
+By default MAC address need to also match for add allowed address pairs,
+update port and delete port cases, but it can be disabled by provide query parameter
+`strict=0`. Like `http://neutron-server:9697/port-delete?strict=0`.
+With strict disabled, Mac address will not required to match.
+Policy can pass with only IP address match. This is useful with some HA structure
+which one IP might needs to switch cross two instances.
+
+-----------------
+Known Limitations
+-----------------
+
+Current limitation for cross-ownership network port address pair binding only
+allows fixed IP address format x.x.x.x without CIDR format like
+`/32` or `/24`. And the reason for that limitation is, when using CIDR like
+`10.10.10.0/24`, it will lock all ports with IPs under 10.10.10.0/24 to prevent
+delete actions. But that’s pretty damage to security consider user doesn’t get
+the ownership to the entire network. Currently it can directly put in fixed IP
+address like 10.10.10.4.
+Also worth to mentioned that, CIDR format limitation are not affected on any
+existing use cases (which user actually owned the network).
+So network owner can add allowed address pair with CIDRs like 10.10.10.0/24.
diff --git a/neutron_policy_server/tests/test_neutron_address_pair.py b/neutron_policy_server/tests/test_neutron_address_pair.py
index f9d2e08..aa33ab5 100644
--- a/neutron_policy_server/tests/test_neutron_address_pair.py
+++ b/neutron_policy_server/tests/test_neutron_address_pair.py
@@ -18,7 +18,10 @@
 
     def setUp(self, plugin=None, ext_mgr=None):
         super(TestAddressPairCasesFlaskBase, self).setUp(plugin)
-        address_pairs = [{"mac_address": "00:00:00:00:00:01", "ip_address": "10.0.0.1"}]
+        address_pairs = [
+            {"mac_address": "00:00:00:00:00:01", "ip_address": "10.0.0.1"},
+            {"mac_address": "00:00:00:00:00:02", "ip_address": "10.0.0.2"},
+        ]
         db_options.set_defaults(cfg.CONF, connection="sqlite://")
 
         with self.network() as net:
@@ -26,6 +29,9 @@
                 fixed_ips = [
                     {"subnet_id": subnet["subnet"]["id"], "ip_address": "10.0.0.1"}
                 ]
+                fixed_ips2 = [
+                    {"subnet_id": subnet["subnet"]["id"], "ip_address": "10.0.0.2"}
+                ]
 
             self.port_resp = self._create_port(
                 self.fmt,
@@ -35,6 +41,14 @@
             )
             self.port = self.deserialize(self.fmt, self.port_resp)
 
+            self.port_2_resp = self._create_port(
+                self.fmt,
+                net["network"]["id"],
+                mac_address="00:00:00:00:00:03",
+                fixed_ips=fixed_ips2,
+            )
+            self.port2 = self.deserialize(self.fmt, self.port_2_resp)
+
             self.port_resp_dep = self._create_port(
                 self.fmt,
                 net["network"]["id"],
@@ -52,6 +66,14 @@
                 "project_id": self.port["port"]["project_id"],
             },
         }
+        self.delete_port_strict = {
+            "rule": "delete_port",
+            "target": self.port2["port"],
+            "credentials": {
+                "user_id": "fake_user",
+                "project_id": self.port2["port"]["project_id"],
+            },
+        }
         self.delete_port_dep_json = {
             "rule": "delete_port",
             "target": self.port_dep["port"],
@@ -102,11 +124,33 @@
             "target": self.port["port"].copy(),
             "credentials": {
                 "user_id": "fake_user",
-                "project_id": self.port_dep["port"]["project_id"],
+                "project_id": self.port["port"]["project_id"],
             },
         }
-        self.update_port["target"]["attributes_to_update"] = ["mac_address"]
-        self.update_port["target"]["mac_address"] = "52:54:00:41:a4:97"
+        self.update_port["target"]["attributes_to_update"] = [
+            "fixed_ips",
+            "mac_address",
+        ]
+        self.update_port["target"]["fixed_ips"] = [
+            {"subnet_id": subnet["subnet"]["id"], "ip_address": "10.0.0.1"}
+        ]
+
+        self.update_port_strict = {
+            "rule": "update_port",
+            "target": self.port2["port"].copy(),
+            "credentials": {
+                "user_id": "fake_user",
+                "project_id": self.port2["port"]["project_id"],
+            },
+        }
+        self.update_port_strict["target"]["attributes_to_update"] = [
+            "fixed_ips",
+            "mac_address",
+        ]
+        self.update_port_strict["target"]["fixed_ips"] = [
+            {"subnet_id": subnet["subnet"]["id"], "ip_address": "10.0.0.2"}
+        ]
+        self.update_port_strict["target"]["mac_address"] = "00:00:00:00:20:01"
 
         self.allowed_address_pairs = {
             "rule": "allowed_address_pairs",
@@ -126,6 +170,10 @@
         self.allowed_address_pairs_not_found["target"]["allowed_address_pairs"] = [
             {"ip_address": "10.96.250.203", "mac_address": "fa:16:3e:da:ed:0b"}
         ]
+        self.allowed_address_pairs_only_ip_found = deepcopy(self.allowed_address_pairs)
+        self.allowed_address_pairs_only_ip_found["target"]["allowed_address_pairs"] = [
+            {"ip_address": "10.0.0.1", "mac_address": "00:00:00:00:00:31"}
+        ]
         self.allowed_address_pairs_address_not_found = deepcopy(
             self.allowed_address_pairs
         )
@@ -170,14 +218,53 @@
 
     def test_port_delete_success(self):
         response = self.client.post(  # pylint: disable=E1101
-            "/port-delete", json=self.delete_port_dep_json
+            "/port-delete?strict=0", json=self.delete_port_dep_json
         )  # pylint: disable=E1101
         self.assertEqual(b"True", response.data)
         self.assertEqual(200, response.status_code)
 
+    def test_port_delete_fail_with_only_ip_dep(self):
+        response = self.client.post(  # pylint: disable=E1101
+            "/port-delete?strict=0", json=self.delete_port_strict
+        )  # pylint: disable=E1101
+        self.assertEqual(
+            bytes(
+                (
+                    "Address pairs dependency found for port: "
+                    f"{self.port2['port']['id']}"
+                ),
+                "utf-8",
+            ),
+            response.data,
+        )
+        self.assertEqual(403, response.status_code)
+
+    def test_strict_port_delete_success_with_only_ip_dep(self):
+        response = self.client.post(  # pylint: disable=E1101
+            "/port-delete?strict=1", json=self.delete_port_strict
+        )  # pylint: disable=E1101
+        self.assertEqual(b"True", response.data)
+        self.assertEqual(200, response.status_code)
+
+    def test_strict_port_delete_fail_with_dep(self):
+        response = self.client.post(  # pylint: disable=E1101
+            "/port-delete?strict=1", json=self.delete_port_json
+        )  # pylint: disable=E1101
+        self.assertEqual(
+            bytes(
+                (
+                    "Address pairs dependency found for port: "
+                    f"{self.port['port']['id']}"
+                ),
+                "utf-8",
+            ),
+            response.data,
+        )
+        self.assertEqual(403, response.status_code)
+
     def test_port_delete_fail_with_dep(self):
         response = self.client.post(  # pylint: disable=E1101
-            "/port-delete", json=self.delete_port_json
+            "/port-delete?strict=0", json=self.delete_port_json
         )  # pylint: disable=E1101
         self.assertEqual(
             bytes(
@@ -193,28 +280,48 @@
 
     def test_port_update_success(self):
         response = self.client.post(  # pylint: disable=E1101
-            "/port-update", json=self.update_port_dep
+            "port-update?strict=0", json=self.update_port_dep
         )  # pylint: disable=E1101
         self.assertEqual(b"True", response.data)
         self.assertEqual(200, response.status_code)
 
     def test_port_update_success_no_address_change(self):
         response = self.client.post(  # pylint: disable=E1101
-            "/port-update", json=self.update_port_no_address
+            "port-update?strict=0", json=self.update_port_no_address
         )  # pylint: disable=E1101
         self.assertEqual(b"True", response.data)
         self.assertEqual(200, response.status_code)
 
     def test_port_update_fail_no_match_port(self):
         response = self.client.post(  # pylint: disable=E1101
-            "/port-update", json=self.update_port_not_exist
+            "port-update?strict=0", json=self.update_port_not_exist
         )  # pylint: disable=E1101
         self.assertEqual(b"True", response.data)
         self.assertEqual(200, response.status_code)
 
-    def test_port_update_fail(self):
+    def test_port_update_fail_with_ip_dep_found(self):
+        """Failed if IP address dep found"""
+
         response = self.client.post(  # pylint: disable=E1101
-            "/port-update", json=self.update_port
+            "port-update?strict=0", json=self.update_port_strict
+        )
+        self.assertEqual(
+            bytes(
+                (
+                    "Address pairs dependency found for port: "
+                    f"{self.port2['port']['id']}"
+                ),
+                "utf-8",
+            ),
+            response.data,
+        )
+        self.assertEqual(403, response.status_code)
+
+    def test_strict_port_update_fail(self):
+        """Failed only if IP and MAC address pair dep found"""
+
+        response = self.client.post(  # pylint: disable=E1101
+            "/port-update?strict=1", json=self.update_port
         )
         self.assertEqual(
             bytes(
@@ -228,13 +335,22 @@
         )
         self.assertEqual(403, response.status_code)
 
+    def test_strict_port_update_success_with_mac_not_match(self):
+        """Failed only if IP and MAC address pair dep found"""
+
+        response = self.client.post(  # pylint: disable=E1101
+            "/port-update?strict=1", json=self.update_port_strict
+        )
+        self.assertEqual(b"True", response.data)
+        self.assertEqual(200, response.status_code)
+
     def test_health_check_success(self):
         response = self.client.get("/health")  # pylint: disable=E1101
         self.assertEqual(200, response.status_code)
 
     def test_address_pair_success(self):
         response = self.client.post(  # pylint: disable=E1101
-            "/address-pair", json=self.allowed_address_pairs
+            "/address-pair?strict=0", json=self.allowed_address_pairs
         )
         self.assertEqual(
             b"True",
@@ -242,9 +358,30 @@
         )
         self.assertEqual(200, response.status_code)
 
+    def test_address_pair_failed_only_ip_match(self):
+        response = self.client.post(  # pylint: disable=E1101
+            "/address-pair?strict=0", json=self.allowed_address_pairs_only_ip_found
+        )
+        self.assertEqual(
+            b"True",
+            response.data,
+        )
+        self.assertEqual(200, response.status_code)
+
+    def test_strict_address_pair_failed_only_ip_match(self):
+        response = self.client.post(  # pylint: disable=E1101
+            "/address-pair?strict=1", json=self.allowed_address_pairs_only_ip_found
+        )
+
+        self.assertEqual(
+            b"Zero or Multiple match port found with MAC address 00:00:00:00:00:31.",
+            response.data,
+        )
+        self.assertEqual(403, response.status_code)
+
     def test_address_pair_success_no_attributes_to_update(self):
         response = self.client.post(  # pylint: disable=E1101
-            "/address-pair", json=self.allowed_address_pairs_no_attribute
+            "/address-pair?strict=0", json=self.allowed_address_pairs_no_attribute
         )
         self.assertEqual(
             b"True",
@@ -254,7 +391,7 @@
 
     def test_address_pair_success_empty(self):
         response = self.client.post(  # pylint: disable=E1101
-            "/address-pair", json=self.allowed_address_pairs_empty
+            "/address-pair?strict=0", json=self.allowed_address_pairs_empty
         )
         self.assertEqual(
             b"True",
@@ -264,7 +401,7 @@
 
     def test_address_pair_success_not_in_attributes(self):
         response = self.client.post(  # pylint: disable=E1101
-            "/address-pair", json=self.allowed_address_pairs_not_in_attribute
+            "/address-pair?strict=0", json=self.allowed_address_pairs_not_in_attribute
         )
         self.assertEqual(
             b"True",
@@ -274,7 +411,7 @@
 
     def test_address_pair_fail_target_not_found(self):
         response = self.client.post(  # pylint: disable=E1101
-            "/address-pair", json=self.allowed_address_pairs_target_not_found
+            "/address-pair?strict=0", json=self.allowed_address_pairs_target_not_found
         )
         portname = self.allowed_address_pairs_target_not_found["target"]["id"]
         self.assertEqual(
@@ -285,9 +422,21 @@
         )
         self.assertEqual(403, response.status_code)
 
-    def test_address_pair_fail_mac_not_found(self):
+    def test_address_pair_fail_not_found(self):
         response = self.client.post(  # pylint: disable=E1101
-            "/address-pair", json=self.allowed_address_pairs_not_found
+            "/address-pair?strict=0", json=self.allowed_address_pairs_not_found
+        )
+        self.assertEqual(
+            f"IP address not exists in network from project {self.port['port']['project_id']}.".encode(
+                "utf-8"
+            ),
+            response.data,
+        )
+        self.assertEqual(403, response.status_code)
+
+    def test_strict_address_pair_fail_mac_not_found(self):
+        response = self.client.post(  # pylint: disable=E1101
+            "/address-pair?strict=1", json=self.allowed_address_pairs_not_found
         )
         self.assertEqual(
             b"Zero or Multiple match port found with MAC address fa:16:3e:da:ed:0b.",
@@ -297,7 +446,7 @@
 
     def test_address_pair_fail_address_not_found(self):
         response = self.client.post(  # pylint: disable=E1101
-            "/address-pair", json=self.allowed_address_pairs_address_not_found
+            "/address-pair?strict=0", json=self.allowed_address_pairs_address_not_found
         )
         self.assertEqual(
             f"IP address not exists in network from project {self.port['port']['project_id']}.".encode(
diff --git a/neutron_policy_server/wsgi.py b/neutron_policy_server/wsgi.py
index dc27baa..00349d3 100644
--- a/neutron_policy_server/wsgi.py
+++ b/neutron_policy_server/wsgi.py
@@ -47,6 +47,9 @@
 
 @app.route("/address-pair", methods=["POST"])
 def enforce_address_pair():
+    """Check if allowed address pair set to valid target IP address and MAC"""
+    # Check only IP address if strict is 0
+    strict = bool(request.args.get("strict", default=1, type=int))
     if "attributes_to_update" not in g.target:
         LOG.info("No attributes_to_update found, skip check.")
         return Response("True", status=200, mimetype="text/plain")
@@ -85,13 +88,15 @@
     for pair in target_pairs:
         if pair.get("ip_address") not in db_pairs_dict:
             verify_address_pairs.append(pair)
-        elif pair.get("mac_address") and db_pairs_dict[
-            pair.get("ip_address")
-        ] != pair.get("mac_address"):
+        elif (
+            strict
+            and pair.get("mac_address")
+            and db_pairs_dict[pair.get("ip_address")] != pair.get("mac_address")
+        ):
             verify_address_pairs.append(pair)
 
     for allowed_address_pair in verify_address_pairs:
-        if "mac_address" in allowed_address_pair:
+        if strict and "mac_address" in allowed_address_pair:
             with db_api.CONTEXT_READER.using(g.ctx):
                 ports = port_obj.Port.get_objects(
                     g.ctx,
@@ -136,15 +141,24 @@
 
 @app.route("/port-update", methods=["POST"])
 def enforce_port_update():
+    """Check if IP or MAC has address pair dependency
+
+    Make sure we allow update IP or MAC only if they don't
+    have any allowed address pair dependency
+    """
+    # Check only IP address if strict is 0
+    strict = bool(request.args.get("strict", default=1, type=int))
+
     if "attributes_to_update" not in g.target:
         LOG.info("No attributes_to_update found, skip check.")
         return Response("True", status=200, mimetype="text/plain")
-    elif ("mac_address" not in g.target["attributes_to_update"]) and (
+    elif (not strict or ("mac_address" not in g.target["attributes_to_update"])) and (
         "fixed_ips" not in g.target["attributes_to_update"]
     ):
+        msg = ""
         LOG.info(
-            "No mac_address or fixed_ips in update targets "
-            f"for port {g.target['id']}, skip check."
+            f"No {'mac_address or fixed_ips' if strict else 'fixed_ips'} in "
+            f"update targets for port {g.target['id']}, skip check."
         )
         return Response("True", status=200, mimetype="text/plain")
 
@@ -163,40 +177,40 @@
 
         fixed_ips = [str(fixed_ip["ip_address"]) for fixed_ip in ports[0].fixed_ips]
 
-        query = (
-            g.ctx.session.query(models.AllowedAddressPair)
-            .filter(
+        query = g.ctx.session.query(models.AllowedAddressPair).filter(
+            models.AllowedAddressPair.ip_address.in_(fixed_ips)
+        )
+        if strict:
+            query = query.filter(
                 models.AllowedAddressPair.mac_address.in_([str(ports[0].mac_address)])
             )
-            .filter(models.AllowedAddressPair.ip_address.in_(fixed_ips))
-        )
-        pairs = query.all()
-    pairs = [
-        aap_obj.AllowedAddressPair._load_object(context, db_obj)
-        for db_obj in query.all()
-    ]
+        pairs = [
+            aap_obj.AllowedAddressPair._load_object(context, db_obj)
+            for db_obj in query.all()
+        ]
     if len(pairs) > 0:
         msg = f"Address pairs dependency found for port: {g.target['id']}"
         LOG.info(msg)
         return Response(msg, status=403, mimetype="text/plain")
-
     LOG.info(f"Update check passed for port: {g.target['id']}")
     return Response("True", status=200, mimetype="text/plain")
 
 
 @app.route("/port-delete", methods=["POST"])
 def enforce_port_delete():
+    # Check only IP address if strict is 0
+    strict = bool(request.args.get("strict", default=1, type=int))
     fixed_ips = [str(fixed_ip["ip_address"]) for fixed_ip in g.target["fixed_ips"]]
     with db_api.CONTEXT_READER.using(g.ctx):
-        query = (
-            g.ctx.session.query(models.AllowedAddressPair)
-            .filter(
+        query = g.ctx.session.query(models.AllowedAddressPair).filter(
+            models.AllowedAddressPair.ip_address.in_(fixed_ips)
+        )
+        if strict:
+            query = query.filter(
                 models.AllowedAddressPair.mac_address.in_(
                     [str(g.target["mac_address"])]
                 )
             )
-            .filter(models.AllowedAddressPair.ip_address.in_(fixed_ips))
-        )
 
     pairs = query.all()
     pairs = [