edx_event_bus_kafka.internal package#

Subpackages#

Submodules#

edx_event_bus_kafka.internal.config module#

Configuration loading and validation.

This module is for internal use only.

edx_event_bus_kafka.internal.config.get_full_topic(base_topic: str) str#

Given a base topic name, add a prefix (if configured).

edx_event_bus_kafka.internal.config.get_schema_registry_client()#

Create a schema registry client from common settings.

This is cached on the assumption of a performance benefit (avoid reloading settings and reconstructing client) but it may also be that the client keeps around long-lived connections that we could benefit from.

Returns

None if confluent_kafka library is not available or the settings are invalid. SchemaRegistryClient if it is.

edx_event_bus_kafka.internal.config.load_common_settings() dict | None#

Load common settings, a base for either producer or consumer configuration.

Warns and returns None if essential settings are missing.

edx_event_bus_kafka.internal.consumer module#

Core consumer and event-loop code.

exception edx_event_bus_kafka.internal.consumer.EventConsumptionException#

Bases: Exception

Indicates that we had an issue in event production. Useful for filtering on later.

class edx_event_bus_kafka.internal.consumer.KafkaEventConsumer(topic, group_id, offset_time=None)#

Bases: EventBusConsumer

Construct consumer for the given topic and group. The consumer can then emit events from the event bus using the signal from the message headers.

Note that the topic should be specified here without the optional environment prefix.

Can also consume messages indefinitely off the queue.

topic#

Topic to consume (without environment prefix).

group_id#

Consumer group id.

consumer#

Actual kafka consumer instance.

offset_time#

The timestamp (in ISO format) that we would like to reset the consumers to. If this is used, the consumers will only reset the offsets of the topic but will not actually consume and process any messages.

consume_indefinitely()#

Consume events from a topic in an infinite loop if offset_time is not set else reset any assigned partitions to the given offset, and sleep indefinitely.

determine_signal(msg) OpenEdxPublicSignal#

Determine which OpenEdxPublicSignal should be used to emit the event data in a message

Parameters:

msg (Message) – Consumed message

Returns:

The OpenEdxPublicSignal instance corresponding to the ce_type header on the message

emit_signals_from_message(msg, signal)#

Send the event from the message via the given signal.

Assumes the message has been deserialized and the signal matches the event_type of the message header.

Parameters:
  • msg (Message) – Deserialized message.

  • signal (OpenEdxPublicSignal) – Signal - must match the event_type of the message header.

record_event_consuming_error(run_context, error, maybe_message)#

Record an error caught while consuming an event, both to the logs and to telemetry.

Parameters:
  • run_context – Dictionary of contextual information: full_topic, consumer_group, and expected_signal.

  • error – An exception instance

  • maybe_message – None if event could not be fetched or decoded, or a Kafka Message if one was successfully deserialized but could not be processed for some reason

reset_offsets_and_sleep_indefinitely()#

Reset any assigned partitions to the given offset, and sleep indefinitely.

exception edx_event_bus_kafka.internal.consumer.ReceiverError(message: str, causes: list)#

Bases: Exception

Indicates that one or more receivers of a signal raised an exception when called.

exception edx_event_bus_kafka.internal.consumer.UnusableMessageError#

Bases: Exception

Indicates that a message was successfully received but could not be processed.

This could be invalid headers, an unknown signal, or other issue specific to the contents of the message.

edx_event_bus_kafka.internal.consumer.get_deserializer(signal: OpenEdxPublicSignal, schema_registry_client)#

Get the value deserializer for a signal.

This is cached in order to save work re-transforming classes into Avro schemas. We do not deserialize the key because we don’t need it for anything yet. Also see openedx/openedx-events#86 for some challenges on determining key schema.

Parameters:
  • signal – The OpenEdxPublicSignal to make a deserializer for.

  • schema_registry_client – The SchemaRegistryClient instance for the consumer

Returns:

AvroSignalDeserializer for event value

edx_event_bus_kafka.internal.producer module#

Produce Kafka events from signals.

Main function is create_producer(), which should be referred to from EVENT_BUS_PRODUCER.

exception edx_event_bus_kafka.internal.producer.EventProductionException#

Bases: Exception

An exception we can check for when errors occur in event production code.

class edx_event_bus_kafka.internal.producer.KafkaEventProducer(producer)#

Bases: EventBusProducer

API singleton for event production to Kafka.

This is just a wrapper around a confluent_kafka Producer that knows how to serialize a signal to event wire format.

Only one instance (of Producer or this wrapper) should be created, since it is stateful and needs lifecycle management.

prepare_for_shutdown()#

Prepare producer for a clean shutdown.

Flush pending outbound events, wait for acknowledgement, and process callbacks.

send(*, signal: OpenEdxPublicSignal, topic: str, event_key_field: str, event_data: dict, event_metadata: EventsMetadata) None#

Send a signal event to the event bus under the specified topic.

Parameters:
  • signal – The original OpenEdxPublicSignal the event was sent to

  • topic – The base (un-prefixed) event bus topic for the event

  • event_key_field – Path to the event data field to use as the event key (period-delimited string naming the dictionary keys to descend)

  • event_data – The event data (kwargs) sent to the signal

  • event_metadata – An EventsMetadata object with all the metadata necessary for the CloudEvent spec

class edx_event_bus_kafka.internal.producer.ProducingContext(*, full_topic: str | None = None, event_key: str | None = None, signal: OpenEdxPublicSignal | None = None, initial_topic: str | None = None, event_key_field: str | None = None, event_data: dict | None = None, event_metadata: EventsMetadata | None = None, event_data_as_json: str | None = None, event_metadata_as_json: str | None = None)#

