Files
ruantiblock_openwrt/ruantiblock-mod-py/files/usr/libexec/ruantiblock/ruab_parser.py
T
2022-12-25 19:45:07 +03:00

536 lines
18 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
(с) 2020 gSpot (https://github.com/gSpotx2f/ruantiblock_openwrt)
Python >= 3.6
"""
from contextlib import contextmanager
import os
import re
import socket
import ssl
import sys
from urllib import request
from ruab_sum_ip import summarize_ip_ranges, summarize_nets
class Config:
environ_list = [
"BLLIST_SOURCE",
"BLLIST_MODE",
"BLLIST_ALT_NSLOOKUP",
"BLLIST_ALT_DNS_ADDR",
"BLLIST_ENABLE_IDN",
"BLLIST_GR_EXCLUDED_SLD",
"BLLIST_GR_EXCLUDED_MASKS",
"BLLIST_FQDN_FILTER",
"BLLIST_FQDN_FILTER_FILE",
"BLLIST_IP_FILTER",
"BLLIST_IP_FILTER_FILE",
"BLLIST_SD_LIMIT",
"BLLIST_IP_LIMIT",
"BLLIST_GR_EXCLUDED_NETS",
"BLLIST_MIN_ENTRIES",
"BLLIST_STRIP_WWW",
"DATA_DIR",
"IPSET_DNSMASQ",
"IPSET_IP_TMP",
"IPSET_CIDR_TMP",
"DNSMASQ_DATA_FILE",
"IP_DATA_FILE",
"UPDATE_STATUS_FILE",
"RBL_ALL_URL",
"RBL_IP_URL",
"ZI_ALL_URL",
"AF_IP_URL",
"AF_FQDN_URL",
"RA_IP_IPSET_URL",
"RA_IP_DMASK_URL",
"RA_IP_STAT_URL",
"RA_FQDN_IPSET_URL",
"RA_FQDN_DMASK_URL",
"RA_FQDN_STAT_URL",
"RBL_ENCODING",
"ZI_ENCODING",
"AF_ENCODING",
"RA_ENCODING",
"BLLIST_SUMMARIZE_IP",
"BLLIST_SUMMARIZE_CIDR",
]
BLLIST_FQDN_FILTER_PATTERNS = set()
BLLIST_IP_FILTER_PATTERNS = set()
@classmethod
def _load_config(cls, cfg_dict):
def normalize_string(string):
return re.sub('"', '', string)
config_arrays = {
"BLLIST_GR_EXCLUDED_SLD",
"BLLIST_GR_EXCLUDED_NETS",
}
try:
for k, v in cfg_dict.items():
if k in config_arrays:
value = {normalize_string(i) for i in v.split(" ")}
else:
try:
value = int(v)
except ValueError:
value = normalize_string(v)
setattr(cls, k, value)
except Exception:
pass
@classmethod
def load_environ_config(cls):
cls._load_config({
k: v for k, v in os.environ.items()
if k in cls.environ_list
})
@classmethod
def _load_filter(cls, file_path, filter_patterns):
try:
with open(file_path, "rt") as file_handler:
for line in file_handler:
if line and re.match("[^#]", line):
filter_patterns.add(line.strip())
except OSError:
pass
@classmethod
def load_fqdn_filter(cls, file_path=None):
if cls.BLLIST_FQDN_FILTER:
cls._load_filter(file_path or cls.BLLIST_FQDN_FILTER_FILE, cls.BLLIST_FQDN_FILTER_PATTERNS)
@classmethod
def load_ip_filter(cls, file_path=None):
if cls.BLLIST_IP_FILTER:
cls._load_filter(file_path or cls.BLLIST_IP_FILTER_FILE, cls.BLLIST_IP_FILTER_PATTERNS)
class ParserError(Exception):
def __init__(self, reason=None):
super().__init__(reason)
self.reason = reason
def __str__(self):
return self.reason
class FieldValueError(ParserError):
pass
class BlackListParser(Config):
def __init__(self):
self.ip_pattern = re.compile("(([0-9]{1,3}[.]){3})[0-9]{1,3}")
self.cidr_pattern = re.compile("([0-9]{1,3}[.]){3}[0-9]{1,3}/[0-9]{1,2}")
self.fqdn_pattern = re.compile(
"([а-яёa-z0-9_.*-]*?)([а-яёa-z0-9_-]+[.][а-яёa-z0-9-]+)",
re.U)
self.www_pattern = re.compile("^www[0-9]?[.]")
self.cyr_pattern = re.compile("[а-яё]", re.U)
self.fqdn_set = {}
self.sld_dict = {}
self.ip_set = {}
self.ip_subnet_dict = {}
self.cidr_set = set()
self.cidr_count = 0
self.ip_count = 0
self.output_fqdn_count = 0
self.ssl_unverified = False
self.send_headers_dict = {
"User-Agent": "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:68.0) Gecko/20100101 Firefox/68.0",
}
### Proxies (ex.: self.proxies = {"http": "http://192.168.0.1:8080", "https": "http://192.168.0.1:8080"})
self.proxies = None
self.connect_timeout = None
self.data_chunk = 2048
self.url = "http://127.0.0.1"
self.records_separator = "\n"
self.fields_separator = ";"
self.ips_separator = "|"
self.default_site_encoding = "utf-8"
self.site_encoding = self.default_site_encoding
@staticmethod
def _compile_filter_patterns(filters_seq):
return {
re.compile(i, re.U)
for i in filters_seq
if i and type(i) == str
}
@contextmanager
def _make_connection(self,
url,
method="GET",
postData=None,
send_headers_dict=None,
timeout=None):
conn_object = http_code = received_headers = None
req_object = request.Request(url,
data=postData,
headers=send_headers_dict,
method=method)
opener_args = [request.ProxyHandler(self.proxies)]
if self.ssl_unverified:
opener_args.append(request.HTTPSHandler(context=ssl._create_unverified_context()))
try:
conn_object = request.build_opener(*opener_args).open(
req_object,
timeout=(
timeout if type(timeout) == int else socket._GLOBAL_DEFAULT_TIMEOUT
)
)
http_code, received_headers = conn_object.status, conn_object.getheaders()
except Exception as exception_object:
print(f" Connection error! {exception_object} ( {url} )",
file=sys.stderr)
try:
yield (conn_object, http_code, received_headers)
except Exception as exception_object:
raise ParserError(f"Parser error! {exception_object} ( {self.url} )")
finally:
if conn_object:
conn_object.close()
def _download_data(self):
with self._make_connection(
self.url,
send_headers_dict=self.send_headers_dict,
timeout=self.connect_timeout
) as conn_params:
conn_object, http_code, _ = conn_params
if http_code == 200:
while True:
chunk = conn_object.read(self.data_chunk)
yield (chunk or None)
if not chunk:
break
def _align_chunk(self):
rest = bytes()
for chunk in self._download_data():
if chunk is None:
yield rest
continue
data, _, rest = (rest + chunk).rpartition(self.records_separator)
yield data
def _split_entries(self):
for chunk in self._align_chunk():
for entry in chunk.split(self.records_separator):
try:
yield entry.decode(
self.site_encoding or self.default_site_encoding)
except UnicodeError:
pass
@staticmethod
def _check_filter(string, filter_patterns):
if filter_patterns and string:
for pattern in filter_patterns:
if pattern and pattern.search(string):
return True
return False
def _get_subnet(self, ip_addr):
regexp_obj = self.ip_pattern.fullmatch(ip_addr)
return regexp_obj.group(1) if regexp_obj else None
def ip_field_processing(self, string):
for i in string.split(self.ips_separator):
if self.BLLIST_IP_FILTER and self._check_filter(i, self.BLLIST_IP_FILTER_PATTERNS):
continue
if self.ip_pattern.fullmatch(i) and i not in self.ip_set:
subnet = self._get_subnet(i)
if subnet in self.BLLIST_GR_EXCLUDED_NETS or (
not self.BLLIST_IP_LIMIT or (
subnet not in self.ip_subnet_dict or self.ip_subnet_dict[subnet] <= self.BLLIST_IP_LIMIT
)
):
self.ip_set[i] = subnet
self.ip_subnet_dict[subnet] = (self.ip_subnet_dict.get(subnet) or 0) + 1
elif self.cidr_pattern.fullmatch(i) and i not in self.cidr_set:
self.cidr_set.add(i)
def _convert_to_punycode(self, string):
if self.cyr_pattern.search(string):
if self.BLLIST_ENABLE_IDN:
try:
string = string.encode("idna").decode(
self.site_encoding or self.default_site_encoding)
except UnicodeError:
pass
else:
raise FieldValueError()
return string
def _get_sld(self, fqdn):
regexp_obj = self.fqdn_pattern.fullmatch(fqdn)
return regexp_obj.group(2) if regexp_obj else None
def fqdn_field_processing(self, string):
if self.ip_pattern.fullmatch(string):
raise FieldValueError()
string = string.strip("*.").lower()
if self.BLLIST_STRIP_WWW:
string = self.www_pattern.sub("", string)
if not self.BLLIST_FQDN_FILTER or (
self.BLLIST_FQDN_FILTER and not self._check_filter(string, self.BLLIST_FQDN_FILTER_PATTERNS)
):
if self.fqdn_pattern.fullmatch(string):
string = self._convert_to_punycode(string)
sld = self._get_sld(string)
if sld in self.BLLIST_GR_EXCLUDED_SLD or (
not self.BLLIST_SD_LIMIT or (
sld not in self.sld_dict or self.sld_dict[sld] < self.BLLIST_SD_LIMIT
)
):
self.sld_dict[sld] = (self.sld_dict.get(sld) or 0) + 1
self.fqdn_set[string] = sld
else:
raise FieldValueError()
def parser_func(self):
"""Must be overridden by a subclass"""
raise NotImplementedError()
def _check_sld_masks(self, sld):
if self.BLLIST_GR_EXCLUDED_MASKS:
for pattern in self.BLLIST_GR_EXCLUDED_MASKS:
if re.fullmatch(pattern, sld):
return True
return False
def _optimize_fqdn_set(self):
optimized_set = set()
for fqdn, sld in self.fqdn_set.items():
if sld and (fqdn == sld or sld not in self.fqdn_set) and self.sld_dict.get(sld):
if (not self._check_sld_masks(sld) and (
self.BLLIST_SD_LIMIT and sld not in self.BLLIST_GR_EXCLUDED_SLD
)) and (self.sld_dict[sld] >= self.BLLIST_SD_LIMIT):
record_value = sld
del(self.sld_dict[sld])
else:
record_value = fqdn
optimized_set.add(record_value)
self.output_fqdn_count += 1
self.fqdn_set = optimized_set
def _optimize_ip_set(self):
optimized_set = set()
for ip_addr, subnet in self.ip_set.items():
if subnet in self.ip_subnet_dict:
if subnet not in self.BLLIST_GR_EXCLUDED_NETS and (
self.BLLIST_IP_LIMIT and self.ip_subnet_dict[subnet] >= self.BLLIST_IP_LIMIT
):
self.cidr_set.add(f"{subnet}0/24")
del(self.ip_subnet_dict[subnet])
else:
optimized_set.add(ip_addr)
self.ip_count += 1
self.ip_set = optimized_set
def _group_ip_ranges(self):
if self.BLLIST_SUMMARIZE_IP:
for i in summarize_ip_ranges(self.ip_set, True):
self.cidr_set.add(i.with_prefixlen)
self.ip_count = len(self.ip_set)
def _group_cidr_ranges(self):
if self.BLLIST_SUMMARIZE_CIDR:
for i in summarize_nets(self.cidr_set):
self.cidr_set.add(i.with_prefixlen)
self.cidr_count = len(self.cidr_set)
def run(self):
ret_value = 1
self.BLLIST_FQDN_FILTER_PATTERNS = self._compile_filter_patterns(self.BLLIST_FQDN_FILTER_PATTERNS)
self.BLLIST_IP_FILTER_PATTERNS = self._compile_filter_patterns(self.BLLIST_IP_FILTER_PATTERNS)
self.records_separator = bytes(self.records_separator, "utf-8")
self.parser_func()
if (len(self.ip_set) + len(self.cidr_set) + len(self.fqdn_set)) >= self.BLLIST_MIN_ENTRIES:
self._optimize_fqdn_set()
self._optimize_ip_set()
self._group_ip_ranges()
self._group_cidr_ranges()
ret_value = 0
else:
ret_value = 2
return ret_value
class RblFQDN(BlackListParser):
def __init__(self):
super().__init__()
self.url = self.RBL_ALL_URL
self.records_separator = '{"authority": '
self.ips_separator = ", "
def parser_func(self):
for entry in self._split_entries():
res = re.search(r'"domains": \["?(.*?)"?\].*?"ips": \[([a-f0-9/.:", ]*)\]', entry)
if not res:
continue
ip_string = res.group(2).replace('"', "")
fqdn_string = res.group(1)
if fqdn_string:
try:
self.fqdn_field_processing(fqdn_string)
except FieldValueError:
self.ip_field_processing(ip_string)
else:
self.ip_field_processing(ip_string)
class RblIp(BlackListParser):
def __init__(self):
super().__init__()
self.url = self.RBL_IP_URL
self.records_separator = ","
def parser_func(self):
for entry in self._split_entries():
self.ip_field_processing(re.sub(r'[\[\]" ]', "", entry))
class ZiFQDN(BlackListParser):
def __init__(self):
super().__init__()
self.url = self.ZI_ALL_URL
self.site_encoding = self.ZI_ENCODING
def parser_func(self):
for entry in self._split_entries():
entry_list = entry.split(self.fields_separator)
try:
if entry_list[1]:
try:
self.fqdn_field_processing(entry_list[1])
except FieldValueError:
self.ip_field_processing(entry_list[0])
else:
self.ip_field_processing(entry_list[0])
except IndexError:
pass
class ZiIp(ZiFQDN):
def parser_func(self):
for entry in self._split_entries():
entry_list = entry.split(self.fields_separator)
self.ip_field_processing(entry_list[0])
class AfFQDN(BlackListParser):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.url = self.AF_FQDN_URL
def parser_func(self):
for entry in self._split_entries():
try:
self.fqdn_field_processing(entry)
except FieldValueError:
self.ip_field_processing(entry)
class AfIp(BlackListParser):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.url = self.AF_IP_URL
self.BLLIST_MIN_ENTRIES = 100
def parser_func(self):
for entry in self._split_entries():
self.ip_field_processing(entry)
class RaFQDN(BlackListParser):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.url_ipset = self.RA_FQDN_IPSET_URL
self.url_dnsmasq = self.RA_FQDN_DMASK_URL
self.url_stat = self.RA_FQDN_STAT_URL
self.current_file_handler = None
def parser_func(self):
for chunk in self._download_data():
if chunk:
self.current_file_handler.write(chunk)
def download_config(self, url, cfg_file):
self.url = url
with open(cfg_file, "wb", buffering=-1) as self.current_file_handler:
self.parser_func()
def run(self):
self.download_config(self.url_ipset, self.IP_DATA_FILE)
self.download_config(self.url_dnsmasq, self.DNSMASQ_DATA_FILE)
self.download_config(self.url_stat, self.UPDATE_STATUS_FILE)
return 0
class RaIp(RaFQDN):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.url_ipset = self.RA_IP_IPSET_URL
self.url_dnsmasq = self.RA_IP_DMASK_URL
self.url_stat = self.RA_IP_STAT_URL
class WriteConfigFiles(Config):
def __init__(self):
self.write_buffer = -1
def write_ipset_config(self, ip_set, cidr_set):
with open(self.IP_DATA_FILE, "wt", buffering=self.write_buffer) as file_handler:
for i in ip_set:
file_handler.write(f"add {self.IPSET_IP_TMP} {i}\n")
for i in cidr_set:
file_handler.write(f"add {self.IPSET_CIDR_TMP} {i}\n")
def write_dnsmasq_config(self, fqdn_set):
with open(self.DNSMASQ_DATA_FILE, "wt", buffering=self.write_buffer) as file_handler:
for fqdn in fqdn_set:
file_handler.write(
f"server=/{fqdn}/{self.BLLIST_ALT_DNS_ADDR}\nipset=/{fqdn}/{self.IPSET_DNSMASQ}\n"
if self.BLLIST_ALT_NSLOOKUP else
f"ipset=/{fqdn}/{self.IPSET_DNSMASQ}\n")
def write_update_status_file(self, ip_count, cidr_count, output_fqdn_count):
with open(self.UPDATE_STATUS_FILE, "wt") as file_handler:
file_handler.write(
f"{cidr_count} {ip_count} {output_fqdn_count}")
if __name__ == "__main__":
Config.load_environ_config()
Config.load_fqdn_filter()
Config.load_ip_filter()
ctx_dict = {
"ip": {"rublacklist": RblIp, "zapret-info": ZiIp, "antifilter": AfIp, "ruantiblock": RaIp},
"fqdn": {"rublacklist": RblFQDN, "zapret-info": ZiFQDN, "antifilter": AfFQDN, "ruantiblock": RaFQDN},
}
write_cfg_obj = WriteConfigFiles()
try:
ctx = ctx_dict[Config.BLLIST_MODE][Config.BLLIST_SOURCE]()
except KeyError:
print("Wrong configuration! (Config.BLLIST_MODE or Config.BLLIST_SOURCE)",
file=sys.stderr)
sys.exit(1)
ret_code = ctx.run()
if ret_code == 0 and Config.BLLIST_SOURCE != "ruantiblock":
write_cfg_obj.write_dnsmasq_config(ctx.fqdn_set)
write_cfg_obj.write_ipset_config(ctx.ip_set, ctx.cidr_set)
write_cfg_obj.write_update_status_file(ctx.ip_count, ctx.cidr_count, ctx.output_fqdn_count)
sys.exit(ret_code)