platform_plugin_aspects.sinks package#

Submodules#

platform_plugin_aspects.sinks.base_sink module#

Base classes for event sinks

class platform_plugin_aspects.sinks.base_sink.BaseSink(connection_overrides, log)#

Bases: object

Base class for ClickHouse event sink, allows overwriting of default settings

CLICKHOUSE_BULK_INSERT_PARAMS = {'input_format_allow_errors_num': 1, 'input_format_allow_errors_ratio': 0.1}#
class platform_plugin_aspects.sinks.base_sink.ClickHouseAuth(username, password)#

Bases: tuple

password#

Alias for field number 1

username#

Alias for field number 0

class platform_plugin_aspects.sinks.base_sink.ModelBaseSink(connection_overrides, log)#

Bases: BaseSink

Base class for ClickHouse event sink, allows overwriting of default settings

This class is used for the model based event sink, which uses the Django ORM to write events to ClickHouse.

clickhouse_table_name = None#

The name of the ClickHouse table where the events will be written. This should be set to the desired table name for the specific event type.

Type:

str

dump(item_id, many=False, initial=None)#

Do the serialization and send to ClickHouse

Dump related items to ClickHouse

fetch_target_items(start_pk=None, ids=None, skip_ids=None, force_dump=False, batch_size=None)#

Fetch the items that should be dumped to ClickHouse

get_last_dumped_timestamp(item_id)#

Return the last timestamp that was dumped to ClickHouse

get_model()#

Return the model to be used for the insert

get_object(item_id)#

Return the object to be dumped to ClickHouse

get_queryset(start_pk=None)#

Return the queryset to be used for the insert

get_serializer()#

Return the serializer to be used for the insert

classmethod get_sink_by_model_name(model)#

Return the sink instance for the given model

classmethod is_enabled()#

Return True if the sink is enabled, False otherwise

model = None#

The Django model class representing the structure of the event data. This is used to validate and organize the data before writing it to ClickHouse.

Type:

Model

name = None#

A human-readable name for the sink instance. This can be used for logging and identification purposes.

Type:

str

nested_sinks = []#

A list of nested sink instances that can be used to further process or route the event data. Nested sinks allow chaining multiple sinks together for more complex event processing pipelines.

Type:

list

pk_format#

function: A function to format the primary key of the model

alias of int

queryset = None#

A Django QuerySet that represents the initial set of data to be processed by the sink. It can be used to filter and select specific data for writing to ClickHouse.

Type:

QuerySet

send_item(serialized_item, many=False)#

Create the insert query and CSV to send the serialized CourseOverview to ClickHouse.

We still use a CSV here even though there’s only 1 row because it affords handles type serialization for us and keeps the pattern consistent.

send_item_and_log(item_id, serialized_item, many)#

Send the item to clickhouse and log any errors

serialize_item(item, many=False, initial=None)#

Serialize the data to be sent to ClickHouse

serializer_class = None#

The serializer class responsible for converting event data into a format suitable for storage. This serializer should be compatible with Django’s serialization framework.

Type:

Serializer

should_dump_item(item)#

Return True if the item should be dumped to ClickHouse, False otherwise

timestamp_field = None#

The name of the field in the model representing the timestamp of the event. It is used to extract the timestamp from the event data for writing to ClickHouse.

Type:

str

unique_key = None#

A unique identifier key used to distinguish between different instances of the sink. It can be used to specify the uniqueness constraint when writing events to ClickHouse.

Type:

str

platform_plugin_aspects.sinks.course_overview_sink module#

Handler for the CMS COURSE_PUBLISHED event

Does the following: - Pulls the course structure from modulestore - Serialize the xblocks - Sends them to ClickHouse in CSV format

Note that the serialization format does not include all fields as there may be things like LTI passwords and other secrets. We just take the fields necessary for reporting at this time.

class platform_plugin_aspects.sinks.course_overview_sink.CourseOverviewSink(connection_overrides, log)#

Bases: ModelBaseSink

Sink for CourseOverview model

clickhouse_table_name = 'course_overviews'#

The name of the ClickHouse table where the events will be written. This should be set to the desired table name for the specific event type.

Type:

str

get_course_last_published(course_overview)#

Get approximate last publish date for the given course. We use the ‘modified’ column in the CourseOverview table as a quick and easy (although perhaps inexact) way of determining when a course was last published. This works because CourseOverview rows are re-written upon course publish. :param course_key: a CourseKey

