Source code for wellcome_aws_utils.dynamo_event

from enum import Enum

from boto3.dynamodb.types import TypeDeserializer

from wellcome_aws_utils.exceptions import UnWellcomeException


def create_dynamo_events(event):
    for record in event['Records']:
        yield DynamoEvent(record)


[docs]class DynamoEventType(Enum): REMOVE, INSERT, MODIFY = range(3)
class DynamoEvent: def _set_event_type(self, record): if self.record['eventName'] == 'REMOVE': self.event_type = DynamoEventType.REMOVE elif self.record['eventName'] == 'INSERT': self.event_type = DynamoEventType.INSERT elif self.record['eventName'] == 'MODIFY': self.event_type = DynamoEventType.MODIFY else: raise UnWellcomeException( f'Unrecognised eventName found in {record}!' ) def __init__(self, record): self.record = record try: self._set_event_type(record) self.event_source_arn = record['eventSourceARN'] self._keys = record['dynamodb']['Keys'] self._new_image = record['dynamodb'].get('NewImage') self._old_image = record['dynamodb'].get('OldImage') except KeyError as e: raise UnWellcomeException( f'{e} not found in {record}!' ) @staticmethod def _deserialize_values(image): td = TypeDeserializer() return {k: td.deserialize(v) for k, v in image.items()} def keys(self, deserialize_values=False): if deserialize_values and self._keys: return DynamoEvent._deserialize_values(self._keys) return self._keys def new_image(self, deserialize_values=False): if deserialize_values and self._new_image: return DynamoEvent._deserialize_values(self._new_image) return self._new_image def old_image(self, deserialize_values=False): if deserialize_values and self._old_image: return DynamoEvent._deserialize_values(self._old_image) return self._old_image