Example: Managing a Proxy Pool with Automatic Updates
This example demonstrates how RedisAllocator can be used for managing a dynamic proxy pool, featuring automatic pool updates using the RedisAllocatorUpdater integrated with DefaultRedisAllocatorPolicy.
Scenario
We need a system where: 1. Proxies are fetched periodically from different sources (e.g., APIs, databases). 2. These fetched proxies automatically update the central Redis pool. 3. Multiple worker threads allocate exclusive proxies from this pool. 4. Workers use the proxies for tasks. 5. Proxies are automatically returned/recovered via free() and gc(). 6. (Future) Priority and Soft Binding features enhance allocation.
Core RedisAllocator Concepts Used
RedisAllocator(policy=…): Using a policy to control behavior.
RedisAllocatorUpdater: Base class for defining how to fetch resources.
DefaultRedisAllocatorPolicy(updater=…): Policy that uses an updater to refresh the pool periodically.
malloc(): Workers acquire proxies (triggering policy checks).
free(): Workers return proxies.
gc(): Background process for recovery.
Resource Prioritization & Soft Binding: (Mentioned as TODO / Future).
Implementation Sketch
1import redis
2import threading
3import time
4import random
5from typing import Sequence, Any
6from redis_allocator import RedisAllocator
7# Import base class and default policy
8from redis_allocator.allocator import RedisAllocatorUpdater, DefaultRedisAllocatorPolicy
9
10# --- Configuration ---
11REDIS_HOST = 'localhost'
12REDIS_PORT = 6379
13PROXY_POOL_PREFIX = 'myapp'
14PROXY_POOL_SUFFIX = 'proxies'
15NUM_WORKERS = 5
16ALLOCATION_TIMEOUT = 120 # Seconds a proxy can be held
17
18# --- Custom Updater Implementation ---
19
20class ProxySourceUpdater(RedisAllocatorUpdater):
21 """Fetches proxy lists from different predefined sources."""
22 def fetch(self, source_id: str) -> Sequence[str]:
23 """Simulates fetching proxies from a specific source URL/ID."""
24 print(f"[{threading.current_thread().name or 'Updater'}] Fetching proxies from source: {source_id}...")
25 # Replace with actual fetching logic (e.g., API call, DB query)
26 time.sleep(0.5) # Simulate fetch time
27 if source_id == 'source_A_api':
28 # Simulate getting a list of proxy strings
29 new_proxies = [f'proxy_A_{i}:8000' for i in range(random.randint(1,3))]
30 elif source_id == 'source_B_db':
31 new_proxies = [f'proxy_B_{i}:9000' for i in range(random.randint(0,2))]
32 else:
33 new_proxies = [] # Simulate source being unavailable
34 print(f"[{threading.current_thread().name or 'Updater'}] Fetched {len(new_proxies)} proxies from {source_id}: {new_proxies}")
35 # TODO: Return priority info when allocator supports it: [('proxy_A_0:8000', 10), ...]
36 return new_proxies
37
38# --- Setup ---
39redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)
40
41# Define the sources the updater will cycle through
42PROXY_SOURCES = ['source_A_api', 'source_B_db']
43# How often the policy should *try* to run the updater (in seconds)
44# The updater only runs if the update_lock is acquired during a malloc call.
45UPDATE_INTERVAL = 60
46# Optional: Default expiry for items added by the updater (-1 = no expiry)
47UPDATER_ITEM_EXPIRY = 3600
48
49# 1. Instantiate the custom Updater
50proxy_updater = ProxySourceUpdater(params=PROXY_SOURCES)
51
52# 2. Instantiate the Policy, providing the updater
53default_policy = DefaultRedisAllocatorPolicy(
54 updater=proxy_updater,
55 update_interval=UPDATE_INTERVAL,
56 expiry_duration=UPDATER_ITEM_EXPIRY,
57 gc_count=5 # How many items GC checks per malloc call
58)
59
60# 3. Instantiate the Allocator using the configured Policy
61proxy_allocator = RedisAllocator(
62 redis_client,
63 prefix=PROXY_POOL_PREFIX,
64 suffix=PROXY_POOL_SUFFIX,
65 shared=False, # Proxies are used exclusively
66 policy=default_policy # IMPORTANT: Use the policy
67)
68
69# --- Proxy Usage Simulation (Remains mostly the same) ---
70
71def get_proxy_config_url(proxy_key):
72 """Simulates fetching configuration for a given proxy key."""
73 # In reality, this might query a database or another Redis key
74 print(f"[{threading.current_thread().name}] Fetched config for {proxy_key}")
75 return f"http://config.server/{proxy_key}"
76
77def start_local_proxy(config_url):
78 """Simulates configuring and starting a local proxy process/connection."""
79 print(f"[{threading.current_thread().name}] Starting proxy using {config_url}")
80 time.sleep(0.1) # Simulate startup time
81 return True # Simulate success
82
83# --- Scrape Target Simulation (Remains the same) ---
84
85def fetch_scrape_target():
86 """Simulates fetching the next high-priority item to scrape."""
87 # TODO: Target fetching logic is application-specific.
88 # Could use another Redis list, queue, or even another Allocator
89 # if targets themselves are managed resources. Priority needed here too.
90 targets = ["item_A", "item_B", "item_C", "item_D", "item_E"]
91 target = random.choice(targets)
92 print(f"[{threading.current_thread().name}] Fetched target: {target}")
93 return target
94
95# --- Worker Thread Logic (Remains mostly the same) ---
96
97def worker_thread_logic(worker_id):
98 """Main logic for each scraper worker thread."""
99 print(f"[Worker-{worker_id}] Started.")
100 allocated_proxy_obj = None
101 retry_delay = 1
102 while True: # Keep trying to allocate and work
103 try:
104 # 1. Allocate a Proxy (This now triggers Policy checks -> GC and Updater)
105 print(f"[Worker-{worker_id}] Attempting to allocate proxy...")
106 allocated_proxy_obj = proxy_allocator.malloc(timeout=ALLOCATION_TIMEOUT)
107
108 if allocated_proxy_obj:
109 retry_delay = 1 # Reset delay on success
110 proxy_key = allocated_proxy_obj.key
111 print(f"[Worker-{worker_id}] Allocated proxy: {proxy_key}")
112
113 # 2. Simulate using the proxy
114 config_url = get_proxy_config_url(proxy_key)
115 if start_local_proxy(config_url):
116 print(f"[Worker-{worker_id}] Proxy started successfully.")
117
118 # 3. Fetch target and simulate work
119 target = fetch_scrape_target()
120 if target:
121 print(f"[Worker-{worker_id}] Simulating scrape of {target} via {proxy_key}...")
122 time.sleep(random.uniform(0.5, 2.0)) # Simulate work duration
123 print(f"[Worker-{worker_id}] Finished scrape of {target}.")
124
125 else:
126 print(f"[Worker-{worker_id}] Failed to start proxy {proxy_key}.")
127 # Optionally, report this proxy as bad
128
129 # IMPORTANT: Free the proxy *only after* you are completely done with it for this task
130 # Move the free call inside the 'if allocated_proxy_obj:' block before the loop continues or breaks
131 print(f"[Worker-{worker_id}] Freeing proxy: {allocated_proxy_obj.key}")
132 proxy_allocator.free(allocated_proxy_obj)
133 allocated_proxy_obj = None # Clear the reference
134 # Potentially add a small delay before next allocation attempt
135 time.sleep(random.uniform(0.1, 0.5))
136
137 else:
138 # No proxy available, wait and retry with exponential backoff
139 print(f"[Worker-{worker_id}] No proxy available. Retrying in {retry_delay}s...")
140 time.sleep(retry_delay)
141 retry_delay = min(retry_delay * 2, 30) # Exponential backoff up to 30s
142
143 except Exception as e:
144 print(f"[Worker-{worker_id}] Error: {e}")
145 # Ensure proxy is freed even if an error occurs mid-task
146 if allocated_proxy_obj:
147 print(f"[Worker-{worker_id}] Freeing proxy due to error: {allocated_proxy_obj.key}")
148 proxy_allocator.free(allocated_proxy_obj)
149 allocated_proxy_obj = None
150 time.sleep(5) # Wait after error before retrying
151 # Note: The finally block is removed as freeing is handled within the loop/except block
152
153# --- Running the Example ---
154
155if __name__ == "__main__":
156 print("Starting proxy pool example with automatic updates...")
157 # Note: No initial pool population needed. The updater in the policy handles it.
158 # The first calls to malloc() by workers will likely trigger the updater.
159
160 threads = []
161 for i in range(NUM_WORKERS):
162 thread = threading.Thread(target=worker_thread_logic, args=(i,), name=f"Worker-{i}")
163 threads.append(thread)
164 thread.start()
165
166 # Keep main thread alive to observe workers (or add proper shutdown logic)
167 try:
168 for thread in threads:
169 thread.join() # Wait for all worker threads indefinitely
170 except KeyboardInterrupt:
171 print("\nCtrl+C detected. Exiting example...")
Future Enhancements Shown
Resource Prioritization: The comments indicate where proxy and target priorities would be integrated once the feature is implemented in RedisAllocator.
Soft Binding: Notes show how soft binding could be used to dedicate specific proxies to high-frequency tasks.
Garbage Collection: The example mentions where gc() would typically be called for automatic cleanup.
This example provides a skeleton for building a robust proxy management system on top of RedisAllocator, highlighting how its features map to real-world requirements.