Skip to content

Consumer - Getting Started

The consumer is a NestJS microservice that runs purely as a Kafka consumer — it does not bind to any HTTP port. Its sole job is to receive raw AA news content messages, call the ML model service over gRPC to classify each article into one of 7 Turkish news categories, persist the classification results to Postgres, cache the outcome in Redis, and emit a processed event to a downstream Kafka topic.

Prerequisites

  • Postgres: the service reads and writes content rows, inserts detailed classification results (including the full per-category confidence scores), and records per-message analytics.
  • Redis: used exclusively for deduplication. A short-lived key is set for each successfully processed source ID so that replayed Kafka messages are skipped without hitting the database or the model.
  • Kafka: the service consumes from one topic and publishes to one topic. Both the broker connection and consumer group are configured at startup.
  • gRPC Model Service: every message that passes deduplication triggers a synchronous classification call to the model service. The consumer cannot process articles without this service being reachable.

Configuration Keys

The service validates all configuration at startup using a Joi schema. Missing required values cause the application to refuse to start. Key names and their defaults (from the config schema) include:

  • KAFKA_BROKERS (default: kafka:9092)
  • REDIS_URL (default: redis://redis:6379)
  • DB_HOST (default: postgres), DB_PORT (default: 5432), DB_USER (default: postgres), DB_PASS (default: postgres), DB_NAME (default: aan_db)
  • GRPC_MODEL_URL (default: model:50051)

The service never auto-syncs the database schema — synchronize is set to false in the TypeORM config, so migrations must be applied externally before the service starts.

What the Consumer Processes and Produces

The service consumes the raw_content_aa topic. Each message on this topic is expected to carry three fields: a source identifier (id), a headline (baslik), and an abstract (ozet). These are the only fields used during inference — no full article body is passed to the model.

On successful classification, the service publishes a single message to the processed_content_aa topic, containing the source ID, the resolved category (model_kategori), and the top confidence score. This downstream message is only emitted when the full processing chain succeeds; skipped or failed messages do not produce any output event.

The Kafka producer client is configured with allowAutoTopicCreation: false, so the processed_content_aa topic must exist on the broker before the service starts.

Data Storage Overview

Three tables are written to during processing:

Content rows (contents): if a content row for the given source ID already exists but has no category assigned, the consumer fills in model_kategori and processed_at. If no row exists at all, the consumer creates a minimal record containing only the source ID, source enum, category, and timestamp — body fields are not written here.

Classification results (content_results): a new row is always inserted on success, storing the predicted category index, the top confidence score, and the full confidence map for all 7 categories as a JSONB column.

Consumer analytics (consumer_analytics): a row is written in the finally block of every processing attempt, regardless of outcome. It records the source ID, end-to-end processing latency in milliseconds, whether the attempt succeeded, and an error type classification (grpc or internal) when it did not.

Operational Notes

  • Deduplication: a two-level guard prevents reprocessing replayed messages. A Redis key is checked first; if it exists, the message is skipped immediately. If Redis misses, the database is queried — if the content row already has a category, the Redis key is backfilled and the message is skipped. This means a Redis flush does not cause double-inference, because the Postgres check acts as a fallback.
  • Validation failures: messages that cannot be parsed by the Zod schema (missing or wrong-shaped fields) are logged as warnings and silently dropped. They are consumed from Kafka and not retried at the application level.
  • gRPC failures: errors from the model service are caught, logged, and classified. The message is marked as failed in analytics, but it is still consumed — there is no dead letter queue and no application-level retry. Kafka-level retries (5 attempts, 300ms initial delay) apply only to broker connectivity issues at startup.
  • Consumer group: the Kafka consumer group ID is consumer-group. The service subscribes with fromBeginning: false, so only messages produced after startup are processed.
  • No HTTP port: the application is created via NestFactory.createMicroservice, not NestFactory.create. There is no REST interface, health check endpoint, or HTTP listener of any kind.