From 08969f44c80139416fb948ec8c02c55b8dac6791 Mon Sep 17 00:00:00 2001 From: amorfo77 Date: Sat, 11 Feb 2023 21:41:53 +0100 Subject: [PATCH] some pylint suggestions applied - Consider using dict literal instead of a call to 'dict' - Redefining built-in 'object' --- data/Dockerfiles/netfilter/server.py | 256 ++++++++++++--------------- 1 file changed, 110 insertions(+), 146 deletions(-) diff --git a/data/Dockerfiles/netfilter/server.py b/data/Dockerfiles/netfilter/server.py index 17f9f5c7..e4713244 100644 --- a/data/Dockerfiles/netfilter/server.py +++ b/data/Dockerfiles/netfilter/server.py @@ -7,13 +7,13 @@ import time import atexit import signal import ipaddress -import nftables from collections import Counter from random import randint from threading import Thread from threading import Lock -import redis import json +import redis +import nftables import iptc import dns.resolver import dns.exception @@ -157,14 +157,13 @@ def search_current_chains(): 'ip6': {'filter': {'input': None, 'forward': None}, 'nat': {'postrouting': None} } } # Command: 'nft list chains' - _list_opts = dict(chains='null') - _list = dict(list=_list_opts) + _list = {'list' : {'chains': 'null'} } command = get_base_dict() command['nftables'].append(_list) kernel_ruleset = nft_exec_dict(command) if kernel_ruleset: - for object in kernel_ruleset['nftables']: - chain = object.get("chain") + for _object in kernel_ruleset['nftables']: + chain = _object.get("chain") if not chain: continue _family = chain['family'] @@ -188,8 +187,8 @@ def search_current_chains(): def search_for_chain(kernel_ruleset: dict, chain_name: str): found = False - for object in kernel_ruleset["nftables"]: - chain = object.get("chain") + for _object in kernel_ruleset["nftables"]: + chain = _object.get("chain") if not chain: continue ch_name = chain.get("name") @@ -200,12 +199,8 @@ def search_for_chain(kernel_ruleset: dict, chain_name: str): def get_chain_dict(_family: str, _name: str): # nft (add | create) chain [] - _chain_opts = dict(family = _family, - table = 'filter', - name = _name ) - - _chain = dict(chain = _chain_opts) - _add = dict(add = _chain) + _chain_opts = {'family': _family, 'table': 'filter', 'name': _name } + _add = {'add': {'chain': _chain_opts} } final_chain = get_base_dict() final_chain["nftables"].append(_add) return final_chain @@ -213,23 +208,21 @@ def get_chain_dict(_family: str, _name: str): def get_mailcow_jump_rule_dict(_family: str, _chain: str): _jump_rule = get_base_dict() _expr_opt=[] - _expr_counter = dict(family = _family, table = 'filter', packets = 0, bytes = 0) - _counter_dict = dict(counter = _expr_counter) + _expr_counter = {'family': _family, 'table': 'filter', 'packets': 0, 'bytes': 0} + _counter_dict = {'counter': _expr_counter} _expr_opt.append(_counter_dict) - _expr_jump = dict(target = 'MAILCOW') - _jump_opts = dict(jump = _expr_jump) + _jump_opts = {'jump': {'target': 'MAILCOW'} } _expr_opt.append(_jump_opts) - _rule_params = dict(family = _family, - table = 'filter', - chain = _chain, - expr = _expr_opt, - comment = "mailcow" - ) - _opts_rule = dict(rule = _rule_params) - _add_rule = dict(insert = _opts_rule) + _rule_params = {'family': _family, + 'table': 'filter', + 'chain': _chain, + 'expr': _expr_opt, + 'comment': "mailcow" } + + _add_rule = {'insert': {'rule': _rule_params} } _jump_rule["nftables"].append(_add_rule) @@ -239,9 +232,8 @@ def insert_mailcow_chains(_family: str): nft_input_chain = nft_chain_names[_family]['filter']['input'] nft_forward_chain = nft_chain_names[_family]['filter']['forward'] # Command: 'nft list table filter' - _table_opts = dict(family=_family, name='filter') - _table = dict(table=_table_opts) - _list = dict(list=_table) + _table_opts = {'family': _family, 'name': 'filter'} + _list = {'list': {'table': _table_opts} } command = get_base_dict() command['nftables'].append(_list) kernel_ruleset = nft_exec_dict(command) @@ -249,16 +241,16 @@ def insert_mailcow_chains(_family: str): # MAILCOW chain if not search_for_chain(kernel_ruleset, "MAILCOW"): cadena = get_chain_dict(_family, "MAILCOW") - if(nft_exec_dict(cadena)): + if nft_exec_dict(cadena): logInfo(f"MAILCOW {_family} chain created successfully.") input_jump_found, forward_jump_found = False, False - for object in kernel_ruleset["nftables"]: - if not object.get("rule"): + for _object in kernel_ruleset["nftables"]: + if not _object.get("rule"): continue - rule = object["rule"] + rule = _object["rule"] if nft_input_chain and rule["chain"] == nft_input_chain: if rule.get("comment") and rule["comment"] == "mailcow": input_jump_found = True @@ -276,13 +268,11 @@ def insert_mailcow_chains(_family: str): def delete_nat_rule(_family:str, _chain: str, _handle:str): delete_command = get_base_dict() - _rule_opts = dict(family = _family, - table = 'nat', - chain = _chain, - handle = _handle - ) - _rule = dict(rule = _rule_opts) - _delete = dict(delete = _rule) + _rule_opts = {'family': _family, + 'table': 'nat', + 'chain': _chain, + 'handle': _handle } + _delete = {'delete': {'rule': _rule_opts} } delete_command["nftables"].append(_delete) return nft_exec_dict(delete_command) @@ -294,9 +284,8 @@ def snat_rule(_family: str, snat_target: str): if not chain_name: return # Command: nft list chain nat - _chain_opts = dict(family=_family, table='nat', name=chain_name) - _chain = dict(chain=_chain_opts) - _list = dict(list=_chain) + _chain_opts = {'family': _family, 'table': 'nat', 'name': chain_name} + _list = {'list':{'chain': _chain_opts} } command = get_base_dict() command['nftables'].append(_list) kernel_ruleset = nft_exec_dict(command) @@ -306,18 +295,18 @@ def snat_rule(_family: str, snat_target: str): rule_position = 0 rule_handle = None rule_found = False - for object in kernel_ruleset["nftables"]: - if not object.get("rule"): + for _object in kernel_ruleset["nftables"]: + if not _object.get("rule"): continue - rule = object["rule"] + rule = _object["rule"] if not rule.get("comment") or not rule["comment"] == "mailcow": rule_position +=1 continue - else: - rule_found = True - rule_handle = rule["handle"] - break + + rule_found = True + rule_handle = rule["handle"] + break if _family == "ip": source_address = os.getenv('IPV4_NETWORK', '172.22.1') + '.0/24' @@ -357,40 +346,32 @@ def snat_rule(_family: str, snat_target: str): # rule not found json_command = get_base_dict() try: - payload_fields = dict(protocol = _family, field = "saddr") - payload_dict = dict(payload = payload_fields) - payload_fields2 = dict(protocol = _family, field = "daddr") - payload_dict2 = dict(payload = payload_fields2) - prefix_fields=dict(addr = dest_ip, len = int(dest_len)) - prefix_dict=dict(prefix = prefix_fields) + snat_dict = {'snat': {'addr': snat_target} } - snat_addr = dict(addr = snat_target) - snat_dict = dict(snat = snat_addr) + expr_counter = {'family': _family, 'table': 'nat', 'packets': 0, 'bytes': 0} + counter_dict = {'counter': expr_counter} - expr_counter = dict(family = _family, table = "nat", packets = 0, bytes = 0) - counter_dict = dict(counter = expr_counter) + prefix_dict = {'prefix': {'addr': dest_ip, 'len': int(dest_len)} } + payload_dict = {'payload': {'protocol': _family, 'field': "saddr"} } + match_dict1 = {'match': {'op': '==', 'left': payload_dict, 'right': prefix_dict} } - match_fields1 = dict(op = "==", left = payload_dict, right = prefix_dict) - match_dict1 = dict(match = match_fields1) - - match_fields2 = dict(op = "!=", left = payload_dict2, right = prefix_dict ) - match_dict2 = dict(match = match_fields2) + payload_dict2 = {'payload': {'protocol': _family, 'field': "daddr"} } + match_dict2 = {'match': {'op': '!=', 'left': payload_dict2, 'right': prefix_dict } } expr_list = [ match_dict1, match_dict2, counter_dict, snat_dict ] - rule_fields = dict(family = _family, - table = "nat", - chain = chain_name, - comment = "mailcow", - expr = expr_list - ) - rule_dict = dict(rule = rule_fields) - insert_dict = dict(insert = rule_dict) + rule_fields = {'family': _family, + 'table': 'nat', + 'chain': chain_name, + 'comment': "mailcow", + 'expr': expr_list } + + insert_dict = {'insert': {'rule': rule_fields} } json_command["nftables"].append(insert_dict) - if(nft_exec_dict(json_command)): + if nft_exec_dict(json_command): logInfo(f"Added {_family} POSTROUTING rule for source network {dest_ip} to {snat_target}") except: logCrit(f"Error running SNAT on {_family}, retrying...") @@ -398,17 +379,15 @@ def snat_rule(_family: str, snat_target: str): def get_chain_handle(_family: str, _table: str, chain_name: str): chain_handle = None # Command: 'nft list chains {family}' - _chain_opts = dict(family=_family) - _chain = dict(chains=_chain_opts) - _list = dict(list=_chain) + _list = {'list': {'chains': {'family': _family} } } command = get_base_dict() command['nftables'].append(_list) kernel_ruleset = nft_exec_dict(command) if kernel_ruleset: - for object in kernel_ruleset["nftables"]: - if not object.get("chain"): + for _object in kernel_ruleset["nftables"]: + if not _object.get("chain"): continue - chain = object["chain"] + chain = _object["chain"] if chain["family"] == _family and chain["table"] == _table and chain["name"] == chain_name: chain_handle = chain["handle"] break @@ -417,19 +396,18 @@ def get_chain_handle(_family: str, _table: str, chain_name: str): def get_rules_handle(_family: str, _table: str, chain_name: str): rule_handle = [] # Command: 'nft list chain {family} {table} {chain_name}' - _chain_opts = dict(family=_family, table=_table, name=chain_name) - _chain = dict(chain=_chain_opts) - _list = dict(list=_chain) + _chain_opts = {'family': _family, 'table': _table, 'name': chain_name} + _list = {'list': {'chain': _chain_opts} } command = get_base_dict() command['nftables'].append(_list) kernel_ruleset = nft_exec_dict(command) if kernel_ruleset: - for object in kernel_ruleset["nftables"]: - if not object.get("rule"): + for _object in kernel_ruleset["nftables"]: + if not _object.get("rule"): continue - rule = object["rule"] + rule = _object["rule"] if rule["family"] == _family and rule["table"] == _table and rule["chain"] == chain_name: if rule.get("comment") and rule["comment"] == "mailcow": rule_handle.append(rule["handle"]) @@ -440,30 +418,23 @@ def get_ban_ip_dict(ipaddr: str, _family: str): expr_opt = [] if re.search(r'/', ipaddr): - divided = re.split(r'/', ipaddr) - prefix_dict=dict(addr = divided[0], - len = int(divided[1]) ) - right_dict = dict(prefix = prefix_dict) + tmp_data = re.split(r'/', ipaddr) + right_dict = {'prefix': {'addr': tmp_data[0], 'len': int(tmp_data[1]) } } else: right_dict = ipaddr - payload_dict = dict(protocol = _family, field="saddr" ) - left_dict = dict(payload = payload_dict) - match_dict = dict(op = "==", left = left_dict, right = right_dict ) - match_base = dict(match = match_dict) - expr_opt.append(match_base) + left_dict = {'payload': {'protocol': _family, 'field': 'saddr'} } + match_dict = {'op': '==', 'left': left_dict, 'right': right_dict } + expr_opt.append({'match': match_dict}) - expr_counter = dict(family = _family, table = "filter", packets = 0, bytes = 0) - counter_dict = dict(counter = expr_counter) + counter_dict = {'counter': {'family': _family, 'table': "filter", 'packets': 0, 'bytes': 0} } expr_opt.append(counter_dict) - drop_dict = dict(drop = "null") - expr_opt.append(drop_dict) + expr_opt.append({'drop': "null"}) - rule_dict = dict(family = _family, table = "filter", chain = "MAILCOW", expr = expr_opt) + rule_dict = {'family': _family, 'table': "filter", 'chain': "MAILCOW", 'expr': expr_opt} - base_rule = dict(rule = rule_dict) - base_dict = dict(insert = base_rule) + base_dict = {'insert': {'rule': rule_dict} } json_command["nftables"].append(base_dict) return json_command @@ -471,19 +442,18 @@ def get_ban_ip_dict(ipaddr: str, _family: str): def get_unban_ip_dict(ipaddr:str, _family: str): json_command = get_base_dict() # Command: 'nft list chain {s_family} filter MAILCOW' - _chain_opts = dict(family=_family, table='filter', name='MAILCOW') - _chain = dict(chain=_chain_opts) - _list = dict(list=_chain) + _chain_opts = {'family': _family, 'table': 'filter', 'name': 'MAILCOW'} + _list = {'list': {'chain': _chain_opts} } command = get_base_dict() command['nftables'].append(_list) kernel_ruleset = nft_exec_dict(command) rule_handle = None if kernel_ruleset: - for object in kernel_ruleset["nftables"]: - if not object.get("rule"): + for _object in kernel_ruleset["nftables"]: + if not _object.get("rule"): continue - rule = object["rule"]["expr"][0]["match"] + rule = _object["rule"]["expr"][0]["match"] left_opt = rule["left"]["payload"] if not left_opt["protocol"] == _family: continue @@ -501,9 +471,9 @@ def get_unban_ip_dict(ipaddr:str, _family: str): # ip to ban if re.search(r'/', ipaddr): - divided = re.split(r'/', ipaddr) - candidate_ip = divided[0] - candidate_len = int(divided[1]) + tmp_data = re.split(r'/', ipaddr) + candidate_ip = tmp_data[0] + candidate_len = int(tmp_data[1]) else: candidate_ip = ipaddr candidate_len = 32 if _family == 'ip' else 128 @@ -511,13 +481,12 @@ def get_unban_ip_dict(ipaddr:str, _family: str): if all((current_rule_ip == candidate_ip, current_rule_len and candidate_len, current_rule_len == candidate_len )): - rule_handle = object["rule"]["handle"] + rule_handle = _object["rule"]["handle"] break if rule_handle is not None: - mailcow_rule = dict(family = _family, table = "filter", chain = "MAILCOW", handle = rule_handle) - del_rule = dict(rule = mailcow_rule) - delete_rule=dict(delete = del_rule) + mailcow_rule = {'family': _family, 'table': 'filter', 'chain': 'MAILCOW', 'handle': rule_handle} + delete_rule = {'delete': {'rule': mailcow_rule} } json_command["nftables"].append(delete_rule) else: return False @@ -531,17 +500,16 @@ def check_mailcow_chains(family: str, chain: str): if not chain_name: return None - _chain_opts = dict(family=family, table='filter', name=chain_name) - _chain = dict(chain=_chain_opts) - _list = dict(list=_chain) + _chain_opts = {'family': family, 'table': 'filter', 'name': chain_name} + _list = {'list': {'chain': _chain_opts}} command = get_base_dict() command['nftables'].append(_list) kernel_ruleset = nft_exec_dict(command) if kernel_ruleset: - for object in kernel_ruleset["nftables"]: - if not object.get("rule"): + for _object in kernel_ruleset["nftables"]: + if not _object.get("rule"): continue - rule = object["rule"] + rule = _object["rule"] if rule.get("comment") and rule["comment"] == "mailcow": rule_found = True break @@ -627,7 +595,7 @@ def ban(address): net = ipaddress.ip_network((address + (NETBAN_IPV4 if type(ip) is ipaddress.IPv4Address else NETBAN_IPV6)), strict=False) net = str(net) - if not net in bans or time.time() - bans[net]['last_attempt'] > RETRY_WINDOW: + if net not in bans or time.time() - bans[net]['last_attempt'] > RETRY_WINDOW: bans[net] = { 'attempts': 0 } active_window = RETRY_WINDOW else: @@ -692,7 +660,7 @@ def unban(net): else: dict_unban = get_unban_ip_dict(net, "ip") if dict_unban: - if(nft_exec_dict(dict_unban)): + if nft_exec_dict(dict_unban): logInfo(f"Unbanned ip: {net}") else: with lock: @@ -707,7 +675,7 @@ def unban(net): else: dict_unban = get_unban_ip_dict(net, "ip6") if dict_unban: - if(nft_exec_dict(dict_unban)): + if nft_exec_dict(dict_unban): logInfo(f"Unbanned ip6: {net}") r.hdel('F2B_ACTIVE_BANS', '%s' % net) @@ -736,13 +704,13 @@ def permBan(net, unban=False): else: if not unban: ban_dict = get_ban_ip_dict(net, "ip") - if(nft_exec_dict(ban_dict)): + if nft_exec_dict(ban_dict): logCrit('Add host/network %s to blacklist' % net) r.hset('F2B_PERM_BANS', '%s' % net, int(round(time.time()))) elif unban: dict_unban = get_unban_ip_dict(net, "ip") if dict_unban: - if(nft_exec_dict(dict_unban)): + if nft_exec_dict(dict_unban): logCrit('Remove host/network %s from blacklist' % net) r.hdel('F2B_PERM_BANS', '%s' % net) else: @@ -764,13 +732,13 @@ def permBan(net, unban=False): else: if not unban: ban_dict = get_ban_ip_dict(net, "ip6") - if(nft_exec_dict(ban_dict)): + if nft_exec_dict(ban_dict): logCrit('Add host/network %s to blacklist' % net) r.hset('F2B_PERM_BANS', '%s' % net, int(round(time.time()))) elif unban: dict_unban = get_unban_ip_dict(net, "ip6") if dict_unban: - if(nft_exec_dict(dict_unban)): + if nft_exec_dict(dict_unban): logCrit('Remove host/network %s from blacklist' % net) r.hdel('F2B_PERM_BANS', '%s' % net) @@ -814,9 +782,8 @@ def clear(): if chain_handle is not None: is_empty_dict = False # flush chain MAILCOW - mailcow_chain = dict(family=_family, table="filter", name="MAILCOW") - mc_chain_base = dict(chain=mailcow_chain) - flush_chain = dict(flush=mc_chain_base) + mailcow_chain = {'family': _family, 'table': 'filter', 'name': 'MAILCOW'} + flush_chain = {'flush': {'chain': mailcow_chain}} json_command["nftables"].append(flush_chain) # remove rule in forward chain @@ -831,29 +798,25 @@ def clear(): if rules_handle is not None: for r_handle in rules_handle: is_empty_dict = False - mailcow_rule = dict(family=_family, - table="filter", - chain=chain_base, - handle=r_handle - ) - del_rule = dict(rule=mailcow_rule) - delete_rules=dict(delete=del_rule) + mailcow_rule = {'family':_family, + 'table': 'filter', + 'chain': chain_base, + 'handle': r_handle } + delete_rules = {'delete': {'rule': mailcow_rule} } json_command["nftables"].append(delete_rules) # remove chain MAILCOW # after delete all rules referencing this chain if chain_handle is not None: - mc_chain_handle = dict(family=_family, - table="filter", - name="MAILCOW", - handle=chain_handle - ) - del_chain=dict(chain=mc_chain_handle) - delete_chain = dict(delete=del_chain) + mc_chain_handle = {'family':_family, + 'table': 'filter', + 'name': 'MAILCOW', + 'handle': chain_handle } + delete_chain = {'delete': {'chain': mc_chain_handle} } json_command["nftables"].append(delete_chain) if is_empty_dict == False: - if(nft_exec_dict(json_command)): + if nft_exec_dict(json_command): logInfo(f"Clear completed: {_family}") r.delete('F2B_ACTIVE_BANS') @@ -1103,6 +1066,7 @@ if __name__ == '__main__': if backend == 'nftables': search_current_chains() + # In case a previous session was killed without cleanup clear() # Reinit MAILCOW chain