Task Queue Module

This module provides a task queue system for distributed task processing using Redis as the backend.

RedisTaskQueue

class redis_allocator.task_queue.RedisTaskQueue[source]

Provides a distributed task queue using Redis lists and key/value storage.

Enables submitting tasks (represented by RedisTask objects) to named queues and having them processed either locally (if task_fn is provided) or by remote listeners polling the corresponding Redis list.

Key Concepts: - Task Queues: Redis lists (<prefix>|<suffix>|task-queue|<name>) where task IDs

are pushed for remote workers.

  • Task Data: Serialized RedisTask objects stored in Redis keys

    (<prefix>|<suffix>|task-result:<id>) with an expiry time. This stores parameters, progress, results, and errors.

  • Listeners: Remote workers use BLPOP on queue lists. To signal their presence,

    they periodically set a listener key (<prefix>|<suffix>|task-listen|<name>).

  • Execution Policies (TaskExecutePolicy): Control whether a task is executed

    locally, remotely, or attempts one then the other. Auto mode checks for a listener key.

  • Task Function (task_fn): A user-provided function that takes a RedisTask

    and performs the actual work locally.

redis

StrictRedis client instance.

prefix

Prefix for all Redis keys.

suffix

Suffix for Redis keys (default: ‘task-queue’).

timeout

Default expiry/timeout for tasks and listener keys (seconds).

interval

Polling interval for remote task fetching (seconds).

task_fn

Callable[[RedisTask], Any] to execute tasks locally.

__init__(redis, prefix, suffix='task-queue', timeout=300, interval=5, task_fn=None)[source]

Initialize a RedisTaskQueue instance.

Parameters:
  • redis (Redis) – The Redis client used for interacting with Redis.

  • prefix (str) – The prefix to be added to the task queue key.

  • suffix – The suffix to be added to the task queue key. Default is ‘task-queue’.

  • timeout – The query timeout in seconds. Default is 300s.

  • interval – The query interval in seconds. Default is 5s.

  • task_fn (Callable[[RedisTask], Any] | None) – A function to execute tasks locally. Takes a RedisTask and returns Any.

build_task(id, name, params)[source]

Create a new RedisTask instance with the given parameters.

Parameters:
  • id (str) – Unique identifier for the task

  • name (str) – Name of the task category

  • params (dict) – Dictionary of parameters for the task

Returns:

A new RedisTask instance with a save function configured

Return type:

RedisTask

execute_task_remotely(task, timeout=None, once=False)[source]

Execute a task remotely by pushing it to the queue.

Parameters:
  • task (RedisTask) – The RedisTask to execute

  • timeout (float | None) – Optional timeout in seconds, defaults to self.timeout

  • once (bool) – Whether to delete the result after getting it

Returns:

The result of the task

Raises:
Return type:

Any

execute_task_locally(task, timeout=None)[source]

Execute a task locally using the task_fn.

Parameters:
  • task (RedisTask) – The RedisTask to execute

  • timeout (float | None) – Optional timeout in seconds, updates task.expiry if provided

Returns:

The result of the task

Raises:

Exception – Any exception raised during task execution

Return type:

Any

set_queue_listened(name)[source]

Set the queue as being listened to for the given task name.

Parameters:

name (str) – The task name.

Return type:

None

set_task(task)[source]

Save a task to Redis.

Parameters:

task (RedisTask) – The RedisTask to save.

Returns:

The task ID.

Return type:

str

get_task(task_id, once=False)[source]

Get a task from Redis.

Parameters:
  • task_id (str) – The task ID.

  • once (bool) – Whether to delete the task after getting it.

Returns:

The RedisTask, or None if no task is available.

Return type:

RedisTask | None

has_task(task_id)[source]

Check if a task exists.

Parameters:

task_id (str) – The task ID.

Returns:

True if the task exists, False otherwise.

Return type:

bool

listen(names, workers=128, event=None)[source]

Listen for tasks on the specified queues.

