Getting Started
This guide will help you get started with the RedisAllocator library. It covers the basic concepts and provides examples for the main functionality.
Basic Concepts
RedisAllocator is designed for distributed resource management using Redis as the backend. It provides several key components:
RedisLock: Distributed locking mechanism
RedisAllocator: Resource allocation and management
RedisTaskQueue: Distributed task processing
Examples
Using RedisLock for Distributed Locking
RedisLock provides a robust distributed locking mechanism with several important characteristics:
Single Redis Instance Support: Works only with a single Redis instance, not with Redis Cluster
Automatic Expiry: Locks automatically expire after a configured timeout to prevent deadlocks
Active Update Required: Lock holders must actively update locks to maintain ownership
Thread Identification: Locks can be associated with thread/process identifiers
Reentrant Locking: Support for reentrant locks with the rlock method
from redis import Redis
from redis_allocator import RedisLock
import threading
import time
# Initialize Redis client (requires a single Redis instance)
redis = Redis(host='localhost', port=6379, decode_responses=True)
# Create a RedisLock instance
lock = RedisLock(redis, "myapp", "resource-lock")
# Use the current thread ID as the lock identifier
thread_id = str(threading.get_ident())
# Acquire a lock with a 60-second timeout
if lock.lock("resource-123", value=thread_id, timeout=60):
try:
# Perform operations with the locked resource
print("Resource locked successfully")
# For long-running operations, periodically update the lock
# to prevent timeout expiration
for _ in range(5):
time.sleep(10) # Do some work
# Extend the lock's lifetime by updating it
lock.update("resource-123", value=thread_id, timeout=60)
print("Lock updated, timeout extended")
# Example of reentrant locking with rlock (succeeds because same thread_id)
if lock.rlock("resource-123", value=thread_id):
print("Successfully re-locked the resource")
finally:
# Release the lock when done
lock.unlock("resource-123")
print("Resource unlocked")
else:
print("Could not acquire lock - resource is busy")
Understanding Lock Timeouts and Updates
The distributed lock mechanism is designed to prevent deadlocks in case of client failures:
When a lock is acquired, it has a specified timeout (in seconds) after which it’s automatically released
Lock holders must periodically update the lock before the timeout expires to maintain ownership
If a client crashes or becomes unresponsive, the lock will be automatically released after the timeout
Other clients can acquire the lock after the timeout period expires
Thread Identification and Reentrant Locks
When working with locks, proper thread identification is essential:
Use unique identifiers (like thread ID or process ID) as the lock value
The same thread/process can reacquire its own lock using the rlock method
Different threads/processes cannot acquire a lock while it’s held by another
In distributed systems, ensure each node uses a globally unique identifier
Redis Instance Requirements
Important considerations regarding Redis deployment:
RedisLock requires a single Redis instance (standalone or master in master-replica setup)
It is not compatible with Redis Cluster, as cluster mode doesn’t guarantee atomicity
For Redis Cluster environments, consider alternative solutions such as RedLock
Using RedisAllocator for Resource Management
RedisAllocator manages a pool of resources that can be allocated, freed, and garbage collected:
from redis import Redis
from redis_allocator import RedisAllocator
# Initialize Redis client
redis = Redis(host='localhost', port=6379)
# Create a RedisAllocator instance
allocator = RedisAllocator(
redis,
prefix='myapp',
suffix='allocator',
shared=False # Whether resources can be shared
)
# Add resources to the pool
allocator.extend(['resource-1', 'resource-2', 'resource-3'])
# Allocate a resource key (returns only the key)
key = allocator.malloc_key(timeout=120)
if key:
try:
# Use the allocated resource
print(f"Allocated resource: {key}")
finally:
# Free the resource when done
allocator.free_keys(key)
# Allocate a resource with object (returns a RedisAllocatorObject)
allocated_obj = allocator.malloc(timeout=120)
if allocated_obj:
try:
# The key is available as a property
print(f"Allocated resource: {allocated_obj.key}")
# Update the resource's lock timeout
allocated_obj.update(timeout=60)
finally:
# Free the resource when done
allocator.free(allocated_obj)
# Using soft binding (associates a name with a resource)
allocator.update_soft_bind("worker-1", "resource-1")
# Later...
allocator.unbind_soft_bind("worker-1")
# Garbage collection (reclaims unused resources)
allocator.gc(count=10) # Check 10 items for cleanup
Using RedisTaskQueue for Distributed Task Processing
RedisTaskQueue enables distributed task processing across multiple workers:
from redis import Redis
from redis_allocator import RedisTaskQueue, TaskExecutePolicy
import json
# Initialize Redis client
redis = Redis(host='localhost', port=6379)
# Process tasks in a worker
def process_task(task):
# Process the task (task is a RedisTask object)
# You can access task.id, task.name, task.params
# You can update progress with task.update(current, total)
return json.dumps({"result": "processed"})
# Create a task queue
task_queue = RedisTaskQueue(redis, "myapp", task_fn=process_task)
# Submit a task with query method
result = task_queue.query(
id="task-123",
name="example-task",
params={"input": "data"},
timeout=300, # Optional timeout in seconds
policy=TaskExecutePolicy.Auto, # Execution policy
once=False # Whether to delete the result after getting it
)
# Start listening for tasks
task_queue.listen(
names=["example-task"], # List of task names to listen for
workers=128, # Number of worker threads
event=None # Optional event to signal when to stop listening
)
Advanced Usage
For more advanced usage examples and the complete API reference, please refer to the API Reference.