aboutsummaryrefslogtreecommitdiff
path: root/tor-site-tester.py
diff options
context:
space:
mode:
Diffstat (limited to 'tor-site-tester.py')
-rw-r--r--tor-site-tester.py221
1 files changed, 221 insertions, 0 deletions
diff --git a/tor-site-tester.py b/tor-site-tester.py
new file mode 100644
index 0000000..ac1d112
--- /dev/null
+++ b/tor-site-tester.py
@@ -0,0 +1,221 @@
+import requests
+from stem import Signal, StreamStatus
+from stem.control import Controller, EventType
+import stem.process
+from dataclasses import dataclass
+from typing import Any
+from argparse import ArgumentParser
+import functools
+from time import perf_counter, sleep
+
+from term import Term
+
+
+@dataclass
+class TORProc:
+ torproc: any
+ port: int
+ exit_node: str
+
+ def __del__(self):
+ if self.torproc is not None:
+ self.torproc.kill()
+ if self.controller is not None:
+ self.controller.close()
+
+ def __post_init__(self):
+ self.last_exit_node = None
+ self.controller = Controller.from_port(port=self.port + 2000)
+ self.controller.connect()
+ self.controller.authenticate()
+ self.stream_listener = functools.partial(self.stream_event, self.controller)
+ self.controller.add_event_listener(self.stream_listener, EventType.STREAM)
+
+ def stream_event(self, controller, event):
+ if event.status == StreamStatus.SUCCEEDED:# and (event.circ_id == 1):
+ try:
+ circ = controller.get_circuit(event.circ_id)
+ exit_fingerprint = circ.path[-1][0]
+ exit_relay = controller.get_network_status(exit_fingerprint)
+ locale = controller.get_info("ip-to-country/%s" % exit_relay.address, 'unknown')
+ self.last_exit_node = (locale, exit_relay.address, exit_relay.fingerprint, exit_relay.nickname)
+ except Exception as e: Term.log(e)
+
+ @property
+ def proxies(self):
+ return {
+ 'http': f'socks5h://127.0.0.1:{self.port}',
+ 'https': f'socks5h://127.0.0.1:{self.port}'
+ }
+
+ def get_proxy_info(self):
+ info = requests.get('https://www.ipinfo.io', proxies=self.proxies).json()
+ remote_info = ("Could not get second opinion on exit node...", None, None, None)
+ if ("ip" in info) and ("country" in info) and ("region" in info):
+ remote_info = (
+ f"Remote reported as {info['country']}, {info['region']} (ip {info['ip']})",
+ info["country"].lower() == self.exit_node.lower(),
+ info["country"],
+ info["ip"],
+ info["region"]
+ )
+
+ local_info = (
+ f"Exit node \"{self.last_exit_node[3]}\" {self.last_exit_node[2]} in {self.last_exit_node[0]} (ip {self.last_exit_node[1]})",
+ self.last_exit_node[0] == self.exit_node.lower(),
+ *self.last_exit_node
+ )
+ status = 1
+ if remote_info[1] and local_info[1]: status = 0
+ elif (remote_info[1] is None) and local_info[1]: status = 2
+ return status, remote_info, local_info
+
+
+class TORConnections:
+
+
+ def __init__(self, start_port = 7000):
+ self._tor_procs = []
+ self._start_port = start_port
+ self._cport = self._start_port
+
+
+ def start_tor_proc(self, exit_node):
+ port = self._cport
+ self._cport += 1
+ torproc = stem.process.launch_tor_with_config(
+ config = {
+ "ControlPort": str(port + 2000),
+ "SocksPort": str(port),
+ "ExitNodes": "{" + exit_node + "}",
+ "DataDirectory": "tordata/tor" + str(port)
+ },
+ init_msg_handler = Term.proc_state,
+ take_ownership = True
+ )
+ Term.proc_state("Attaching controller to TOR process")
+ torproc = TORProc(torproc, port, exit_node)
+ self._tor_procs.append(torproc)
+ return torproc
+
+
+ def close_tor_procs(self):
+ for proc in self._tor_procs:
+ proc.torproc.kill()
+
+
+ def get_country_proc(self, exit_node):
+ for proc in self._tor_procs:
+ if proc.exit_node == exit_node:
+ return proc
+ raise Exception("No TOR proxy with that exit country running, are you sure you have started it?")
+
+ def remove_tor_proc(self, torproc: TORProc):
+ del self._tor_procs[self._tor_procs.index(torproc)]
+
+
+def do_request(url, proxies = None):
+ t1 = perf_counter()
+ try:
+ res = requests.get(url, proxies=proxies)
+ return (res.status_code, perf_counter() - t1)
+ except requests.ConnectionError as e:
+ return (e, perf_counter() - t1)
+
+
+def test_sites(tor: TORConnections, sites: list[str], countries: list[str]):
+ results = {site: dict() for site in sites}
+ for country in countries:
+ Term.proc_start(f"Start TOR Proxy with endpoint in {country}", spin=True)
+ try:
+ torproc = tor.start_tor_proc(country)
+ Term.proc_state("Checking connection endpoint")
+ status, r_info, l_info = torproc.get_proxy_info()
+ Term.proc_end(msg=f"{l_info[0]} | {r_info[0]}", result=status)
+ except Exception as e:
+ Term.proc_end(msg=f"Could not start proxy ({e}).", result=1)
+ continue
+
+ Term.proc_start(f"Test sites via {country}", spin=True)
+ for i, site in enumerate(sites):
+ Term.proc_state(f"Site {i}/{len(sites)}")
+ results[site][country] = do_request(site, torproc.proxies)
+ sleep(1)
+ Term.proc_end()
+
+ tor.remove_tor_proc(torproc)
+
+ Term.proc_start("Test sites without proxy", spin=True)
+ for i, site in enumerate(sites):
+ Term.proc_state(f"Site {i}/{len(sites)}")
+ results[site][""] = do_request(site)
+ Term.proc_end()
+ return results
+
+
+def pp_results(countries: list[str], results: dict[str, dict[str, Any]], fn = None):
+ countries.insert(0, "")
+ tbl = [["site"]]
+ for country in countries:
+ tbl[0].append(f"{country}")
+ tbl[0].append(f"{country} time")
+ for site, res in results.items():
+ row = ["" for _ in range(len(tbl[0]))]
+ row[0] = site
+ for country, res in res.items():
+ row[tbl[0].index(f"{country}")] = str(res[0])
+ row[tbl[0].index(f"{country} time")] = f"{res[1]:7.5f}"
+ tbl.append(tuple(row))
+ if fn is not None:
+ with open(fn, "w") as f:
+ for row in tbl:
+ f.write(", ".join(row))
+ f.write("\n")
+ table(tbl)
+
+
+def table(tbl):
+ w = [0 for _ in range(len(tbl[0]))]
+ for row in tbl:
+ for i, cell in enumerate(row):
+ w[i] = max(len(str(cell)), w[i])
+
+ print("+" + "+".join("-" * (x+2) for x in w) + "+")
+ for j, row in enumerate(tbl):
+ for i, cell in enumerate(row):
+ c = cell
+ if c == "200": c = f"\u001b[32m{c}\u001b[0m"
+ if c == "403": c = f"\u001b[31m{c}\u001b[0m"
+ print(f"| {c:{w[i]}} ", end="")
+ print("|")
+ if j == 0:
+ print("+" + "+".join("-" * (x+2) for x in w) + "+")
+ print("+" + "+".join("-" * (x+2) for x in w) + "+")
+
+if __name__ == "__main__":
+ parser = ArgumentParser()
+ parser.add_argument("--port-start", type=int, default=7000)
+ parser.add_argument("-c", "--countries", type=str, required=True)
+ parser.add_argument("-s", "--sites-file", type=str, required=True)
+ parser.add_argument("-o", "--out-file", type=str, default=None)
+ args = parser.parse_args()
+
+ Term.begin()
+
+ Term.proc_start("Read sites", spin=True)
+ with open(args.sites_file, "r") as f:
+ sites = [x.strip() for x in f.readlines() if x.strip() != ""]
+ Term.proc_end()
+
+ tor = TORConnections(args.port_start)
+ try:
+ countries = args.countries.split(",")
+ results = test_sites(tor, sites, countries)
+ pp_results(countries, results, args.out_file)
+ except KeyboardInterrupt:
+ Term.log("Interrupt received, exiting")
+ except Exception as e:
+ print(e)
+ raise e
+ finally:
+ tor.close_tor_procs()