This method continuously polls the specified queues for tasks, and executes tasks locally when they are received.

Parameters:
  • names (List[str]) – The names of the queues to listen to.

  • workers (int) – The number of worker threads to use. Default is 128.

  • event (Event | None) – An event to signal when to stop listening. Default is None.

Return type:

None

query(id, name, params, timeout=None, policy=TaskExecutePolicy.Auto, once=False)[source]

Execute a task according to the specified policy.

This method provides a flexible way to execute tasks with different strategies based on the specified policy.

Parameters:
  • id (str) – The task ID.

  • name (str) – The task name.

  • params (dict) – The task parameters.

  • timeout (float | None) – Optional timeout override.

  • policy (TaskExecutePolicy) – The execution policy to use.

  • once (bool) – Whether to delete the result after getting it.

Returns:

The result of the task.

Raises:

Exception – Any exception raised during task execution.

Return type:

Any

RedisTask

class redis_allocator.task_queue.RedisTask[source]

Represents a task to be processed via the RedisTaskQueue.

Encapsulates task details like ID, category (name), parameters, and its current state (progress, result, error). It includes methods for saving state and updating progress.

id

Unique identifier for this specific task instance.

Type:

str

name

Categorical name for the task (used for routing in the queue).

Type:

str

params

Dictionary containing task-specific input parameters.

Type:

dict

expiry

Absolute Unix timestamp when the task should be considered expired. Used both locally and remotely to timeout waiting operations.

Type:

float

result

Stores the successful return value of the task execution.

Type:

Any

error

Stores any exception raised during task execution.

Type:

Any

update_progress_time

Timestamp of the last progress update.

Type:

float

current_progress

Current progress value (e.g., items processed).

Type:

float

total_progress

Total expected value for completion (e.g., total items).

Type:

float

_save

Internal callback function provided by RedisTaskQueue to persist the task’s state (result, error, progress) back to Redis.

Type:

Callable[[], None]

id: str
name: str
params: dict
expiry: float
result: Any = None
error: Any = None
update_progress_time: float = 0.0
current_progress: float = 0.0
total_progress: float = 100.0
save()[source]

Save the current state of the task to Redis.

update(current_progress, total_progress)[source]

Update the progress of the task.

Parameters:
  • current_progress (float) – The current progress value

  • total_progress (float) – The total progress value for completion

Raises:

TimeoutError – If the task has expired

__init__(id, name, params, expiry, result=None, error=None, update_progress_time=0.0, current_progress=0.0, total_progress=100.0, _save=None)
Parameters:
Return type:

None

TaskExecutePolicy

Task Queue

Redis-based task queue system for distributed task management.

This module provides a RedisTaskQueue class that enables distributed task management and coordination using Redis as the underlying infrastructure.

class redis_allocator.task_queue.TaskExecutePolicy[source]

Bases: Enum

Defines different policies for task execution.

Local

Execute task only locally

Remote

Execute task only remotely

LocalFirst

Try to execute locally first, then remotely if it fails

RemoteFirst

Try to execute remotely first, then locally if it fails

Auto

Choose execution mode based on whether a remote listener exists

Local = 0
Remote = 1
LocalFirst = 2
RemoteFirst = 3
Auto = 4
class redis_allocator.task_queue.RedisTask[source]

Bases: object

Represents a task to be processed via the RedisTaskQueue.

Encapsulates task details like ID, category (name), parameters, and its current state (progress, result, error). It includes methods for saving state and updating progress.

id

Unique identifier for this specific task instance.

Type:

str

name

Categorical name for the task (used for routing in the queue).

Type:

str

params

Dictionary containing task-specific input parameters.

Type:

dict

expiry

Absolute Unix timestamp when the task should be considered expired. Used both locally and remotely to timeout waiting operations.

Type:

float

result

Stores the successful return value of the task execution.

Type:

Any

error

Stores any exception raised during task execution.

Type:

Any

update_progress_time

Timestamp of the last progress update.

Type:

float

current_progress

Current progress value (e.g., items processed).

Type:

float

total_progress

Total expected value for completion (e.g., total items).

Type:

float

_save

Internal callback function provided by RedisTaskQueue to persist the task’s state (result, error, progress) back to Redis.

Type:

Callable[[], None]

id: str
name: str
params: dict
expiry: float
result: Any = None
error: Any = None
update_progress_time: float = 0.0
current_progress: float = 0.0
total_progress: float = 100.0
save()[source]

Save the current state of the task to Redis.

update(current_progress, total_progress)[source]

Update the progress of the task.

Parameters:
  • current_progress (float) – The current progress value

  • total_progress (float) – The total progress value for completion

Raises:

TimeoutError – If the task has expired

__init__(id, name, params, expiry, result=None, error=None, update_progress_time=0.0, current_progress=0.0, total_progress=100.0, _save=None)
Parameters:
Return type:

None

class redis_allocator.task_queue.RedisTaskQueue[source]

Bases: object

Provides a distributed task queue using Redis lists and key/value storage.

Enables submitting tasks (represented by RedisTask objects) to named queues and having them processed either locally (if task_fn is provided) or by remote listeners polling the corresponding Redis list.

Key Concepts: - Task Queues: Redis lists (<prefix>|<suffix>|task-queue|<name>) where task IDs

are pushed for remote workers.

  • Task Data: Serialized RedisTask objects stored in Redis keys

    (<prefix>|<suffix>|task-result:<id>) with an expiry time. This stores parameters, progress, results, and errors.

  • Listeners: Remote workers use BLPOP on queue lists. To signal their presence,

    they periodically set a listener key (<prefix>|<suffix>|task-listen|<name>).

  • Execution Policies (TaskExecutePolicy): Control whether a task is executed

    locally, remotely, or attempts one then the other. Auto mode checks for a listener key.

  • Task Function (task_fn): A user-provided function that takes a RedisTask

    and performs the actual work locally.

redis

StrictRedis client instance.

prefix

Prefix for all Redis keys.

suffix

Suffix for Redis keys (default: ‘task-queue’).

timeout

Default expiry/timeout for tasks and listener keys (seconds).

interval

Polling interval for remote task fetching (seconds).

task_fn

Callable[[RedisTask], Any] to execute tasks locally.

__init__(redis, prefix, suffix='task-queue', timeout=300, interval=5, task_fn=None)[source]

Initialize a RedisTaskQueue instance.

Parameters:
  • redis (Redis) – The Redis client used for interacting with Redis.

  • prefix (str) – The prefix to be added to the task queue key.

  • suffix – The suffix to be added to the task queue key. Default is ‘task-queue’.

  • timeout – The query timeout in seconds. Default is 300s.

  • interval – The query interval in seconds. Default is 5s.

  • task_fn (Callable[[RedisTask], Any] | None) – A function to execute tasks locally. Takes a RedisTask and returns Any.

build_task(id, name, params)[source]

Create a new RedisTask instance with the given parameters.

Parameters:
  • id (str) – Unique identifier for the task

  • name (str) – Name of the task category

  • params (dict) – Dictionary of parameters for the task

Returns:

A new RedisTask instance with a save function configured

Return type:

RedisTask

execute_task_remotely(task, timeout=None, once=False)[source]

Execute a task remotely by pushing it to the queue.

Parameters:
  • task (RedisTask) – The RedisTask to execute

  • timeout (float | None) – Optional timeout in seconds, defaults to self.timeout

  • once (bool) – Whether to delete the result after getting it

Returns:

The result of the task

Raises:
Return type:

Any

execute_task_locally(task, timeout=None)[source]

Execute a task locally using the task_fn.

Parameters:
  • task (RedisTask) – The RedisTask to execute

  • timeout (float | None) – Optional timeout in seconds, updates task.expiry if provided

Returns:

The result of the task

Raises:

Exception – Any exception raised during task execution

Return type:

Any

set_queue_listened(name)[source]

Set the queue as being listened to for the given task name.

