# Kafka Consumer

## Connection

Connect to your Kafka bootstrap broker(s):

```sh
ilagent daemon --kafka_brokers localhost:9092 --kafka_group_id ilagent \
    -e 'events-topic' -r 'heartbeats-topic'
```

| Flag               | Description                                       | Default      |
| ------------------ | ------------------------------------------------- | ------------ |
| `--kafka_brokers`  | Comma-separated broker list (required)            | —            |
| `--kafka_group_id` | Consumer group ID (required)                      | —            |
| `-e`               | Topic for event messages                          | —            |
| `-r`               | Topic for heartbeat messages                      | —            |
| `-p`               | HTTP server port (for health/readiness endpoints) | *(disabled)* |

{% hint style="info" %}
Kafka mode does not start the HTTP server by default. Pass `-p` explicitly if you need `/health` and `/ready` endpoints (e.g. for Kubernetes probes).
{% endhint %}

## Graceful shutdown

The Kafka consumer shuts down gracefully on SIGINT/SIGTERM — in-flight messages are finished processing and offsets are committed before exit. If the HTTP server is running (`-p`), the `/health` endpoint returns 503 during shutdown to allow load balancers or Kubernetes to drain traffic.

## At least once delivery

In Kafka mode, events are **not** buffered in SQLite. Instead, the consumer offset is used to provide at-least-once delivery. If an event cannot be delivered, the offset is not committed and ilagent will exit after 5 seconds.

We recommend running ilagent so it is automatically restarted on exit:

```sh
docker run --restart always ilert/ilagent daemon \
    --kafka_brokers localhost:9092 --kafka_group_id ilagent \
    -e 'events-topic'
```

## Payload format

All message payloads must be JSON. An example event payload using the default field names:

```json
{
  "integrationKey": "il1api123...",
  "eventType": "ALERT",
  "summary": "Hey from Kafka"
}
```

If your payloads use different field names, you can map them with `--map_key_etype`, `--map_key_summary`, `--map_key_alert_key`, and set a fixed integration key with `--event_key`. See [Event Mapping](/developer-docs/client-libraries/ilagent/event-mapping.md) for details.

{% hint style="warning" %}
The `integrationKey` (or `--event_key`) must belong to a [**Kafka** alert source](https://github.com/iLert/docs/blob/master/product-docs/integrations/inbound-integrations/kafka.md) in ilert. Events sent with integration keys from other alert source types will be rejected.
{% endhint %}

An example heartbeat payload:

```json
{
  "integrationKey": "il1hbt123..."
}
```

## Sample command

```sh
ilagent daemon -v -v \
    --kafka_brokers localhost:9092 \
    --kafka_group_id ilagent \
    -e 'events-topic' -r 'heartbeats-topic'
```

See [Event Mapping](/developer-docs/client-libraries/ilagent/event-mapping.md) for mapping custom message fields to ilert event properties.


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.ilert.com/developer-docs/client-libraries/ilagent/kafka.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
