Skip to content

Producer - Code Flow

This page explains the runtime sequence implemented by the producer code. The summary below is drawn directly from the producer services and modules.

Discovery & metadata gathering

  • The scheduled or manual trigger starts a fetch run via FetchService.runFetch(). The fetch service pages the AA API using AaApiService.searchPage() to discover new items between a startDate and endDate.
  • For each result returned from the API:
    • If raw XML already exists in the DB for that source_id — it is counted as a duplicate and skipped.
    • If a metadata row exists but has downloaded: false — the item is re-queued by emitting a new raw_content_aa_download event, so previously discovered but never-downloaded items are retried on every run.
    • If no metadata row exists — a new metadata entry is persisted and a raw_content_aa_download event is emitted for that source_id.
  • Pagination continues in increments of 64 until the last page is reached, a rate limit is hit, or an unexpected error occurs.

Download handling

  • The application controller consumes raw_content_aa_download events and calls FetchService.fetchDocument(sourceId). The fetch logic requests the NewsML XML for the given source_id using AaApiService.fetchDocument(), persists the raw XML into the raw table, and marks the metadata row as downloaded.

Parsing and persistence

  • After persisting the raw XML, the service parses it with NewsmlParserService.parse(). If the parser returns null, the document is skipped (a db_errors counter is incremented) and no content entity is created.
  • Even when parsing succeeds, if both ozet (abstract) and icerik (body) fields are empty in the parsed result, the document is also skipped — again with no content entity saved and no Kafka message emitted.
  • When valid content is available, the service writes a parsed content row (the content entity) and removes the raw XML row. The parsed content includes fields such as headline, abstract, content body, category, publication date, and language as extracted by the parser.

Emission to Kafka

  • Once parsed and saved, the producer emits a raw_content_aa message containing the content id (source_id), a cleaned headline (baslik), and a cleaned abstract/body (ozet). The ozet field is populated with cleanAndLowercase(parsed.ozet || parsed.icerik) — if the abstract is absent, the full article body is used in its place. The Kafka wrapper emits plain JavaScript objects (no protobuf is used in the producer codebase).

Deduplication and short-lived state

  • Deduplication logic differs between the live API path and the mock path:
    • Live path (gatherMetadata): dedup is performed via direct database queries. The service checks _rawRepo for an existing raw XML record and _metadataRepo for an existing metadata row. No Redis dedup keys are used here.
    • Mock path (fetchFromMock): dedup is handled by DedupService backed by Redis. The service creates a SHA-256-based dedup key prefixed with aan_source_ and uses a TTL (10800 seconds).
  • Both paths update Redis analytics counters (fetched, duplicates, rate_limit_errors, db_errors) and persist run-level aggregate analytics into Postgres after each run completes.

Rate limiting and retries

  • The AA API client raises a rate-limit-specific error on HTTP 429. The fetch workflow is aware of rate limits: it increments the appropriate analytics counter and uses delays between requests. Authors/operators should review the rate-limit handling strategy in the source if you require a different retry/backoff policy.

Observability and artifacts to check

  • Inspect the metadata table for discovered items and the raw table for raw NewsML while debugging downloads. Check Redis keys for dedup entries and last-fetch timestamps. Run-level analytics are persisted to the analytics table after a fetch completes.

Implementation notes (facts to verify in certain environments)

  • Topic creation via Kafka admin: code attempts to create topics at startup; if your broker disallows topic creation, pre-create topics or adjust permissions.
  • Message contracts: the producer emits plain objects; downstream consumers may expect additional headers or a stricter schema — verify consumer expectations if integrating systems.