Metadata-Version: 2.1
Name: ZeroMqEventManager
Version: 2.3.0
Author-email: Eduardo Almeida <eduardo.almeida@e-deploy.com.br>
Maintainer-email: Eduardo Almeida <eduardo.almeida@e-deploy.com.br>
Requires-Python: >=3.12
Description-Content-Type: text/markdown
Requires-Dist: edphelper==1.0.0
Requires-Dist: MessageBus==1.2.0
Requires-Dist: MessageProcessor==4.3.0
Requires-Dist: pytz-deprecation-shim==0.1.0.post0
Requires-Dist: pyzmq==26.2.1
Requires-Dist: tzdata==2025.1
Requires-Dist: tzlocal==4.3.1
Requires-Dist: Unidecode==1.3.8
Requires-Dist: ZeroMqEventManagerApi==1.0.0
Requires-Dist: ZeroMqMessageHandler==2.5.0
Requires-Dist: ZeroMqThreadPool==2.4.0

# ZeroMqEventManager

ZeroMQ-based event management service for the e-deploy POS ecosystem. Provides async and sync pub/sub event routing between services using ZMQ devices (FORWARDER for pub/sub, STREAMER with ROUTER sockets for per-service routing).

## Requirements

- Python 3.12+
- ZeroMQ (pyzmq)
- ServiceLocator running and reachable

## Installation

```bash
pip install ZeroMqEventManager --extra-index-url https://pip.e-deploy.com.br
```

## Running

```bash
python src/main.py
```

## Architecture

The EventManager acts as a central event routing hub:

1. **Publishers** connect a PUB socket to the ForwarderDevice (default port 5000).
2. The **ForwarderDevice** (ZMQ FORWARDER) bridges PUB/SUB sockets, broadcasting all messages to connected StreamerDevices.
3. One **StreamerDevice** is created per subscribing service, with a ROUTER socket bound to a dynamic port (range 5001-5100). It filters messages by subject prefix and routes them to consumer workers.
4. **Consumers** connect DEALER sockets to the StreamerDevice's ROUTER and send a `READY` handshake to begin receiving events.

```
Publisher (PUB) ──connect──> ForwarderDevice (SUB:5000 -> PUB:5001)
                                     |
                              StreamerDevice (SUB ──connect──> :5001)
                                     |
                              ROUTER (:dynamic port)
                                     |
                             Consumer (DEALER) ──connect──> ROUTER
```

### Subscription Flow

1. Consumer sends `TK_EVENT_MANAGER_SUBSCRIBE` with `sub_id\0service_name\0subject1\0subject2...`
2. EventManager creates/reuses an `AsyncSubscription` (one per service) with its own `StreamerDevice`.
3. Response: `sub_id\0pub_address\0pull_address` — the consumer uses `pub_address` to publish and `pull_address` to receive events.
4. Consumer must send periodic `TK_EVENT_MANAGER_KEEP_ALIVE` with `sub_id` to prevent subscription expiry.

### Event Delivery

- **Async events**: Published via PUB socket to ForwarderDevice. StreamerDevice filters by subject and routes to consumer workers via ROUTER.
- **Sync events**: Sent via `TK_EVENT_MANAGER_PUBLISH` to EventManager, which forwards to subscribed services via MessageBus.

## Configuration

Configuration is loaded from `config.json` and environment variables:

| Config Key | Default | Env Var | Description |
|---|---|---|---|
| `pubAddress` | `tcp://*:5000` | - | ForwarderDevice SUB (input) bind address |
| `subAddress` | `tcp://*:5001` | - | ForwarderDevice PUB (output) bind address |
| `pullAddress` | `tcp://*` | - | StreamerDevice ROUTER bind address |
| `minStreamerPort` | `5001` | - | Min port for dynamic StreamerDevice binding |
| `maxStreamerPort` | `5100` | - | Max port for dynamic StreamerDevice binding |
| `publicHostname` | `localhost` | `PUBLIC_HOSTNAME` | Hostname returned to clients in subscribe response |
| `locatorAddress` | `tcp://localhost:6000` | `SERVICE_LOCATOR_ADDRESS` | ServiceLocator address |
| - | `./config.json` | `CONFIG_PATH` | Path to config file |
| - | (same as config) | `LOG_PATH` | Path to log config file |
| - | (disabled) | `STARTUP_GUARD` | Enable startup guard (`true`/`1`/`yes`) |
| - | (disabled) | `DIAGNOSTIC_MONITOR` | Enable diagnostic monitor (`true`/`1`/`yes`) |
| - | `1800` | `DIAGNOSTIC_INTERVAL` | Diagnostic snapshot interval in seconds |

## Tokens

The EventManager uses message group `2`:

| Token | ID | Description |
|---|---|---|
| `TK_EVENT_MANAGER_SUBSCRIBE` | 2.1 | Subscribe to subjects (defined in API package) |
| `TK_EVENT_MANAGER_UNSUBSCRIBE` | 2.2 | Unsubscribe (not yet implemented) |
| `TK_EVENT_MANAGER_PUBLISH` | 2.3 | Publish a sync event |
| `TK_EVENT_MANAGER_KEEP_ALIVE` | 2.4 | Refresh subscription lifetime |

## Usage Example

```python
import zmq
from messagebus import SubscriptionInfo
from zeromqmessagehandler import ZeroMqMessageBus

context = zmq.Context()
message_bus = ZeroMqMessageBus("MyService", context)

message_bus.subscribe([
    SubscriptionInfo("OrderCreated", SubscriptionInfo.TYPE_NON_REENTRANT),
    SubscriptionInfo("PaymentReceived", SubscriptionInfo.TYPE_REENTRANT),
])
```

After subscribing, the `ZeroMqMessageBus` automatically:
- Sends a subscribe request to the EventManager via the ServiceLocator
- Receives the `pub_address` and `pull_address` from the response
- Connects worker threads (DEALER sockets) to the StreamerDevice's ROUTER socket
- Sends `READY` to begin receiving events
- Runs a keep-alive thread to prevent subscription expiry

## Startup Guard

When `STARTUP_GUARD=true`, the service queries the ServiceLocator for an existing EventManager before starting. If a live predecessor is found, the service blocks (polling every 2s, up to 60s) until it becomes unreachable. This prevents split-brain scenarios where two EventManager instances run simultaneously, causing event delivery loss when publishers and consumers connect to different instances.
