Example: Managing a Proxy Pool with Automatic Updates

This example demonstrates how RedisAllocator can be used for managing a dynamic proxy pool, featuring automatic pool updates using the RedisAllocatorUpdater integrated with DefaultRedisAllocatorPolicy.

Scenario

We need a system where: 1. Proxies are fetched periodically from different sources (e.g., APIs, databases). 2. These fetched proxies automatically update the central Redis pool. 3. Multiple worker threads allocate exclusive proxies from this pool. 4. Workers use the proxies for tasks. 5. Proxies are automatically returned/recovered via free() and gc(). 6. (Future) Priority and Soft Binding features enhance allocation.

Core RedisAllocator Concepts Used

  • RedisAllocator(policy=…): Using a policy to control behavior.

  • RedisAllocatorUpdater: Base class for defining how to fetch resources.

  • DefaultRedisAllocatorPolicy(updater=…): Policy that uses an updater to refresh the pool periodically.

  • malloc(): Workers acquire proxies (triggering policy checks).

  • free(): Workers return proxies.

  • gc(): Background process for recovery.

  • Resource Prioritization & Soft Binding: (Mentioned as TODO / Future).

Implementation Sketch

  1import redis
  2import threading
  3import time
  4import random
  5from typing import Sequence, Any
  6from redis_allocator import RedisAllocator
  7# Import base class and default policy
  8from redis_allocator.allocator import RedisAllocatorUpdater, DefaultRedisAllocatorPolicy
  9
 10# --- Configuration ---
 11REDIS_HOST = 'localhost'
 12REDIS_PORT = 6379
 13PROXY_POOL_PREFIX = 'myapp'
 14PROXY_POOL_SUFFIX = 'proxies'
 15NUM_WORKERS = 5
 16ALLOCATION_TIMEOUT = 120 # Seconds a proxy can be held
 17
 18# --- Custom Updater Implementation ---
 19
 20class ProxySourceUpdater(RedisAllocatorUpdater):
 21    """Fetches proxy lists from different predefined sources."""
 22    def fetch(self, source_id: str) -> Sequence[str]:
 23        """Simulates fetching proxies from a specific source URL/ID."""
 24        print(f"[{threading.current_thread().name or 'Updater'}] Fetching proxies from source: {source_id}...")
 25        # Replace with actual fetching logic (e.g., API call, DB query)
 26        time.sleep(0.5) # Simulate fetch time
 27        if source_id == 'source_A_api':
 28            # Simulate getting a list of proxy strings
 29            new_proxies = [f'proxy_A_{i}:8000' for i in range(random.randint(1,3))]
 30        elif source_id == 'source_B_db':
 31            new_proxies = [f'proxy_B_{i}:9000' for i in range(random.randint(0,2))]
 32        else:
 33            new_proxies = [] # Simulate source being unavailable
 34        print(f"[{threading.current_thread().name or 'Updater'}] Fetched {len(new_proxies)} proxies from {source_id}: {new_proxies}")
 35        # TODO: Return priority info when allocator supports it: [('proxy_A_0:8000', 10), ...]
 36        return new_proxies
 37
 38# --- Setup ---
 39redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)
 40
 41# Define the sources the updater will cycle through
 42PROXY_SOURCES = ['source_A_api', 'source_B_db']
 43# How often the policy should *try* to run the updater (in seconds)
 44# The updater only runs if the update_lock is acquired during a malloc call.
 45UPDATE_INTERVAL = 60
 46# Optional: Default expiry for items added by the updater (-1 = no expiry)
 47UPDATER_ITEM_EXPIRY = 3600
 48
 49# 1. Instantiate the custom Updater
 50proxy_updater = ProxySourceUpdater(params=PROXY_SOURCES)
 51
 52# 2. Instantiate the Policy, providing the updater
 53default_policy = DefaultRedisAllocatorPolicy(
 54    updater=proxy_updater,
 55    update_interval=UPDATE_INTERVAL,
 56    expiry_duration=UPDATER_ITEM_EXPIRY,
 57    gc_count=5 # How many items GC checks per malloc call
 58)
 59
 60# 3. Instantiate the Allocator using the configured Policy
 61proxy_allocator = RedisAllocator(
 62    redis_client,
 63    prefix=PROXY_POOL_PREFIX,
 64    suffix=PROXY_POOL_SUFFIX,
 65    shared=False, # Proxies are used exclusively
 66    policy=default_policy # IMPORTANT: Use the policy
 67)
 68
 69# --- Proxy Usage Simulation (Remains mostly the same) ---
 70
 71def get_proxy_config_url(proxy_key):
 72    """Simulates fetching configuration for a given proxy key."""
 73    # In reality, this might query a database or another Redis key
 74    print(f"[{threading.current_thread().name}] Fetched config for {proxy_key}")
 75    return f"http://config.server/{proxy_key}"
 76
 77def start_local_proxy(config_url):
 78    """Simulates configuring and starting a local proxy process/connection."""
 79    print(f"[{threading.current_thread().name}] Starting proxy using {config_url}")
 80    time.sleep(0.1) # Simulate startup time
 81    return True # Simulate success
 82
 83# --- Scrape Target Simulation (Remains the same) ---
 84
 85def fetch_scrape_target():
 86    """Simulates fetching the next high-priority item to scrape."""
 87    # TODO: Target fetching logic is application-specific.
 88    #       Could use another Redis list, queue, or even another Allocator
 89    #       if targets themselves are managed resources. Priority needed here too.
 90    targets = ["item_A", "item_B", "item_C", "item_D", "item_E"]
 91    target = random.choice(targets)
 92    print(f"[{threading.current_thread().name}] Fetched target: {target}")
 93    return target
 94
 95# --- Worker Thread Logic (Remains mostly the same) ---
 96
 97def worker_thread_logic(worker_id):
 98    """Main logic for each scraper worker thread."""
 99    print(f"[Worker-{worker_id}] Started.")
