From 3f87df954d91434506990c804c5cb002c65431fe Mon Sep 17 00:00:00 2001 From: Vicente <45470655+amorfo77@users.noreply.github.com> Date: Sat, 11 Mar 2023 14:13:46 +0100 Subject: [PATCH] use 'ipaddress' to manage ips in SNAT --- data/Dockerfiles/netfilter/server.py | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/data/Dockerfiles/netfilter/server.py b/data/Dockerfiles/netfilter/server.py index ff3414b6..d9fb9aeb 100644 --- a/data/Dockerfiles/netfilter/server.py +++ b/data/Dockerfiles/netfilter/server.py @@ -313,9 +313,8 @@ def snat_rule(_family: str, snat_target: str): else: source_address = os.getenv('IPV6_NETWORK', 'fd4d:6169:6c63:6f77::/64') - tmp_addr = re.split(r'/', source_address) - dest_ip = tmp_addr[0] - dest_len = int(tmp_addr[1]) + dest_net = ipaddress.ip_network(source_address) + target_net = ipaddress.ip_network(snat_target) if rule_found: saddr_ip = rule["expr"][0]["match"]["right"]["prefix"]["addr"] @@ -323,11 +322,17 @@ def snat_rule(_family: str, snat_target: str): daddr_ip = rule["expr"][1]["match"]["right"]["prefix"]["addr"] daddr_len = int(rule["expr"][1]["match"]["right"]["prefix"]["len"]) + + target_ip = rule["expr"][3]["snat"]["addr"] + + saddr_net = ipaddress.ip_network(saddr_ip + '/' + str(saddr_len)) + daddr_net = ipaddress.ip_network(daddr_ip + '/' + str(daddr_len)) + current_target_net = ipaddress.ip_network(target_ip) + match = all(( - saddr_ip == dest_ip, - saddr_len == dest_len, - daddr_ip == dest_ip, - daddr_len == dest_len + dest_net == saddr_net, + dest_net == daddr_net, + target_net == current_target_net )) try: if rule_position == 0: @@ -345,12 +350,12 @@ def snat_rule(_family: str, snat_target: str): # rule not found json_command = get_base_dict() try: - snat_dict = {'snat': {'addr': snat_target} } + snat_dict = {'snat': {'addr': str(target_net.network_address)} } expr_counter = {'family': _family, 'table': 'nat', 'packets': 0, 'bytes': 0} counter_dict = {'counter': expr_counter} - prefix_dict = {'prefix': {'addr': dest_ip, 'len': int(dest_len)} } + prefix_dict = {'prefix': {'addr': str(dest_net.network_address), 'len': int(dest_net.prefixlen)} } payload_dict = {'payload': {'protocol': _family, 'field': "saddr"} } match_dict1 = {'match': {'op': '==', 'left': payload_dict, 'right': prefix_dict} } @@ -371,7 +376,7 @@ def snat_rule(_family: str, snat_target: str): insert_dict = {'insert': {'rule': rule_fields} } json_command["nftables"].append(insert_dict) if nft_exec_dict(json_command): - logInfo(f"Added {_family} POSTROUTING rule for source network {dest_ip} to {snat_target}") + logInfo(f'Added {_family} nat {chain_name} rule for source network {dest_net} to {target_net}') except: logCrit(f"Error running SNAT on {_family}, retrying...")