Getting Started

This guide will help you get started with the RedisAllocator library. It covers the basic concepts and provides examples for the main functionality.

Basic Concepts

RedisAllocator is designed for distributed resource management using Redis as the backend. It provides several key components:

  • RedisLock: Distributed locking mechanism

  • RedisAllocator: Resource allocation and management

  • RedisTaskQueue: Distributed task processing

Examples

Using RedisLock for Distributed Locking

RedisLock provides a robust distributed locking mechanism with several important characteristics:

  • Single Redis Instance Support: Works only with a single Redis instance, not with Redis Cluster

  • Automatic Expiry: Locks automatically expire after a configured timeout to prevent deadlocks

  • Active Update Required: Lock holders must actively update locks to maintain ownership

  • Thread Identification: Locks can be associated with thread/process identifiers

  • Reentrant Locking: Support for reentrant locks with the rlock method

from redis import Redis
from redis_allocator import RedisLock
import threading
import time

# Initialize Redis client (requires a single Redis instance)
redis = Redis(host='localhost', port=6379, decode_responses=True)

# Create a RedisLock instance
lock = RedisLock(redis, "myapp", "resource-lock")

# Use the current thread ID as the lock identifier
thread_id = str(threading.get_ident())

# Acquire a lock with a 60-second timeout
if lock.lock("resource-123", value=thread_id, timeout=60):
    try:
        # Perform operations with the locked resource
        print("Resource locked successfully")

        # For long-running operations, periodically update the lock
        # to prevent timeout expiration
        for _ in range(5):
            time.sleep(10)  # Do some work

            # Extend the lock's lifetime by updating it
            lock.update("resource-123", value=thread_id, timeout=60)
            print("Lock updated, timeout extended")

        # Example of reentrant locking with rlock (succeeds because same thread_id)
        if lock.rlock("resource-123", value=thread_id):
            print("Successfully re-locked the resource")
    finally:
        # Release the lock when done
        lock.unlock("resource-123")
        print("Resource unlocked")
else:
    print("Could not acquire lock - resource is busy")

Understanding Lock Timeouts and Updates

The distributed lock mechanism is designed to prevent deadlocks in case of client failures:

  1. When a lock is acquired, it has a specified timeout (in seconds) after which it’s automatically released

  2. Lock holders must periodically update the lock before the timeout expires to maintain ownership

  3. If a client crashes or becomes unresponsive, the lock will be automatically released after the timeout

  4. Other clients can acquire the lock after the timeout period expires

Thread Identification and Reentrant Locks

When working with locks, proper thread identification is essential:

  1. Use unique identifiers (like thread ID or process ID) as the lock value

  2. The same thread/process can reacquire its own lock using the rlock method

  3. Different threads/processes cannot acquire a lock while it’s held by another

  4. In distributed systems, ensure each node uses a globally unique identifier

Redis Instance Requirements

Important considerations regarding Redis deployment:

  1. RedisLock requires a single Redis instance (standalone or master in master-replica setup)

  2. It is not compatible with Redis Cluster, as cluster mode doesn’t guarantee atomicity

  3. For Redis Cluster environments, consider alternative solutions such as RedLock

Using RedisAllocator for Resource Management

RedisAllocator manages a pool of resources that can be allocated, freed, and garbage collected:

from redis import Redis
from redis_allocator import RedisAllocator

# Initialize Redis client
redis = Redis(host='localhost', port=6379)

# Create a RedisAllocator instance
allocator = RedisAllocator(
    redis,
    prefix='myapp',
    suffix='allocator',
    shared=False  # Whether resources can be shared
)

# Add resources to the pool
allocator.extend(['resource-1', 'resource-2', 'resource-3'])

# Allocate a resource key (returns only the key)
key = allocator.malloc_key(timeout=120)
if key:
    try:
        # Use the allocated resource
        print(f"Allocated resource: {key}")
    finally:
        # Free the resource when done
        allocator.free_keys(key)

# Allocate a resource with object (returns a RedisAllocatorObject)
allocated_obj = allocator.malloc(timeout=120)
if allocated_obj:
    try:
        # The key is available as a property
        print(f"Allocated resource: {allocated_obj.key}")

        # Update the resource's lock timeout
        allocated_obj.update(timeout=60)
    finally:
        # Free the resource when done
        allocator.free(allocated_obj)

# Using soft binding (associates a name with a resource)
allocator.update_soft_bind("worker-1", "resource-1")
# Later...
allocator.unbind_soft_bind("worker-1")

# Garbage collection (reclaims unused resources)
allocator.gc(count=10)  # Check 10 items for cleanup

Using RedisTaskQueue for Distributed Task Processing

RedisTaskQueue enables distributed task processing across multiple workers:

from redis import Redis
from redis_allocator import RedisTaskQueue, TaskExecutePolicy
import json

# Initialize Redis client
redis = Redis(host='localhost', port=6379)

# Process tasks in a worker
def process_task(task):
    # Process the task (task is a RedisTask object)
    # You can access task.id, task.name, task.params
    # You can update progress with task.update(current, total)
    return json.dumps({"result": "processed"})

# Create a task queue
task_queue = RedisTaskQueue(redis, "myapp", task_fn=process_task)

# Submit a task with query method
result = task_queue.query(
    id="task-123",
    name="example-task",
    params={"input": "data"},
    timeout=300,  # Optional timeout in seconds
    policy=TaskExecutePolicy.Auto,  # Execution policy
    once=False  # Whether to delete the result after getting it
)

# Start listening for tasks
task_queue.listen(
    names=["example-task"],  # List of task names to listen for
    workers=128,  # Number of worker threads
    event=None  # Optional event to signal when to stop listening
)

Advanced Usage

For more advanced usage examples and the complete API reference, please refer to the API Reference.