Source code for wellcome_aws_utils.s3_utils

# -*- encoding: utf-8 -*-

import json
from urllib.parse import unquote

import boto3
from botocore.exceptions import ClientError
import dateutil.parser


[docs]def is_object(bucket, key): """ Checks if an object exists in S3. Returns True/False. :param bucket: Bucket of the object to check. :param key: Key of the object to check. """ client = boto3.client('s3') try: client.head_object(Bucket=bucket, Key=key) except ClientError as err: if err.response['Error']['Code'] == '404': return False else: raise else: return True
[docs]def copy_object(src_bucket, src_key, dst_bucket, dst_key, lazy=False): """ Copy an object from one S3 bucket to another. :param src_bucket: Bucket of the source object. :param src_key: Key of the source object. :param dst_bucket: Bucket of the destination object. :param dst_key: Key of the destination object. :param lazy: Do a lazy copy. This means that the object will only be copied if the destination object does not exist, or exists but has a different ETag from the source object. """ client = boto3.client('s3') if not is_object(bucket=src_bucket, key=src_key): raise ValueError( f'Tried to copy missing object ({src_bucket}, {src_key})' ) def should_copy(): if not lazy: return True if not is_object(bucket=dst_bucket, key=dst_key): return True src_resp = client.head_object(Bucket=src_bucket, Key=src_key) dst_resp = client.head_object(Bucket=dst_bucket, Key=dst_key) return src_resp['ETag'] == dst_resp['ETag'] if should_copy(): return client.copy_object( CopySource={'Bucket': src_bucket, 'Key': src_key}, Bucket=dst_bucket, Key=dst_key )
def _extract_s3_event(record): event_datetime = dateutil.parser.parse(record["eventTime"]) return { "event_name": record["eventName"], "event_time": event_datetime, "bucket_name": record["s3"]["bucket"]["name"], "object_key": unquote(record["s3"]["object"]["key"]), "size": record["s3"]["object"]["size"], "versionId": record["s3"]["object"].get("versionId") }
[docs]def parse_s3_record(event): """ Extracts a simple subset of an S3 update event. """ return [_extract_s3_event(record) for record in event["Records"]]
[docs]def write_objects_to_s3(bucket, key, objects): """ Given an iterable of objects that can be serialised as JSON, serialise them as JSON, and write them to a file in S3, one per line. :param bucket: S3 bucket to upload the new file to. :param key: S3 key to upload the new file to. :param objects: An iterable of objects that can be serialised as JSON. """ # We use sort_keys=True to ensure deterministic results. The separators # flag allows us to write more compact JSON, which makes things faster! # See https://twitter.com/raymondh/status/842777864193769472 json_str = b'\n'.join([ json.dumps(m, sort_keys=True, separators=(',', ':')).encode('ascii') for m in objects ]) client = boto3.client('s3') client.put_object(Bucket=bucket, Key=key, Body=json_str)
__all__ = [ 'is_object', 'copy_object', 'parse_s3_record', 'write_objects_to_s3', ]