some pylint suggestions applied

- Consider using dict literal instead of a call to 'dict'
- Redefining built-in 'object'
This commit is contained in:
amorfo77 2023-02-11 21:41:53 +01:00
parent 1a5bdb4ee2
commit 08969f44c8
1 changed files with 110 additions and 146 deletions

View File

@ -7,13 +7,13 @@ import time
import atexit
import signal
import ipaddress
import nftables
from collections import Counter
from random import randint
from threading import Thread
from threading import Lock
import redis
import json
import redis
import nftables
import iptc
import dns.resolver
import dns.exception
@ -157,14 +157,13 @@ def search_current_chains():
'ip6': {'filter': {'input': None, 'forward': None}, 'nat': {'postrouting': None} } }
# Command: 'nft list chains'
_list_opts = dict(chains='null')
_list = dict(list=_list_opts)
_list = {'list' : {'chains': 'null'} }
command = get_base_dict()
command['nftables'].append(_list)
kernel_ruleset = nft_exec_dict(command)
if kernel_ruleset:
for object in kernel_ruleset['nftables']:
chain = object.get("chain")
for _object in kernel_ruleset['nftables']:
chain = _object.get("chain")
if not chain: continue
_family = chain['family']
@ -188,8 +187,8 @@ def search_current_chains():
def search_for_chain(kernel_ruleset: dict, chain_name: str):
found = False
for object in kernel_ruleset["nftables"]:
chain = object.get("chain")
for _object in kernel_ruleset["nftables"]:
chain = _object.get("chain")
if not chain:
continue
ch_name = chain.get("name")
@ -200,12 +199,8 @@ def search_for_chain(kernel_ruleset: dict, chain_name: str):
def get_chain_dict(_family: str, _name: str):
# nft (add | create) chain [<family>] <table> <name>
_chain_opts = dict(family = _family,
table = 'filter',
name = _name )
_chain = dict(chain = _chain_opts)
_add = dict(add = _chain)
_chain_opts = {'family': _family, 'table': 'filter', 'name': _name }
_add = {'add': {'chain': _chain_opts} }
final_chain = get_base_dict()
final_chain["nftables"].append(_add)
return final_chain
@ -213,23 +208,21 @@ def get_chain_dict(_family: str, _name: str):
def get_mailcow_jump_rule_dict(_family: str, _chain: str):
_jump_rule = get_base_dict()
_expr_opt=[]
_expr_counter = dict(family = _family, table = 'filter', packets = 0, bytes = 0)
_counter_dict = dict(counter = _expr_counter)
_expr_counter = {'family': _family, 'table': 'filter', 'packets': 0, 'bytes': 0}
_counter_dict = {'counter': _expr_counter}
_expr_opt.append(_counter_dict)
_expr_jump = dict(target = 'MAILCOW')
_jump_opts = dict(jump = _expr_jump)
_jump_opts = {'jump': {'target': 'MAILCOW'} }
_expr_opt.append(_jump_opts)
_rule_params = dict(family = _family,
table = 'filter',
chain = _chain,
expr = _expr_opt,
comment = "mailcow"
)
_opts_rule = dict(rule = _rule_params)
_add_rule = dict(insert = _opts_rule)
_rule_params = {'family': _family,
'table': 'filter',
'chain': _chain,
'expr': _expr_opt,
'comment': "mailcow" }
_add_rule = {'insert': {'rule': _rule_params} }
_jump_rule["nftables"].append(_add_rule)
@ -239,9 +232,8 @@ def insert_mailcow_chains(_family: str):
nft_input_chain = nft_chain_names[_family]['filter']['input']
nft_forward_chain = nft_chain_names[_family]['filter']['forward']
# Command: 'nft list table <family> filter'
_table_opts = dict(family=_family, name='filter')
_table = dict(table=_table_opts)
_list = dict(list=_table)
_table_opts = {'family': _family, 'name': 'filter'}
_list = {'list': {'table': _table_opts} }
command = get_base_dict()
command['nftables'].append(_list)
kernel_ruleset = nft_exec_dict(command)
@ -249,16 +241,16 @@ def insert_mailcow_chains(_family: str):
# MAILCOW chain
if not search_for_chain(kernel_ruleset, "MAILCOW"):
cadena = get_chain_dict(_family, "MAILCOW")
if(nft_exec_dict(cadena)):
if nft_exec_dict(cadena):
logInfo(f"MAILCOW {_family} chain created successfully.")
input_jump_found, forward_jump_found = False, False
for object in kernel_ruleset["nftables"]:
if not object.get("rule"):
for _object in kernel_ruleset["nftables"]:
if not _object.get("rule"):
continue
rule = object["rule"]
rule = _object["rule"]
if nft_input_chain and rule["chain"] == nft_input_chain:
if rule.get("comment") and rule["comment"] == "mailcow":
input_jump_found = True
@ -276,13 +268,11 @@ def insert_mailcow_chains(_family: str):
def delete_nat_rule(_family:str, _chain: str, _handle:str):
delete_command = get_base_dict()
_rule_opts = dict(family = _family,
table = 'nat',
chain = _chain,
handle = _handle
)
_rule = dict(rule = _rule_opts)
_delete = dict(delete = _rule)
_rule_opts = {'family': _family,
'table': 'nat',
'chain': _chain,
'handle': _handle }
_delete = {'delete': {'rule': _rule_opts} }
delete_command["nftables"].append(_delete)
return nft_exec_dict(delete_command)
@ -294,9 +284,8 @@ def snat_rule(_family: str, snat_target: str):
if not chain_name: return
# Command: nft list chain <family> nat <chain_name>
_chain_opts = dict(family=_family, table='nat', name=chain_name)
_chain = dict(chain=_chain_opts)
_list = dict(list=_chain)
_chain_opts = {'family': _family, 'table': 'nat', 'name': chain_name}
_list = {'list':{'chain': _chain_opts} }
command = get_base_dict()
command['nftables'].append(_list)
kernel_ruleset = nft_exec_dict(command)
@ -306,15 +295,15 @@ def snat_rule(_family: str, snat_target: str):
rule_position = 0
rule_handle = None
rule_found = False
for object in kernel_ruleset["nftables"]:
if not object.get("rule"):
for _object in kernel_ruleset["nftables"]:
if not _object.get("rule"):
continue
rule = object["rule"]
rule = _object["rule"]
if not rule.get("comment") or not rule["comment"] == "mailcow":
rule_position +=1
continue
else:
rule_found = True
rule_handle = rule["handle"]
break
@ -357,40 +346,32 @@ def snat_rule(_family: str, snat_target: str):
# rule not found
json_command = get_base_dict()
try:
payload_fields = dict(protocol = _family, field = "saddr")
payload_dict = dict(payload = payload_fields)
payload_fields2 = dict(protocol = _family, field = "daddr")
payload_dict2 = dict(payload = payload_fields2)
prefix_fields=dict(addr = dest_ip, len = int(dest_len))
prefix_dict=dict(prefix = prefix_fields)
snat_dict = {'snat': {'addr': snat_target} }
snat_addr = dict(addr = snat_target)
snat_dict = dict(snat = snat_addr)
expr_counter = {'family': _family, 'table': 'nat', 'packets': 0, 'bytes': 0}
counter_dict = {'counter': expr_counter}
expr_counter = dict(family = _family, table = "nat", packets = 0, bytes = 0)
counter_dict = dict(counter = expr_counter)
prefix_dict = {'prefix': {'addr': dest_ip, 'len': int(dest_len)} }
payload_dict = {'payload': {'protocol': _family, 'field': "saddr"} }
match_dict1 = {'match': {'op': '==', 'left': payload_dict, 'right': prefix_dict} }
match_fields1 = dict(op = "==", left = payload_dict, right = prefix_dict)
match_dict1 = dict(match = match_fields1)
match_fields2 = dict(op = "!=", left = payload_dict2, right = prefix_dict )
match_dict2 = dict(match = match_fields2)
payload_dict2 = {'payload': {'protocol': _family, 'field': "daddr"} }
match_dict2 = {'match': {'op': '!=', 'left': payload_dict2, 'right': prefix_dict } }
expr_list = [
match_dict1,
match_dict2,
counter_dict,
snat_dict
]
rule_fields = dict(family = _family,
table = "nat",
chain = chain_name,
comment = "mailcow",
expr = expr_list
)
rule_dict = dict(rule = rule_fields)
insert_dict = dict(insert = rule_dict)
rule_fields = {'family': _family,
'table': 'nat',
'chain': chain_name,
'comment': "mailcow",
'expr': expr_list }
insert_dict = {'insert': {'rule': rule_fields} }
json_command["nftables"].append(insert_dict)
if(nft_exec_dict(json_command)):
if nft_exec_dict(json_command):
logInfo(f"Added {_family} POSTROUTING rule for source network {dest_ip} to {snat_target}")
except:
logCrit(f"Error running SNAT on {_family}, retrying...")
@ -398,17 +379,15 @@ def snat_rule(_family: str, snat_target: str):
def get_chain_handle(_family: str, _table: str, chain_name: str):
chain_handle = None
# Command: 'nft list chains {family}'
_chain_opts = dict(family=_family)
_chain = dict(chains=_chain_opts)
_list = dict(list=_chain)
_list = {'list': {'chains': {'family': _family} } }
command = get_base_dict()
command['nftables'].append(_list)
kernel_ruleset = nft_exec_dict(command)
if kernel_ruleset:
for object in kernel_ruleset["nftables"]:
if not object.get("chain"):
for _object in kernel_ruleset["nftables"]:
if not _object.get("chain"):
continue
chain = object["chain"]
chain = _object["chain"]
if chain["family"] == _family and chain["table"] == _table and chain["name"] == chain_name:
chain_handle = chain["handle"]
break
@ -417,19 +396,18 @@ def get_chain_handle(_family: str, _table: str, chain_name: str):
def get_rules_handle(_family: str, _table: str, chain_name: str):
rule_handle = []
# Command: 'nft list chain {family} {table} {chain_name}'
_chain_opts = dict(family=_family, table=_table, name=chain_name)
_chain = dict(chain=_chain_opts)
_list = dict(list=_chain)
_chain_opts = {'family': _family, 'table': _table, 'name': chain_name}
_list = {'list': {'chain': _chain_opts} }
command = get_base_dict()
command['nftables'].append(_list)
kernel_ruleset = nft_exec_dict(command)
if kernel_ruleset:
for object in kernel_ruleset["nftables"]:
if not object.get("rule"):
for _object in kernel_ruleset["nftables"]:
if not _object.get("rule"):
continue
rule = object["rule"]
rule = _object["rule"]
if rule["family"] == _family and rule["table"] == _table and rule["chain"] == chain_name:
if rule.get("comment") and rule["comment"] == "mailcow":
rule_handle.append(rule["handle"])
@ -440,30 +418,23 @@ def get_ban_ip_dict(ipaddr: str, _family: str):
expr_opt = []
if re.search(r'/', ipaddr):
divided = re.split(r'/', ipaddr)
prefix_dict=dict(addr = divided[0],
len = int(divided[1]) )
right_dict = dict(prefix = prefix_dict)
tmp_data = re.split(r'/', ipaddr)
right_dict = {'prefix': {'addr': tmp_data[0], 'len': int(tmp_data[1]) } }
else:
right_dict = ipaddr
payload_dict = dict(protocol = _family, field="saddr" )
left_dict = dict(payload = payload_dict)
match_dict = dict(op = "==", left = left_dict, right = right_dict )
match_base = dict(match = match_dict)
expr_opt.append(match_base)
left_dict = {'payload': {'protocol': _family, 'field': 'saddr'} }
match_dict = {'op': '==', 'left': left_dict, 'right': right_dict }
expr_opt.append({'match': match_dict})
expr_counter = dict(family = _family, table = "filter", packets = 0, bytes = 0)
counter_dict = dict(counter = expr_counter)
counter_dict = {'counter': {'family': _family, 'table': "filter", 'packets': 0, 'bytes': 0} }
expr_opt.append(counter_dict)
drop_dict = dict(drop = "null")
expr_opt.append(drop_dict)
expr_opt.append({'drop': "null"})
rule_dict = dict(family = _family, table = "filter", chain = "MAILCOW", expr = expr_opt)
rule_dict = {'family': _family, 'table': "filter", 'chain': "MAILCOW", 'expr': expr_opt}
base_rule = dict(rule = rule_dict)
base_dict = dict(insert = base_rule)
base_dict = {'insert': {'rule': rule_dict} }
json_command["nftables"].append(base_dict)
return json_command
@ -471,19 +442,18 @@ def get_ban_ip_dict(ipaddr: str, _family: str):
def get_unban_ip_dict(ipaddr:str, _family: str):
json_command = get_base_dict()
# Command: 'nft list chain {s_family} filter MAILCOW'
_chain_opts = dict(family=_family, table='filter', name='MAILCOW')
_chain = dict(chain=_chain_opts)
_list = dict(list=_chain)
_chain_opts = {'family': _family, 'table': 'filter', 'name': 'MAILCOW'}
_list = {'list': {'chain': _chain_opts} }
command = get_base_dict()
command['nftables'].append(_list)
kernel_ruleset = nft_exec_dict(command)
rule_handle = None
if kernel_ruleset:
for object in kernel_ruleset["nftables"]:
if not object.get("rule"):
for _object in kernel_ruleset["nftables"]:
if not _object.get("rule"):
continue
rule = object["rule"]["expr"][0]["match"]
rule = _object["rule"]["expr"][0]["match"]
left_opt = rule["left"]["payload"]
if not left_opt["protocol"] == _family:
continue
@ -501,9 +471,9 @@ def get_unban_ip_dict(ipaddr:str, _family: str):
# ip to ban
if re.search(r'/', ipaddr):
divided = re.split(r'/', ipaddr)
candidate_ip = divided[0]
candidate_len = int(divided[1])
tmp_data = re.split(r'/', ipaddr)
candidate_ip = tmp_data[0]
candidate_len = int(tmp_data[1])
else:
candidate_ip = ipaddr
candidate_len = 32 if _family == 'ip' else 128
@ -511,13 +481,12 @@ def get_unban_ip_dict(ipaddr:str, _family: str):
if all((current_rule_ip == candidate_ip,
current_rule_len and candidate_len,
current_rule_len == candidate_len )):
rule_handle = object["rule"]["handle"]
rule_handle = _object["rule"]["handle"]
break
if rule_handle is not None:
mailcow_rule = dict(family = _family, table = "filter", chain = "MAILCOW", handle = rule_handle)
del_rule = dict(rule = mailcow_rule)
delete_rule=dict(delete = del_rule)
mailcow_rule = {'family': _family, 'table': 'filter', 'chain': 'MAILCOW', 'handle': rule_handle}
delete_rule = {'delete': {'rule': mailcow_rule} }
json_command["nftables"].append(delete_rule)
else:
return False
@ -531,17 +500,16 @@ def check_mailcow_chains(family: str, chain: str):
if not chain_name: return None
_chain_opts = dict(family=family, table='filter', name=chain_name)
_chain = dict(chain=_chain_opts)
_list = dict(list=_chain)
_chain_opts = {'family': family, 'table': 'filter', 'name': chain_name}
_list = {'list': {'chain': _chain_opts}}
command = get_base_dict()
command['nftables'].append(_list)
kernel_ruleset = nft_exec_dict(command)
if kernel_ruleset:
for object in kernel_ruleset["nftables"]:
if not object.get("rule"):
for _object in kernel_ruleset["nftables"]:
if not _object.get("rule"):
continue
rule = object["rule"]
rule = _object["rule"]
if rule.get("comment") and rule["comment"] == "mailcow":
rule_found = True
break
@ -627,7 +595,7 @@ def ban(address):
net = ipaddress.ip_network((address + (NETBAN_IPV4 if type(ip) is ipaddress.IPv4Address else NETBAN_IPV6)), strict=False)
net = str(net)
if not net in bans or time.time() - bans[net]['last_attempt'] > RETRY_WINDOW:
if net not in bans or time.time() - bans[net]['last_attempt'] > RETRY_WINDOW:
bans[net] = { 'attempts': 0 }
active_window = RETRY_WINDOW
else:
@ -692,7 +660,7 @@ def unban(net):
else:
dict_unban = get_unban_ip_dict(net, "ip")
if dict_unban:
if(nft_exec_dict(dict_unban)):
if nft_exec_dict(dict_unban):
logInfo(f"Unbanned ip: {net}")
else:
with lock:
@ -707,7 +675,7 @@ def unban(net):
else:
dict_unban = get_unban_ip_dict(net, "ip6")
if dict_unban:
if(nft_exec_dict(dict_unban)):
if nft_exec_dict(dict_unban):
logInfo(f"Unbanned ip6: {net}")
r.hdel('F2B_ACTIVE_BANS', '%s' % net)
@ -736,13 +704,13 @@ def permBan(net, unban=False):
else:
if not unban:
ban_dict = get_ban_ip_dict(net, "ip")
if(nft_exec_dict(ban_dict)):
if nft_exec_dict(ban_dict):
logCrit('Add host/network %s to blacklist' % net)
r.hset('F2B_PERM_BANS', '%s' % net, int(round(time.time())))
elif unban:
dict_unban = get_unban_ip_dict(net, "ip")
if dict_unban:
if(nft_exec_dict(dict_unban)):
if nft_exec_dict(dict_unban):
logCrit('Remove host/network %s from blacklist' % net)
r.hdel('F2B_PERM_BANS', '%s' % net)
else:
@ -764,13 +732,13 @@ def permBan(net, unban=False):
else:
if not unban:
ban_dict = get_ban_ip_dict(net, "ip6")
if(nft_exec_dict(ban_dict)):
if nft_exec_dict(ban_dict):
logCrit('Add host/network %s to blacklist' % net)
r.hset('F2B_PERM_BANS', '%s' % net, int(round(time.time())))
elif unban:
dict_unban = get_unban_ip_dict(net, "ip6")
if dict_unban:
if(nft_exec_dict(dict_unban)):
if nft_exec_dict(dict_unban):
logCrit('Remove host/network %s from blacklist' % net)
r.hdel('F2B_PERM_BANS', '%s' % net)
@ -814,9 +782,8 @@ def clear():
if chain_handle is not None:
is_empty_dict = False
# flush chain MAILCOW
mailcow_chain = dict(family=_family, table="filter", name="MAILCOW")
mc_chain_base = dict(chain=mailcow_chain)
flush_chain = dict(flush=mc_chain_base)
mailcow_chain = {'family': _family, 'table': 'filter', 'name': 'MAILCOW'}
flush_chain = {'flush': {'chain': mailcow_chain}}
json_command["nftables"].append(flush_chain)
# remove rule in forward chain
@ -831,29 +798,25 @@ def clear():
if rules_handle is not None:
for r_handle in rules_handle:
is_empty_dict = False
mailcow_rule = dict(family=_family,
table="filter",
chain=chain_base,
handle=r_handle
)
del_rule = dict(rule=mailcow_rule)
delete_rules=dict(delete=del_rule)
mailcow_rule = {'family':_family,
'table': 'filter',
'chain': chain_base,
'handle': r_handle }
delete_rules = {'delete': {'rule': mailcow_rule} }
json_command["nftables"].append(delete_rules)
# remove chain MAILCOW
# after delete all rules referencing this chain
if chain_handle is not None:
mc_chain_handle = dict(family=_family,
table="filter",
name="MAILCOW",
handle=chain_handle
)
del_chain=dict(chain=mc_chain_handle)
delete_chain = dict(delete=del_chain)
mc_chain_handle = {'family':_family,
'table': 'filter',
'name': 'MAILCOW',
'handle': chain_handle }
delete_chain = {'delete': {'chain': mc_chain_handle} }
json_command["nftables"].append(delete_chain)
if is_empty_dict == False:
if(nft_exec_dict(json_command)):
if nft_exec_dict(json_command):
logInfo(f"Clear completed: {_family}")
r.delete('F2B_ACTIVE_BANS')
@ -1103,6 +1066,7 @@ if __name__ == '__main__':
if backend == 'nftables':
search_current_chains()
# In case a previous session was killed without cleanup
clear()
# Reinit MAILCOW chain