Bases: object

Wrapper class to allow us to link a call to produce() with the on_event_deliver callback

on_event_deliver(err, evt)#

Simple callback method for debugging event production

If there is any error, log all the known information about the calling context so the event can be recreated and/or resent later. This log will not contain the exact headers but will contain the EventsMetadata object that can be used to recreate them.

Parameters:
  • err – Error if event production failed

  • evt – Event that was delivered (or failed to be delivered)

edx_event_bus_kafka.internal.producer.create_producer() KafkaEventProducer | None#

Create a Producer API instance. Caller should cache the returned object.

If confluent-kafka library or essential settings are missing, warn and return None.

edx_event_bus_kafka.internal.producer.descend_avro_schema(serializer_schema: dict, field_path: List[str]) dict#

Extract a subfield within an Avro schema, recursively.

Parameters:
  • serializer_schema – An Avro schema (nested dictionaries)

  • field_path – List of strings matching the ‘name’ of successively deeper subfields

Returns:

Schema for some field

Note: Avro helpers could be moved to openedx_events.event_bus.avro.serializer to be

used for other event bus implementations other than Kafka.

edx_event_bus_kafka.internal.producer.extract_event_key(event_data: dict, event_key_field: str) Any#

From an event object, extract a Kafka event key (not yet serialized).

Parameters:
  • event_data – The event data (kwargs) sent to the signal

  • event_key_field – Path to the event data field to use as the event key (period-delimited string naming the dictionary keys to descend)

Returns:

Key data, which might be an integer, string, dictionary, etc.

edx_event_bus_kafka.internal.producer.extract_key_schema(signal_serializer: AvroSignalSerializer, event_key_field: str) str#

From a signal’s serializer, extract just the part of the Avro schema that will be used for the Kafka event key.

Parameters:
  • signal_serializer – The signal serializer to extract a sub-schema from

  • event_key_field – Path to the event data field to use as the event key (period-delimited string naming the dictionary keys to descend)

Returns:

The key’s schema, as a string.

edx_event_bus_kafka.internal.producer.get_serializers(signal: OpenEdxPublicSignal, event_key_field: str)#

Get the full key and value serializers for a signal and a key field path.

This is cached in order to save work re-transforming AvroSignalSerializers into AvroSerializers.

Parameters:
  • signal – The OpenEdxPublicSignal to make a serializer for.

  • event_key_field – Path to descend in the signal schema to find the subschema for the key (period-delimited string naming the field names to descend).

Returns:

2-tuple of AvroSerializers, for event key and value

edx_event_bus_kafka.internal.producer.get_signal_serializer(signal: OpenEdxPublicSignal)#

Get the AvroSignalSerializer for a signal

This is cached in order to save work re-transforming classes into Avro schemas.

Parameters:

signal – The OpenEdxPublicSignal to make a serializer for.

Returns:

An AvroSignalSerializer configured to serialize event data to dictionaries

edx_event_bus_kafka.internal.producer.poll_indefinitely(api_weakref: KafkaEventProducer)#

Poll the producer indefinitely to ensure delivery/stats/etc. callbacks are triggered.

The thread stops automatically once the producer is garbage-collected.

See ADR for more information: openedx/event-bus-kafka

edx_event_bus_kafka.internal.producer.record_producing_error(error, context)#

Record an error in producing an event to both the monitoring system and the regular logs

Parameters:
  • error – The exception or error raised during producing

  • context – An instance of ProducingContext containing additional information about the message

edx_event_bus_kafka.internal.utils module#

Utilities for converting between message headers and EventsMetadata

class edx_event_bus_kafka.internal.utils.MessageHeader(message_header_key, event_metadata_field=None, to_metadata=None, from_metadata=None)#

Bases: object

Utility class for converting between message headers and EventsMetadata objects

instances = [<edx_event_bus_kafka.internal.utils.MessageHeader object>, <edx_event_bus_kafka.internal.utils.MessageHeader object>, <edx_event_bus_kafka.internal.utils.MessageHeader object>, <edx_event_bus_kafka.internal.utils.MessageHeader object>, <edx_event_bus_kafka.internal.utils.MessageHeader object>, <edx_event_bus_kafka.internal.utils.MessageHeader object>, <edx_event_bus_kafka.internal.utils.MessageHeader object>, <edx_event_bus_kafka.internal.utils.MessageHeader object>, <edx_event_bus_kafka.internal.utils.MessageHeader object>, <edx_event_bus_kafka.internal.utils.MessageHeader object>]#
edx_event_bus_kafka.internal.utils.get_message_header_values(headers: List, header: MessageHeader) List[str]#

Return all values for this header.

Parameters:
  • headers – List of key/value tuples. Keys are strings, values are bytestrings.

  • header – The MessageHeader to look for.

Returns:

List of zero or more header values decoded as strings.

edx_event_bus_kafka.internal.utils.last_message_header_value(headers: List, header: MessageHeader) str | None#

Return the value for the header with the specified key, if there is at least one.

We should not ordinarily expect there to be more than one instance of a header. However, if there is one, this function will return the last value of it. (The latest value may have been intended to override an earlier value.)

Parameters:
  • headers – List of key/value tuples. Keys are strings, values are bytestrings.

  • header – The MessageHeader to look for.

Returns:

Decoded value of the last header with this key, or None if there are none.

Module contents#

Most of the implementation of the package is here and is internal-only.

Public API will be in the edx_event_bus_kafka module for the most part.

See ADR docs/decisions/0006-public-api-and-app-organization.rst for the reasoning.