edx_event_bus_kafka.internal.tests package#

Submodules#

edx_event_bus_kafka.internal.tests.test_config module#

Test common configuration loading.

class edx_event_bus_kafka.internal.tests.test_config.TestCommonSettings(methodName='runTest')#

Bases: TestCase

Test loading of settings common to producer and consumer.

test_full()#
test_minimal()#
test_unconfigured()#
class edx_event_bus_kafka.internal.tests.test_config.TestSchemaRegistryClient(methodName='runTest')#

Bases: TestCase

Test client creation.

test_configured()#
test_unconfigured()#
class edx_event_bus_kafka.internal.tests.test_config.TestTopicPrefixing(methodName='runTest')#

Bases: TestCase

Test autoprefixing of base topic.

test_empty_string_prefix()#

Check that empty string is treated the same as None.

test_no_prefix()#
test_regular_prefix()#

edx_event_bus_kafka.internal.tests.test_consumer module#

Tests for event_consumer module.

class edx_event_bus_kafka.internal.tests.test_consumer.TestEmitSignals(methodName='runTest')#

Bases: TestCase

Tests for message parsing and signal-sending.

TEST_CONSUME_ERROR_FATAL = ConsumeError(KafkaError{FATAL,code=INVALID_MSG,val=2,str="Broker: Invalid message"})#
TEST_CONSUME_ERROR_NO_MESSAGE = ConsumeError(KafkaError{code=INVALID_MSG,val=2,str="Broker: Invalid message"})#
TEST_CONSUME_ERROR_WITH_MESSAGE = ConsumeError(KafkaError{code=INVALID_MSG,val=2,str="Broker: Invalid message"})#
TEST_FAILED_MESSAGE = <edx_event_bus_kafka.internal.tests.test_utils.FakeMessage object>#
TEST_KAFKA_ERROR = KafkaError{code=INVALID_MSG,val=2,str="Broker: Invalid message"}#
TEST_KAFKA_EXCEPTION = KafkaException(KafkaError{code=INVALID_MSG,val=2,str="Broker: Invalid message"})#
TEST_KAFKA_FATAL_ERROR = KafkaError{FATAL,code=INVALID_MSG,val=2,str="Broker: Invalid message"}#
assert_signal_sent_with(signal, data)#

Check that a signal-send came in as expected to the mock receiver.

setUp()#

Hook method for setting up the test fixture before exercising it.

tearDown()#

Hook method for deconstructing the test fixture after testing it.

test_bad_headers()#

Check that if we cannot process the message headers, we raise an UnusableMessageError

The various kinds of bad headers are more fully tested in test_utils

test_check_event_error()#
test_connection_reset_1__False__False__False_(has_connection, is_usable, reconnect_expected, mock_connection)#

Confirm we reconnect to the database as required

test_connection_reset_2__True__False__True_(has_connection, is_usable, reconnect_expected, mock_connection)#

Confirm we reconnect to the database as required

test_connection_reset_3__False__True__False_(has_connection, is_usable, reconnect_expected, mock_connection)#

Confirm we reconnect to the database as required

test_consecutive_error_limit()#

Confirm that consecutive errors can break out of loop.

test_consume_loop(mock_sleep, mock_logger, mock_set_custom_attribute)#

Check the basic loop lifecycle.

test_consume_loop_disabled(mock_logger)#
test_deserialize_type_mismatch()#
test_determine_signal_success()#
test_emit(mock_logger)#
test_emit_success_1_True(audit_logging, mock_logger, mock_set_attribute)#
test_emit_success_2_False(audit_logging, mock_logger, mock_set_attribute)#
test_emit_success_tolerates_missing_timestamp(mock_logger, mock_set_attribute)#
test_emit_type_mismatch()#
test_malformed_receiver_errors()#

Ensure that even a really messed-up receiver is still reported correctly.

test_multiple_types()#

Very unlikely case, but this gets us coverage.

test_no_commit_if_no_error_logged()#

Check that if there is an error that we fail to log, we do not commit the offset

test_no_consume_with_bad_offset_timestamp(mock_logger)#
test_no_consume_with_offsets(mock_sleep, mock_logger)#
test_no_deserializer_if_no_registry_client()#
test_no_type()#
test_non_consecutive_errors()#

Confirm that non-consecutive errors may not break out of loop.

test_record_error_for_various_errors_1(exception, has_message, has_kafka_error, is_fatal, mock_sleep, mock_logger, mock_set_custom_attribute)#

Covers reporting of an error in the consumer loop for various types of errors.

test_record_error_for_various_errors_2(exception, has_message, has_kafka_error, is_fatal, mock_sleep, mock_logger, mock_set_custom_attribute)#

Covers reporting of an error in the consumer loop for various types of errors.

test_record_error_for_various_errors_3(exception, has_message, has_kafka_error, is_fatal, mock_sleep, mock_logger, mock_set_custom_attribute)#

Covers reporting of an error in the consumer loop for various types of errors.

