import functools from argparse import ArgumentParser from dataclasses import dataclass from time import perf_counter, sleep from typing import Any import requests import stem.process from stem import StreamStatus from stem.control import Controller, EventType from term import Term @dataclass class TORProc: torproc: any port: int exit_node: str def __del__(self): if self.torproc is not None: self.torproc.kill() if self.controller is not None: self.controller.close() def __post_init__(self): self.last_exit_node = None self.controller = Controller.from_port(port=self.port + 2000) self.controller.connect() self.controller.authenticate() self.stream_listener = functools.partial(self.stream_event, self.controller) self.controller.add_event_listener(self.stream_listener, EventType.STREAM) def stream_event(self, controller, event): if event.status == StreamStatus.SUCCEEDED: # and (event.circ_id == 1): try: circ = controller.get_circuit(event.circ_id) exit_fingerprint = circ.path[-1][0] exit_relay = controller.get_network_status(exit_fingerprint) locale = controller.get_info("ip-to-country/%s" % exit_relay.address, 'unknown') self.last_exit_node = (locale, exit_relay.address, exit_relay.fingerprint, exit_relay.nickname) except Exception as e: Term.log(e) @property def proxies(self): return { 'http': f'socks5h://127.0.0.1:{self.port}', 'https': f'socks5h://127.0.0.1:{self.port}' } def get_proxy_info(self): info = requests.get('https://www.ipinfo.io', proxies=self.proxies).json() remote_info = ("Could not get second opinion on exit node...", None, None, None) if ("ip" in info) and ("country" in info) and ("region" in info): remote_info = ( f"Remote reported as {info['country']}, {info['region']} (ip {info['ip']})", info["country"].lower() == self.exit_node.lower(), info["country"], info["ip"], info["region"] ) local_info = ( f"Exit node \"{self.last_exit_node[3]}\" {self.last_exit_node[2]} in {self.last_exit_node[0]} (ip {self.last_exit_node[1]})", self.last_exit_node[0] == self.exit_node.lower(), *self.last_exit_node ) status = 1 if remote_info[1] and local_info[1]: status = 0 elif (remote_info[1] is None) and local_info[1]: status = 2 return status, remote_info, local_info class TORConnections: def __init__(self, start_port=7000): self._tor_procs = [] self._start_port = start_port self._cport = self._start_port def start_tor_proc(self, exit_node): port = self._cport self._cport += 1 torproc = stem.process.launch_tor_with_config( config={ "ControlPort": str(port + 2000), "SocksPort": str(port), "ExitNodes": "{" + exit_node + "}", "DataDirectory": "tordata/tor" + str(port) }, init_msg_handler=Term.proc_state, take_ownership=True ) Term.proc_state("Attaching controller to TOR process") torproc = TORProc(torproc, port, exit_node) self._tor_procs.append(torproc) return torproc def close_tor_procs(self): for proc in self._tor_procs: proc.torproc.kill() def get_country_proc(self, exit_node): for proc in self._tor_procs: if proc.exit_node == exit_node: return proc raise Exception("No TOR proxy with that exit country running, are you sure you have started it?") def remove_tor_proc(self, torproc: TORProc): del self._tor_procs[self._tor_procs.index(torproc)] def do_request(url, proxies=None): t1 = perf_counter() headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; rv:104.0) Gecko/20100101 Firefox/104.0", } try: res = requests.get(url, proxies=proxies, headers=headers, timeout=120) Term.log(str(res.text), display=False) return (res.status_code, perf_counter() - t1) except (requests.ReadTimeout, requests.ConnectionError) as e: return (e, perf_counter() - t1) def test_sites(tor: TORConnections, sites: list[str], countries: list[str]): results = {site: dict() for site in sites} for country in countries: Term.proc_start(f"Start TOR Proxy with endpoint in {country}", spin=True) try: torproc = tor.start_tor_proc(country) Term.proc_state("Checking connection endpoint") status, r_info, l_info = torproc.get_proxy_info() Term.proc_end(msg=f"{l_info[0]} | {r_info[0]}", result=status) except Exception as e: Term.proc_end(msg=f"Could not start proxy ({e}).", result=1) continue Term.proc_start(f"Test sites via {country}", spin=True) for i, site in enumerate(sites): Term.proc_state(f"Site {i}/{len(sites)}") results[site][country] = do_request(site, torproc.proxies) sleep(1) Term.proc_end() tor.remove_tor_proc(torproc) Term.proc_start("Test sites without proxy", spin=True) for i, site in enumerate(sites): Term.proc_state(f"Site {i}/{len(sites)}") results[site][""] = do_request(site) Term.proc_end() return results def pp_results(countries: list[str], results: dict[str, dict[str, Any]], fn=None): countries.insert(0, "") tbl = [["site"]] for country in countries: tbl[0].append(f"{country}") tbl[0].append(f"{country} time") for site, res in results.items(): row = ["" for _ in range(len(tbl[0]))] row[0] = site for country, res in res.items(): row[tbl[0].index(f"{country}")] = str(res[0]) row[tbl[0].index(f"{country} time")] = f"{res[1]:7.5f}" tbl.append(tuple(row)) if fn is not None: with open(fn, "w") as f: for row in tbl: f.write(", ".join(row)) f.write("\n") table(tbl) def table(tbl): w = [0 for _ in range(len(tbl[0]))] for row in tbl: for i, cell in enumerate(row): w[i] = max(len(str(cell)), w[i]) divider = ("+" + "+".join("-" * (x+2) for x in w) + "+\n") tablestr = "" tablestr += divider for j, row in enumerate(tbl): for i, cell in enumerate(row): c = cell if c == "200": c = f"\u001b[32m{c}\u001b[0m" if c == "403": c = f"\u001b[31m{c}\u001b[0m" tablestr += f"| {c:{w[i]}} " tablestr += "|\n" if j == 0: tablestr += divider tablestr += divider Term.log(tablestr) if __name__ == "__main__": parser = ArgumentParser() parser.add_argument("--port-start", type=int, default=7000) parser.add_argument("-c", "--countries", type=str, default="") parser.add_argument("-s", "--sites-file", type=str, required=True) parser.add_argument("-o", "--out-file", type=str, default=None) args = parser.parse_args() Term.begin() Term.proc_start("Read sites", spin=True) with open(args.sites_file, "r") as f: sites = [x.strip() for x in f.readlines() if x.strip() != ""] Term.proc_end(msg=f"got {len(sites)} to test") tor = TORConnections(args.port_start) try: countries = [x.strip() for x in args.countries.split(",") if x.strip() != ""] results = test_sites(tor, sites, countries) pp_results(countries, results, args.out_file) except KeyboardInterrupt: Term.log("Interrupt received, exiting") except Exception as e: print(e) raise e finally: tor.close_tor_procs()