100    allocated_proxy_obj = None
101    retry_delay = 1
102    while True: # Keep trying to allocate and work
103        try:
104            # 1. Allocate a Proxy (This now triggers Policy checks -> GC and Updater)
105            print(f"[Worker-{worker_id}] Attempting to allocate proxy...")
106            allocated_proxy_obj = proxy_allocator.malloc(timeout=ALLOCATION_TIMEOUT)
107
108            if allocated_proxy_obj:
109                retry_delay = 1 # Reset delay on success
110                proxy_key = allocated_proxy_obj.key
111                print(f"[Worker-{worker_id}] Allocated proxy: {proxy_key}")
112
113                # 2. Simulate using the proxy
114                config_url = get_proxy_config_url(proxy_key)
115                if start_local_proxy(config_url):
116                    print(f"[Worker-{worker_id}] Proxy started successfully.")
117
118                    # 3. Fetch target and simulate work
119                    target = fetch_scrape_target()
120                    if target:
121                        print(f"[Worker-{worker_id}] Simulating scrape of {target} via {proxy_key}...")
122                        time.sleep(random.uniform(0.5, 2.0)) # Simulate work duration
123                        print(f"[Worker-{worker_id}] Finished scrape of {target}.")
124
125                else:
126                    print(f"[Worker-{worker_id}] Failed to start proxy {proxy_key}.")
127                    # Optionally, report this proxy as bad
128
129                # IMPORTANT: Free the proxy *only after* you are completely done with it for this task
130                # Move the free call inside the 'if allocated_proxy_obj:' block before the loop continues or breaks
131                print(f"[Worker-{worker_id}] Freeing proxy: {allocated_proxy_obj.key}")
132                proxy_allocator.free(allocated_proxy_obj)
133                allocated_proxy_obj = None # Clear the reference
134                # Potentially add a small delay before next allocation attempt
135                time.sleep(random.uniform(0.1, 0.5))
136
137            else:
138                # No proxy available, wait and retry with exponential backoff
139                print(f"[Worker-{worker_id}] No proxy available. Retrying in {retry_delay}s...")
140                time.sleep(retry_delay)
141                retry_delay = min(retry_delay * 2, 30) # Exponential backoff up to 30s
142
143        except Exception as e:
144            print(f"[Worker-{worker_id}] Error: {e}")
145            # Ensure proxy is freed even if an error occurs mid-task
146            if allocated_proxy_obj:
147                print(f"[Worker-{worker_id}] Freeing proxy due to error: {allocated_proxy_obj.key}")
148                proxy_allocator.free(allocated_proxy_obj)
149                allocated_proxy_obj = None
150            time.sleep(5) # Wait after error before retrying
151        # Note: The finally block is removed as freeing is handled within the loop/except block
152
153# --- Running the Example ---
154
155if __name__ == "__main__":
156    print("Starting proxy pool example with automatic updates...")
157    # Note: No initial pool population needed. The updater in the policy handles it.
158    # The first calls to malloc() by workers will likely trigger the updater.
159
160    threads = []
161    for i in range(NUM_WORKERS):
162        thread = threading.Thread(target=worker_thread_logic, args=(i,), name=f"Worker-{i}")
163        threads.append(thread)
164        thread.start()
165
166    # Keep main thread alive to observe workers (or add proper shutdown logic)
167    try:
168        for thread in threads:
169            thread.join() # Wait for all worker threads indefinitely
170    except KeyboardInterrupt:
171        print("\nCtrl+C detected. Exiting example...")

Future Enhancements Shown

  • Resource Prioritization: The comments indicate where proxy and target priorities would be integrated once the feature is implemented in RedisAllocator.

  • Soft Binding: Notes show how soft binding could be used to dedicate specific proxies to high-frequency tasks.

  • Garbage Collection: The example mentions where gc() would typically be called for automatic cleanup.

This example provides a skeleton for building a robust proxy management system on top of RedisAllocator, highlighting how its features map to real-world requirements.