Returns: The datetime the course was last published at, stringified.

Uses Python’s default str(…) implementation for datetimes, which is sortable and similar to ISO 8601: https://docs.python.org/3/library/datetime.html#datetime.date.__str__

model = 'course_overviews'#

The Django model class representing the structure of the event data. This is used to validate and organize the data before writing it to ClickHouse.

Type:

Model

name = 'Course Overview'#

A human-readable name for the sink instance. This can be used for logging and identification purposes.

Type:

str

nested_sinks = [<class 'platform_plugin_aspects.sinks.course_overview_sink.XBlockSink'>]#

A list of nested sink instances that can be used to further process or route the event data. Nested sinks allow chaining multiple sinks together for more complex event processing pipelines.

Type:

list

pk_format#

alias of str

serializer_class#

alias of CourseOverviewSerializer

should_dump_item(item)#

Only dump the course if it’s been changed since the last time it’s been dumped. :param course_key: a CourseKey object.

Returns:

  • whether this course should be dumped (bool)

  • reason why course needs, or does not need, to be dumped (string)

timestamp_field = 'time_last_dumped'#

The name of the field in the model representing the timestamp of the event. It is used to extract the timestamp from the event data for writing to ClickHouse.

Type:

str

unique_key = 'course_key'#

A unique identifier key used to distinguish between different instances of the sink. It can be used to specify the uniqueness constraint when writing events to ClickHouse.

Type:

str

class platform_plugin_aspects.sinks.course_overview_sink.XBlockSink(connection_overrides, log)#

Bases: ModelBaseSink

Sink for XBlock model

clickhouse_table_name = 'course_blocks'#

The name of the ClickHouse table where the events will be written. This should be set to the desired table name for the specific event type.

Type:

str

Dump all XBlocks for a course

name = 'XBlock'#

A human-readable name for the sink instance. This can be used for logging and identification purposes.

Type:

str

nested_sinks = []#

A list of nested sink instances that can be used to further process or route the event data. Nested sinks allow chaining multiple sinks together for more complex event processing pipelines.

Type:

list

serialize_item(item, many=False, initial=None)#

Serialize an XBlock into a dict

serialize_xblock(item, index, detached_xblock_types, dump_id, time_last_dumped)#

Serialize an XBlock instance into a dict

static strip_branch_and_version(location)#

Removes the branch and version information from a location. :param location: an xblock’s location.

Returns: that xblock’s location without branch and version information.

timestamp_field = 'time_last_dumped'#

The name of the field in the model representing the timestamp of the event. It is used to extract the timestamp from the event data for writing to ClickHouse.

Type:

str

unique_key = 'location'#

A unique identifier key used to distinguish between different instances of the sink. It can be used to specify the uniqueness constraint when writing events to ClickHouse.

Type:

str

platform_plugin_aspects.sinks.external_id_sink module#

User profile sink

class platform_plugin_aspects.sinks.external_id_sink.ExternalIdSink(connection_overrides, log)#

Bases: ModelBaseSink

Sink for user external ID serializer

clickhouse_table_name = 'external_id'#

The name of the ClickHouse table where the events will be written. This should be set to the desired table name for the specific event type.

Type:

str

get_queryset(start_pk=None)#

Return the queryset to be used for the insert

model = 'external_id'#

The Django model class representing the structure of the event data. This is used to validate and organize the data before writing it to ClickHouse.

Type:

Model

name = 'External ID'#

A human-readable name for the sink instance. This can be used for logging and identification purposes.

Type:

str

serializer_class#

alias of UserExternalIDSerializer

timestamp_field = 'time_last_dumped'#

The name of the field in the model representing the timestamp of the event. It is used to extract the timestamp from the event data for writing to ClickHouse.

Type:

str

unique_key = 'id'#

A unique identifier key used to distinguish between different instances of the sink. It can be used to specify the uniqueness constraint when writing events to ClickHouse.

Type:

str

platform_plugin_aspects.sinks.serializers module#

Django serializers for the event_sink_clickhouse app.

class platform_plugin_aspects.sinks.serializers.BaseSinkSerializer(*args, **kwargs)#

Bases: Serializer

Base sink serializer for ClickHouse.

class Meta#

Bases: object

Meta class for base sink serializer.

fields = ['dump_id', 'time_last_dumped']#
get_dump_id(instance)#

Return a unique ID for the dump.

