Metadata-Version: 2.4
Name: mini-job
Version: 0.2.1
Summary: 极简分布式延迟任务队列 - 基于 Redis，支持 Cron 周期任务、异步协程、多执行器
Project-URL: Homepage, https://gitee.com/xiwenhaochi/mini_job
Project-URL: Repository, https://gitee.com/xiwenhaochi/mini_job.git
Project-URL: Issues, https://gitee.com/xiwenhaochi/mini_job/issues
Author-email: xiwen <xxx@example.com>
License: MIT
License-File: LICENSE
Keywords: asyncio,cron,delay-queue,distributed,job-scheduler,redis,task-queue
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: System :: Distributed Computing
Requires-Python: >=3.12
Requires-Dist: croniter>=6.2.0
Requires-Dist: pydantic-settings>=2.0.0
Requires-Dist: python-dotenv>=1.0.0
Requires-Dist: redis>=7.4.0
Provides-Extra: dev
Requires-Dist: pytest>=8.0; extra == 'dev'
Requires-Dist: ruff>=0.8; extra == 'dev'
Provides-Extra: script
Requires-Dist: numpy>=2.0.0; extra == 'script'
Requires-Dist: pandas>=3.0.0; extra == 'script'
Description-Content-Type: text/markdown

# mini-job

极简分布式延迟任务队列 — 基于 Redis，支持 Cron 周期任务、异步协程和多执行器。

## 特性

| 特性                 | 说明                                                                       |
| -------------------- | -------------------------------------------------------------------------- |
| **延迟任务**         | 设定延迟秒数，到期自动执行                                                 |
| **Cron 周期调度**    | 支持标准 cron 表达式（分 时 日 月 星期）                                   |
| **三种执行器**       | `async` 协程（IO 密集）、`thread` 线程（通用）、`process` 进程（CPU 密集） |
| **队列级执行器隔离** | Redis Key `{ns}:ready:{executor}` 三队列隔离，消费者只拉取专属队列，零竞争 |
| **死信队列**         | 失败的一次性任务自动进入死信队列，可排查重试                               |
| **可见性超时回收**   | 消费者崩溃后任务自动回收重入队列，不丢任务                                 |
| **命名空间隔离**     | 多环境共用同一 Redis 实例，Key 前缀隔离                                    |
| **监控指标**         | 内置 `QueueMetrics`，统计各生命周期计数                                    |
| **背压控制**         | 队列深度超阈值自动告警                                                     |
| **Pydantic 配置**    | 集中配置管理，环境变量覆盖，类型校验                                       |
| **Lua 原子操作**     | 抢占和回收均为 Redis 端原子执行，无竞态                                    |
| **优雅关闭**         | SIGTERM/SIGINT 信号处理，等待任务完成                                      |

## 安装

```bash
pip install mini-job              # 核心依赖
pip install mini-job[script]      # 含 pandas/numpy（脚本执行模式）
```

依赖：Python >= 3.12，Redis >= 7.4，croniter，pydantic-settings，python-dotenv

## 快速开始

### 1. 确保 Redis 运行

```bash
redis-cli ping  # PONG
```

### 2. 生产者 — 发布任务

```python
from mini_job import DelayQueue

dq = DelayQueue(namespace="myapp")

# 注册脚本（动态执行模式）
dq.register_script(
    "send_email",
    """
def handler(payload):
    to_email = payload.get('to')
    print(f'发送邮件到: {to_email}')
    return {'status': 'sent', 'to': to_email}
    """,
)

# 发布任务 — executor 参数指定执行器类型
dq.publish(
    "send_email",
    {"to": "user@example.com", "subject": "欢迎", "content": "注册成功"},
    executor="async",   # async / thread / process
)

# 延迟 30 秒执行
dq.publish("send_email", {...}, delay_seconds=30)

# 每天凌晨 2 点执行（cron 表达式：分 时 日 月 星期）
dq.publish("send_email", {...}, cron="0 2 * * *")

# 查询任务结果
result = dq.get_task_result(task_id)
```

### 3. 消费者 — 按类型独立启动

```python
from mini_job import DelayQueue

# 注册本地函数
def send_sms(payload):
    print(f"发送短信 -> {payload['phone']}")

TASK_REGISTRY = {
    "send_sms": send_sms,
}

dq = DelayQueue(namespace="myapp")
dq.start(
    task_registry=TASK_REGISTRY,
    executor_type="async",    # 本进程只消费 async 任务
)
```

启动不同执行器类型的消费者（3 个终端）：

```bash
python consumer.py async     # 协程消费者 — IO 密集任务
python consumer.py thread    # 线程消费者 — 通用任务
python consumer.py process   # 进程消费者 — CPU 密集任务
```

