import requests from stem import Signal, StreamStatus from stem.control import Controller, EventType import stem.process from dataclasses import dataclass from typing import Any from argparse import ArgumentParser import functools from time import perf_counter, sleep from term import Term @dataclass class TORProc: torproc: any port: int exit_node: str def __del__(self): if self.torproc is not None: self.torproc.kill() if self.controller is not None: self.controller.close() def __post_init__(self): self.last_exit_node = None self.controller = Controller.from_port(port=self.port + 2000) self.controller.connect() self.controller.authenticate() self.stream_listener = functools.partial(self.stream_event, self.controller) self.controller.add_event_listener(self.stream_listener, EventType.STREAM) def stream_event(self, controller, event): if event.status == StreamStatus.SUCCEEDED:# and (event.circ_id == 1): try: circ = controller.get_circuit(event.circ_id) exit_fingerprint = circ.path[-1][0] exit_relay = controller.get_network_status(exit_fingerprint) locale = controller.get_info("ip-to-country/%s" % exit_relay.address, 'unknown') self.last_exit_node = (locale, exit_relay.address, exit_relay.fingerprint, exit_relay.nickname) except Exception as e: Term.log(e) @property def proxies(self): return { 'http': f'socks5h://127.0.0.1:{self.port}', 'https': f'socks5h://127.0.0.1:{self.port}' } def get_proxy_info(self): info = requests.get('https://www.ipinfo.io', proxies=self.proxies).json() remote_info = ("Could not get second opinion on exit node...", None, None, None) if ("ip" in info) and ("country" in info) and ("region" in info): remote_info = ( f"Remote reported as {info['country']}, {info['region']} (ip {info['ip']})", info["country"].lower() == self.exit_node.lower(), info["country"], info["ip"], info["region"] ) local_info = ( f"Exit node \"{self.last_exit_node[3]}\" {self.last_exit_node[2]} in {self.last_exit_node[0]} (ip {self.last_exit_node[1]})", self.last_exit_node[0] == self.exit_node.lower(), *self.last_exit_node ) status = 1 if remote_info[1] and local_info[1]: status = 0 elif (remote_info[1] is None) and local_info[1]: status = 2 return status, remote_info, local_info class TORConnections: def __init__(self, start_port = 7000): self._tor_procs = [] self._start_port = start_port self._cport = self._start_port def start_tor_proc(self, exit_node): port = self._cport self._cport += 1 torproc = stem.process.launch_tor_with_config( config = { "ControlPort": str(port + 2000), "SocksPort": str(port), "ExitNodes": "{" + exit_node + "}", "DataDirectory": "tordata/tor" + str(port) }, init_msg_handler = Term.proc_state, take_ownership = True ) Term.proc_state("Attaching controller to TOR process") torproc = TORProc(torproc, port, exit_node) self._tor_procs.append(torproc) return torproc def close_tor_procs(self): for proc in self._tor_procs: proc.torproc.kill() def get_country_proc(self, exit_node): for proc in self._tor_procs: if proc.exit_node == exit_node: return proc raise Exception("No TOR proxy with that exit country running, are you sure you have started it?") def remove_tor_proc(self, torproc: TORProc): del self._tor_procs[self._tor_procs.index(torproc)] def do_request(url, proxies = None): t1 = perf_counter() try: res = requests.get(url, proxies=proxies) return (res.status_code, perf_counter() - t1) except requests.ConnectionError as e: return (e, perf_counter() - t1) def test_sites(tor: TORConnections, sites: list[str], countries: list[str]): results = {site: dict() for site in sites} for country in countries: Term.proc_start(f"Start TOR Proxy with endpoint in {country}", spin=True) try: torproc = tor.start_tor_proc(country) Term.proc_state("Checking connection endpoint") status, r_info, l_info = torproc.get_proxy_info() Term.proc_end(msg=f"{l_info[0]} | {r_info[0]}", result=status) except Exception as e: Term.proc_end(msg=f"Could not start proxy ({e}).", result=1) continue Term.proc_start(f"Test sites via {country}", spin=True) for i, site in enumerate(sites): Term.proc_state(f"Site {i}/{len(sites)}") results[site][country] = do_request(site, torproc.proxies) sleep(1) Term.proc_end() tor.remove_tor_proc(torproc) Term.proc_start("Test sites without proxy", spin=True) for i, site in enumerate(sites): Term.proc_state(f"Site {i}/{len(sites)}") results[site][""] = do_request(site) Term.proc_end() return results def pp_results(countries: list[str], results: dict[str, dict[str, Any]], fn = None): countries.insert(0, "") tbl = [["site"]] for country in countries: tbl[0].append(f"{country}") tbl[0].append(f"{country} time") for site, res in results.items(): row = ["" for _ in range(len(tbl[0]))] row[0] = site for country, res in res.items(): row[tbl[0].index(f"{country}")] = str(res[0]) row[tbl[0].index(f"{country} time")] = f"{res[1]:7.5f}" tbl.append(tuple(row)) if fn is not None: with open(fn, "w") as f: for row in tbl: f.write(", ".join(row)) f.write("\n") table(tbl) def table(tbl): w = [0 for _ in range(len(tbl[0]))] for row in tbl: for i, cell in enumerate(row): w[i] = max(len(str(cell)), w[i]) divider = ("+" + "+".join("-" * (x+2) for x in w) + "+\n") tablestr = "" tablestr += divider for j, row in enumerate(tbl): for i, cell in enumerate(row): c = cell if c == "200": c = f"\u001b[32m{c}\u001b[0m" if c == "403": c = f"\u001b[31m{c}\u001b[0m" tablestr += f"| {c:{w[i]}} " tablestr += "|\n" if j == 0: tablestr += divider tablestr += divider Term.log(tablestr) if __name__ == "__main__": parser = ArgumentParser() parser.add_argument("--port-start", type=int, default=7000) parser.add_argument("-c", "--countries", type=str, default="") parser.add_argument("-s", "--sites-file", type=str, required=True) parser.add_argument("-o", "--out-file", type=str, default=None) args = parser.parse_args() Term.begin() Term.proc_start("Read sites", spin=True) with open(args.sites_file, "r") as f: sites = [x.strip() for x in f.readlines() if x.strip() != ""] Term.proc_end(msg=f"got {len(sites)} to test") tor = TORConnections(args.port_start) try: countries = [x.strip() for x in args.countries.split(",") if x.strip() != ""] results = test_sites(tor, sites, countries) pp_results(countries, results, args.out_file) except KeyboardInterrupt: Term.log("Interrupt received, exiting") except Exception as e: print(e) raise e finally: tor.close_tor_procs()