Source code for wellcome_aws_utils.sns_utils
# -*- encoding: utf-8 -*-
import collections
import datetime
import decimal
import json
import logging
import warnings
from wellcome_aws_utils.exceptions import UnWellcomeException
SNSEvent = collections.namedtuple('SNSEvent', 'subject message')
logger = logging.getLogger(__name__)
[docs]class EnhancedJSONEncoder(json.JSONEncoder):
[docs] def default(self, obj):
if isinstance(obj, datetime.datetime):
return obj.isoformat()
if isinstance(obj, decimal.Decimal):
if float(obj).is_integer():
return int(obj)
else:
return float(obj)
return json.JSONEncoder.default(self, obj)
[docs]def publish_sns_message(sns_client,
topic_arn,
message,
subject="default-subject"):
"""
Given a topic ARN and a series of key-value pairs, publish the key-value
data to the SNS topic.
"""
response = sns_client.publish(
TopicArn=topic_arn,
MessageStructure='json',
Message=json.dumps({
'default': json.dumps(
message,
cls=EnhancedJSONEncoder
)
}),
Subject=subject
)
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
logger.debug('SNS: sent notification %s', response["MessageId"])
else:
raise RuntimeError(repr(response))
return response
[docs]def extract_sns_messages_from_lambda_event(event):
"""
Extracts a JSON message from an SNS event sent to an AWS Lambda.
:param event: An event sent to a Lambda from SNS.
:returns: A generator of SNSEvent instances.
"""
if 'Records' not in event:
raise UnWellcomeException(f'No records found in {event}')
for record in event['Records']:
if record['EventSource'] != 'aws:sns':
raise UnWellcomeException(f'Invalid message source for {record}')
try:
subject = record['Sns']['Subject']
message = json.loads(record['Sns']['Message'])
except KeyError as e:
raise UnWellcomeException(
f'Invalid message structure, missing {e} in {record}'
)
yield SNSEvent(subject=subject, message=message)
[docs]def extract_json_message(event):
"""
Extracts a JSON message from an SNS event sent to a lambda
Deprecated in favour of extract_sns_messages_from_lambda_event
"""
warnings.warn(
'Deprecated in favour of extract_sns_messages_from_lambda_event',
DeprecationWarning
)
message = event['Records'][0]['Sns']['Message']
return json.loads(message)