Task Queue Module
This module provides a task queue system for distributed task processing using Redis as the backend.
RedisTaskQueue
- class redis_allocator.task_queue.RedisTaskQueue[source]
Provides a distributed task queue using Redis lists and key/value storage.
Enables submitting tasks (represented by RedisTask objects) to named queues and having them processed either locally (if task_fn is provided) or by remote listeners polling the corresponding Redis list.
Key Concepts: - Task Queues: Redis lists (<prefix>|<suffix>|task-queue|<name>) where task IDs
are pushed for remote workers.
- Task Data: Serialized RedisTask objects stored in Redis keys
(<prefix>|<suffix>|task-result:<id>) with an expiry time. This stores parameters, progress, results, and errors.
- Listeners: Remote workers use BLPOP on queue lists. To signal their presence,
they periodically set a listener key (<prefix>|<suffix>|task-listen|<name>).
- Execution Policies (TaskExecutePolicy): Control whether a task is executed
locally, remotely, or attempts one then the other. Auto mode checks for a listener key.
- Task Function (task_fn): A user-provided function that takes a RedisTask
and performs the actual work locally.
- redis
StrictRedis client instance.
- prefix
Prefix for all Redis keys.
- suffix
Suffix for Redis keys (default: ‘task-queue’).
- timeout
Default expiry/timeout for tasks and listener keys (seconds).
- interval
Polling interval for remote task fetching (seconds).
- task_fn
Callable[[RedisTask], Any] to execute tasks locally.
- __init__(redis, prefix, suffix='task-queue', timeout=300, interval=5, task_fn=None)[source]
Initialize a RedisTaskQueue instance.
- Parameters:
redis (Redis) – The Redis client used for interacting with Redis.
prefix (str) – The prefix to be added to the task queue key.
suffix – The suffix to be added to the task queue key. Default is ‘task-queue’.
timeout – The query timeout in seconds. Default is 300s.
interval – The query interval in seconds. Default is 5s.
task_fn (Callable[[RedisTask], Any] | None) – A function to execute tasks locally. Takes a RedisTask and returns Any.
- execute_task_remotely(task, timeout=None, once=False)[source]
Execute a task remotely by pushing it to the queue.
- Parameters:
- Returns:
The result of the task
- Raises:
TimeoutError – If the task times out
Exception – Any exception raised during task execution
- Return type:
- set_queue_listened(name)[source]
Set the queue as being listened to for the given task name.
- Parameters:
name (str) – The task name.
- Return type:
None
- listen(names, workers=128, event=None)[source]
Listen for tasks on the specified queues.
This method continuously polls the specified queues for tasks, and executes tasks locally when they are received.
- query(id, name, params, timeout=None, policy=TaskExecutePolicy.Auto, once=False)[source]
Execute a task according to the specified policy.
This method provides a flexible way to execute tasks with different strategies based on the specified policy.
- Parameters:
- Returns:
The result of the task.
- Raises:
Exception – Any exception raised during task execution.
- Return type:
RedisTask
- class redis_allocator.task_queue.RedisTask[source]
Represents a task to be processed via the RedisTaskQueue.
Encapsulates task details like ID, category (name), parameters, and its current state (progress, result, error). It includes methods for saving state and updating progress.
- expiry
Absolute Unix timestamp when the task should be considered expired. Used both locally and remotely to timeout waiting operations.
- Type:
- result
Stores the successful return value of the task execution.
- Type:
Any
- error
Stores any exception raised during task execution.
- Type:
Any
- _save
Internal callback function provided by RedisTaskQueue to persist the task’s state (result, error, progress) back to Redis.
- Type:
Callable[[], None]
- update(current_progress, total_progress)[source]
Update the progress of the task.
- Parameters:
- Raises:
TimeoutError – If the task has expired
- __init__(id, name, params, expiry, result=None, error=None, update_progress_time=0.0, current_progress=0.0, total_progress=100.0, _save=None)
TaskExecutePolicy
Task Queue
Redis-based task queue system for distributed task management.
This module provides a RedisTaskQueue class that enables distributed task management and coordination using Redis as the underlying infrastructure.
- class redis_allocator.task_queue.TaskExecutePolicy[source]
Bases:
Enum
Defines different policies for task execution.
- Local
Execute task only locally
- Remote
Execute task only remotely
- LocalFirst
Try to execute locally first, then remotely if it fails
- RemoteFirst
Try to execute remotely first, then locally if it fails
- Auto
Choose execution mode based on whether a remote listener exists
- Local = 0
- Remote = 1
- LocalFirst = 2
- RemoteFirst = 3
- Auto = 4
- class redis_allocator.task_queue.RedisTask[source]
Bases:
object
Represents a task to be processed via the RedisTaskQueue.
Encapsulates task details like ID, category (name), parameters, and its current state (progress, result, error). It includes methods for saving state and updating progress.
- expiry
Absolute Unix timestamp when the task should be considered expired. Used both locally and remotely to timeout waiting operations.
- Type:
- result
Stores the successful return value of the task execution.
- Type:
Any
- error
Stores any exception raised during task execution.
- Type:
Any
- _save
Internal callback function provided by RedisTaskQueue to persist the task’s state (result, error, progress) back to Redis.
- Type:
Callable[[], None]
- update(current_progress, total_progress)[source]
Update the progress of the task.
- Parameters:
- Raises:
TimeoutError – If the task has expired
- __init__(id, name, params, expiry, result=None, error=None, update_progress_time=0.0, current_progress=0.0, total_progress=100.0, _save=None)
- class redis_allocator.task_queue.RedisTaskQueue[source]
Bases:
object
Provides a distributed task queue using Redis lists and key/value storage.
Enables submitting tasks (represented by RedisTask objects) to named queues and having them processed either locally (if task_fn is provided) or by remote listeners polling the corresponding Redis list.
Key Concepts: - Task Queues: Redis lists (<prefix>|<suffix>|task-queue|<name>) where task IDs
are pushed for remote workers.
- Task Data: Serialized RedisTask objects stored in Redis keys
(<prefix>|<suffix>|task-result:<id>) with an expiry time. This stores parameters, progress, results, and errors.
- Listeners: Remote workers use BLPOP on queue lists. To signal their presence,
they periodically set a listener key (<prefix>|<suffix>|task-listen|<name>).
- Execution Policies (TaskExecutePolicy): Control whether a task is executed
locally, remotely, or attempts one then the other. Auto mode checks for a listener key.
- Task Function (task_fn): A user-provided function that takes a RedisTask
and performs the actual work locally.
- redis
StrictRedis client instance.
- prefix
Prefix for all Redis keys.
- suffix
Suffix for Redis keys (default: ‘task-queue’).
- timeout
Default expiry/timeout for tasks and listener keys (seconds).
- interval
Polling interval for remote task fetching (seconds).
- task_fn
Callable[[RedisTask], Any] to execute tasks locally.
- __init__(redis, prefix, suffix='task-queue', timeout=300, interval=5, task_fn=None)[source]
Initialize a RedisTaskQueue instance.
- Parameters:
redis (Redis) – The Redis client used for interacting with Redis.
prefix (str) – The prefix to be added to the task queue key.
suffix – The suffix to be added to the task queue key. Default is ‘task-queue’.
timeout – The query timeout in seconds. Default is 300s.
interval – The query interval in seconds. Default is 5s.
task_fn (Callable[[RedisTask], Any] | None) – A function to execute tasks locally. Takes a RedisTask and returns Any.
- execute_task_remotely(task, timeout=None, once=False)[source]
Execute a task remotely by pushing it to the queue.
- Parameters:
- Returns:
The result of the task
- Raises:
TimeoutError – If the task times out
Exception – Any exception raised during task execution
- Return type:
- set_queue_listened(name)[source]
Set the queue as being listened to for the given task name.
- Parameters:
name (str) – The task name.
- Return type:
None
- listen(names, workers=128, event=None)[source]
Listen for tasks on the specified queues.
This method continuously polls the specified queues for tasks, and executes tasks locally when they are received.
- query(id, name, params, timeout=None, policy=TaskExecutePolicy.Auto, once=False)[source]
Execute a task according to the specified policy.
This method provides a flexible way to execute tasks with different strategies based on the specified policy.
- Parameters:
- Returns:
The result of the task.
- Raises:
Exception – Any exception raised during task execution.
- Return type:
Simplified Task Queue Flow
This diagram shows the interaction between a Client submitting a task, the RedisTaskQueue, Redis itself, and a Listener processing the task.
Submission: The client calls query(). The RedisTaskQueue serializes the task data and stores it in a Redis key (result:<id>) with an expiry. It then pushes the task ID onto the corresponding queue list (queue:<name>).
Listening: A listener calls listen() and performs a blocking pop (BLPOP) on the queue list.
Processing: When a task ID is popped, the listener retrieves the task data from Redis, executes the user-defined task_fn, and updates the task data in Redis with the result or error.
Result Retrieval: The original client can periodically call get_task() to fetch the updated task data and retrieve the result or error.
sequenceDiagram participant Client participant TaskQueue participant Redis participant Listener Client->>TaskQueue: query(id, name, params) note right of Redis: Stores task state: SETEX result:<id> pickled_task TaskQueue->>Redis: SETEX result:<id> pickled_task note right of Redis: Adds ID to queue: RPUSH queue:<name> <id> TaskQueue->>Redis: RPUSH queue:<name> <id> Redis-->>TaskQueue: OK TaskQueue-->>Client: (Waits or returns if local) Listener->>TaskQueue: listen([name]) loop Poll Queue note over TaskQueue,Redis: BLPOP queue:<name> timeout TaskQueue->>Redis: BLPOP queue:<name> timeout alt Task Available Redis-->>TaskQueue: [queue:<name>, <id>] note over TaskQueue,Redis: GET result:<id> TaskQueue->>Redis: GET result:<id> Redis-->>TaskQueue: pickled_task TaskQueue->>Listener: Execute task_fn(task) Listener-->>TaskQueue: result/error note right of Redis: Updates task state: SETEX result:<id> updated_pickled_task TaskQueue->>Redis: SETEX result:<id> updated_pickled_task else Timeout Redis-->>TaskQueue: nil end end Client->>TaskQueue: get_task(id) (Periodically or when notified) note over TaskQueue,Redis: GET result:<id> TaskQueue->>Redis: GET result:<id> Redis-->>TaskQueue: updated_pickled_task TaskQueue-->>Client: Task result/error