Skip to content

Task Queue

Manage background tasks for data collection, backtests, and other long-running operations.

MethodEndpointAuthDescription
POST/queueYesCreate task
GET/queueYesList tasks
GET/queue/nextYesGet next pending task
GET/queue/{id}YesGet task details
POST/queue/{id}/executeYesExecute task immediately
POST/queue/{id}/cancelYesCancel task
DELETE/queue/{id}YesDelete task
GET/queue/stats/queueYesGet queue statistics

StatusDescription
PENDINGWaiting to be executed
RUNNINGCurrently executing
COMPLETEDSuccessfully finished
FAILEDExecution failed
CANCELLEDManually cancelled

POST /api/v1/queue
Content-Type: application/json
{
"task_type": "FETCH_DATA",
"parameters": {
"symbols": ["AAPL", "MSFT"],
"interval": "1d",
"start_date": "2024-01-01",
"end_date": "2024-12-31"
},
"priority": 50,
"task_group": "daily_collection_2025_12_17",
"max_retries": 3
}
FieldTypeRequiredDefaultDescription
task_typestringYes-Type of task (see below)
parametersobjectYes-Task-specific parameters
priorityintegerNo50Priority (0=highest, 100=lowest)
task_groupstringNo-Logical grouping label
max_retriesintegerNo3Max retry attempts
depends_onUUIDNo-Task dependency
PriorityDescriptionUse Case
0Immediate”Run Now” from UI
1-25HighCritical data collection
26-50NormalRegular scheduled tasks
51-100LowBackground maintenance

Download historical market data.

{
"task_type": "FETCH_DATA",
"parameters": {
"symbols": ["AAPL", "MSFT", "GOOGL"],
"interval": "1d",
"start_date": "2024-01-01",
"end_date": "2024-12-31",
"provider": "massive"
}
}

Run a strategy backtest.

{
"task_type": "BACKTEST_STRATEGY",
"parameters": {
"strategy_id": "550e8400-...",
"symbols": ["AAPL"],
"start_date": "2024-01-01",
"end_date": "2024-12-31",
"initial_capital": 100000
}
}

Compare multiple strategies.

{
"task_type": "COMPARE_STRATEGIES",
"parameters": {
"strategy_ids": ["strategy1", "strategy2", "strategy3"],
"symbols": ["AAPL"],
"start_date": "2024-01-01",
"end_date": "2024-12-31"
}
}

Find correlated stocks.

{
"task_type": "CORRELATION_ANALYSIS",
"parameters": {
"symbols": ["AAPL", "MSFT", "GOOGL", "AMZN", "META"],
"method": "pearson",
"max_lag_days": 30,
"min_correlation": 0.7
}
}

Run a task immediately (“Run Now”).

POST /api/v1/queue/cc0e8400-.../execute
{
"id": "cc0e8400-...",
"status": "RUNNING",
"started_at": "2025-12-17T10:31:00Z"
}

Filter and list tasks.

GET /api/v1/queue?status=PENDING&task_type=FETCH_DATA&task_group=daily_collection
ParameterTypeDescription
statusstringFilter by status
task_typestringFilter by type
task_groupstringFilter by group
limitintegerMax results
offsetintegerPagination offset

GET /api/v1/queue/stats/queue
{
"total": 150,
"by_status": {
"PENDING": 45,
"RUNNING": 3,
"COMPLETED": 98,
"FAILED": 4,
"CANCELLED": 0
}
}

Automatically selects the best data provider.

POST /api/v1/tasks/ingest-data-smart/sync
Content-Type: application/json
{
"symbols": ["AAPL", "MSFT"],
"interval": "1d",
"full_history": false
}

Start background collection for all active tickers.

POST /api/v1/tasks/collect-all-data
Content-Type: application/json
{
"intervals": ["1d"],
"full_history": false,
"provider": "massive"
}

Response:

{
"job_id": "dd0e8400-...",
"status": "RUNNING",
"message": "Data collection started",
"progress": {
"total": 50,
"completed": 0,
"failed": 0
}
}
GET /api/v1/tasks/collect-all-data/dd0e8400-.../status
{
"job_id": "dd0e8400-...",
"status": "RUNNING",
"progress": {
"total": 50,
"completed": 35,
"failed": 2,
"current_symbol": "GOOGL"
}
}

