use 'ipaddress' to manage ips in SNAT
This commit is contained in:
parent
7def99a3c5
commit
3f87df954d
|
@ -313,9 +313,8 @@ def snat_rule(_family: str, snat_target: str):
|
||||||
else:
|
else:
|
||||||
source_address = os.getenv('IPV6_NETWORK', 'fd4d:6169:6c63:6f77::/64')
|
source_address = os.getenv('IPV6_NETWORK', 'fd4d:6169:6c63:6f77::/64')
|
||||||
|
|
||||||
tmp_addr = re.split(r'/', source_address)
|
dest_net = ipaddress.ip_network(source_address)
|
||||||
dest_ip = tmp_addr[0]
|
target_net = ipaddress.ip_network(snat_target)
|
||||||
dest_len = int(tmp_addr[1])
|
|
||||||
|
|
||||||
if rule_found:
|
if rule_found:
|
||||||
saddr_ip = rule["expr"][0]["match"]["right"]["prefix"]["addr"]
|
saddr_ip = rule["expr"][0]["match"]["right"]["prefix"]["addr"]
|
||||||
|
@ -323,11 +322,17 @@ def snat_rule(_family: str, snat_target: str):
|
||||||
|
|
||||||
daddr_ip = rule["expr"][1]["match"]["right"]["prefix"]["addr"]
|
daddr_ip = rule["expr"][1]["match"]["right"]["prefix"]["addr"]
|
||||||
daddr_len = int(rule["expr"][1]["match"]["right"]["prefix"]["len"])
|
daddr_len = int(rule["expr"][1]["match"]["right"]["prefix"]["len"])
|
||||||
|
|
||||||
|
target_ip = rule["expr"][3]["snat"]["addr"]
|
||||||
|
|
||||||
|
saddr_net = ipaddress.ip_network(saddr_ip + '/' + str(saddr_len))
|
||||||
|
daddr_net = ipaddress.ip_network(daddr_ip + '/' + str(daddr_len))
|
||||||
|
current_target_net = ipaddress.ip_network(target_ip)
|
||||||
|
|
||||||
match = all((
|
match = all((
|
||||||
saddr_ip == dest_ip,
|
dest_net == saddr_net,
|
||||||
saddr_len == dest_len,
|
dest_net == daddr_net,
|
||||||
daddr_ip == dest_ip,
|
target_net == current_target_net
|
||||||
daddr_len == dest_len
|
|
||||||
))
|
))
|
||||||
try:
|
try:
|
||||||
if rule_position == 0:
|
if rule_position == 0:
|
||||||
|
@ -345,12 +350,12 @@ def snat_rule(_family: str, snat_target: str):
|
||||||
# rule not found
|
# rule not found
|
||||||
json_command = get_base_dict()
|
json_command = get_base_dict()
|
||||||
try:
|
try:
|
||||||
snat_dict = {'snat': {'addr': snat_target} }
|
snat_dict = {'snat': {'addr': str(target_net.network_address)} }
|
||||||
|
|
||||||
expr_counter = {'family': _family, 'table': 'nat', 'packets': 0, 'bytes': 0}
|
expr_counter = {'family': _family, 'table': 'nat', 'packets': 0, 'bytes': 0}
|
||||||
counter_dict = {'counter': expr_counter}
|
counter_dict = {'counter': expr_counter}
|
||||||
|
|
||||||
prefix_dict = {'prefix': {'addr': dest_ip, 'len': int(dest_len)} }
|
prefix_dict = {'prefix': {'addr': str(dest_net.network_address), 'len': int(dest_net.prefixlen)} }
|
||||||
payload_dict = {'payload': {'protocol': _family, 'field': "saddr"} }
|
payload_dict = {'payload': {'protocol': _family, 'field': "saddr"} }
|
||||||
match_dict1 = {'match': {'op': '==', 'left': payload_dict, 'right': prefix_dict} }
|
match_dict1 = {'match': {'op': '==', 'left': payload_dict, 'right': prefix_dict} }
|
||||||
|
|
||||||
|
@ -371,7 +376,7 @@ def snat_rule(_family: str, snat_target: str):
|
||||||
insert_dict = {'insert': {'rule': rule_fields} }
|
insert_dict = {'insert': {'rule': rule_fields} }
|
||||||
json_command["nftables"].append(insert_dict)
|
json_command["nftables"].append(insert_dict)
|
||||||
if nft_exec_dict(json_command):
|
if nft_exec_dict(json_command):
|
||||||
logInfo(f"Added {_family} POSTROUTING rule for source network {dest_ip} to {snat_target}")
|
logInfo(f'Added {_family} nat {chain_name} rule for source network {dest_net} to {target_net}')
|
||||||
except:
|
except:
|
||||||
logCrit(f"Error running SNAT on {_family}, retrying...")
|
logCrit(f"Error running SNAT on {_family}, retrying...")
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue