Metadata-Version: 2.4
Name: streamix-queue
Version: 0.1.0
Summary: Lightweight Redis Streams event messenger with publish/consume API
Author-email: moundher  <bouroumanamoundher@gmail.com>
License: MIT
Project-URL: Homepage, https://github.com/streamix/streamix-queue
Project-URL: Repository, https://github.com/streamix/streamix-queue
Project-URL: Issues, https://github.com/streamix/streamix-queue/issues
Project-URL: Changelog, https://github.com/streamix/streamix-queue/releases
Keywords: redis,streams,event,messaging,queue,async
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: System :: Monitoring
Requires-Python: >=3.10
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: redis>=5.0.0
Provides-Extra: dev
Requires-Dist: mypy>=1.10.0; extra == "dev"
Requires-Dist: pytest>=7.0.0; extra == "dev"
Requires-Dist: black>=23.0.0; extra == "dev"
Requires-Dist: ruff>=0.1.0; extra == "dev"
Dynamic: license-file

# Streamix Queue

[![Python 3.10+](https://img.shields.io/badge/python-3.10+-blue.svg)](https://www.python.org/downloads/)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)

Lightweight Redis Streams event messenger for service-to-service communication.

## Features

- **Simple API**: Just `publish()` and `consume()` functions
- **Redis Streams**: Built on battle-tested Redis infrastructure
- **Consumer Groups**: Multiple services can consume the same events with group-based tracking
- **Automatic Retries**: Configurable retry limit with exponential backoff support
- **Dead-Letter Queue**: Failed messages sent to `<stream>:failed` for inspection
- **Stale Message Recovery**: Automatic reclaim of messages from crashed consumers
- **Structured Schema**: Messages include id, event, data, retries, and timestamps
- **Type Hints**: Full Python type annotations for IDE support
- **Minimal Dependencies**: Only Redis client required

## Installation

```bash
pip install streamix-queue
```

## Quick Start

### Consumer Service A

```python
from streamix_queue import consume

def on_user_created(data):
    print(f"User created: {data['user_id']}")
    # If handler raises an exception, message is automatically retried

consume(
    "user.created",
    on_user_created,
    redis_url="redis://localhost:6379/0",
    stream="app.events",
    group="service-a",
)
```

### Publisher Service B

```python
from streamix_queue import publish

publish(
    "user.created",
    {"user_id": "123", "email": "alice@example.com"},
    redis_url="redis://localhost:6379/0",
    stream="app.events",
    group="service-a",
)
```

## How It Works

1. **Publish**: Event sent to Redis Stream with structured schema
2. **Consumer Group**: Maintains message delivery state and ownership
3. **Processing**: Consumer reads and processes events
4. **Success**: Message acknowledged (removed from pending list)
5. **Failure**: On exception, message retried; after limit exceeded, moved to DLQ
6. **Dead-Letter**: Permanently failed messages stored in `<stream>:failed` for debugging

## API Reference

### `publish(event, data, **kwargs)`

Publish an event to the stream.

**Parameters:**
- `event` (str): Event name (e.g., "user.created")
- `data` (dict): Event payload
- `redis_url` (str, default="redis://localhost:6379/0"): Redis connection URL
- `stream` (str, default="app.events"): Stream name
- `group` (str, default="app.workers"): Consumer group name

**Returns:** `StreamMessage` object with id, event, data, retries, timestamps

### `consume(event, handler, **kwargs)`

Start a consumer that listens for events.

**Parameters:**
- `event` (str): Event name to listen for
- `handler` (callable): Function called with message data (or message object if it accepts 2+ args)
- `redis_url` (str, default="redis://localhost:6379/0"): Redis connection URL
- `stream` (str, default="app.events"): Stream name
- `group` (str, default="app.workers"): Consumer group name
- `consumer` (str, optional): Consumer instance name (auto-generated if None)
- `retry_limit` (int, default=3): Max retries before sending to DLQ
- `batch_size` (int, default=10): Messages per batch
- `block_ms` (int, default=5000): Blocking timeout for XREADGROUP
- `claim_idle_ms` (int, default=60000): Idle time threshold for stale message reclaim

**Handler signature:**
```python
# Simple - receives data only
def handler(data):
    pass

# Advanced - receives data and full message
def handler(data, message):
    print(message.id)       # Message UUID
    print(message.retries)  # Retry count
    print(message.event)    # Original event name
```

## Configuration Examples

### Multiple consumers for same event

```python
# Service A
consume("order.placed", on_order_placed, group="service-a", consumer="worker-1")

# Service B - same event, different group
consume("order.placed", on_order_placed_b, group="service-b", consumer="worker-1")
```

### Different streams per environment

```python
# Dev
publish("user.updated", {...}, stream="dev.events", group="dev-workers")

# Prod
publish("user.updated", {...}, stream="prod.events", group="prod-workers")
```

### Adjust retry behavior

```python
consume(
    "payment.processed",
    handle_payment,
    retry_limit=5,  # More retries
    block_ms=10000,  # Longer blocking timeouts
    claim_idle_ms=120000,  # Reclaim after 2 minutes
)
```

## Message Schema

Every message follows this structure:

```json
{
  "id": "550e8400-e29b-41d4-a716-446655440000",
  "event": "user.created",
  "data": {
    "user_id": "123",
    "email": "alice@example.com"
  },
  "retries": 0,
  "timestamps": {
    "created_at": "2026-04-24T17:45:00+00:00",
    "updated_at": "2026-04-24T17:45:00+00:00"
  }
}
```

## Error Handling & Dead-Letter Queue

By default, messages are retried up to 3 times. After exceeding the retry limit, they're sent to the dead-letter stream:

**DLQ Stream:** `<stream>:failed` (e.g., `app.events:failed`)

**DLQ Message Example:**
```json
{
  "id": "...",
  "event": "user.created",
  "data": {
    "original_id": "550e8400-...",
    "original_event": "user.created",
    "original_data": {"user_id": "123"},
    "source_stream_id": "1713982500001-0",
    "retries": 3,
    "error": "Traceback: Connection timeout..."
  },
  "retries": 3,
  "timestamps": {...}
}
```

## Running in Production

### Docker Example

```dockerfile
FROM python:3.12-slim

WORKDIR /app
RUN pip install streamix-queue

COPY handlers.py .

CMD ["python", "handlers.py"]
```

### Kubernetes Example

```yaml
apiVersion: v1
kind: Pod
metadata:
  name: streamix-consumer
spec:
  containers:
  - name: consumer
    image: myapp:latest
    env:
    - name: REDIS_URL
      value: "redis://redis:6379/0"
    - name: STREAM
      value: "app.events"
    - name: GROUP
      value: "service-a"
```

## Performance Tips

1. **Batch Size**: Increase `batch_size` for high throughput (10-50)
2. **Block Timeout**: Increase `block_ms` to reduce CPU usage (5000-30000)
3. **Consumer Instances**: Run multiple consumers in the same group for parallel processing
4. **Redis Persistence**: Enable AOF/RDB for durability

## Troubleshooting

### Messages stuck in pending

Check the consumer group pending entries:

```python
from redis import Redis

r = Redis.from_url("redis://localhost:6379/0")
pending = r.xpending("app.events", "service-a")
print(pending)
```

### Inspect dead-letter stream

```python
from redis import Redis

r = Redis.from_url("redis://localhost:6379/0")
failed = r.xread({"app.events:failed": "0"}, count=10)
for stream, messages in failed:
    for msg_id, data in messages:
        print(msg_id, data)
```

## License

MIT License - see [LICENSE](LICENSE) file for details

## Contributing

Contributions welcome! Please feel free to submit a Pull Request.

## Support

For issues, questions, or feature requests, please open an issue on GitHub.