const api = axios.create({
baseURL: 'http://localhost:8501/api/v1'
});
// Create and execute task
async function createAndRunTask(taskType, parameters) {
// Create task with high priority
const { data: task } = await api.post('/queue', {
task_type: taskType,
parameters,
priority: 10, // High priority
max_retries: 3
});
console.log(`Created task: ${task.id}`);
// Execute immediately
await api.post(`/queue/${task.id}/execute`);
// Poll for completion
let status;
do {
await new Promise(r => setTimeout(r, 2000));
const { data } = await api.get(`/queue/${task.id}`);
status = data;
console.log(`Status: ${status.status}, Progress: ${status.progress || 'N/A'}`);
} while (status.status === 'RUNNING');
if (status.status === 'COMPLETED') {
console.log('Task completed successfully');
return status.result;
} else {
throw new Error(status.error_message || 'Task failed');
}
}
// Batch data collection
async function collectData(symbols, interval = '1d') {
return createAndRunTask('FETCH_DATA', {
symbols,
interval,
start_date: '2024-01-01',
end_date: '2024-12-31'
});
}
// Monitor queue
async function monitorQueue() {
const { data: stats } = await api.get('/queue/stats/queue');
console.log('Queue Status:');
console.log(` Pending: ${stats.by_status.PENDING}`);
console.log(` Running: ${stats.by_status.RUNNING}`);
console.log(` Completed: ${stats.by_status.COMPLETED}`);
console.log(` Failed: ${stats.by_status.FAILED}`);
return stats;
}
// Cancel all pending tasks in a group
async function cancelGroup(taskGroup) {
const { data: tasks } = await api.get('/queue', {
params: { task_group: taskGroup, status: 'PENDING' }
});
for (const task of tasks) {
await api.post(`/queue/${task.id}/cancel`);
console.log(`Cancelled: ${task.id}`);
}
}
// Usage
await collectData(['AAPL', 'MSFT', 'GOOGL']);
await monitorQueue();
import requests
import time
from typing import List, Dict, Optional
class TaskQueue:
def __init__(self, base_url: str):
self.base_url = base_url
def create_task(
self,
task_type: str,
parameters: Dict,
priority: int = 50,
task_group: Optional[str] = None
) -> Dict:
"""Create a new task."""
response = requests.post(
f"{self.base_url}/queue",
json={
"task_type": task_type,
"parameters": parameters,
"priority": priority,
"task_group": task_group
}
)
response.raise_for_status()
return response.json()
def execute(self, task_id: str) -> Dict:
"""Execute a task immediately."""
response = requests.post(f"{self.base_url}/queue/{task_id}/execute")
response.raise_for_status()
return response.json()
def get_status(self, task_id: str) -> Dict:
"""Get task status."""
response = requests.get(f"{self.base_url}/queue/{task_id}")
response.raise_for_status()
return response.json()
def wait_for_completion(self, task_id: str, poll_interval: int = 2) -> Dict:
"""Wait for task to complete."""
while True:
status = self.get_status(task_id)
if status["status"] in ["COMPLETED", "FAILED", "CANCELLED"]:
return status
print(f"Status: {status['status']}, Progress: {status.get('progress', 'N/A')}")
time.sleep(poll_interval)
def run_and_wait(
self,
task_type: str,
parameters: Dict,
priority: int = 10
) -> Dict:
"""Create, execute, and wait for task."""
task = self.create_task(task_type, parameters, priority)
print(f"Created task: {task['id']}")
self.execute(task["id"])
print("Executing...")
result = self.wait_for_completion(task["id"])
if result["status"] == "COMPLETED":
print("Task completed successfully")
return result["result"]
else:
raise RuntimeError(result.get("error_message", "Task failed"))
def get_stats(self) -> Dict:
"""Get queue statistics."""
response = requests.get(f"{self.base_url}/queue/stats/queue")
response.raise_for_status()
return response.json()
# Usage
queue = TaskQueue("http://localhost:8501/api/v1")
# Collect data
result = queue.run_and_wait(
"FETCH_DATA",
{
"symbols": ["AAPL", "MSFT", "GOOGL"],
"interval": "1d",
"start_date": "2024-01-01",
"end_date": "2024-12-31"
}
)
# Check queue stats
stats = queue.get_stats()
print(f"\nQueue Status:")
print(f" Pending: {stats['by_status']['PENDING']}")
print(f" Running: {stats['by_status']['RUNNING']}")
print(f" Completed: {stats['by_status']['COMPLETED']}")
print(f" Failed: {stats['by_status']['FAILED']}")

Use Task Groups

Group related tasks for easier management and bulk operations.

Set Priorities

Use priority to ensure critical tasks run first.

Monitor Queue

Regularly check queue stats to identify bottlenecks.

Handle Failures

Check failed tasks and retry or investigate errors.