aboutsummaryrefslogtreecommitdiff
path: root/tor-site-tester.py
diff options
context:
space:
mode:
Diffstat (limited to 'tor-site-tester.py')
-rw-r--r--tor-site-tester.py52
1 files changed, 28 insertions, 24 deletions
diff --git a/tor-site-tester.py b/tor-site-tester.py
index 912117d..7ce28ed 100644
--- a/tor-site-tester.py
+++ b/tor-site-tester.py
@@ -1,12 +1,13 @@
-import requests
-from stem import Signal, StreamStatus
-from stem.control import Controller, EventType
-import stem.process
-from dataclasses import dataclass
-from typing import Any
-from argparse import ArgumentParser
import functools
+from argparse import ArgumentParser
+from dataclasses import dataclass
from time import perf_counter, sleep
+from typing import Any
+
+import requests
+import stem.process
+from stem import StreamStatus
+from stem.control import Controller, EventType
from term import Term
@@ -26,20 +27,21 @@ class TORProc:
def __post_init__(self):
self.last_exit_node = None
self.controller = Controller.from_port(port=self.port + 2000)
- self.controller.connect()
+ self.controller.connect()
self.controller.authenticate()
self.stream_listener = functools.partial(self.stream_event, self.controller)
self.controller.add_event_listener(self.stream_listener, EventType.STREAM)
def stream_event(self, controller, event):
- if event.status == StreamStatus.SUCCEEDED:# and (event.circ_id == 1):
+ if event.status == StreamStatus.SUCCEEDED: # and (event.circ_id == 1):
try:
circ = controller.get_circuit(event.circ_id)
exit_fingerprint = circ.path[-1][0]
exit_relay = controller.get_network_status(exit_fingerprint)
locale = controller.get_info("ip-to-country/%s" % exit_relay.address, 'unknown')
self.last_exit_node = (locale, exit_relay.address, exit_relay.fingerprint, exit_relay.nickname)
- except Exception as e: Term.log(e)
+ except Exception as e:
+ Term.log(e)
@property
def proxies(self):
@@ -66,44 +68,42 @@ class TORProc:
*self.last_exit_node
)
status = 1
- if remote_info[1] and local_info[1]: status = 0
- elif (remote_info[1] is None) and local_info[1]: status = 2
+ if remote_info[1] and local_info[1]:
+ status = 0
+ elif (remote_info[1] is None) and local_info[1]:
+ status = 2
return status, remote_info, local_info
class TORConnections:
-
- def __init__(self, start_port = 7000):
+ def __init__(self, start_port=7000):
self._tor_procs = []
self._start_port = start_port
self._cport = self._start_port
-
def start_tor_proc(self, exit_node):
port = self._cport
self._cport += 1
torproc = stem.process.launch_tor_with_config(
- config = {
+ config={
"ControlPort": str(port + 2000),
"SocksPort": str(port),
"ExitNodes": "{" + exit_node + "}",
"DataDirectory": "tordata/tor" + str(port)
},
- init_msg_handler = Term.proc_state,
- take_ownership = True
+ init_msg_handler=Term.proc_state,
+ take_ownership=True
)
Term.proc_state("Attaching controller to TOR process")
torproc = TORProc(torproc, port, exit_node)
self._tor_procs.append(torproc)
return torproc
-
def close_tor_procs(self):
for proc in self._tor_procs:
proc.torproc.kill()
-
def get_country_proc(self, exit_node):
for proc in self._tor_procs:
if proc.exit_node == exit_node:
@@ -114,10 +114,13 @@ class TORConnections:
del self._tor_procs[self._tor_procs.index(torproc)]
-def do_request(url, proxies = None):
+def do_request(url, proxies=None):
t1 = perf_counter()
+ headers = {
+ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; rv:104.0) Gecko/20100101 Firefox/104.0",
+ }
try:
- res = requests.get(url, proxies=proxies)
+ res = requests.get(url, proxies=proxies, headers=headers)
return (res.status_code, perf_counter() - t1)
except requests.ConnectionError as e:
return (e, perf_counter() - t1)
@@ -144,7 +147,7 @@ def test_sites(tor: TORConnections, sites: list[str], countries: list[str]):
Term.proc_end()
tor.remove_tor_proc(torproc)
-
+
Term.proc_start("Test sites without proxy", spin=True)
for i, site in enumerate(sites):
Term.proc_state(f"Site {i}/{len(sites)}")
@@ -153,7 +156,7 @@ def test_sites(tor: TORConnections, sites: list[str], countries: list[str]):
return results
-def pp_results(countries: list[str], results: dict[str, dict[str, Any]], fn = None):
+def pp_results(countries: list[str], results: dict[str, dict[str, Any]], fn=None):
countries.insert(0, "")
tbl = [["site"]]
for country in countries:
@@ -195,6 +198,7 @@ def table(tbl):
tablestr += divider
Term.log(tablestr)
+
if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument("--port-start", type=int, default=7000)