edx_event_bus_kafka.internal package#
Subpackages#
- edx_event_bus_kafka.internal.tests package
- Submodules
- edx_event_bus_kafka.internal.tests.test_config module
- edx_event_bus_kafka.internal.tests.test_consumer module
TestEmitSignals
TestEmitSignals.TEST_CONSUME_ERROR_FATAL
TestEmitSignals.TEST_CONSUME_ERROR_NO_MESSAGE
TestEmitSignals.TEST_CONSUME_ERROR_WITH_MESSAGE
TestEmitSignals.TEST_FAILED_MESSAGE
TestEmitSignals.TEST_KAFKA_ERROR
TestEmitSignals.TEST_KAFKA_EXCEPTION
TestEmitSignals.TEST_KAFKA_FATAL_ERROR
TestEmitSignals.assert_signal_sent_with()
TestEmitSignals.setUp()
TestEmitSignals.tearDown()
TestEmitSignals.test_bad_headers()
TestEmitSignals.test_check_event_error()
TestEmitSignals.test_connection_reset_1__False__False__False_()
TestEmitSignals.test_connection_reset_2__True__False__True_()
TestEmitSignals.test_connection_reset_3__False__True__False_()
TestEmitSignals.test_consecutive_error_limit()
TestEmitSignals.test_consume_loop()
TestEmitSignals.test_consume_loop_disabled()
TestEmitSignals.test_deserialize_type_mismatch()
TestEmitSignals.test_determine_signal_success()
TestEmitSignals.test_emit()
TestEmitSignals.test_emit_success_1_True()
TestEmitSignals.test_emit_success_2_False()
TestEmitSignals.test_emit_success_tolerates_missing_timestamp()
TestEmitSignals.test_emit_type_mismatch()
TestEmitSignals.test_malformed_receiver_errors()
TestEmitSignals.test_multiple_types()
TestEmitSignals.test_no_commit_if_no_error_logged()
TestEmitSignals.test_no_consume_with_bad_offset_timestamp()
TestEmitSignals.test_no_consume_with_offsets()
TestEmitSignals.test_no_deserializer_if_no_registry_client()
TestEmitSignals.test_no_type()
TestEmitSignals.test_non_consecutive_errors()
TestEmitSignals.test_record_error_for_various_errors_1()
TestEmitSignals.test_record_error_for_various_errors_2()
TestEmitSignals.test_record_error_for_various_errors_3()
TestEmitSignals.test_record_error_for_various_errors_4()
TestEmitSignals.test_record_error_for_various_errors_5()
TestEmitSignals.test_reset_offsets_and_sleep_indefinitely()
TestEmitSignals.test_reset_offsets_and_sleep_indefinitely_with_none_offset()
TestEmitSignals.test_unexpected_signal_type_in_header()
fake_receiver_raises_error()
fake_receiver_returns_quietly()
- edx_event_bus_kafka.internal.tests.test_producer module
TestCommand
TestEventProducer
TestEventProducer.setUp()
TestEventProducer.test_create_producer_configured()
TestEventProducer.test_create_producer_unconfigured()
TestEventProducer.test_descend_avro_schema()
TestEventProducer.test_extract_event_key()
TestEventProducer.test_extract_key_schema()
TestEventProducer.test_full_event_data_present_in_kafka_error()
TestEventProducer.test_full_event_data_present_in_key_extraction_error()
TestEventProducer.test_on_event_deliver_1_True()
TestEventProducer.test_on_event_deliver_2_False()
TestEventProducer.test_polling_loop_robust()
TestEventProducer.test_polling_loop_terminates()
TestEventProducer.test_send_to_event_bus()
TestEventProducer.test_serialize_and_produce_to_same_topic()
TestEventProducer.test_serializers_configured()
TestEventProducer.test_serializers_unconfigured()
- edx_event_bus_kafka.internal.tests.test_utils module
FakeMessage
TestTestHelpers
TestUtils
TestUtils.TEST_UUID_BYTES
TestUtils.test_generate_metadata_fails_with_duplicate_headers()
TestUtils.test_generate_metadata_from_missing_or_bad_headers_1()
TestUtils.test_generate_metadata_from_missing_or_bad_headers_2()
TestUtils.test_generate_metadata_from_missing_or_bad_headers_3()
TestUtils.test_generate_metadata_from_missing_or_bad_headers_4()
TestUtils.test_generate_metadata_from_missing_or_bad_headers_5__None__None__None__True_()
TestUtils.test_headers_from_event_metadata()
TestUtils.test_metadata_from_headers()
side_effects()
- Module contents
Submodules#
edx_event_bus_kafka.internal.config module#
Configuration loading and validation.
This module is for internal use only.
- edx_event_bus_kafka.internal.config.get_full_topic(base_topic: str) str #
Given a base topic name, add a prefix (if configured).
- edx_event_bus_kafka.internal.config.get_schema_registry_client()#
Create a schema registry client from common settings.
This is cached on the assumption of a performance benefit (avoid reloading settings and reconstructing client) but it may also be that the client keeps around long-lived connections that we could benefit from.
- Returns
None if confluent_kafka library is not available or the settings are invalid. SchemaRegistryClient if it is.
edx_event_bus_kafka.internal.consumer module#
Core consumer and event-loop code.
- exception edx_event_bus_kafka.internal.consumer.EventConsumptionException#
Bases:
Exception
Indicates that we had an issue in event production. Useful for filtering on later.
- class edx_event_bus_kafka.internal.consumer.KafkaEventConsumer(topic, group_id, offset_time=None)#
Bases:
EventBusConsumer
Construct consumer for the given topic and group. The consumer can then emit events from the event bus using the signal from the message headers.
Note that the topic should be specified here without the optional environment prefix.
Can also consume messages indefinitely off the queue.
- topic#
Topic to consume (without environment prefix).
- group_id#
Consumer group id.
- consumer#
Actual kafka consumer instance.
- offset_time#
The timestamp (in ISO format) that we would like to reset the consumers to. If this is used, the consumers will only reset the offsets of the topic but will not actually consume and process any messages.
- consume_indefinitely()#
Consume events from a topic in an infinite loop if offset_time is not set else reset any assigned partitions to the given offset, and sleep indefinitely.
- determine_signal(msg) OpenEdxPublicSignal #
Determine which OpenEdxPublicSignal should be used to emit the event data in a message
- Parameters:
msg (Message) – Consumed message
- Returns:
The OpenEdxPublicSignal instance corresponding to the ce_type header on the message
- emit_signals_from_message(msg, signal)#
Send the event from the message via the given signal.
Assumes the message has been deserialized and the signal matches the event_type of the message header.
- Parameters:
msg (Message) – Deserialized message.
signal (OpenEdxPublicSignal) – Signal - must match the event_type of the message header.
- record_event_consuming_error(run_context, error, maybe_message)#
Record an error caught while consuming an event, both to the logs and to telemetry.
- Parameters:
run_context – Dictionary of contextual information: full_topic, consumer_group, and expected_signal.
error – An exception instance
maybe_message – None if event could not be fetched or decoded, or a Kafka Message if one was successfully deserialized but could not be processed for some reason
- reset_offsets_and_sleep_indefinitely()#
Reset any assigned partitions to the given offset, and sleep indefinitely.
- exception edx_event_bus_kafka.internal.consumer.ReceiverError(message: str, causes: list)#
Bases:
Exception
Indicates that one or more receivers of a signal raised an exception when called.
- exception edx_event_bus_kafka.internal.consumer.UnusableMessageError#
Bases:
Exception
Indicates that a message was successfully received but could not be processed.
This could be invalid headers, an unknown signal, or other issue specific to the contents of the message.
- edx_event_bus_kafka.internal.consumer.get_deserializer(signal: OpenEdxPublicSignal, schema_registry_client)#
Get the value deserializer for a signal.
This is cached in order to save work re-transforming classes into Avro schemas. We do not deserialize the key because we don’t need it for anything yet. Also see openedx/openedx-events#86 for some challenges on determining key schema.
- Parameters:
signal – The OpenEdxPublicSignal to make a deserializer for.
schema_registry_client – The SchemaRegistryClient instance for the consumer
- Returns:
AvroSignalDeserializer for event value
edx_event_bus_kafka.internal.producer module#
Produce Kafka events from signals.
Main function is create_producer()
, which should be referred to from EVENT_BUS_PRODUCER
.
- exception edx_event_bus_kafka.internal.producer.EventProductionException#
Bases:
Exception
An exception we can check for when errors occur in event production code.
- class edx_event_bus_kafka.internal.producer.KafkaEventProducer(producer)#
Bases:
EventBusProducer
API singleton for event production to Kafka.
This is just a wrapper around a confluent_kafka Producer that knows how to serialize a signal to event wire format.
Only one instance (of Producer or this wrapper) should be created, since it is stateful and needs lifecycle management.
- prepare_for_shutdown()#
Prepare producer for a clean shutdown.
Flush pending outbound events, wait for acknowledgement, and process callbacks.
- send(*, signal: OpenEdxPublicSignal, topic: str, event_key_field: str, event_data: dict, event_metadata: EventsMetadata) None #
Send a signal event to the event bus under the specified topic.
- Parameters:
signal – The original OpenEdxPublicSignal the event was sent to
topic – The base (un-prefixed) event bus topic for the event
event_key_field – Path to the event data field to use as the event key (period-delimited string naming the dictionary keys to descend)
event_data – The event data (kwargs) sent to the signal
event_metadata – An EventsMetadata object with all the metadata necessary for the CloudEvent spec
- class edx_event_bus_kafka.internal.producer.ProducingContext(*, full_topic: str | None = None, event_key: str | None = None, signal: OpenEdxPublicSignal | None = None, initial_topic: str | None = None, event_key_field: str | None = None, event_data: dict | None = None, event_metadata: EventsMetadata | None = None, event_data_as_json: str | None = None, event_metadata_as_json: str | None = None)#
Bases:
object
Wrapper class to allow us to link a call to produce() with the on_event_deliver callback
- on_event_deliver(err, evt)#
Simple callback method for debugging event production
If there is any error, log all the known information about the calling context so the event can be recreated and/or resent later. This log will not contain the exact headers but will contain the EventsMetadata object that can be used to recreate them.
- Parameters:
err – Error if event production failed
evt – Event that was delivered (or failed to be delivered)
- edx_event_bus_kafka.internal.producer.create_producer() KafkaEventProducer | None #
Create a Producer API instance. Caller should cache the returned object.
If confluent-kafka library or essential settings are missing, warn and return None.
- edx_event_bus_kafka.internal.producer.descend_avro_schema(serializer_schema: dict, field_path: List[str]) dict #
Extract a subfield within an Avro schema, recursively.
- Parameters:
serializer_schema – An Avro schema (nested dictionaries)
field_path – List of strings matching the ‘name’ of successively deeper subfields
- Returns:
Schema for some field
- Note: Avro helpers could be moved to openedx_events.event_bus.avro.serializer to be
used for other event bus implementations other than Kafka.
- edx_event_bus_kafka.internal.producer.extract_event_key(event_data: dict, event_key_field: str) Any #
From an event object, extract a Kafka event key (not yet serialized).
- Parameters:
event_data – The event data (kwargs) sent to the signal
event_key_field – Path to the event data field to use as the event key (period-delimited string naming the dictionary keys to descend)
- Returns:
Key data, which might be an integer, string, dictionary, etc.
- edx_event_bus_kafka.internal.producer.extract_key_schema(signal_serializer: AvroSignalSerializer, event_key_field: str) str #
From a signal’s serializer, extract just the part of the Avro schema that will be used for the Kafka event key.
- Parameters:
signal_serializer – The signal serializer to extract a sub-schema from
event_key_field – Path to the event data field to use as the event key (period-delimited string naming the dictionary keys to descend)
- Returns:
The key’s schema, as a string.
- edx_event_bus_kafka.internal.producer.get_serializers(signal: OpenEdxPublicSignal, event_key_field: str)#
Get the full key and value serializers for a signal and a key field path.
This is cached in order to save work re-transforming AvroSignalSerializers into AvroSerializers.
- Parameters:
signal – The OpenEdxPublicSignal to make a serializer for.
event_key_field – Path to descend in the signal schema to find the subschema for the key (period-delimited string naming the field names to descend).
- Returns:
2-tuple of AvroSerializers, for event key and value
- edx_event_bus_kafka.internal.producer.get_signal_serializer(signal: OpenEdxPublicSignal)#
Get the AvroSignalSerializer for a signal
This is cached in order to save work re-transforming classes into Avro schemas.
- Parameters:
signal – The OpenEdxPublicSignal to make a serializer for.
- Returns:
An AvroSignalSerializer configured to serialize event data to dictionaries
- edx_event_bus_kafka.internal.producer.poll_indefinitely(api_weakref: KafkaEventProducer)#
Poll the producer indefinitely to ensure delivery/stats/etc. callbacks are triggered.
The thread stops automatically once the producer is garbage-collected.
See ADR for more information: openedx/event-bus-kafka
- edx_event_bus_kafka.internal.producer.record_producing_error(error, context)#
Record an error in producing an event to both the monitoring system and the regular logs
- Parameters:
error – The exception or error raised during producing
context – An instance of ProducingContext containing additional information about the message
edx_event_bus_kafka.internal.utils module#
Utilities for converting between message headers and EventsMetadata
- class edx_event_bus_kafka.internal.utils.MessageHeader(message_header_key, event_metadata_field=None, to_metadata=None, from_metadata=None)#
Bases:
object
Utility class for converting between message headers and EventsMetadata objects
- instances = [<edx_event_bus_kafka.internal.utils.MessageHeader object>, <edx_event_bus_kafka.internal.utils.MessageHeader object>, <edx_event_bus_kafka.internal.utils.MessageHeader object>, <edx_event_bus_kafka.internal.utils.MessageHeader object>, <edx_event_bus_kafka.internal.utils.MessageHeader object>, <edx_event_bus_kafka.internal.utils.MessageHeader object>, <edx_event_bus_kafka.internal.utils.MessageHeader object>, <edx_event_bus_kafka.internal.utils.MessageHeader object>, <edx_event_bus_kafka.internal.utils.MessageHeader object>, <edx_event_bus_kafka.internal.utils.MessageHeader object>]#
- edx_event_bus_kafka.internal.utils.get_message_header_values(headers: List, header: MessageHeader) List[str] #
Return all values for this header.
- Parameters:
headers – List of key/value tuples. Keys are strings, values are bytestrings.
header – The MessageHeader to look for.
- Returns:
List of zero or more header values decoded as strings.
- edx_event_bus_kafka.internal.utils.last_message_header_value(headers: List, header: MessageHeader) str | None #
Return the value for the header with the specified key, if there is at least one.
We should not ordinarily expect there to be more than one instance of a header. However, if there is one, this function will return the last value of it. (The latest value may have been intended to override an earlier value.)
- Parameters:
headers – List of key/value tuples. Keys are strings, values are bytestrings.
header – The MessageHeader to look for.
- Returns:
Decoded value of the last header with this key, or None if there are none.
Module contents#
Most of the implementation of the package is here and is internal-only.
Public API will be in the edx_event_bus_kafka
module for the most part.
See ADR docs/decisions/0006-public-api-and-app-organization.rst
for the reasoning.