## 核心概念

### 执行器类型

| 类型      | 适用场景                              | 实现                  | 推荐并发数 |
| --------- | ------------------------------------- | --------------------- | ---------- |
| `async`   | IO 密集（发邮件、HTTP 请求、DB 操作） | `asyncio` 协程        | 100~500    |
| `thread`  | 通用任务、阻塞操作                    | `ThreadPoolExecutor`  | 30~100     |
| `process` | CPU 密集（数据处理、报表生成）        | `ProcessPoolExecutor` | CPU 核数   |

### 任务路由表

```python
TASK_REGISTRY = {
    # 简单格式：默认 async 执行器
    "send_sms": send_sms,

    # 带配置格式：指定执行器类型
    "daily_report": (daily_report, {"executor": "thread"}),
}
```

### 状态生命周期

```
pending → running → completed
                ↘ failed → 死信队列（一次性任务）
                           下次重试（周期任务）
```

### Redis Key 设计

```
{namespace}:ready:{executor}  — 按执行器隔离的就绪 ZSet（async/thread/process）
{namespace}:processing:{id}   — 消费者专属处理列表
{namespace}:processing:timeout — 全局超时追踪 ZSet
{namespace}:dead_letter        — 死信队列
{namespace}:dead_letter:detail — 死信详情
{namespace}:task:meta          — 任务元数据
{namespace}:task:result:{id}   — 任务结果（独立 TTL）
{namespace}:scripts            — 注册脚本
```

## API 参考

### DelayQueue

```python
dq = DelayQueue(namespace="myapp")
# 或使用配置对象
from mini_job import QueueConfig
dq = DelayQueue(QueueConfig(namespace="myapp"))
```

**生产者方法：**

| 方法                                                                   | 说明                    |
| ---------------------------------------------------------------------- | ----------------------- |
| `publish(func, payload, delay_seconds=0, cron=None, executor="async")` | 发布任务 → 返回 task_id |
| `register_script(name, content, language="python", use=[])`            | 注册动态脚本            |
| `get_script(name)`                                                     | 获取脚本信息            |
| `delete_script(name)`                                                  | 删除脚本                |
| `list_scripts()`                                                       | 列出所有脚本            |
| `get_task_result(task_id)`                                             | 查询任务状态和结果      |

**消费者方法：**

| 方法                                                    | 说明             |
| ------------------------------------------------------- | ---------------- |
| `start(task_registry, executor_type="async", **kwargs)` | 启动消费者       |
| `stop()`                                                | 手动触发优雅关闭 |

**`start()` 参数：**

| 参数                 | 默认值    | 说明                                 |
| -------------------- | --------- | ------------------------------------ |
| `task_registry`      | (必填)    | 任务路由表 `{"name": func}`          |
| `executor_type`      | `"async"` | 执行器类型：async / thread / process |
| `poll_interval`      | `0.5`     | 轮询间隔（秒）                       |
| `grab_limit`         | `80`      | 每次最多抢占任务数                   |
| `worker_threads`     | `50`      | 工作线程/协程/进程数                 |
| `task_timeout`       | `30`      | 单个任务超时（秒）                   |
| `visibility_timeout` | `60`      | 可见性超时（秒）                     |

## 配置

通过 Pydantic Settings 管理，支持 `.env` 文件、环境变量覆盖、类型校验。

### 队列配置 `DQ_*`

| 参数               | 环境变量              | 默认值   | 类型  | 说明                                            |
| ------------------ | --------------------- | -------- | ----- | ----------------------------------------------- |
| `namespace`        | `DQ_NAMESPACE`        | `"dq"`   | `str` | Redis Key 命名空间前缀，多环境隔离              |
| `consumer_id`      | `DQ_CONSUMER_ID`      | 自动生成 | `str` | 消费者唯一标识，默认 `worker-` + 8 位 hex       |
| `result_ttl`       | `DQ_RESULT_TTL`       | `86400`  | `int` | 任务结果保留时间（秒），默认 1 天               |
| `reclaim_interval` | `DQ_RECLAIM_INTERVAL` | `10`     | `int` | 超时回收检查间隔（轮询周期数），每 N 轮检查一次 |

### Redis 连接配置 `DQ_REDIS_*`

