Use Task Groups
Group related tasks for easier management and bulk operations.
Manage background tasks for data collection, backtests, and other long-running operations.
| Method | Endpoint | Auth | Description |
|---|---|---|---|
POST | /queue | Yes | Create task |
GET | /queue | Yes | List tasks |
GET | /queue/next | Yes | Get next pending task |
GET | /queue/{id} | Yes | Get task details |
POST | /queue/{id}/execute | Yes | Execute task immediately |
POST | /queue/{id}/cancel | Yes | Cancel task |
DELETE | /queue/{id} | Yes | Delete task |
GET | /queue/stats/queue | Yes | Get queue statistics |
| Status | Description |
|---|---|
| PENDING | Waiting to be executed |
| RUNNING | Currently executing |
| COMPLETED | Successfully finished |
| FAILED | Execution failed |
| CANCELLED | Manually cancelled |
POST /api/v1/queueContent-Type: application/json
{ "task_type": "FETCH_DATA", "parameters": { "symbols": ["AAPL", "MSFT"], "interval": "1d", "start_date": "2024-01-01", "end_date": "2024-12-31" }, "priority": 50, "task_group": "daily_collection_2025_12_17", "max_retries": 3}{ "id": "cc0e8400-e29b-41d4-a716-446655440000", "task_type": "FETCH_DATA", "task_group": "daily_collection_2025_12_17", "priority": 50, "status": "PENDING", "parameters": { "symbols": ["AAPL", "MSFT"], "interval": "1d", "start_date": "2024-01-01", "end_date": "2024-12-31" }, "progress": null, "result": null, "error_message": null, "retry_count": 0, "max_retries": 3, "created_at": "2025-12-17T10:30:00Z", "started_at": null, "completed_at": null}| Field | Type | Required | Default | Description |
|---|---|---|---|---|
task_type | string | Yes | - | Type of task (see below) |
parameters | object | Yes | - | Task-specific parameters |
priority | integer | No | 50 | Priority (0=highest, 100=lowest) |
task_group | string | No | - | Logical grouping label |
max_retries | integer | No | 3 | Max retry attempts |
depends_on | UUID | No | - | Task dependency |
| Priority | Description | Use Case |
|---|---|---|
| 0 | Immediate | ”Run Now” from UI |
| 1-25 | High | Critical data collection |
| 26-50 | Normal | Regular scheduled tasks |
| 51-100 | Low | Background maintenance |
Download historical market data.
{ "task_type": "FETCH_DATA", "parameters": { "symbols": ["AAPL", "MSFT", "GOOGL"], "interval": "1d", "start_date": "2024-01-01", "end_date": "2024-12-31", "provider": "massive" }}Run a strategy backtest.
{ "task_type": "BACKTEST_STRATEGY", "parameters": { "strategy_id": "550e8400-...", "symbols": ["AAPL"], "start_date": "2024-01-01", "end_date": "2024-12-31", "initial_capital": 100000 }}Compare multiple strategies.
{ "task_type": "COMPARE_STRATEGIES", "parameters": { "strategy_ids": ["strategy1", "strategy2", "strategy3"], "symbols": ["AAPL"], "start_date": "2024-01-01", "end_date": "2024-12-31" }}Find correlated stocks.
{ "task_type": "CORRELATION_ANALYSIS", "parameters": { "symbols": ["AAPL", "MSFT", "GOOGL", "AMZN", "META"], "method": "pearson", "max_lag_days": 30, "min_correlation": 0.7 }}Run a task immediately (“Run Now”).
POST /api/v1/queue/cc0e8400-.../execute{ "id": "cc0e8400-...", "status": "RUNNING", "started_at": "2025-12-17T10:31:00Z"}Filter and list tasks.
GET /api/v1/queue?status=PENDING&task_type=FETCH_DATA&task_group=daily_collection| Parameter | Type | Description |
|---|---|---|
status | string | Filter by status |
task_type | string | Filter by type |
task_group | string | Filter by group |
limit | integer | Max results |
offset | integer | Pagination offset |
GET /api/v1/queue/stats/queue{ "total": 150, "by_status": { "PENDING": 45, "RUNNING": 3, "COMPLETED": 98, "FAILED": 4, "CANCELLED": 0 }}Automatically selects the best data provider.
POST /api/v1/tasks/ingest-data-smart/syncContent-Type: application/json
{ "symbols": ["AAPL", "MSFT"], "interval": "1d", "full_history": false}Start background collection for all active tickers.
POST /api/v1/tasks/collect-all-dataContent-Type: application/json
{ "intervals": ["1d"], "full_history": false, "provider": "massive"}Response:
{ "job_id": "dd0e8400-...", "status": "RUNNING", "message": "Data collection started", "progress": { "total": 50, "completed": 0, "failed": 0 }}GET /api/v1/tasks/collect-all-data/dd0e8400-.../status{ "job_id": "dd0e8400-...", "status": "RUNNING", "progress": { "total": 50, "completed": 35, "failed": 2, "current_symbol": "GOOGL" }}const api = axios.create({ baseURL: 'http://localhost:8501/api/v1'});
// Create and execute taskasync function createAndRunTask(taskType, parameters) { // Create task with high priority const { data: task } = await api.post('/queue', { task_type: taskType, parameters, priority: 10, // High priority max_retries: 3 });
console.log(`Created task: ${task.id}`);
// Execute immediately await api.post(`/queue/${task.id}/execute`);
// Poll for completion let status; do { await new Promise(r => setTimeout(r, 2000)); const { data } = await api.get(`/queue/${task.id}`); status = data; console.log(`Status: ${status.status}, Progress: ${status.progress || 'N/A'}`); } while (status.status === 'RUNNING');
if (status.status === 'COMPLETED') { console.log('Task completed successfully'); return status.result; } else { throw new Error(status.error_message || 'Task failed'); }}
// Batch data collectionasync function collectData(symbols, interval = '1d') { return createAndRunTask('FETCH_DATA', { symbols, interval, start_date: '2024-01-01', end_date: '2024-12-31' });}
// Monitor queueasync function monitorQueue() { const { data: stats } = await api.get('/queue/stats/queue');
console.log('Queue Status:'); console.log(` Pending: ${stats.by_status.PENDING}`); console.log(` Running: ${stats.by_status.RUNNING}`); console.log(` Completed: ${stats.by_status.COMPLETED}`); console.log(` Failed: ${stats.by_status.FAILED}`);
return stats;}
// Cancel all pending tasks in a groupasync function cancelGroup(taskGroup) { const { data: tasks } = await api.get('/queue', { params: { task_group: taskGroup, status: 'PENDING' } });
for (const task of tasks) { await api.post(`/queue/${task.id}/cancel`); console.log(`Cancelled: ${task.id}`); }}
// Usageawait collectData(['AAPL', 'MSFT', 'GOOGL']);await monitorQueue();import requestsimport timefrom typing import List, Dict, Optional
class TaskQueue: def __init__(self, base_url: str): self.base_url = base_url
def create_task( self, task_type: str, parameters: Dict, priority: int = 50, task_group: Optional[str] = None ) -> Dict: """Create a new task.""" response = requests.post( f"{self.base_url}/queue", json={ "task_type": task_type, "parameters": parameters, "priority": priority, "task_group": task_group } ) response.raise_for_status() return response.json()
def execute(self, task_id: str) -> Dict: """Execute a task immediately.""" response = requests.post(f"{self.base_url}/queue/{task_id}/execute") response.raise_for_status() return response.json()
def get_status(self, task_id: str) -> Dict: """Get task status.""" response = requests.get(f"{self.base_url}/queue/{task_id}") response.raise_for_status() return response.json()
def wait_for_completion(self, task_id: str, poll_interval: int = 2) -> Dict: """Wait for task to complete.""" while True: status = self.get_status(task_id)
if status["status"] in ["COMPLETED", "FAILED", "CANCELLED"]: return status
print(f"Status: {status['status']}, Progress: {status.get('progress', 'N/A')}") time.sleep(poll_interval)
def run_and_wait( self, task_type: str, parameters: Dict, priority: int = 10 ) -> Dict: """Create, execute, and wait for task.""" task = self.create_task(task_type, parameters, priority) print(f"Created task: {task['id']}")
self.execute(task["id"]) print("Executing...")
result = self.wait_for_completion(task["id"])
if result["status"] == "COMPLETED": print("Task completed successfully") return result["result"] else: raise RuntimeError(result.get("error_message", "Task failed"))
def get_stats(self) -> Dict: """Get queue statistics.""" response = requests.get(f"{self.base_url}/queue/stats/queue") response.raise_for_status() return response.json()
# Usagequeue = TaskQueue("http://localhost:8501/api/v1")
# Collect dataresult = queue.run_and_wait( "FETCH_DATA", { "symbols": ["AAPL", "MSFT", "GOOGL"], "interval": "1d", "start_date": "2024-01-01", "end_date": "2024-12-31" })
# Check queue statsstats = queue.get_stats()print(f"\nQueue Status:")print(f" Pending: {stats['by_status']['PENDING']}")print(f" Running: {stats['by_status']['RUNNING']}")print(f" Completed: {stats['by_status']['COMPLETED']}")print(f" Failed: {stats['by_status']['FAILED']}")Use Task Groups
Group related tasks for easier management and bulk operations.
Set Priorities
Use priority to ensure critical tasks run first.
Monitor Queue
Regularly check queue stats to identify bottlenecks.
Handle Failures
Check failed tasks and retry or investigate errors.