1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
| import time import random import requests import warnings from tqdm.rich import trange from datetime import datetime from concurrent.futures import ThreadPoolExecutor, as_completed warnings.filterwarnings("ignore")
def visit(url, delay): start_time = datetime.now() headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' }
try: response = requests.get(url, headers=headers, timeout=10) min_delay, max_delay = delay time.sleep(random.uniform(min_delay, max_delay))
return { 'status_code': response.status_code, 'time': datetime.now() - start_time, 'success': True } except Exception as e: return { 'status_code': None, 'error': str(e), 'time': datetime.now() - start_time, 'success': False }
def execute_round(url, round_config, round_num, rounds, wait_time): times = round_config['requests'] workers = round_config['workers'] delay = round_config['delay']
if round_num == 0: if rounds == 1: print(f"執行第 1/{rounds} 輪") else: print(f"執行第 1/{rounds} 輪,等待 {wait_time:.1f} 秒") elif round_num < rounds - 1: print(f"\n執行第 {round_num + 1}/{rounds} 輪,等待 {wait_time:.1f} 秒") else: print(f"\n執行第 {round_num + 1}/{rounds} 輪")
with ThreadPoolExecutor(max_workers=workers) as executor: futures = [executor.submit(visit, url, delay) for _ in range(times)] with trange(times, desc="進度", ncols=100, unit="次", dynamic_ncols=False, bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]") as progress_bar: for future in as_completed(futures): future.result() progress_bar.update(1) if progress_bar.n < progress_bar.total: progress_bar.update(progress_bar.total - progress_bar.n)
def main(config): url = config['url'] rounds = config['rounds'] round_config = { 'requests': config['requests'], 'workers': config['workers'], 'delay': config['delay'] }
total_start_time = time.time()
min_interval, max_interval = config['DELAY'] wait_times = [random.uniform(min_interval, max_interval) for _ in range(rounds - 1)] + [0]
for round_num in range(rounds): execute_round(url, round_config, round_num, rounds, wait_times[round_num])
if round_num < rounds - 1: time.sleep(wait_times[round_num])
total_time = time.time() - total_start_time print(f"\n(執行時間: {total_time:.2f} 秒)")
if __name__ == '__main__': config = { 'url': "https://example.com", 'rounds': 5, 'requests': 20, 'workers': 10, 'delay': (1.0, 2.0), 'DELAY': (5.0, 8.0) }
main(config)
|