Parameters:

name (str) – The task name.

Return type:

None

set_task(task)[source]

Save a task to Redis.

Parameters:

task (RedisTask) – The RedisTask to save.

Returns:

The task ID.

Return type:

str

get_task(task_id, once=False)[source]

Get a task from Redis.

Parameters:
  • task_id (str) – The task ID.

  • once (bool) – Whether to delete the task after getting it.

Returns:

The RedisTask, or None if no task is available.

Return type:

RedisTask | None

has_task(task_id)[source]

Check if a task exists.

Parameters:

task_id (str) – The task ID.

Returns:

True if the task exists, False otherwise.

Return type:

bool

listen(names, workers=128, event=None)[source]

Listen for tasks on the specified queues.

This method continuously polls the specified queues for tasks, and executes tasks locally when they are received.

Parameters:
  • names (List[str]) – The names of the queues to listen to.

  • workers (int) – The number of worker threads to use. Default is 128.

  • event (Event | None) – An event to signal when to stop listening. Default is None.

Return type:

None

query(id, name, params, timeout=None, policy=TaskExecutePolicy.Auto, once=False)[source]

Execute a task according to the specified policy.

This method provides a flexible way to execute tasks with different strategies based on the specified policy.

Parameters:
  • id (str) – The task ID.

  • name (str) – The task name.

  • params (dict) – The task parameters.

  • timeout (float | None) – Optional timeout override.

  • policy (TaskExecutePolicy) – The execution policy to use.

  • once (bool) – Whether to delete the result after getting it.

Returns:

The result of the task.

Raises:

Exception – Any exception raised during task execution.

Return type:

Any

Simplified Task Queue Flow

This diagram shows the interaction between a Client submitting a task, the RedisTaskQueue, Redis itself, and a Listener processing the task.

  1. Submission: The client calls query(). The RedisTaskQueue serializes the task data and stores it in a Redis key (result:<id>) with an expiry. It then pushes the task ID onto the corresponding queue list (queue:<name>).

  2. Listening: A listener calls listen() and performs a blocking pop (BLPOP) on the queue list.

  3. Processing: When a task ID is popped, the listener retrieves the task data from Redis, executes the user-defined task_fn, and updates the task data in Redis with the result or error.

  4. Result Retrieval: The original client can periodically call get_task() to fetch the updated task data and retrieve the result or error.

        sequenceDiagram
    participant Client
    participant TaskQueue
    participant Redis
    participant Listener

    Client->>TaskQueue: query(id, name, params)
    note right of Redis: Stores task state: SETEX result:<id> pickled_task
    TaskQueue->>Redis: SETEX result:<id> pickled_task
    note right of Redis: Adds ID to queue: RPUSH queue:<name> <id>
    TaskQueue->>Redis: RPUSH queue:<name> <id>
    Redis-->>TaskQueue: OK
    TaskQueue-->>Client: (Waits or returns if local)

    Listener->>TaskQueue: listen([name])
    loop Poll Queue
        note over TaskQueue,Redis: BLPOP queue:<name> timeout
        TaskQueue->>Redis: BLPOP queue:<name> timeout
        alt Task Available
            Redis-->>TaskQueue: [queue:<name>, <id>]
            note over TaskQueue,Redis: GET result:<id>
            TaskQueue->>Redis: GET result:<id>
            Redis-->>TaskQueue: pickled_task
            TaskQueue->>Listener: Execute task_fn(task)
            Listener-->>TaskQueue: result/error
            note right of Redis: Updates task state: SETEX result:<id> updated_pickled_task
            TaskQueue->>Redis: SETEX result:<id> updated_pickled_task
        else Timeout
            Redis-->>TaskQueue: nil
        end
    end

    Client->>TaskQueue: get_task(id) (Periodically or when notified)
    note over TaskQueue,Redis: GET result:<id>
    TaskQueue->>Redis: GET result:<id>
    Redis-->>TaskQueue: updated_pickled_task
    TaskQueue-->>Client: Task result/error