get_time_last_dumped(instance)#

Return the timestamp for the dump.

class platform_plugin_aspects.sinks.serializers.CourseOverviewSerializer(*args, **kwargs)#

Bases: BaseSinkSerializer, ModelSerializer

Serializer for course overview events.

class Meta#

Bases: object

Meta classes for course overview serializer.

fields = ['org', 'course_key', 'display_name', 'course_start', 'course_end', 'enrollment_start', 'enrollment_end', 'self_paced', 'course_data_json', 'created', 'modified', 'dump_id', 'time_last_dumped']#
model = None#
get_course_data_json(overview)#

Return the course data as a JSON string.

get_course_key(overview)#

Return the course key as a string.

class platform_plugin_aspects.sinks.serializers.UserExternalIDSerializer(*args, **kwargs)#

Bases: BaseSinkSerializer, ModelSerializer

Serializer for user external ID events.

class Meta#

Bases: object

Meta class for user external ID serializer.

fields = ['external_user_id', 'external_id_type', 'username', 'user_id', 'dump_id', 'time_last_dumped']#
model = None#
class platform_plugin_aspects.sinks.serializers.UserProfileSerializer(*args, **kwargs)#

Bases: BaseSinkSerializer, ModelSerializer

Serializer for user profile events.

class Meta#

Bases: object

Meta class for user profile serializer.

fields = ['id', 'user_id', 'name', 'email', 'meta', 'courseware', 'language', 'location', 'year_of_birth', 'gender', 'level_of_education', 'mailing_address', 'city', 'country', 'state', 'goals', 'bio', 'profile_image_uploaded_at', 'phone_number', 'dump_id', 'time_last_dumped']#
model = None#
class platform_plugin_aspects.sinks.serializers.UserRetirementSerializer(*args, **kwargs)#

Bases: BaseSinkSerializer, ModelSerializer

Serializer for user retirement events.

class Meta#

Bases: object

Meta class for user retirement serializer.

fields = ['user_id']#
model = None#

platform_plugin_aspects.sinks.user_profile_sink module#

User profile sink

class platform_plugin_aspects.sinks.user_profile_sink.UserProfileSink(connection_overrides, log)#

Bases: ModelBaseSink

Sink for user profile events

clickhouse_table_name = 'user_profile'#

The name of the ClickHouse table where the events will be written. This should be set to the desired table name for the specific event type.

Type:

str

get_queryset(start_pk=None)#

Return the queryset to be used for the insert

model = 'user_profile'#

The Django model class representing the structure of the event data. This is used to validate and organize the data before writing it to ClickHouse.

Type:

Model

name = 'User Profile'#

A human-readable name for the sink instance. This can be used for logging and identification purposes.

Type:

str

serializer_class#

alias of UserProfileSerializer

timestamp_field = 'time_last_dumped'#

The name of the field in the model representing the timestamp of the event. It is used to extract the timestamp from the event data for writing to ClickHouse.

Type:

str

unique_key = 'id'#

A unique identifier key used to distinguish between different instances of the sink. It can be used to specify the uniqueness constraint when writing events to ClickHouse.

Type:

str

platform_plugin_aspects.sinks.user_retire_sink module#

User retirement sink

class platform_plugin_aspects.sinks.user_retire_sink.UserRetirementSink(connection_overrides, log)#

Bases: ModelBaseSink

Sink for user retirement events

clickhouse_table_name = 'dummy'#

The name of the ClickHouse table where the events will be written. This should be set to the desired table name for the specific event type.

Type:

str

model = 'auth_user'#

The Django model class representing the structure of the event data. This is used to validate and organize the data before writing it to ClickHouse.

Type:

Model

name = 'User Retirement'#

A human-readable name for the sink instance. This can be used for logging and identification purposes.

Type:

str

send_item(serialized_item, many=False)#

Unlike the other data sinks, the User Retirement sink deletes records from the user PII tables in Clickhouse.

Send delete queries to remove the serialized User from ClickHouse.

serializer_class#

alias of UserRetirementSerializer

timestamp_field = 'modified'#

The name of the field in the model representing the timestamp of the event. It is used to extract the timestamp from the event data for writing to ClickHouse.

Type:

str

unique_key = 'id'#

A unique identifier key used to distinguish between different instances of the sink. It can be used to specify the uniqueness constraint when writing events to ClickHouse.

Type:

str

Module contents#

This module contains the sinks for the platform plugin aspects.