| 参数                     | 环境变量                          | 默认值        | 类型    | 说明                   |
| ------------------------ | --------------------------------- | ------------- | ------- | ---------------------- |
| `host`                   | `DQ_REDIS_HOST`                   | `"localhost"` | `str`   | Redis 主机地址         |
| `port`                   | `DQ_REDIS_PORT`                   | `6379`        | `int`   | Redis 端口             |
| `db`                     | `DQ_REDIS_DB`                     | `0`           | `int`   | Redis 数据库编号       |
| `password`               | `DQ_REDIS_PASSWORD`               | `None`        | `str`   | Redis 密码（可选）     |
| `max_connections`        | `DQ_REDIS_MAX_CONNECTIONS`        | `50`          | `int`   | 连接池最大连接数       |
| `socket_timeout`         | `DQ_REDIS_SOCKET_TIMEOUT`         | `5.0`         | `float` | 单次操作超时（秒）     |
| `socket_connect_timeout` | `DQ_REDIS_SOCKET_CONNECT_TIMEOUT` | `5.0`         | `float` | 连接建立超时（秒）     |
| `retry_on_timeout`       | `DQ_REDIS_RETRY_ON_TIMEOUT`       | `True`        | `bool`  | 超时是否自动重试       |
| `health_check_interval`  | `DQ_REDIS_HEALTH_CHECK_INTERVAL`  | `30`          | `int`   | 连接健康检查间隔（秒） |

### 消费者配置 `DQ_CONSUMER_*`

| 参数                 | 环境变量                         | 默认值  | 类型    | 说明                                         |
| -------------------- | -------------------------------- | ------- | ------- | -------------------------------------------- |
| `poll_interval`      | `DQ_CONSUMER_POLL_INTERVAL`      | `0.5`   | `float` | 轮询间隔（秒），影响任务延迟精度             |
| `grab_limit`         | `DQ_CONSUMER_GRAB_LIMIT`         | `80`    | `int`   | 每次最多抢占任务数，建议 worker × 1.5~2      |
| `worker_threads`     | `DQ_CONSUMER_WORKER_THREADS`     | `50`    | `int`   | 工作协程/线程/进程数                         |
| `task_timeout`       | `DQ_CONSUMER_TASK_TIMEOUT`       | `30`    | `int`   | 单个任务执行超时（秒），超时后标记失败       |
| `visibility_timeout` | `DQ_CONSUMER_VISIBILITY_TIMEOUT` | `60`    | `int`   | 可见性超时（秒），消费者需在此时间内完成任务 |
| `shutdown_timeout`   | `DQ_CONSUMER_SHUTDOWN_TIMEOUT`   | `30`    | `int`   | 优雅关闭最大等待时间（秒）                   |
| `max_queue_depth`    | `DQ_CONSUMER_MAX_QUEUE_DEPTH`    | `10000` | `int`   | 队列深度告警阈值，超阈值打印 WARNING         |

### 示例 `.env`

```bash
# 队列
DQ_NAMESPACE=production
DQ_CONSUMER_ID=web-server-01

# Redis
DQ_REDIS_HOST=redis.example.com
DQ_REDIS_PORT=6379
DQ_REDIS_PASSWORD=secret

# 消费者
DQ_CONSUMER_POLL_INTERVAL=0.3
DQ_CONSUMER_GRAB_LIMIT=100
DQ_CONSUMER_WORKER_THREADS=80
DQ_CONSUMER_TASK_TIMEOUT=60
DQ_CONSUMER_VISIBILITY_TIMEOUT=120
```

## 监控

```python
# 获取监控指标快照
snapshot = dq.metrics.snapshot()
# {'published': 1000, 'completed': 980, 'failed': 15, 'timeout': 5, ...}
```

指标说明：

| 指标            | 含义             |
| --------------- | ---------------- |
| `published`     | 已发布任务总数   |
| `completed`     | 成功完成数       |
| `failed`        | 执行失败数       |
| `timeout`       | 超时任务数       |
| `dead_lettered` | 进入死信队列数   |
| `reclaimed`     | 超时回收重入队数 |

## 项目结构

```
mini_job/
├── __init__.py           # 公共导出
├── config.py             # Pydantic Settings 配置
├── core/
│   ├── delay_queue.py    # DelayQueue 核心
│   └── task.py           # 任务模型
├── executor/
│   ├── base.py           # 执行器抽象基类
│   ├── async_io.py       # 协程执行器
│   ├── thread.py         # 线程执行器
│   └── process.py        # 进程执行器
├── redis/
│   ├── client.py         # Redis 连接 + Lua 脚本
│   └── scripts.lua       # 原子 Lua 脚本
├── utils/
│   ├── retry.py          # 重试装饰器
│   ├── metrics.py        # 监控指标
│   └── decorators.py     # 任务装饰器
├── consumer.py           # 消费者示例
└── producer.py           # 生产者示例
```

## License

MIT