test_record_error_for_various_errors_4(exception, has_message, has_kafka_error, is_fatal, mock_sleep, mock_logger, mock_set_custom_attribute)#

Covers reporting of an error in the consumer loop for various types of errors.

test_record_error_for_various_errors_5(exception, has_message, has_kafka_error, is_fatal, mock_sleep, mock_logger, mock_set_custom_attribute)#

Covers reporting of an error in the consumer loop for various types of errors.

test_reset_offsets_and_sleep_indefinitely()#
test_reset_offsets_and_sleep_indefinitely_with_none_offset()#
test_unexpected_signal_type_in_header()#
edx_event_bus_kafka.internal.tests.test_consumer.fake_receiver_raises_error(**kwargs)#
edx_event_bus_kafka.internal.tests.test_consumer.fake_receiver_returns_quietly(**kwargs)#

edx_event_bus_kafka.internal.tests.test_producer module#

Test the event producer code.

class edx_event_bus_kafka.internal.tests.test_producer.TestCommand(methodName='runTest')#

Bases: TestCase

Test produce_event management command

test_command(_, fake_logger)#
class edx_event_bus_kafka.internal.tests.test_producer.TestEventProducer(methodName='runTest')#

Bases: TestCase

Test producer.

setUp()#

Hook method for setting up the test fixture before exercising it.

test_create_producer_configured()#

Creation succeeds when all settings are present.

Also tests basic compliance with the implementation-loader API in openedx-events.

test_create_producer_unconfigured()#

With missing essential settings, just warn and return None.

test_descend_avro_schema()#
test_extract_event_key()#
test_extract_key_schema()#
test_full_event_data_present_in_kafka_error(mock_logger, *args)#
test_full_event_data_present_in_key_extraction_error(mock_logger, *args)#
test_on_event_deliver_1_True(audit_logging, mock_logger)#
test_on_event_deliver_2_False(audit_logging, mock_logger)#
test_polling_loop_robust()#

Test that polling loop continues even if one call raises an exception.

test_polling_loop_terminates()#

Test that polling loop stops as soon as the producer is garbage-collected.

test_send_to_event_bus(mock_get_serializers)#
test_serialize_and_produce_to_same_topic(mock_context)#
test_serializers_configured()#
test_serializers_unconfigured()#

edx_event_bus_kafka.internal.tests.test_utils module#

Test header conversion utils

class edx_event_bus_kafka.internal.tests.test_utils.FakeMessage(topic: str | None = None, partition: int | None = None, offset: int | None = None, headers: list | None = None, key: bytes | None = None, value=None, error=None, timestamp=None)#

Bases: object

A fake confluent_kafka.cimpl.Message that we can actually construct for mocking.

See https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#message

error()#
headers() list | None#

List of str/bytes key/value pairs.

key() bytes | None#

Bytes (Avro).

offset() int | None#
partition() int | None#
set_value(value)#
timestamp()#
topic() str | None#
value()#

Event value (bytes or object)

class edx_event_bus_kafka.internal.tests.test_utils.TestTestHelpers(methodName='runTest')#

Bases: TestCase

Tests for local unit test utilities.

test_side_effects()#
class edx_event_bus_kafka.internal.tests.test_utils.TestUtils(methodName='runTest')#

Bases: TestCase

Tests for header conversion utils

TEST_UUID_BYTES = b'ec81cdf0-6712-11ee-a142-0242ac110002'#
test_generate_metadata_fails_with_duplicate_headers()#

Check that we raise if there are duplicate headers

test_generate_metadata_from_missing_or_bad_headers_1(msg_id, msg_time, source_lib, should_raise, mock_dt)#

Check that we raise an exception iff there are missing required headers, or some of them are unparseable

test_generate_metadata_from_missing_or_bad_headers_2(msg_id, msg_time, source_lib, should_raise, mock_dt)#

Check that we raise an exception iff there are missing required headers, or some of them are unparseable

test_generate_metadata_from_missing_or_bad_headers_3(msg_id, msg_time, source_lib, should_raise, mock_dt)#

Check that we raise an exception iff there are missing required headers, or some of them are unparseable

test_generate_metadata_from_missing_or_bad_headers_4(msg_id, msg_time, source_lib, should_raise, mock_dt)#

Check that we raise an exception iff there are missing required headers, or some of them are unparseable

test_generate_metadata_from_missing_or_bad_headers_5__None__None__None__True_(msg_id, msg_time, source_lib, should_raise, mock_dt)#

Check that we raise an exception iff there are missing required headers, or some of them are unparseable

test_headers_from_event_metadata()#

Check we can generate message headers from an EventsMetadata object

test_metadata_from_headers()#

Check we can generate an EventsMetadata object from valid message headers

edx_event_bus_kafka.internal.tests.test_utils.side_effects(functions: list)#

Given a list of functions, return a new function that will call each one in turn on successive invocations. (The returned function ignores any arguments it is called with.) Each function’s return value will be returned. Behavior is undefined if insufficient functions are supplied.

Module contents#